# Network analysis on segment level

Split up the segments into groups of segment type and solar exposure class. Use the resulting files to find critical and favorable segments in the road network.

In [1]:
import geopandas as gpd
import numpy as np
import pandas as pd
import warnings
from itertools import islice
from pathlib import Path

# Ignore FutureWarnings from geopandas
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.filterwarnings(action='ignore', message='.*initial implementation of Parquet.*')
# Ignore SettingWithCopyWarning from (geo-)pandas
pd.options.mode.chained_assignment = None  # default='warn'


# Config
tod_list = ['10am', '1pm', '4pm', '7pm']

day = 170
sensitivity_factor = 1.0

in_dir = Path(f'../../export/{day}/{sensitivity_factor}/exportdata/aggregation')
out_dir = in_dir.parent.parent / 'analysis'
out_dir.mkdir(exist_ok=True)
out_segment_level = out_dir / 'segment_level'
out_segment_level.mkdir(exist_ok=True)

default_type = 'shortest'
count_threshold = 0.01

high_sol_expo = 90
low_sol_expo = 50

crs = 'EPSG:4326'

In [2]:
# Helper functions
def subfolder(out_dir, time_of_day):
    """Returns the respective folder path and creates subfolders in out_dir with timestamps if not existent already"""
    out_folder = out_dir / f'{time_of_day}'
    out_folder.mkdir(exist_ok=True)
    return out_folder

In [3]:
# Filters to extract segments ranks 
class EqualSegmentRanking:
    """Filters for equal segment data sets"""

    def __init__(self, data, time_of_day):
        self.data = data.to_crs(crs)
        self.time_of_day = time_of_day

    @property
    def lse_not_avoided(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'equal, solar exposure < {low_sol_expo} %']
        
        return filtered

    @property
    def hse_not_avoided(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'equal, {high_sol_expo} % >= solar exposure']
        
        return filtered
    
    @property
    def mse_not_avoided(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'equal, {low_sol_expo} % <= solar exposure < {high_sol_expo} %']
        
        return filtered
    
class DetourSegmentRanking:
    """Filters for detour segment data sets"""

    def __init__(self, data, time_of_day):
        self.data = data.to_crs(crs)
        self.time_of_day = time_of_day

    @property
    def lse_detours(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'detour, solar exposure < {low_sol_expo} %']
        
        return filtered

    @property
    def hse_detours(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'detour, {high_sol_expo} % >= solar exposure']
            
        return filtered
    
    @property
    def mse_detours(self):
        filtered = self.data[self.data[f'ranking_{self.time_of_day}'] == f'detour, {low_sol_expo} % <= solar exposure < {high_sol_expo} %']
            
        return filtered
    
class DefaultSegmentRanking:
    """Filters the default segment data sets"""

    def __init__(self, data, equal_segments, time_of_day):
        self.time_of_day = time_of_day
        # Avoided default segments
        self.difference = gpd.overlay(data, equal_segments, how='difference')

    @property
    def lse_defaults(self):
        filtered = self.difference[self.difference[f'ranking_{self.time_of_day}'] == f'{default_type}, solar exposure < {low_sol_expo} %']
        
        return filtered

    @property
    def hse_defaults(self):
        filtered = self.difference[self.difference[f'ranking_{self.time_of_day}'] == f'{default_type}, {high_sol_expo} % >= solar exposure']
        
        return filtered
    
    @property
    def mse_defaults(self):
        filtered = self.difference[self.difference[f'ranking_{self.time_of_day}'] == f'{default_type}, {low_sol_expo} % <= solar exposure < {high_sol_expo} %']
        
        return filtered

In [None]:
# Read data of default routes
default_file = gpd.read_feather(in_dir / f'counts_{default_type}_alltimes.feather')
default_file.to_crs(crs, inplace=True)

