#### Impedance calculation ('edge effect' of biodiversity stressors)
This **optional** block takes into account the negative effect of some human-made objects, or stressors, cause impact on biodiversity and habitat connectivity. This impact can be represented as overhead landscape impedance, compared to the intrinsic resistance of landscapes to pass species through:
- simplified schema implies that each LULC type strongly matches only one value of landscape impedance (for example 1000 in urban areas and 10 in forests)
- enriched schema implies that impedance in each pixel contains the simple value summed with overhead impedance declining while moving from the stressors. For example, the landscape impedance within roads urban areas is 1000, while in forests it is 10 + overhead. Overhead is close to 1000 in forests adjacent to urban areas, but it is declining while moving away from the stressor. 

The character and pace of decline (or decay rate) of edge effect is defined in the configuration files. Meanwhile, it should be revisited by user and changed if needed, because the character of negative impact decline can vary significantly, depending on study area, species, stressors and land-use/land-cover.

It depends on user whether they use this tool to create the detailed representation of landscape impedance or apply the simplified schema.

**INPUT**
- Impedance raster dataset (GeoTIFF)
- LULC raster dataset (GeoTIFF)
- OSM raster datasets with rasterised spatial features (GeoTIFF)
- configuration of edge effect from user: which LULC categories cause edge effect (0...\*,) which OSM spatial features can cause edge effect (0...\*), defined in CSV
- parameters of edge effect from stressors (YAML configuration file)

**OUTPUT**
- enriched impedance raster dataset with applied edge effect (GeoTIFF)

**NOTES**
- gdal_proximity computes proximity between centroids of corresponding raster pixels (for example, proximity from the edge of roads, but from the centroid of first pixel)
r.grow.distance (GRASS) might be used instead of gdal to generate the distances to the nearest points of target pixels, but it is unclear if gdal represents the actual distances better.
- Outputs from Open Street Map can have a larger spatial extent than initial raster datasets (land-use/land-cover). It is not an issue as long as LULC bounding box is fully contained within the bounding boxes of OSM outputs
Moreover, bounding box can vary across Open Street Map outputs, depening on the geometry of spatial features covering LULC dataset.
- No data values in the entire output raster dataset while calculating raster proximity (only for the last stressors from LULC in the loop + all stressors from OSM). 
14/10/2024: solved for the last stressor from LULC (more FlushCache() and ...= none statements). To avoid similar issues, initialise VRT dataset after running gdal.RasterProximity and taking No Data value directly from input raster dataset (stressors).

***TODO - to put visualisation (or in software paper)***

##### Dependencies and configuration

In [1]:
import geopandas as gpd
import yaml
import warnings
import os
import sys
import numpy as np
from collections import OrderedDict # to order entries in YAML
import re
import copy
from typing import Optional
from anytree import Node, RenderTree # pip install anytree

# TODO - implement logging

# installing GDAL
try:
    from osgeo import ogr, osr, gdal
except ImportError as e:
    print("GDAL/OGR modules are not available. Please make sure GDAL is installed correctly.") # changed from sys.exit
    raise e
'''
# debug: enable GDAL debug logging
gdal.SetConfigOption('CPL_DEBUG', 'ON')
'''

"\n# debug: enable GDAL debug logging\ngdal.SetConfigOption('CPL_DEBUG', 'ON')\n"

Specify the configuration (brought from previous Notebooks and updated):

In [2]:
# function to write YAML back to the file
def save_yaml_config(yaml_data, filepath='config.yml'):
    with open(filepath, 'w') as yaml_file:
        yaml.dump(yaml_data, yaml_file, default_flow_style=False)
        print(f"Updated YAML configuration saved to {filepath}")

with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f) # then use safe load to avoid issues with input files

year = config.get('year')
if year is None or 'year' not in config: # both conditions should be considered
    warnings.warn("Year variable is not found in the configuration file.")

lulc_template = config.get('lulc')

# substitute year from the configuration file
lulc = lulc_template.format(year=year)
print(f"Input raster to be used for processing is {lulc}.")

# open additional config file for updating impedance datasets
with open('config_impedance.yaml', 'r') as f:
    config_impedance = yaml.safe_load(f)

# ensure 'initial_lulc' exists and handle 'enabled' field logic (various cases)
if 'initial_lulc' not in config_impedance or config_impedance['initial_lulc'] is None:
    # create 'initial_lulc' with 'enabled' set to 'false' if it doesn't exist or is None
    config_impedance['initial_lulc'] = {'enabled': 'false'}
else:
    # if 'enabled' doesn't exist in 'initial_lulc', add it and set to 'false'
    if 'enabled' not in config_impedance['initial_lulc']:
        config_impedance['initial_lulc']['enabled'] = 'false'
    # if 'enabled' exists but is None, replace it with 'false'
    elif config_impedance['initial_lulc']['enabled'] is None:
        config_impedance['initial_lulc']['enabled'] = 'false'

# specify paths from config
lulc_dir = config.get('lulc_dir')
impedance_dir = config.get('impedance_dir')
vector_dir = config.get('vector_dir')
output_dir = config.get('output_dir')
output_dir = os.path.normpath(output_dir)

print("Initial structure of the configuration file for impedance dataset:") # debug
print(yaml.dump(config_impedance, default_flow_style=False)) # debug

Input raster to be used for processing is lulc_ukceh_25m_2018.tif.
Initial structure of the configuration file for impedance dataset:
initial_lulc:
  enabled: true
vector:
  enabled: true
  types: true



Specify parent, output directories and Python path to search for scripts, modules:

In [3]:
parent_dir = os.getcwd()
print (f"Parent directory: {parent_dir}")
sys.path.append(parent_dir)

# create the output directory if it does not exist
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Created directory: {output_dir}")

Parent directory: C:\Users\kriukovv\Documents\pilot_2\preprocessing


Initialise dictionaries for stressors with parameters of edge effect:

In [4]:
# initialize the 'initial_lulc' dictionary
initial_lulc = {}
# ensure 'enabled' is the first key
initial_lulc['enabled'] = True

Let's define function to extract max value from raster dataset. In our case it will be the initial impedance dataset.

In [5]:
def get_max_from_tif(ds):
    """
    Extracts the maximum value from a GDAL raster dataset using GDAL's internal functions.
    
    INPUT (arguments):
        impedance_ds (gdal.Dataset): GDAL dataset object representing the raster.
    
    OUTPUT (returns):
        float: The maximum value in the raster.
    """
        # Check if the dataset is valid
    if ds is None:
        raise ValueError("The dataset is invalid or couldn't be opened.")
    # get the first raster band (assuming a single-band raster)
    band = ds.GetRasterBand(1)
    if band is None:
        raise ValueError("The raster band could not be retrieved.")
    
    # get the statistics for the band: min, max, mean, std_dev
    stats = band.GetStatistics(True, True)  # (approx_ok=True, force=True)
    # the maximum value is the second item in the stats list
    max_value = stats[1]
    # clean up
    ds = None
    return max_value

#### Configuration template

