In [1]:
import os,sys
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd
import shapely
import pandas as pd
import numpy as np
import xarray as xr
import dask_geopandas
from tqdm import tqdm
sys.path.append('c://projects//osm-flex/src') 

from rasterstats import point_query

In [2]:
data_path = 'c://data//CEED'
input_data = os.path.join(data_path,'input_data')
bucco_path = os.path.join(data_path,'coastal_bucco_exact')
osm_path = os.path.join(data_path,'coastal_osm_exact')

In [3]:
def raster_to_vector(xr_raster):
    """
    Convert a raster to a vector representation.

    Args:
        xr_raster (xarray.DataArray): Input raster data as xarray.DataArray.

    Returns:
        gpd.GeoDataFrame: Vector representation of the input raster.
    """

    # Convert xarray raster to pandas DataFrame
    df = xr_raster.to_dataframe()

    # Filter DataFrame to select rows where band_data > 0
    df_1 = df.loc[df.band_data > 0].reset_index()

    # Create a Shapely Point geometry column from x and y values
    df_1['geometry'] = shapely.points(df_1.x.values, df_1.y.values)

    # Remove unnecessary columns from the DataFrame
    df_1 = df_1.drop(['x', 'y', 'band', 'spatial_ref'], axis=1)

    # Calculate the resolution of the raster
    resolution = xr_raster.x[1].values - xr_raster.x[0].values

    # Buffer the Point geometries by half of the resolution with square caps
    df_1.geometry = shapely.buffer(df_1.geometry, distance=resolution/2, cap_style='square').values

    # Convert the DataFrame to a GeoDataFrame
    return gpd.GeoDataFrame(df_1)      

def zonal_stats(vector, raster_in):
    """
    Calculate zonal statistics of a raster dataset based on a vector dataset.
    
    Parameters:
    - vector_in (str): Path to the vector dataset file (in Parquet format).
    - raster_in (str): Path to the raster dataset file (in NetCDF format).
    
    Returns:
    - pandas.Series: A series containing the zonal statistics values corresponding to each centroid point in the vector dataset.
    """
    
    # Open the raster dataset using the xarray library
    raster = xr.open_dataset(raster_in, engine="rasterio")
    
    # Progress bar setup for obtaining values
    tqdm.pandas(desc='obtain values')
    
    # Clip the raster dataset to the bounding box of the vector dataset
    raster_clip = raster.rio.clip_box(vector.total_bounds[0], vector.total_bounds[1], vector.total_bounds[2], vector.total_bounds[3])
    
    # Convert the clipped raster dataset to a vector representation
    raster_vector = raster_to_vector(raster_clip)
    
    # Create a dictionary mapping each index to its corresponding band data value
    band_data_dict = dict(zip(list(raster_vector.index), raster_vector['band_data'].values))
    
    # Construct an STRtree from the vector geometry values
    tree = shapely.STRtree(raster_vector.geometry.values)
    
    # Apply a function to calculate zonal statistics for each centroid point in the vector dataset
    return vector.centroid.progress_apply(lambda x: band_data_dict[tree.query(x, predicate='intersects')[0]])


def vector_point_query(x, coastal_CLC_tree, band_data_dict):
    """
    Perform a point query on a vector dataset based on specific conditions.

    Parameters:
    - x (GeoDataFrame): A GeoDataFrame representing a single feature.
    - coastal_CLC_tree (shapely.STRtree): STRtree object constructed from the coastal CLC vector geometry values.
    - band_data_dict (dict): A dictionary mapping indices to their corresponding band data values.

    Returns:
    - int: The band data value corresponding to the point query, or -9999 if the conditions are not met.
    """

    if x.land_use == 5:
        try:
            # Perform an intersection query using the centroid of the feature
            match = coastal_CLC_tree.query(x.centroid, predicate='intersects')
            return band_data_dict[match[0]]
        except:
            # Return -9999 if no intersection is found
            return -9999
    else:
        # Return -9999 if the land use condition is not met
        return -9999
    