In [None]:
# Read and split up segment data of the optimized routes
for time_of_day in tod_list:
    print(f'Processing {time_of_day}...')
    print(f'...equals...')
    equal_segments = EqualSegmentRanking(gpd.read_feather(in_dir / f'counts_{time_of_day}_equal.feather'), time_of_day)

    # Equal + low solar exposure
    lse_not_avoided = equal_segments.lse_not_avoided
    lse_not_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_lse_not_avoided.feather')

    # Equal + high solar exposure
    hse_not_avoided = equal_segments.hse_not_avoided
    hse_not_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_hse_not_avoided.feather')
    
    # Equal + medium solar exposure
    mse_not_avoided = equal_segments.mse_not_avoided
    mse_not_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_mse_not_avoided.feather')

    print(f'...detours...')
    detour_segments = DetourSegmentRanking(gpd.read_feather(in_dir / f'counts_{time_of_day}_detour.feather'), time_of_day)

    # Detour + low solar exposure
    lse_detours = detour_segments.lse_detours
    lse_detours.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_lse_detours.feather')

    # Detour + high solar exposure
    hse_detours = detour_segments.hse_detours
    hse_detours.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_hse_detours.feather')

    # Detour + medium solar exposure
    mse_detours = detour_segments.mse_detours
    mse_detours.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_mse_detours.feather')
    
    print(f'...avoided segments...')
    default_segments = DefaultSegmentRanking(default_file, equal_segments.data, time_of_day)

    # Avoided + low solar exposure
    lse_avoided = default_segments.lse_defaults
    lse_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_lse_avoided.feather')
    # Avoided + high solar exposure
    hse_avoided = default_segments.hse_defaults
    hse_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_hse_avoided.feather')
    # Avoided + medium solar exposure
    mse_avoided = default_segments.mse_defaults
    mse_avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_mse_avoided.feather')

    avoided = pd.concat([lse_avoided, hse_avoided, mse_avoided], ignore_index=True)
    avoided.to_feather(subfolder(out_segment_level, time_of_day) / f'{time_of_day}_avoided.feather')

print('Done!')

## Special Case Analysis

Find segments that have high or low solar exposure, regardless if they are shortest or detour segments. This helps in understanding the road network towards critical or favorable segments.

In [None]:
def merge_segments(gdfs_dict):
    """Finds segments of similar solar exposure class"""
    segdata = None
    count_col = 'count_relative'

    for gdf in gdfs_dict.values():
        if segdata is None:
            segdata = gdf
            segdata[f'{count_col}_sum'] = segdata[count_col]
        else:
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore", message="`keep_geom_type=True`")
                segdata = gpd.overlay(segdata, gdf, how='intersection')
            segdata[f'{count_col}_sum'] = segdata[f'{count_col}_sum'] + segdata.get(f'{count_col}_2', segdata.get(count_col, 0))
            segdata = segdata.loc[:, ~segdata.columns.str.endswith('_1')]
            segdata = segdata.loc[:, ~segdata.columns.str.endswith('_2')]

    segdata[f'{count_col}_avg'] = segdata[f'{count_col}_sum'] / len(gdfs_dict)
    segdata.drop(f'{count_col}_sum', axis=1, inplace=True)
    return segdata.reset_index(drop=True)

def merge_rest(dict):
    """Merges the remaining segments"""
    return pd.concat(dict.values())

### Hot Corridors

These are shortest or detour segments with high solar exposure, critical to the road network and potential candidates for adaptation measures.

In [35]:
def process_hot_corridors(tod_list, start, end=None):
    hcs_dicts = {}
    rest_dict = {}

    end_index = end - 1 if end is not None else -1
    print(f'Finding hot corridors for times of day {tod_list[start]} to {tod_list[end_index]}...')

    for tod in islice(tod_list, start, end):
        hse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_not_avoided.feather')
        hse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_detours.feather')
        hcs_dicts[f'{tod}_hse'] = hse_not_avoided.append(hse_detours)

        mse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_not_avoided.feather')
        mse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_detours.feather')    
        lse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_not_avoided.feather')
        lse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_detours.feather')
        avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_avoided.feather')
        rest_dict[f'{tod}_rest'] = avoided.append(lse_not_avoided).append(lse_detours).append(mse_not_avoided).append(mse_detours)
    
    hc = merge_segments(hcs_dicts)
    rest = merge_rest(rest_dict)

    hot_corridors = hc.overlay(rest, how='difference')
    
    if end:
        outname = f'hot_corridors_{tod_list[start]}-{tod_list[end-1]}'
    else:
        outname = f'hot_corridors'
    hot_corridors.to_feather(out_segment_level / f'{outname}.feather')