Let's define variables for each stressor with parameters of decay of edge effect. This variables will be written to the configuration file:
**TODO - to make these variables flexible for different stressors**
- ***types***, which defines whether stressor has any subcategories (eg, primary roads). *True, False, None or string*
- ***decline_type***, which defines whether edge effect from a stressor declines exponentially (**exp_decline**) or proportionally (**prop_decline**). *None or string*
- ***lambda_decay***, which defines the parameter of **exponential** decline
- ***k_value***, which defines the parameter of **proportional** decline

In [6]:
# initialise variables with data type hints
types: Optional[str] = None
decline_type: Optional[str] = 'exp_decline' # 'exp_decline' or 'prop_decline'
lambda_decay: float = 500
k_value: float = 500

Let's define the dictionary template for the configuration YAML file (for each stressor). We are using variables defined above.

In [7]:
params_placeholder = {
    'types': types, # specify whether category of stressors has particular types different in parameters (for example, primary and secondary roads)
    'decline_type': decline_type,  # user will choose from 'exp_decline' and 'prop_decline'
    'exp_decline': {
        'lambda_decay': lambda_decay  # placeholder for exponential decay value
    },
    'prop_decline': {
        'k_value': k_value  # placeholder for proportional decline value
    }
}

Finally, we should initialise the list with stressor rasters and their yaml aliases to use it further:

In [8]:
stressor_rasters = [] # stressor raster path
yaml_stressors = []  # initialise list to collect unique names of stressors for YAML
stressor_dict = {}  # mapping stressor raster path to YAML alias

#### INPUT DATA - LULC
Stressors can be extracted in two ways:
- from LULC raster dataset (for example, urban areas can be considered stressors)
- from Open Street Map vector datasets obtained at previous steps (have been rasterised in previous Jupyter Notebooks)

LULC categories which cause edge effect are defined in the CSV file with impedance data. Let's define this file ,path and read it as a geodatagrame:

In [9]:
impedance = config.get('impedance')
if impedance is not None:
    print(f"Using auxiliary tabular data from {impedance}.")
else:
    warnings.warn("No valid auxiliary tabular data found. Impact from stressors will be estimated from vector features only.") # warning, not error because stressors might come from CSV file pointing out LULC categories and from OSM vector dataset (at least one source or both)
impedance_csv = os.path.join(parent_dir,impedance_dir,impedance) # define path
impedance = gpd.read_file(impedance_csv) # read CSV file through geopandas as a dataframe

Using auxiliary tabular data from reclassification_ukceh.csv.


Next, we are defining paths to input LULC file through the path variables and then normalise them (to avoid non-consistent usage of slashes):

In [10]:
lulc = os.path.join(parent_dir,lulc_dir,lulc)
lulc = os.path.normpath(lulc)

Let's open the LULC as an array (and extract its no data value and data type for logging):

In [11]:
# open LULC
lulc = gdal.Open(lulc)
band = lulc.GetRasterBand(1)
band_array = band.ReadAsArray()
nodata_value = band.GetNoDataValue()

print("NoData value:", nodata_value) # debug
band_data_type = band.DataType
print("Data type of the band:", gdal.GetDataTypeName(band_data_type))# debug

NoData value: 0.0
Data type of the band: Byte


Now, get the the geo-transform and projection specifications from the input raster:

In [12]:
geotransform = lulc.GetGeoTransform()
projection = lulc.GetProjection()

We should extract the codes of LULC types causing edge effect on habitats from the input CSV datase:
1. Check if 'initial_lulc' key in the configuration file is enabled. Otherwise, we will not consider any data from CSV data on stressors.
2. Extract LULC codes causing edge effect
3. Iterate over each LULC code in the list, check if it is filled in the YAML file. If not, copy the template structure for each LULC code
4. Create a mask for each LULC code (to filter out other LULC codes). So, each intermediate output will contain only one type of stressor and only one unique value (for example, LULC = 20)

In [13]:
# сreate an empty list to store LULC codes which cause negative impact on habitats and edge effect
edge_effect_list = []
# 1. check if initial_lulc is enabled
if config_impedance.get('initial_lulc', {}).get('enabled') is True:
    print("Some categories from the input LULC dataset are considered as stressors...")
    # convert datatype of 'edge_effect' column into integer one if needed
    impedance['edge_effect'] = impedance['edge_effect'].astype(int)

    # 2. iterate through each row in dataframe
    for index, row in impedance.iterrows():
        # check if the value in 'edge_effect' column is 1 - user specified that these LULC are affecting habitats
        if row['edge_effect'] == 1:
            # record the value from 'lulc_code' column
            edge_effect_list.append(row['lulc'])
            print(f"LULC code = {row['lulc']} is causing edge effect.")
    print (f"LULC type codes causing edge effect on habitats are: {edge_effect_list}")
    print("-"*40)
    
    # 3. iterate over each LULC code in edge_effect_list
    for lulc_code in edge_effect_list:
        # convert lulc_code to string to match YAML keys
        lulc_code_str = f"stressor_lulc_{lulc_code}"

        # check if the current lulc_code has corresponding settings in the YAML file
        if lulc_code_str not in config_impedance['initial_lulc']:
            # if not found, create new keys for the LULC code with placeholders
            print(f"No specific settings found for LULC code {lulc_code}. Creating placeholder values.")
            
            # cast the placeholder dictionary into initial_lulc for a specific LULC code
            initial_lulc[lulc_code_str] = copy.deepcopy(params_placeholder) # deep copy, otherwise YAML creates placeholders like &id001
            
            # inform the user to fill in the parameters
            print(f"New entry for LULC code {lulc_code} created in the YAML file with default values. Please fill in the values you think are more relevant.")
            # print(f"Settings for LULC code {lulc_code}:\n{config_impedance['initial_lulc'][lulc_code_str]}")
        else:
            print(f"Settings for LULC code {lulc_code} are filled in the YAML file.")
            # if settings exist, retain them
            initial_lulc[lulc_code_str] = config_impedance['initial_lulc'][lulc_code_str]
        # adding the raster structure to config_impedance
        config_impedance['initial_lulc'] = initial_lulc
        '''print(yaml.dump(config_impedance, default_flow_style=False))''' # debug
        
        # 4. create a mask for the current LULC code
        mask = (band_array == int(lulc_code))
        if np.any(mask):
            print(f"True values are present in the mask for LULC code: {lulc_code}.")
        else:
            print(f"No True values are present in the mask for LULC code: {lulc_code}.")

        # apply mask to LULC
        masked_data = np.where(mask, band_array, nodata_value)
        if np.any(masked_data != 0):
            print(f"Valid data is present in masked data for LULC code: {lulc_code}.")
        else:
            print(f"Masked data contains only zeros or nodata values for LULC code: {lulc_code}.")

        #  create unique output raster path for each LULC code
        output_raster_path = os.path.join(parent_dir, output_dir, f'{lulc_code_str}.tif')
        # APPEND outputs with stressors to the list
        stressor_rasters.append(output_raster_path)
        yaml_stressors.append(lulc_code_str)
        stressor_dict[output_raster_path] = lulc_code_str  # mapping stressor raster path to LULC code

        # create output raster file
        driver = gdal.GetDriverByName('GTiff')
        out_dataset = driver.Create(output_raster_path, lulc.RasterXSize, lulc.RasterYSize, 1, band.DataType)
        out_dataset.SetGeoTransform(geotransform)
        out_dataset.SetProjection(projection)

        # write the masked data to the new raster file
        out_band = out_dataset.GetRasterBand(1)
        out_band.WriteArray(masked_data)
        nodata_value_int = int(nodata_value)
        out_band.SetNoDataValue(nodata_value_int)

        # flush data to disk
        out_band.FlushCache() # note: if delete it the last output will be invalid
        out_dataset.FlushCache()

        out_band = None
        out_dataset = None

        print(f"Masked LULC data for code {lulc_code} affecting habitats with edge effect saved to: {output_raster_path}")
        print("-" * 40)

    print(f"All stressors from initial LULC dataset saved successfully: {stressor_rasters}")
    print("-" * 40)
    
    # after processing all LULC codes, save the updated YAML configuration
    config_impedance['initial_lulc'] = initial_lulc
    save_yaml_config(config_impedance, 'config_impedance.yaml')
