### Function versions 08-05-23

In [None]:
def get_mean_NDVI(point_of_interest_file, ndvi_raster_file, buffer_type=None, buffer_dist=None, network_file=None, network_type=None,
                  trip_time=None, travel_speed=None, output_dir=os.getcwd()):
    # Read and process user input, check conditions
    poi = gpd.read_file(point_of_interest_file)
    if all(poi['geometry'].geom_type == 'Point') or all(poi['geometry'].geom_type == 'Polygon'):
        geom_type = poi.iloc[0]['geometry'].geom_type
    else:
        raise ValueError("Please make sure all geometries are of 'Point' type or all geometries are of 'Polygon' type and re-run the function")

    if not poi.crs.is_projected:
        raise ValueError("The CRS of the PoI dataset is currently geographic, please transform it into a local projected CRS and re-run the function")
    else:
        epsg = poi.crs.to_epsg()

    if poi['id'].isnull().values.any():
        poi['id'] = poi['id'].fillna(pd.Series(range(1, len(poi) + 1))).astype(int)

    ndvi_src = rioxarray.open_rasterio(ndvi_raster_file)
    if not ndvi_src.rio.crs.to_epsg() == epsg:
                    print("Adjusting CRS of NDVI file to match with Point of Interest CRS...")
                    ndvi_src.rio.write_crs(f'EPSG:{epsg}', inplace=True)
                    print("Done")

    # Make sure all points of interest are within or do at least intersect (in case of polygons) the NDVI raster provided
    if not all(geom.within(sg.box(*ndvi_src.rio.bounds())) for geom in poi['geometry']):
        if geom_type == "Point":
            raise ValueError("Not all points of interest are within the NDVI file provided, please make sure they are and re-run the function")
        else:
            if not all(geom.intersects(sg.box(*ndvi_src.rio.bounds())) for geom in poi['geometry']):
                raise ValueError("Not all polygons of interest are within, or do at least partly intersect, with the area covered by the NDVI file provided, please make sure they are/do and re-run the function")
            else:
                warnings.warn("Not all polygons of interest are completely within the area covered by the NDVI file provided, results will be based on intersecting part of polygons involved") 

    # Create buffers (or not) based on user input        
    if geom_type == "Point":
        # Make sure all points of interest are within the NDVI raster provided
        if not all(geom.within(sg.box(*ndvi_src.rio.bounds())) for geom in poi['geometry']):
            raise ValueError("Not all points of interest are within the NDVI file provided, please make sure they are and re-run the function")
    
        if buffer_type not in ["euclidian", "network"]:
            raise ValueError("Please make sure that the buffer_type argument is set to either 'euclidian' or 'network' and re-run the function")
    
        if buffer_type == "euclidian":
            if not isinstance(buffer_dist, int) or (not buffer_dist > 0):
                raise TypeError("Please make sure that the buffer distance is set as a positive integer")             

            aoi_gdf = gpd.GeoDataFrame(geometry=poi['geometry'].buffer(buffer_dist))
        else:
            if network_file is not None and (not os.path.splitext(network_file)[1] == ".gpkg"):
                raise ValueError("Please provide the network file in '.gpkg' format")
            elif network_file is not None and (os.path.splitext(network_file)[1] == ".gpkg"):
                network = gpd.read_file(network_file, layer='edges')
            else:
                network = None

            if network is None:
                if not isinstance(buffer_dist, int) or (not buffer_dist > 0):
                    raise TypeError("Please make sure that the buffer distance is set as a positive integer")

                if network_type not in ["walk", "bike", "drive", "all"]:
                    raise ValueError("Please make sure that the network_type argument is set to either 'walk', 'bike, 'drive' or 'all', and re-run the function")
                
                if not isinstance(travel_speed, int) or (not travel_speed > 0):
                    raise TypeError("Please make sure that the travel speed is set as a positive integer")

                if not isinstance(trip_time, int) or (not trip_time > 0):
                    raise TypeError("Please make sure that the trip time is set as a positive integer")             
        
                meters_per_minute = travel_speed * 1000 / 60  # km per hour to m per minute
                epsg_transformer = pyproj.Transformer.from_crs(f"epsg:{epsg}", "epsg:4326")

                # Transform the geometry column to lat-lon coordinates
                aoi_geometry = []
                for geom in poi['geometry']:
                    latlon = epsg_transformer.transform(geom.x, geom.y) # Transform point geometry into latlon for OSMnx
                    graph = ox.graph_from_point(latlon, network_type=network_type, dist=buffer_dist) # Retrieve street network for desired network type and buffer distance surrounding poi
                    nodes = ox.graph_to_gdfs(graph, edges=False) # Create geodataframe which contains only the nodes of the street network
                    x, y = nodes["geometry"].unary_union.centroid.xy # Calculate centroid of node geometries 
                    center_node = ox.distance.nearest_nodes(graph, x[0], y[0]) # Find node which is closest to centroid as base for next steps
                    graph = ox.project_graph(graph) # Project street network graph back to original poi CRS
                    graph_epsg = graph.graph['crs'].to_epsg() # Store EPSG code used for previous step
                    for _, _, _, data in graph.edges(data=True, keys=True): # Calculate the time it takes to cover each edge's distance
                        data["time"] = data["length"] / meters_per_minute
                    isochrone_poly = make_iso_poly(graph, center_node=center_node, trip_time=trip_time) # See separate function for line by line explanation
                    aoi_geometry.append(isochrone_poly)

                aoi_gdf = gpd.GeoDataFrame(geometry=aoi_geometry, crs=f"EPSG:{graph_epsg}").to_crs(f"EPSG:{epsg}")    
            else:
                if not network.crs.to_epsg() == epsg:
                    print("Adjusting CRS of Network file to match with Point of Interest CRS...")
                    network.to_crs(f'EPSG:{epsg}', inplace=True)
                    print("Done")

                # Create bounding box for network file
                bbox_network = network.unary_union.convex_hull

                if not all(geom.within(bbox_network) for geom in poi['geometry']):
                     raise ValueError("Not all points of interest are within the network file provided, please make sure they are and re-run the function")

                aoi_gdf = gpd.GeoDataFrame(geometry=[bbox_network], crs=f'EPSG:{epsg}')
    else:
        aoi_gdf = gpd.GeoDataFrame(geometry=poi['geometry'])

    # Calculate mean NDVI values
    poi['mean_NDVI'] = aoi_gdf.apply(lambda row: ndvi_src.rio.clip([row.geometry]).clip(min=0).mean().values.round(3), axis=1)

    print("Writing results to new geopackage file in specified directory...")
    input_filename, ext = os.path.splitext(point_of_interest_file)
    poi.to_file(os.path.join(output_dir, f"{input_filename}_ndvi_added.gpkg"), driver="GPKG")
    print("Done")
    
    return poi

