In [1]:
from process_framework.steps.geospatial.assign_esri_api_response import ArcGisApiTransformer
from process_framework import Reference

In [2]:
from pandas import DataFrame
subject = Reference(DataFrame)

df = DataFrame.from_records(
    [
        [1010136, 2, 'Malmesbury Abbey'],
        [1022264, 1, 'The Beeches'],
        [1000457, 4, 'Westonbirt']
    ],
    columns=['nhle_id', 'heritage_category_uid', 'name'],
    index='nhle_id'
)

subject.set(df)
subject.value

Unnamed: 0_level_0,heritage_category_uid,name
nhle_id,Unnamed: 1_level_1,Unnamed: 2_level_1
1010136,2,Malmesbury Abbey
1022264,1,The Beeches
1000457,4,Westonbirt


In [3]:
from geopandas import GeoDataFrame
from pandas import DataFrame, Series
from requests import Response
from shapely.geometry import shape
from process_framework.references.reference import Reference

HERITAGE_CATEGORY_UID_TO_LAYER = {
    1:'3',    # Listing
    2:'6',    # Scheduling
    3:'9',    # Wreck
    4:'7',    # Park and Garden
    5:'8',    # Battlefield
    6:'10',   # World Heritage Site
    7:'5',    # Certificate of Immunity
    8:'4'     # Building Preservation Notice
}

assign_to = Reference(GeoDataFrame)

class DemoArcGisApiTransformer(ArcGisApiTransformer):
    """ this is, effectively, the NHLE polygon API connector; a useful demonstrator """
    def __init__(self, subject: Reference[DataFrame], assign_to: Reference[GeoDataFrame], endpoint_url: str, *, 
                 batch_size: int = 1000, payload_args: dict | None = None, verify_session: bool = False,  
                 max_retries: int = 5, retry_backoff_factor: float = 10, 
                 esri_crs:int=27700, out_crs:int=4326, buffer:float=0):
        super().__init__(subject, assign_to, endpoint_url, batch_size=batch_size, max_retries=max_retries, retry_backoff_factor=retry_backoff_factor, payload_args=payload_args, verify_session=verify_session)
        self.esri_crs = esri_crs
        self.out_crs = out_crs
        self.buffer = buffer

    def get_layer_def(self, layer_id:str, group:Series) -> dict:
        """ get a layerDef for a DataFrame grouped by layer_id """
        _ids = group.index.values

        return dict(
            layerId=layer_id,
            where=f"area_ha > 0 AND ListEntry IN ({','.join(str(_id) for _id in _ids)})",
            outFields="ListEntry,geometry"
        )


    def get_layer_defs_for_batch(self, batch:DataFrame) -> list[dict]:
        """ get layer_defs for the supplied `batch` `DataFrame` by mapping `heritage_category_uid` to an Open Data Hub `layer_id` """
        assert 'heritage_category_uid' in batch.columns
        groups = (
            batch.assign(layer_id=batch.heritage_category_uid.map(HERITAGE_CATEGORY_UID_TO_LAYER))
            .groupby('layer_id')
        )
        return [self.get_layer_def(layer_id, group) for layer_id, group in groups]
    

    def get_payload_for_batch(self, batch: DataFrame) -> dict:
        """ get a payload of {layerDefs=[layerdef_1, layerdef_2, etc.], f=format} for a batch """
        layer_defs = self.get_layer_defs_for_batch(batch)
        return dict(
            layerDefs=str(layer_defs),
            f='json'
        )


    @staticmethod
    def esri_geometry_to_geojson(geometry:dict) -> dict:
        """ convert an esri json formatted `geometry` of rings into a geojson polygon of coordinates """
        # this assumes we always have polygons - is this always true?
        assert 'rings' in geometry, "expected an esri json geometry with a 'rings' property"
        polygon = dict(type='polygon')
        polygon['coordinates'] = geometry['rings']
        return polygon
    

    def get_geodataframe_for_responses(self, responses: list[Response]) -> GeoDataFrame:
        """ convert API responses into a GeoDataFrame """
        # explode layers
        layers = DataFrame.from_records(responses).layers.explode()
        layers = DataFrame.from_records(layers.values, index='id')

        # explode features, drop 'NA' features
        features = layers.features.explode().dropna()
        features = DataFrame.from_records(features.values)

        # get geodataframe from features
        gdf = GeoDataFrame(
            data=DataFrame.from_records(features.attributes.values, index=features.index), 
            index=features.index, 
            geometry=features.geometry.map(DemoArcGisApiTransformer.esri_geometry_to_geojson).map(shape),
            crs=self.esri_crs
        )

        # dissolve by ListEntry (get unique index, merge shapes with same index)
        gdf = gdf.dissolve(by='ListEntry')
        gdf.geometry = gdf.geometry.make_valid()
        gdf.geometry = gdf.buffer(self.buffer)
        gdf = gdf.to_crs(self.out_crs) #type: ignore
        assert gdf.index.is_unique, 'index of gdf should be unique'
        return gdf


transformer = DemoArcGisApiTransformer(
    subject=subject,
    assign_to=assign_to,
    endpoint_url='https://services-eu1.arcgis.com/ZOdPfBS3aqqDYPUQ/ArcGIS/rest/services/National_Heritage_List_for_England_NHLE_v02_VIEW/FeatureServer/query'
)

transformer.do()
assign_to.value

Unnamed: 0_level_0,geometry
ListEntry,Unnamed: 1_level_1
1000457,"POLYGON ((-2.21946 51.61988, -2.2193 51.61979,..."
1010136,"POLYGON ((-2.09767 51.58527, -2.09766 51.58512..."


In [8]:
wkts = Reference(Series)

from process_framework.steps.geospatial.assign_wkt_geometry import TransformGeometryToWkt

to_wkt = TransformGeometryToWkt[GeoDataFrame](
    subject=assign_to,
    assign_to=wkts,
    rounding_precision=3,
    buffer=0
)
to_wkt.do()

wkts

Reference[Series](ListEntry
1000457    POLYGON ((-2.219 51.62, -2.219 51.62, -2.219 5...
1010136    POLYGON ((-2.098 51.585, -2.098 51.585, -2.097...
dtype: object)