else:
    print("No LULC categories from the input LULC raster dataset are considered stressors. Therefore, stressors will be extracted from vector data only.")
    print("-" * 40)

Some categories from the input LULC dataset are considered as stressors...
LULC code = 20 is causing edge effect.
LULC code = 21 is causing edge effect.
LULC type codes causing edge effect on habitats are: ['20', '21']
----------------------------------------
No specific settings found for LULC code 20. Creating placeholder values.
New entry for LULC code 20 created in the YAML file with default values. Please fill in the values you think are more relevant.
True values are present in the mask for LULC code: 20.
Valid data is present in masked data for LULC code: 20.
Masked LULC data for code 20 affecting habitats with edge effect saved to: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\stressor_lulc_20.tif
----------------------------------------
No specific settings found for LULC code 21. Creating placeholder values.
New entry for LULC code 21 created in the YAML file with default values. Please fill in the values you think are more relevant.
True values are present in

Let's define input impedance GeoTIFF dataset which will be enriched with edge effect:

In [14]:
impedance_tif_template = config.get('impedance_tif')
impedance_tif = impedance_tif_template.format(year=year) # substitute year from the configuration file

Next, we are defining vector datasets containing OSM spatial features which will be used as well to enrich the impedance dataset (**from previous Notebooks**). Let's define the merged geopackage file first:

In [15]:
osm_data_template = config.get('osm_data')
if osm_data_template is not None:
    osm_data = osm_data_template.format(year=year)
    user_vector = None
    vector_refine = osm_data # define a new variable which will be equal either osm_data or user_vector (depending on the configuration file)
    print ("Input raster dataset will be enriched with OSM data.")
else:
    osm_data = None
    warnings.warn("OSM data not found in the configuration file.") 
    
    user_vector_template = config.get('user_vector')
    if user_vector_template is not None:
        user_vector = user_vector_template.format(year=year)
        vector_refine = user_vector
        print ("Input raster dataset will be enriched with user-specified data.")
    else:
        # if neither OSM dataset, nor user dataset specified in the config file
        user_vector = None
        vector_refine = None
        warnings.warn("Neither OSM data nor user specified data found in the configuration file.")

if vector_refine is None:
    raise ValueError("No valid input vector data found. Both OSM data and user-specified data are missing.")

Input raster dataset will be enriched with OSM data.


Let's define paths for impedance_tif and vector dataset:

In [16]:
vector_refine = os.path.join(parent_dir,vector_dir,vector_refine)
impedance_tif = os.path.join(parent_dir,impedance_dir,impedance_tif)

vector_refine = os.path.normpath(vector_refine)
impedance_tif = os.path.normpath(impedance_tif)

To debug, print used vector input dataset:

In [17]:
print(f"Using vector file to refine raster data: {vector_refine}") # debug

Using vector file to refine raster data: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\input\vector\osm_merged_2018.gpkg


Let's extract layer names from geopackage file:

In [18]:
# open and read geopackage
vector_data = ogr.Open(vector_refine, update=0)  # update=0 means read-only mode
layer_count = vector_data.GetLayerCount() # get the number of layers
layers = [] # initialise list with layer names

# extract and print layer names
print(f"Layers in the GeoPackage ({vector_refine}) to extract stressors are:")
for i in range(layer_count):
    layer = vector_data.GetLayerByIndex(i)
    layer_name = layer.GetName()
    layers.append(layer_name)
print(layers) # debug
print("-" *40)

Layers in the GeoPackage (C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\input\vector\osm_merged_2018.gpkg) to extract stressors are:
['railways', 'roads', 'waterbodies', 'waterways']
----------------------------------------


Next, let's define rasterised temporary outputs and append them to a list:

In [19]:
roads = os.path.join(parent_dir,output_dir,f'roads_{year}.tif')
railways = os.path.join(parent_dir,output_dir,f'railways_{year}.tif')
waterbodies = os.path.join(parent_dir,output_dir,f'waterbodies_{year}.tif')
waterways = os.path.join(parent_dir,output_dir,f'waterways_{year}.tif')
rasters_temp = [roads, railways, waterbodies, waterways] # appending temporary outputs to a list

Now, let's extract the maximum value of impedance raster for further processing, calling the function defined above. The maximum value of enriched impedance raster cannot be larger than this value.

In [20]:
if impedance_tif is not None:
    impedance_ds = gdal.Open(impedance_tif) # open raster impedance dataset
    impedance_max = get_max_from_tif(impedance_ds) # call function from above
    print (f"Impedance raster GeoTIFF dataset used is {impedance_tif}") # debug
    print (f"Maximum value of impedance dataset: {impedance_max}") # debug
else:
    raise FileNotFoundError(f"Impedance raster GeoTIFF dataset '{impedance_tif}' is not found! Please check the configuration file.") # stop execution

Impedance raster GeoTIFF dataset used is C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\input\impedance\impedance_lulc_ukceh_25m_2018.tif
Maximum value of impedance dataset: 1000.0


Let's list OSM features that cause edge effect (from rasters_temp): **TODO - to cast it to config.yaml??**

In [21]:
osm_stressors = [roads, railways]
osm_stressors_names = [] # initialise list with names of stressors
for stressor in osm_stressors:
    osm_stressors_name = os.path.splitext(os.path.basename(stressor))[0].split('_')[0] # extract the base name without path and extension
    osm_stressors_names.append(osm_stressors_name)  # add the name to the list of stressor names

Some of the stressors can also vary in decay parameters. For example the edge effect from tertiary roads will probably decline much faster than from motorways. Since some OSM features can be distinquished by their edge effect, we should define their types. For example, we can group road features by five types different in edge effect, but we assume that it is the same for railway features (see above).
**TODO - to link to previous Notebooks**