def final_land_use(x):
    """
    Determine the final land use based on the coastal land use and land use values of a feature.

    Parameters:
    - x (pandas.Series): A pandas Series representing a single feature.

    Returns:
    - int: The final land use value, which is either the coastal land use or the land use value of the feature.
    """

    if x.coastal_land_use == -9999:
        return x.land_use
    else:
        return x.coastal_land_use  
    
def iso2_to_iso3(iso2):
    
    dict_ =  {
            "NL" : "NLD",
            "FR" : "FRA",
            "DE" : "NLD",
            "SW" : "SWE",
            "ES" : "ESP",
            "PT" : "PRT",
            "IT" : "ITA",
            "HR" : "HRV",
            "GR" : "GRC",
            "MT" : "MLT",
            "DK" : "DNK",
            "BE" : "BEL",
            "PL" : "POL",
            "CY" : "CYP",
            "FI" : "FIN",
            "IE" : "IRL",
            "EE" : "EST",
            "LV" : "LVA",
            "RO" : "ROU",
            "BG" : "BGR",
            "LT" : "LTU"    
            }
    
    return dict_[iso2]

### Load nuts2 file

In [4]:
%%time
nuts_europe = gpd.read_file(os.path.join(input_data,'NUTS_RG_03M_2021_3035.shp'))
nuts2_europe = nuts_europe.loc[nuts_europe.LEVL_CODE == 2].reset_index(drop=True)
nuts0_europe = nuts_europe.loc[nuts_europe.LEVL_CODE == 0].reset_index(drop=True)

CPU times: total: 1.42 s
Wall time: 1.47 s


### LOAD CLC data

In [5]:
CLC_path = os.path.join(input_data,'u2018_clc2018_v2020_20u1_raster100m','DATA','U2018_CLC2018_V2020_20u1.tif')
coastal_CLC_path = os.path.join(input_data,'CZ_2018_DU004_3035_V010.parquet')

### Get coastal countries

In [6]:
country_codes = [x.split('.')[0][:3] for x in os.listdir(bucco_path) if x.endswith('.parquet')]

In [7]:
def prepare_buildings(nuts2,nuts2_europe,bucco_path,CLC_path,coastal_CLC_path):

    # get country iso2
    country_iso2 = nuts2[:2]

    #continue if not in a coastal country
    try: 
        iso2_to_iso3(country_iso2)
    except:
        return None

    # read nuts2 geometry
    nuts2_geom = nuts2_europe.loc[nuts2_europe.NUTS_ID==nuts2].geometry.values[0]

    # load buildings for the country
    gdf_bucco = gpd.read_parquet(os.path.join(bucco_path,'{}_bucco.parquet'.format(iso2_to_iso3(country_iso2))))

    # bounding box clip
    bbox_buildings = gdf_bucco.iloc[gdf_bucco.centroid.clip(nuts2_geom.bounds).index].reset_index(drop=True)

    if len(bbox_buildings) == 0:
        return None
    
    # prepare geometry to improve speed of intersect
    shapely.prepare(bbox_buildings.geometry.values)

    # exact intersect of nuts2 with buildings
    nuts2_buildings = bbox_buildings.loc[shapely.intersects(bbox_buildings.geometry.values,nuts2_geom)].reset_index(drop=True)   

    if len(nuts2_buildings) == 0:
        return None    
    
    # get land use information from CLC 2018
    nuts2_buildings['land_use'] = zonal_stats(nuts2_buildings,CLC_path)

    # read coastal corine land cover layer
    coastal_CLC = gpd.read_parquet(coastal_CLC_path)
    coastal_CLC_tree = shapely.STRtree(coastal_CLC.geometry.values)
    band_data_dict = dict(zip(list(coastal_CLC.index), coastal_CLC['CODE_4_18'].values))

    # get centroids to speed up intersect
    nuts2_buildings['centroid'] = nuts2_buildings.centroid

    # get port values
    tqdm.pandas(desc='obtain port values')
    nuts2_buildings['coastal_land_use'] = nuts2_buildings.progress_apply(lambda x: vector_point_query(x,coastal_CLC_tree,band_data_dict),axis=1)
    
    # get unique use type per building
    tqdm.pandas(desc='get unique use type')
    nuts2_buildings['use_type'] = nuts2_buildings.progress_apply(lambda x: final_land_use(x),axis=1)

    nuts2_buildings = nuts2_buildings.drop(['centroid','land_use','coastal_land_use'],axis=1)
    
    return nuts2_buildings