In [36]:
# Hot corridors for all times of day
process_hot_corridors(tod_list, 0)
# Hot corridors for 1 PM and 4 PM
process_hot_corridors(tod_list, 1, 3)

Finding hot corridors for times of day 10am to 7pm...
Finding hot corridors for times of day 1pm to 4pm...


### Cool Corridors

These are shortest segments with low solar exposure, the best case for shade-optimized routing.

In [37]:
def process_cool_corridors(tod_list, start, end=None):
    ccs_dict = {}
    rest_dict = {}

    end_index = end - 1 if end is not None else -1
    print(f'Finding cool corridors for times of day {tod_list[start]} to {tod_list[end_index]}...')

    for tod in islice(tod_list, start, end):
        lse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_not_avoided.feather')
        ccs_dict[f'{tod}'] = lse_not_avoided

        avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_avoided.feather')
        hse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_not_avoided.feather')
        hse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_detours.feather')
        mse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_not_avoided.feather')
        mse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_detours.feather')   
        lse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_detours.feather')
        rest_dict[f'{tod}'] = avoided.append(hse_not_avoided).append(hse_detours).append(mse_not_avoided).append(mse_detours).append(lse_detours)
    
    cc = merge_segments(ccs_dict)
    rest = merge_rest(rest_dict)

    cool_corridors = cc.overlay(rest, how='difference')
    
    if end:
        outname = f'cool_corridors_{tod_list[start]}-{tod_list[end-1]}'
    else:
        outname = f'cool_corridors'
    cool_corridors.to_feather(out_segment_level / f'{outname}.feather')

In [38]:
# Cool corridors for all times of day
process_cool_corridors(tod_list, 0)
# Cool corridors for 1 PM and 4 PM
process_cool_corridors(tod_list, 1, 3)

Finding cool corridors for times of day 10am to 7pm...
Finding cool corridors for times of day 1pm to 4pm...


### Favorable LSE segments

These are shortest or detour segments with low solar exposure, favorable when traveling across the city.

In [39]:
def process_lse_favorable(tod_list, start, end=None):
    lse_favs_dict = {}
    rest_dict = {}

    end_index = end - 1 if end is not None else -1
    print(f'Finding favorable LSE segments for times of day {tod_list[start]} to {tod_list[end_index]}...')

    for tod in islice(tod_list, start, end):
        lse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_not_avoided.feather')
        lse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_lse_detours.feather')
        lse_favs_dict[f'{tod}'] = lse_not_avoided.append(lse_detours)

        avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_avoided.feather')
        hse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_not_avoided.feather')
        hse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_hse_detours.feather')
        mse_not_avoided = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_not_avoided.feather')
        mse_detours = gpd.read_feather(out_segment_level / tod / f'{tod}_mse_detours.feather')   
        rest_dict[f'{tod}'] = avoided.append(hse_not_avoided).append(hse_detours).append(mse_not_avoided).append(mse_detours)
    
    lse_fav = merge_segments(lse_favs_dict)
    rest = merge_rest(rest_dict)

    lse_favorable = lse_fav.overlay(rest, how='difference')
    
    if end:
        outname = f'lse_favorable_{tod_list[start]}-{tod_list[end-1]}'
    else:
        outname = f'lse_favorable'
    lse_favorable.to_feather(out_segment_level / f'{outname}.feather')

In [40]:
# Favorable LSE segments for all times of day
process_lse_favorable(tod_list, 0)
# Favorable LSE segments for 1 PM and 4 PM
process_lse_favorable(tod_list, 1, 3)

Finding favorable LSE segments for times of day 10am to 7pm...
Finding favorable LSE segments for times of day 1pm to 4pm...