In [22]:
osm_stresTypes_Names = []  # track dynamically created variables
# dynamically create variables based on base names with the suffix 'Types'
for stressor_name in osm_stressors_names:
    # create a dynamic variable with 'Type' suffix
    variable_name = f"{stressor_name}Types"
    globals()[variable_name] = stressor  # cssign the file path as the value
    osm_stresTypes_Names.append(variable_name)  # track created variable names

# explicitly set dynamically created variables
roadsTypes = ['trunk', 'motorway', 'primary', 'secondary', 'tertiary'] # TODO - to link to previous Notebook
railwaysTypes = None # TODO - to link to previous Notebook

print("Dynamically created variables with types of stressors:")
print(osm_stresTypes_Names)

'''# debug
# print the dynamically created variables
print("Dynamically created variables:")
for var_name in created_variables:
    print(f"{var_name}: {globals()[var_name]}")
'''

Dynamically created variables with types of stressors:
['roadsTypes', 'railwaysTypes']


'# debug\n# print the dynamically created variables\nprint("Dynamically created variables:")\nfor var_name in created_variables:\n    print(f"{var_name}: {globals()[var_name]}")\n'

Let's write stressors from Open Street Map into YAML file with detailed structure (if types of these stressors are considered)

In [23]:
vector = config_impedance.get('vector', {}) # access the vector section in YAML

for osm_stressor in osm_stressors_names: # Loop through each stressor in osm_stressors_names
    print(f"Processing {osm_stressor}...")
    # create or update the key for each osm_stressor in 'vector'
    if osm_stressor not in vector:
        vector[osm_stressor] = {}  # ensure the key exists as a dictionary   
    # define the 'types' key for each osm_stressor as an empty dictionary (will be updated later)
    vector[osm_stressor]['types'] = {}  # initialize 'types' as an empty dictionary

    # get the corresponding types variable (e.g., roadsTypes, railwaysTypes)
    osm_stressor_types_var = globals().get(f'{osm_stressor}Types', None) # if no types detected assign with None

    # check if the types variable exists and contains more than one object
    if osm_stressor_types_var is not None and len(osm_stressor_types_var) > 1:
        # update the types in the vector for the current osm_stressor
        vector[osm_stressor]['types'] = True # Update types with True
        # loop through each type in the dynamic variable
        for osm_stressor_type in osm_stressor_types_var:
            # write params_placeholder to vector for each type
            vector[osm_stressor][osm_stressor_type] = copy.deepcopy(params_placeholder)
    else:
        # update the types in the vector for the current osm_stressor
        vector[osm_stressor]['types'] = '123' # update types with empty value
        vector[osm_stressor] = copy.deepcopy(params_placeholder)
        
# update the 'vector' section back into the main config_impedance
config_impedance['vector'] = vector

print("Intermediate YAML structure:")
print(yaml.dump(config_impedance, default_flow_style=False)) # debug

Processing roads...
Processing railways...
Intermediate YAML structure:
initial_lulc:
  enabled: true
  stressor_lulc_20:
    decline_type: exp_decline
    exp_decline:
      lambda_decay: 500
    prop_decline:
      k_value: 500
    types: null
  stressor_lulc_21:
    decline_type: exp_decline
    exp_decline:
      lambda_decay: 500
    prop_decline:
      k_value: 500
    types: null
vector:
  enabled: true
  railways:
    decline_type: exp_decline
    exp_decline:
      lambda_decay: 500
    prop_decline:
      k_value: 500
    types: null
  roads:
    motorway:
      decline_type: exp_decline
      exp_decline:
        lambda_decay: 500
      prop_decline:
        k_value: 500
      types: null
    primary:
      decline_type: exp_decline
      exp_decline:
        lambda_decay: 500
      prop_decline:
        k_value: 500
      types: null
    secondary:
      decline_type: exp_decline
      exp_decline:
        lambda_decay: 500
      prop_decline:
        k_value: 500
      typ

**TODO - to revisit, probably already done in the previous Notebook**
Now, let's extend the list of stressor rasters to other features, including non-classified features (for example, railways) and features classified by types (for example, roads).Names of types should derive from the previous Notebook. To achieve it, we use pattern matching commands and extracting the filenames in two cases - if types of features are specified and if not.

In [24]:
pattern_types = r'(.*)(_\d{4})(\.tif)' # use regex to capture the part before the year and the year itself
roadsType_paths = [] # initialise list of filenames with types
if roadsTypes is not None: # if there are any types of roads defined
    for roadsType in roadsTypes:
        match = re.match(pattern_types, roads)
        if match:  # ensure the regex pattern matched dataset
        # extract parts of filenames
            basename = match.group(1) # 'roads', for example
            year_part = match.group(2) # '_2012', for example
            extension = match.group(3) #'.tif'
            # construct the new file path for each type
            roadsFile = f"{basename}_{roadsType}{year_part}{extension}"
            roadsType_path = os.path.join(parent_dir,output_dir,roadsFile)
            print(f"Filepath to OSM features: {roadsType_path}") # debug
            '''roadsType_paths.append(roadsType_path)'''
            stressor_rasters.append(roadsType_path)
            yaml_stressors.append(roadsType) # collect the names of stressors
            stressor_dict[roadsType_path] = roadsType  # mapping stressor raster path to YAML alias
else: # if no types specified
    match = re.match(pattern, os.path.basename(roads))  # match only the filename part
    if match:  # ensure the regex pattern matched dataset
        # extract the year part from the matched regex
        basename = match.group(1) # 'roads', for example
        year_part = match.group(2) # '_2012', for example
        extension = match.group(3) #'.tif''
        # without roadType
        stressor_rasters.append(roads)
        yaml_stressors.append(basename) # 'roads'
        stressor_dict[roads] = basename  # mapping stressor raster path to YAML alias
    print(f"Filepaths to OSM features: {roads}") # debug

Filepath to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_trunk_2018.tif
Filepath to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_motorway_2018.tif
Filepath to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_primary_2018.tif
Filepath to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_secondary_2018.tif
Filepath to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_tertiary_2018.tif


Do the same for railways:

In [25]:
pattern_types = r'(.*)(_\d{4})(\.tif)' # use regex to capture the part before the year and the year itself
railwaysType_paths = [] # initialise list of filenames with types
if railwaysTypes is not None: # if there are any types 
    for railwaysType in railwaysTypes:
        match = re.match(pattern_types, railways)
        if match:  # ensure the regex pattern matched dataset
        # extract parts of filenames
            basename = match.group(1) # 'roads', for example
            year_part = match.group(2) # '_2012', for example
            extension = match.group(3) #'.tif'
            # construct the new file path for each type
            railwaysFile = f"{basename}_{railwaysType}{year_part}{extension}"
            railwaysType_path = os.path.join(parent_dir,output_dir,railwaysFile)
            railwaysType_paths.append(railwaysType_path)
            yaml_stressors.append(railwaysType) # collect the names of stressors
            stressor_dict[railwaysType_path] = railwaysType  # mapping stressor raster path to YAML alias
    print(f"Filepaths to OSM features: {railwaysType_paths}") # debug
        