In [11]:
def prepare_cis(nuts2,nuts2_europe,osm_path):

    # get country iso2
    country_iso2 = nuts2[:2]

    #continue if not in a coastal country
    try: 
        iso2_to_iso3(country_iso2)
    except:
        return None

    # read nuts2 geometry
    nuts2_geom = nuts2_europe.loc[nuts2_europe.NUTS_ID==nuts2].geometry.values[0]

    # load osm data for the country
    country_cis = gpd.read_parquet(os.path.join(osm_path,'{}_cis.parquet'.format(iso2_to_iso3(country_iso2))))                          

    # list of critical infrastructure types
    cis = ['healthcare','education','gas','oil','telecom','water','wastewater','power','rail','road','air']
    
    cis_nuts = {}
    for i_cis in cis:
        sub_cis = country_cis.loc[i_cis] 
        sub_cis = sub_cis.to_crs(3035)    
        
        # bounding box clip
        bbox_sub_cis = sub_cis.iloc[sub_cis.centroid.clip(nuts2_geom.bounds).index].reset_index(drop=True)

        # prepare geometry to improve speed of intersect
        shapely.prepare(bbox_sub_cis.geometry.values)

        # exact intersect of nuts2 with buildings
        nuts2_cis = bbox_sub_cis.loc[shapely.intersects(bbox_sub_cis.geometry.values,nuts2_geom)].reset_index(drop=True)   
        
        # drop duplicate geometries
        nuts2_cis = nuts2_cis.iloc[nuts2_cis.geometry.to_wkt().drop_duplicates().index].reset_index(drop=True)
        
        cis_nuts[i_cis] = nuts2_cis
    
    return gpd.GeoDataFrame(pd.concat(cis_nuts))

def nuts2_exposure(nuts2):
    
    data_path = 'c://data//CEED'
    input_data = os.path.join(data_path,'input_data')
    bucco_path = os.path.join(data_path,'coastal_bucco_exact')
    osm_path = os.path.join(data_path,'coastal_osm_exact')
    
    nuts_europe = gpd.read_file(os.path.join(input_data,'NUTS_RG_03M_2021_3035.shp'))
    nuts2_europe = nuts_europe.loc[nuts_europe.LEVL_CODE == 2].reset_index(drop=True)
    
    nuts2_buildings = prepare_buildings(nuts2,nuts2_europe,bucco_path,CLC_path,coastal_CLC_path)

    nuts2_cis = prepare_cis(nuts2,nuts2_europe,osm_path)

    all_asset_types = ['buildings','healthcare','education','gas','oil','telecom','water','wastewater','power','rail','road','air']
    
    combine_all = {}
    for asset_type in all_asset_types:
        try:
            if asset_type == 'buildings':
                combine_all[asset_type] = nuts2_buildings

            else:
                combine_all[asset_type] = nuts2_cis.loc[asset_type]
        except:
            continue
            
    # save file
    out_path = os.path.join(data_path,'nuts2_CEED','{}_CEED.parquet'.format(nuts2)
    
    gpd.GeoDataFrame(pd.concat(combine_all)).to_parquet(out_path)        
    
    return gpd.GeoDataFrame(pd.concat(combine_all))        

In [None]:
%%time

nuts_pilots = ['FRI3','ES21','ITC3','ES52']
for nuts2 in nuts_pilots:
    nuts2_combined = nuts2_exposure(nuts2)


KeyboardInterrupt