In [None]:
def get_landcover_percentages(point_of_interest_file, landcover_raster_file, buffer_type=None, buffer_dist=None, network_file=None, 
                              network_type=None, trip_time=None, travel_speed=None, output_dir=os.getcwd()):
    # Read and process user input, check conditions
    poi = gpd.read_file(point_of_interest_file)
    if all(poi['geometry'].geom_type == 'Point') or all(poi['geometry'].geom_type == 'Polygon'):
        geom_type = poi.iloc[0]['geometry'].geom_type
    else:
        raise ValueError("Please make sure all geometries are of 'Point' type or all geometries are of 'Polygon' type and re-run the function")

    if not poi.crs.is_projected:
        raise ValueError("The CRS of the PoI dataset is currently geographic, please transform it into a local projected CRS and re-run the function")
    else:
        epsg = poi.crs.to_epsg()

    if poi['id'].isnull().values.any():
        poi['id'] = poi['id'].fillna(pd.Series(range(1, len(poi) + 1))).astype(int)

    landcover_src = rioxarray.open_rasterio(landcover_raster_file)
    if not landcover_src.rio.crs.to_epsg() == epsg:
                    print("Adjusting CRS of Land Cover file to match with Point of Interest CRS...")
                    landcover_src.rio.write_crs(f'EPSG:{epsg}', inplace=True)
                    print("Done")

    # Make sure all points of interest are within or do at least intersect (in case of polygons) the NDVI raster provided
    if not all(geom.within(sg.box(*landcover_src.rio.bounds())) for geom in poi['geometry']):
        if geom_type == "Point":
            raise ValueError("Not all points of interest are within the Land Cover file provided, please make sure they are and re-run the function")
        else:
            if not all(geom.intersects(sg.box(*landcover_src.rio.bounds())) for geom in poi['geometry']):
                raise ValueError("Not all polygons of interest are within, or do at least partly intersect, with the area covered by the Land Cover file provided, please make sure they are/do and re-run the function")
            else:
                warnings.warn("Not all polygons of interest are completely within the area covered by the Land Cover file provided, results will be based on intersecting part of polygons involved")

    # Create buffers (or not) based on user input        
    if geom_type == "Point":
        # Make sure all points of interest are within the NDVI raster provided
        if not all(geom.within(sg.box(*landcover_src.rio.bounds())) for geom in poi['geometry']):
            raise ValueError("Not all points of interest are within the Land Cover file provided, please make sure they are and re-run the function")

        if buffer_type not in ["euclidian", "network"]:
            raise ValueError("Please make sure that the buffer_type argument is set to either 'euclidian' or 'network' and re-run the function")
    
        if buffer_type == "euclidian":
            if not isinstance(buffer_dist, int) or (not buffer_dist > 0):
                raise TypeError("Please make sure that the buffer distance is set as a positive integer")             

            aoi_gdf = gpd.GeoDataFrame(geometry=poi['geometry'].buffer(buffer_dist))
        else:
            if network_file is not None and (not os.path.splitext(network_file)[1] == ".gpkg"):
                raise ValueError("Please provide the network file in '.gpkg' format")
            elif network_file is not None and (os.path.splitext(network_file)[1] == ".gpkg"):
                network = gpd.read_file(network_file, layer='edges')
            else:
                network = None

            if network is None:
                if not isinstance(buffer_dist, int) or (not buffer_dist > 0):
                    raise TypeError("Please make sure that the buffer distance is set as a positive integer")

                if network_type not in ["walk", "bike", "drive", "all"]:
                    raise ValueError("Please make sure that the network_type argument is set to either 'walk', 'bike, 'drive' or 'all', and re-run the function")
                
                if not isinstance(travel_speed, int) or (not travel_speed > 0):
                    raise TypeError("Please make sure that the travel speed is set as a positive integer")

                if not isinstance(trip_time, int) or (not trip_time > 0):
                    raise TypeError("Please make sure that the trip time is set as a positive integer")             
        
                meters_per_minute = travel_speed * 1000 / 60  # km per hour to m per minute
                epsg_transformer = pyproj.Transformer.from_crs(f"epsg:{epsg}", "epsg:4326")

                # Transform the geometry column to lat-lon coordinates
                aoi_geometry = []
                for geom in poi['geometry']:
                    latlon = epsg_transformer.transform(geom.x, geom.y) # Transform point geometry into latlon for OSMnx
                    graph = ox.graph_from_point(latlon, network_type=network_type, dist=buffer_dist) # Retrieve street network for desired network type and buffer distance surrounding poi
                    nodes = ox.graph_to_gdfs(graph, edges=False) # Create geodataframe which contains only the nodes of the street network
                    x, y = nodes["geometry"].unary_union.centroid.xy # Calculate centroid of node geometries 
                    center_node = ox.distance.nearest_nodes(graph, x[0], y[0]) # Find node which is closest to centroid as base for next steps
                    graph = ox.project_graph(graph) # Project street network graph back to original poi CRS
                    graph_epsg = graph.graph['crs'].to_epsg() # Store EPSG code used for previous step
                    for _, _, _, data in graph.edges(data=True, keys=True): # Calculate the time it takes to cover each edge's distance
                        data["time"] = data["length"] / meters_per_minute
                    isochrone_poly = make_iso_poly(graph, center_node=center_node, trip_time=trip_time) # See separate function for line by line explanation
                    aoi_geometry.append(isochrone_poly)

                aoi_gdf = gpd.GeoDataFrame(geometry=aoi_geometry, crs=f"EPSG:{graph_epsg}").to_crs(f"EPSG:{epsg}")    
            else:
                if not network.crs.to_epsg() == epsg:
                    print("Adjusting CRS of Network file to match with Point of Interest CRS...")
                    network.to_crs(f'EPSG:{epsg}', inplace=True)
                    print("Done")

                # Create bounding box for network file
                bbox_network = network.unary_union.convex_hull

                if not all(geom.within(bbox_network) for geom in poi['geometry']):
                     raise ValueError("Not all points of interest are within the network file provided, please make sure they are and re-run the function")

                aoi_gdf = gpd.GeoDataFrame(geometry=[bbox_network], crs=f'EPSG:{epsg}')
    else:
        aoi_gdf = gpd.GeoDataFrame(geometry=poi['geometry'])

    # apply the function to each geometry in the GeoDataFrame and create a new Pandas Series
    landcover_percentages_series = aoi_gdf.geometry.apply(lambda x: pd.Series(calculate_landcover_percentages(landcover_src=landcover_src, geometry=x)))
    # rename the columns with the land cover class values
    landcover_percentages_series.columns = ["class_" + str(col) for col in landcover_percentages_series.columns]
    # concatenate the new series to the original dataframe
    poi = pd.concat([poi, landcover_percentages_series], axis=1)

    print("Writing results to new geopackage file in specified directory...")
    input_filename, ext = os.path.splitext(point_of_interest_file)
    poi.to_file(os.path.join(output_dir, f"{input_filename}_LCperc_added.gpkg"), driver="GPKG")
    print("Done")

    return poi