else: # if no types specified
    match = re.match(pattern_types, os.path.basename(railways))  # match only the filename part
    if match:  # ensure the regex pattern matched dataset
    # extract the year part from the matched regex
        basename = match.group(1) # 'roads', for example
        year_part = match.group(2) # '_2012', for example
        extension = match.group(3) #'.tif'
        stressor_rasters.append(railways)
        yaml_stressors.append(basename)
        stressor_dict[railways] = basename  # mapping stressor raster path to YAML alias
    print(f"Filepaths to OSM features: {railways}") # debug

Filepaths to OSM features: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\railways_2018.tif


Let's verify the list of stressor rasters and their YAML aliases:

In [26]:
# remove duplicates by converting to sets and back to lists
stressor_rasters = list(set(stressor_rasters))
yaml_stressors = list(set(yaml_stressors))

# remove duplicate pairs in dictionary
unique_pairs = set(stressor_dict.items())  # convert to a set of items (tuples)
stressor_dict = dict(unique_pairs)  # convert back to a dictionary
# sort by alphabetical order
stressor_rasters.sort()
yaml_stressors.sort()

print(f"Unique stressor rasters: {stressor_rasters}") # debug
print(f"Unique aliases of stressors for YAML: {yaml_stressors}") # debug
print("Mapping dictionary of stressors' parths and their aliases is:") # debug
print(stressor_dict) # debug

Unique stressor rasters: ['C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\railways_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\roads_motorway_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\roads_primary_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\roads_secondary_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\roads_tertiary_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\roads_trunk_2018.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\stressor_lulc_20.tif', 'C:\\Users\\kriukovv\\Documents\\pilot_2\\preprocessing\\data\\output\\stressor_lulc_21.tif']
Unique aliases of stressors for YAML: ['motorway', 'primary', 'railways', 'secondary', 'stressor_lulc_20', 'stressor_lulc_21', 'tertiary', 'trunk']
Mapping dictionary of stressors' parths and their aliases is:
{'C

In [27]:
print("-" * 40) # to separate sections of Jupyter Notebook

----------------------------------------


#### Recalculation of impedance

In [28]:
# progress
print("Accessing stressors from vector datasets...")

Accessing stressors from vector datasets...


In [29]:
# REDUNDANT BLOCK TO UPDATE YAML FILE
'''
# iterate over each item in osm_stressors_names (e.g., 'roads', 'railways')
for osm_stressor in osm_stressors_names:
    # check if the key for the current osm_stressor exists and 'types' is enabled
    if vector.get(osm_stressor, {}).get('types') is True:
        print(f"Types are enabled for {osm_stressor}.")
        # check if the list with the types (e.g., roadsTypes, railwaysTypes) is not None
        # apply the params_placeholder to each type for the current osm_stressor
        for osm_stressor_type in osm_stresTypes_Name:
            vector[osm_stressor][osm_stressor_type] = copy.deepcopy(params_placeholder)
    else:
        vector[osm_stressor] = copy.deepcopy(params_placeholder)  # apply a deep copy if no types are defined
else:
    print(f"Types are disabled for {osm_stressor}.")
    ector[osm_stressor] = copy.deepcopy(params_placeholder)  # apply a deep copy if types are disabled

# Add the updated 'vector' structure back to the config_impedance
config_impedance['vector'] = vector

print(yaml.dump(config_impedance, default_flow_style=False)) # debug
'''
''' PREVIOUS VERSION - CHECKING TYPES
vector = config_impedance.get('vector', {}) # access the vector in YAML
# check if 'roads' key exists and get 'types'
if vector.get('roads', {}).get('types') is True:
    print("Types are enabled for roads.")
    # check if list with types is None
    if roadsTypes is not None:
        # apply the params_placeholder to each roadsType
        for roadsType in roadsTypes:
            # apply a deep copy of paramereters to each roadsType, otherwise YAML creates placeholders like &id001 and calls them like *id001
            vector['roads'][roadsType] = copy.deepcopy(params_placeholder)
    else:
        vector['roads'] = copy.deepcopy(params_placeholder) # apply a deep copy

else:
    print("Types are disabled for roads.")
    vector['roads'] = copy.deepcopy(params_placeholder) # apply a deep copy

config_impedance['vector'] = vector  # adding the vector structure to config_impedance
'''

' PREVIOUS VERSION - CHECKING TYPES\nvector = config_impedance.get(\'vector\', {}) # access the vector in YAML\n# check if \'roads\' key exists and get \'types\'\nif vector.get(\'roads\', {}).get(\'types\') is True:\n    print("Types are enabled for roads.")\n    # check if list with types is None\n    if roadsTypes is not None:\n        # apply the params_placeholder to each roadsType\n        for roadsType in roadsTypes:\n            # apply a deep copy of paramereters to each roadsType, otherwise YAML creates placeholders like &id001 and calls them like *id001\n            vector[\'roads\'][roadsType] = copy.deepcopy(params_placeholder)\n    else:\n        vector[\'roads\'] = copy.deepcopy(params_placeholder) # apply a deep copy\n\nelse:\n    print("Types are disabled for roads.")\n    vector[\'roads\'] = copy.deepcopy(params_placeholder) # apply a deep copy\n\nconfig_impedance[\'vector\'] = vector  # adding the vector structure to config_impedance\n'

In [30]:
'''
# apply the same to railways (no types in case study, but these lines added for flexibility)
if vector.get('railways', {}).get('types') is True:
    print("Types are enabled for railways.")

    # check if list with types is None
    if railwaysTypes is not None:
        # apply the params_placeholder to each type
        for railwaysType in railwaysTypes:
            # apply a deep copy of parameters to each railwaysType
            vector['railways'][railwaysType] = copy.deepcopy(params_placeholder)
    else:
        vector['railways'] = copy.deepcopy(params_placeholder) # apply a deep copy

else:
    print("Types are disabled for railways.")
    vector['railways'] = copy.deepcopy(params_placeholder) # apply a deep copy

config_impedance['vector'] = vector  # adding the vector structure to config_impedance
'''

'\n# apply the same to railways (no types in case study, but these lines added for flexibility)\nif vector.get(\'railways\', {}).get(\'types\') is True:\n    print("Types are enabled for railways.")\n\n    # check if list with types is None\n    if railwaysTypes is not None:\n        # apply the params_placeholder to each type\n        for railwaysType in railwaysTypes:\n            # apply a deep copy of parameters to each railwaysType\n            vector[\'railways\'][railwaysType] = copy.deepcopy(params_placeholder)\n    else:\n        vector[\'railways\'] = copy.deepcopy(params_placeholder) # apply a deep copy\n\nelse:\n    print("Types are disabled for railways.")\n    vector[\'railways\'] = copy.deepcopy(params_placeholder) # apply a deep copy\n\nconfig_impedance[\'vector\'] = vector  # adding the vector structure to config_impedance\n'

Then, we are writing updates to the YAML file:
**TODO - to implement placeholders for roads and railways to be chosen from list of OSM features (depends on previous Notebooks)**

In [31]:
with open('config_impedance.yaml', 'w') as file:
    yaml.dump(config_impedance, file, default_flow_style=False)

**debug** Now, we are printing the YAML structure to verify. But we should use special library to represent it as a tree structure:

In [32]:
# recursive function to convert dictionary to a tree structure (avoiding intermediate repetitions)
def dict_to_tree(d, parent=None):
    for key, value in d.items():
        node = Node(key, parent=parent)
        if isinstance(value, dict):
            dict_to_tree(value, parent=node)  # recursively add child nodes if the value is a dictionary
        else:
            node.name = f"{key}: {value}"  # attach the value to the key without creating a separate node

# create the root node for the tree
root_node = Node("root")
# convert the YAML config dictionary into a tree structure
dict_to_tree(config_impedance, root_node)

# print the tree structure
for pre, _, node in RenderTree(root_node):
    print(f"{pre}{node.name}")

# TODO - to exclude types: True at deeper levels

root
├── initial_lulc
│   ├── enabled: True
│   ├── stressor_lulc_20
│   │   ├── types: None
│   │   ├── decline_type: exp_decline
│   │   ├── exp_decline
│   │   │   └── lambda_decay: 500
│   │   └── prop_decline
│   │       └── k_value: 500
│   └── stressor_lulc_21
│       ├── types: None
│       ├── decline_type: exp_decline
│       ├── exp_decline
│       │   └── lambda_decay: 500
│       └── prop_decline
│           └── k_value: 500
└── vector
    ├── enabled: True
    ├── types: True
    ├── roads
    │   ├── types: True
    │   ├── trunk
    │   │   ├── types: None
    │   │   ├── decline_type: exp_decline
    │   │   ├── exp_decline
    │   │   │   └── lambda_decay: 500
    │   │   └── prop_decline
    │   │       └── k_value: 500
    │   ├── motorway
    │   │   ├── types: None
    │   │   ├── decline_type: exp_decline
    │   │   ├── exp_decline
    │   │   │   └── lambda_decay: 500
    │   │   └── prop_decline
    │   │       └── k_value: 500
    │   ├── primary
    │   │   

In [33]:
# debug
'''
print("Current structure of YAML configuration file:")
print(yaml.dump(config_impedance, default_flow_style=False))
'''

'\nprint("Current structure of YAML configuration file:")\nprint(yaml.dump(config_impedance, default_flow_style=False))\n'

##### Updates of configuration by user
User should check the character and parameters of decline for each stressor. If user is not satisfied with default values, they can change it within YAML file and run next steps. <br>
**ATTENTION!** Do not change names of keys! Values can be changed only for three types of keys to ensure the consistency of the configuration file:
1. ***'decline_type'*** ('exp_decline' or 'prop_decline')
2. ***'lambda_decay'*** (float number)
3. ***'k_value'*** (float number)

Now, let's check if data types of updated YAML objects are correct:

In [47]:
# check lambda_decay
if lambda_decay is None:
    warnings.warn("lambda_decay is None. Please provide a numeric value (integer or float).", UserWarning)
elif not isinstance(lambda_decay, (int, float)):
    raise TypeError(f"Invalid data type for lambda_decay: expected int or float, got {type(lambda_decay).__name__}.")

# Check if k_value is None
if k_value is None:
    warnings.warn("k_value is None. Please provide a numeric value (integer or float).", UserWarning)
elif not isinstance(k_value, (int, float)):
    raise TypeError(f"Invalid data type for k_value: expected int or float, got {type(k_value).__name__}.")

Once all parameters are set up, let's initialise variables with maximum of the edge effects and cumulative sum of the effects from all stressors. Currently, only maximum parameter has been implemented as cumulative edge effect is not applicable to case study.
**TODO - implement cumulative of the edge effects (in future)**

In [34]:
# initialise variables with outputs of the effects from all rasters
max_result = None
cumul_result = None

We need to specify GDAL drivers to store these intermediate outputs in memory and output dataset as GeoTIFF.

In [35]:
driver = gdal.GetDriverByName('GTiff') # has already been defined above
mem_driver = gdal.GetDriverByName('MEM')

We need to define a function to search for the key (stressor name) and return its parameter (decline type, lambda_decay or k_value)

In [36]:
def find_param(config, in_key, out_param):
    """Recursively search for the in_key in the nested dictionary (eg, railways) and return the value of out_key (decline type, lambda_decay or k_value)."""
    if isinstance(config, dict):
        # Check if in_key is present at the current level
        if in_key in config:
            # Check for specific nested parameters
            if out_param in config[in_key]:
                return config[in_key][out_param]
            # Handle cases where out_param may be nested under another dictionary
            for key in config[in_key]:
                if isinstance(config[in_key][key], dict) and out_param in config[in_key][key]:
                    return config[in_key][key][out_param]

        # Recurse through each key-value pair in the dictionary
        for key, value in config.items():
            param_value = find_param(value, in_key, out_param)
            if param_value is not None:
                return param_value  # Return the first found value
    return None  # return None if input key is not found

Reload the configuration file again to reflect recent changes if made:

In [37]:
with open('config_impedance.yaml', 'r') as file:
    config_impedance = yaml.safe_load(file)

Now, we can loop over all objects in the list of stressors and process them in the following order:
1. open stressor raster dataset, extract no data value, their count and specifications of transformations
2. calculate raster proximity (distance in each pixel of raster to stressor)
3. calculate edge effect from stressors

In [52]:
# print(config_impedance)
for stressor_raster, yaml_stressor in stressor_dict.items():
    print(f"Processing: {stressor_raster}") # debug
    print(f"Corresponding key in YAML configuration: {yaml_stressor}") # debug
    # open the input raster dataset
    ds = gdal.Open(stressor_raster)
    if ds is None:
        print(f"Failed to open {stressor_raster}, skipping...")
        continue
    try:
        ## 1. Preparation of stressors' datasets
        # handle NoData values
        input_band = ds.GetRasterBand(1)
        nodata_value = input_band.GetNoDataValue()
        print(f"Original no data value for input dataset is {nodata_value}") # debug
        if nodata_value is None:
            nodata_value = -9999  
            input_band.SetNoDataValue(nodata_value)
        print(f"No data value for input dataset is {nodata_value}") # debug

        data = input_band.ReadAsArray()
        # debug
        min_value = np.min(data)
        max_value = np.max(data)
        print(f"Range of values in the data: {min_value} to {max_value}")

        no_data_count = np.sum(data == nodata_value) # supposed to be non-zero
        print (f"No data count: {no_data_count}")

        # get the geo-transform (affine transformation parameters)
        geotransform = ds.GetGeoTransform()
        projection = ds.GetProjection()
        
        ## 2. COMPUTE PROXIMITY/DISTANCE (THROUGH GDAL METHOD)
        output_ds = mem_driver.Create('', impedance_ds.RasterXSize, impedance_ds.RasterYSize, 1, gdal.GDT_Int32) # Int64 might not support .SetNoDataValue()
        # note: it is not possible to specify no data value directly in gdal_create

        # set geotransform parameters from input file
        if geotransform:
            output_ds.SetGeoTransform(geotransform)
        if projection:
            output_ds.SetProjection(projection)

        output_band = output_ds.GetRasterBand(1)
        
        try:
            gdal.ComputeProximity(input_band, output_band, ['DISTUNITS=GEO', f'NODATA={nodata_value}']) 
        except RuntimeError as e:
            print(f"Error computing proximity for {stressor_raster}: {str(e)}")
        # use exceptions because GDAL can fail silently 

        # read proximity data as a NumPy array for validation/debugging
        proximity_data = output_band.ReadAsArray()
        output_nodata_value = output_band.GetNoDataValue()
        print(f"NoData value of output raster is {output_nodata_value}")
        print(proximity_data) # debug: 0 for all pixels of last raster

        output_nodata_count = np.sum(proximity_data == output_nodata_value)
        print(f"Output no data count is {output_nodata_count}") # supposed to be 0
        print(f"No data value for output dataset is {output_nodata_value}") # debug

        # warn if no data values are detected
        if output_nodata_count > 0:
            warnings.warn(f"No data values have been detected in the proximity raster for {stressor_raster}. Check the validity of the input vector dataset.")

        # create a VRT file as a reference to the proximity raster in memory
        vrt_output_path = os.path.join(parent_dir, output_dir, f'{os.path.basename(stressor_raster).replace(".tif", "")}_dist.vrt')
        vrt_options = gdal.BuildVRTOptions(resampleAlg='nearest')

        # build VRT from the in-memory proximity dataset
        vrt_ds = gdal.BuildVRT(vrt_output_path, [output_ds], options=vrt_options)

        # debug: export proximity raster to GeoTIFF
        dist_tiff_output = os.path.join(parent_dir, output_dir, f'{os.path.basename(stressor_raster).replace(".tif", "")}_dist.tif')
        print(f"Distance path: {dist_tiff_output}") # debug
        gdal.Translate(dist_tiff_output, vrt_ds, format="GTiff", outputType=gdal.GDT_Int32, creationOptions=["COMPRESS=LZW"])
        # debug
        if os.path.exists(dist_tiff_output):
            print(f"File successfully created: {dist_tiff_output}")
        else:
            print(f"Error: File not created at: {dist_tiff_output}")

        ## 3. CALCULATION OF EDGE EFFECT
        # note: decay might vary across classes of stressors. For example, primary and tertiary roads will have the different negative impact on natural habitats. In first case it will occur more likely at some distance than in the second case.
        # therefore, we attempt to define different decay parameter by types of vector dataset

        # set decay output path
        edgeEff_output_path = os.path.join(parent_dir, output_dir, f'{os.path.basename(stressor_raster).replace(".tif", "")}_edge.tif')
        print(f"Path to output raster dataset with calculated edge effect: {edgeEff_output_path}") # debug

        # call function defined above to find for the corresponding parameter for each stressor
        decline_type = find_param(config_impedance, yaml_stressor, 'decline_type')
        lambda_decay = find_param(config_impedance, yaml_stressor, 'lambda_decay')
        k_value = find_param(config_impedance, yaml_stressor, 'k_value')

        """ # debug
        command = f"find_param({config_impedance}, '{yaml_stressor}', 'decline_type')"
        print(f"Command executed {command}")
        """
        # debug
        print(f"Fetched parameters for the stressor: {decline_type} (type of decline), {lambda_decay} (lambda decay parameter), {k_value} (k-value of proportional decline") 

        # debug - check if data types of updated YAML objects are correct:
        # for lambda_decay
        if lambda_decay is None:
            warnings.warn("lambda_decay is None. Please provide a numeric value.", UserWarning)
        elif not isinstance(lambda_decay, (int, float)):
            raise TypeError(f"Invalid data type for lambda_decay: expected int or float, got {type(lambda_decay).__name__}.")                 
        # k_value
        if k_value is None:
            warnings.warn("k_value is None. Please provide a numeric value.", UserWarning)
        elif not isinstance(k_value, (int, float)):
            raise TypeError(f"Invalid data type for k_value: expected int or float, got {type(k_value).__name__}.")
        
        # calculate impedance now
        if decline_type == 'exp_decline':
            result = impedance_max * np.exp(-proximity_data / lambda_decay) # impedance_max value has already been extracted through a separate function
            print(f"Decline type is {decline_type}. Expression to calculate edge effect: {impedance_max} * exp(- proximity_data / {lambda_decay})") # debug
        elif decline_type == 'prop_decline':  # proportional decay
            result = np.maximum(impedance_max - k_value * proximity_data, 0)
            print(f"Decline type is {decline_type}. Expression to calculate edge effect: max({impedance_max} - {k_value} * proximity_data, 0)") # debugt
        elif decline_type is None:
            raise ValueError(f"The type of decay has not been specified or defined incorrectly({decline_type}). Please set 'exp_decline' or 'prop_decline' in the configuration file.")
        else:
            raise ValueError(f"Unknown value in the type of decay: {decline_type}")

        # set values < 0 to no data value
        result[result <= 0] = nodata_value
        result = np.ma.masked_equal(result, nodata_value)

        # combine the results: keep the maximum value for each pixel throutgh iterations (keep the larger impedance)
        if max_result is None:
            max_result = result.copy()  # initialize with the first raster's result
        else:
            max_result = np.maximum(max_result, result)  # take max of previous and current
        
        # FOR CUMULATIVE FUNCTION OF DIFFERENT STRESSORS 
        '''
        # combine the results from each raster by summing
        if cumul_result is None:
            cumul_result = result.copy()  # initialize with a copy of the first raster
        else:
            cumul_result += result  # increment cumulative result
        '''
        # define edge effect result for export
        out_result = driver.Create(edgeEff_output_path, impedance_ds.RasterXSize, impedance_ds.RasterYSize, 1, gdal.GDT_Int32, ['COMPRESS=LZW']) # compress
        # set geotransform and projection before exporting
        out_result.SetGeoTransform(geotransform)
        out_result.SetProjection(projection)
        
        # write the masked result to the output raster's first band
        out_band = out_result.GetRasterBand(1)

        # set the nodata value in the band
        if nodata_value is not None:
            out_band.SetNoDataValue(nodata_value)  # define nodata 

        # write array to the band of output dataset (export)
        out_band.WriteArray(result)
        
        # flush the cache
        output_band.FlushCache()
        vrt_ds.FlushCache()
        output_ds.FlushCache()

        output_band = None
        vrt_ds = None
        output_ds = None
        
        # clean up intermediate objects
        del input_band, output_band, vrt_ds, output_ds, proximity_data, no_data_count, output_nodata_count
        '''# del result, del out_result'''
        
        print(f"Finished processing: {stressor_raster}")
        print("-" * 40)
    except Exception as e:
        print(f"Error processing {stressor_raster}: {str(e)}")
        print("-" * 40)
        continue  # skip to the next raster
    finally:
        ds = None
        vrt_ds = None
        # close the input dataset
        input_band = None
        output_band = None
        output_ds = None
        proximity_data = None

Processing: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_trunk_2018.tif
Corresponding key in YAML configuration: trunk
Original no data value for input dataset is 0.0
No data value for input dataset is 0.0
Range of values in the data: 0 to 20
No data count: 32735609
Error processing C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\roads_trunk_2018.tif: 'NoneType' object has no attribute 'RasterXSize'
----------------------------------------
Processing: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\railways_2018.tif
Corresponding key in YAML configuration: railways
Original no data value for input dataset is 0.0
No data value for input dataset is 0.0
Range of values in the data: 0 to 21
No data count: 32886513
Error processing C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\railways_2018.tif: 'NoneType' object has no attribute 'RasterXSize'
----------------------------------------
Processing: C:\Users\kriukovv\Documents\p

#### Update impedance with decayed effect
First, we are reading impedance raster dataset as a numpy array.

In [39]:
impedance_band = impedance_ds.GetRasterBand(1)
impedance_array = impedance_band.ReadAsArray()

Let's choose the maximum value from initial impedance dataset and edge effect calculated above:

In [40]:
max_result = np.maximum(max_result, impedance_array)

Then, apply the maximum value of initial impedance dataset as a cap to the maximum result (impedance can't be higher than in the initial impedance dataset):

In [41]:
max_result[max_result > impedance_max] = impedance_max

For debugging: ensure the size of the final result matches initial impedance dataset. In theory, they should be identical, but rasterised OSM datasets can have larger spatial extent than input LULC or impedance datasets.

In [42]:
impedance_array_shape = impedance_array.shape # shape of input impedance dataset
max_result_shape = max_result.shape # shape of output dataset with maximum values of edfe effect
if impedance_array.shape != max_result.shape:
    warnings.showwarning("The impedance raster dimensions do not match the cumulative decay raster dimensions.")
    print(f"Initial impedance shape is {impedance_array_shape} and maximum result shape is {max_result_shape}.")
else:
    print(f"The shape of maximum result is the same as the shape of initial impedance array shape: {impedance_array_shape} and {max_result_shape}.") # debug

The shape of maximum result is the same as the shape of initial impedance array shape: (7861, 4203) and (7861, 4203).


Now, we should export the result, using GeoTIFF driver and specifications of projection from the input impedance dataset. Outputs should be compressed to save up resources:

In [43]:
max_output_path = os.path.join(parent_dir, output_dir, 'max_result.tif') # TODO - to cast filename to config.yaml: 'impedance_lulc_ukceh_25m_{year}_upd.tif' 
max_out_result = driver.Create(max_output_path, impedance_ds.RasterXSize, impedance_ds.RasterYSize, 1, gdal.GDT_Int32, ['COMPRESS=LZW'])
# set geotransform and projection for export
max_out_result.SetGeoTransform(geotransform)
max_out_result.SetProjection(projection)

0

Then, export the maximum result to the output raster. No data value should be explicitly specified to avoid any potential issues:

In [44]:
try:
    max_out_band = max_out_result.GetRasterBand(1)
    max_out_band.WriteArray(max_result)
    print(f"The updated impedance raster dataset has been exported to: {max_output_path}")
except Exception as e:
    raise ValueError("The updated impedance raster dataset has not been exported: {e}")

if nodata_value is not None: # set nodata value for maximum result
    max_out_band.SetNoDataValue(nodata_value)

The updated impedance raster dataset has been exported to: C:\Users\kriukovv\Documents\pilot_2\preprocessing\data\output\max_result.tif


Finally, let's delete all variables in memory to release all changes in files and clean up resources:
**TODO - to remove all intermediate GeoTIFFs**

In [45]:
# set nodata value for maximum result
if nodata_value is not None:
    max_out_band.SetNoDataValue(nodata_value)

# flush cache to ensure data is written to disk
max_out_band.FlushCache()
# close the max result output
max_out_band = None
max_out_result = None

# free the impedance dataset
impedance_ds = None
vrt_ds = None  # close the VRT dataset
output_ds = None  # close the proximity raster in memory

In [46]:
# SAVE FOR FUTURE (EXPORTING CUMULATIVE RESULT OF IMPEDANCE)
"""
# ensure the size of impedance_data matches cumulative_result
if impedance_array.shape != cumul_result.shape:
    raise ValueError("The impedance raster dimensions do not match the cumulative decay raster dimensions.")

# sum the impedance data with the cumulative result
cumul_result += impedance_array

# apply impedance_max cap to the cumulative result (max impedance value can't be higher than initial value)
cumul_result[cumul_result > impedance_max] = impedance_max
# free the impedance dataset
impedance_ds = None

# after the loop: apply impedance_max cap to the cumulative result (impedance can't be > 1000)
cumul_result[cumul_result > impedance_max] = impedance_max

# DEBUG: save the final combined result as a GeoTIFF
output_path = os.path.join(parent_dir, output_dir, 'combined_decay_output.tif')
driver = gdal.GetDriverByName('GTiff')
# create output raster dataset
out_ds = driver.Create(output_path, cumul_result.shape[1], cumul_result.shape[0], 1, gdal.GDT_Int32, ['COMPRESS=LZW'])
out_ds.SetGeoTransform(geotransform)
out_ds.SetProjection(projection)
# write the combined result to the output raster
out_band = out_ds.GetRasterBand(1)
out_band.WriteArray(cumul_result)
# set NoData value
nodata_value = 0
out_band.SetNoDataValue(nodata_value)

# flush cache to ensure data is written to disk
out_band.FlushCache()
# clean up
out_ds = None
del cumul_result, out_band
print(f"Final combined decay raster saved to: {output_path}")

vrt_ds = None  # close the VRT dataset
output_ds = None  # close the proximity raster in memory
"""

'\n# ensure the size of impedance_data matches cumulative_result\nif impedance_array.shape != cumul_result.shape:\n    raise ValueError("The impedance raster dimensions do not match the cumulative decay raster dimensions.")\n\n# sum the impedance data with the cumulative result\ncumul_result += impedance_array\n\n# apply impedance_max cap to the cumulative result (max impedance value can\'t be higher than initial value)\ncumul_result[cumul_result > impedance_max] = impedance_max\n# free the impedance dataset\nimpedance_ds = None\n\n# after the loop: apply impedance_max cap to the cumulative result (impedance can\'t be > 1000)\ncumul_result[cumul_result > impedance_max] = impedance_max\n\n# DEBUG: save the final combined result as a GeoTIFF\noutput_path = os.path.join(parent_dir, output_dir, \'combined_decay_output.tif\')\ndriver = gdal.GetDriverByName(\'GTiff\')\n# create output raster dataset\nout_ds = driver.Create(output_path, cumul_result.shape[1], cumul_result.shape[0], 1, gdal.GD