In [4]:
%%html
<style>
    table {float:left}
    leafmap.map {
        display: inline-block;
    }
</style>

### **AUTOMATION OF PROCESSING GPX TRACK RECORDS FOR DESIGNING INTENSITY MAPS**

The Map Matching algorithm runs on probabilistic model handling states with missing obseravation i.e., non-emitting states. 

For more information about the MM see [Leuven.MapMatching documentation](https://leuvenmapmatching.readthedocs.io/en/latest/index.html).

In [5]:
import ipywidgets as widget
#from IPython.display import display
from os.path import exists, join, basename
from os import listdir, remove
from urllib.request import urlretrieve
import osmnx as ox
from shapely.geometry import Polygon, Point, LineString, box
from leuvenmapmatching.map.inmem import InMemMap
import numpy as np
import pandas as pd
import geopandas as gpd
from gpx_converter import Converter
from leuvenmapmatching.matcher.distance import DistanceMatcher
import leafmap.foliumap as leafmap

**Set Parameters**

In [33]:
DATA_UPLOAD = widget.FileUpload(
    accept='.gpx',
    multiple=True,
    description='Upload files'
)
PLACE_NAME = widget.Text(
    value='',
    placeholder='type your location',
    description='Study Area:'
)
BUFFER_DIST = widget.FloatSlider(
    value=0.005,
    min=0,
    max=0.01,
    step=0.001,
    description='Buffer',
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.3f',
)
TOLERANCE = widget.FloatSlider(
    value=0.00004,
    min=0,
    max=0.0002,
    step=0.00002,
    description='Tolerance',
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.5f',
)
MAX_DIST = widget.IntSlider(
    value=100,
    min=20,
    max=500,
    step=20,
    description='Max. Distance',
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
OBS_NOISE = widget.IntSlider(
    value=20,
    min=2,
    max=100,
    step=2,
    description='Obs. Noise',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
OBS_NOISE_NE = widget.IntSlider(
    value=60,
    min=5,
    max=200,
    step=5,
    description='Obs.Noise NE',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
DIST_NOISE = widget.IntSlider(
    value=5,
    min=1,
    max=25,
    step=1,
    description='Dist. Noise',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
DIST_NOISE_NE = widget.IntSlider(
    value=15,
    min=5,
    max=50,
    step=5,
    description='Dist.Noise NE',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
MAX_LATTICE_WIDTH = widget.IntSlider(
    value=7,
    min=1,
    max=15,
    step=1,
    description='Max Lattice',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
INCREASE_MAX_LATTICE_WIDTH = widget.IntSlider(
    value=2,
    min=1,
    max=5,
    step=1,
    description='Increase Latt.',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
MIN_PROB_NORM = widget.FloatSlider(
    value=0.005,
    min=0,
    max=1,
    step=0.001,
    description='Min Proba.',
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.3f',
)

In [34]:
items = [DATA_UPLOAD, PLACE_NAME]
widget.HBox(items)

HBox(children=(FileUpload(value={}, accept='.gpx', description='Upload files', multiple=True), Text(value='', …

|Parameter |Description |
|--------- |----------- |
|Buffer         |*expand the study area to minimise potential edge effect on the result; distance in degrees (WGS84)*|
|Tolerance      |*GPX track simplification threshold; distance in degrees (WGS84)*|
|Max. Distance  |*break for zero match probability; in meters*|
|Min. Proba.    |*stop matching below normalized probability*|
|Obs. Noise     |*standard deviation of measuremnet noise, in meter*|
|Obs. Noise NE  |*standard deviation of measuremnet noise for non-emitting states, in meter (the value should be larger than Obs. Noise)*|
|Dist. Noise    |*difference between distance between matched route and distance between tracks, in meter*|
|Dist. Noise Ne |*difference between the distances for non-emitting states, in meter (the value should be larger than Dist. Noise)*|
|Max Lattice    |*search the route with the number of possible paths at every step*|
|Increase Latt. |*if no solution is found, increase the lattice by the value*|

In [8]:
items = [widget.HBox([BUFFER_DIST, TOLERANCE]),
         widget.HBox([MAX_DIST, MIN_PROB_NORM]),
         widget.HBox([OBS_NOISE, OBS_NOISE_NE]),
         widget.HBox([DIST_NOISE, DIST_NOISE_NE]),
         widget.HBox([MAX_LATTICE_WIDTH, INCREASE_MAX_LATTICE_WIDTH])]
accordion = widget.Accordion(children=items)
accordion.set_title(0,"Environment Parameters")
accordion.set_title(1,"Thresholding of Matching")
accordion.set_title(2,"Observation Noise")
accordion.set_title(3,"Distance Noise")
accordion.set_title(4,"Probabilistic Lattice")
accordion

Accordion(children=(HBox(children=(FloatSlider(value=0.005, continuous_update=False, description='Buffer', max…

In [9]:
def upload():
    for filename in listdir("data_upload"):
        if not filename.endswith(".gpx"):
            continue
        remove(join("data_upload", filename))
               
    for elem in DATA_UPLOAD.value.items():
        name, file_info = elem
        data_path = join("data_upload", name)
        with open (data_path, 'wb') as file:
            file.write(file_info['content'])

In [41]:
def create_graph(PLACE_NAME, BUFFER_DIST):
    study_area = ox.geocode_to_gdf(PLACE_NAME)
    boundary_sa = gpd.GeoDataFrame(geometry=gpd.GeoSeries(study_area.boundary)) # for visualization purposes
    gpx_concat = pd.DataFrame(columns=['latitude', 'longitude'])

    for filename in listdir("data_upload"):
        if not filename.endswith(".gpx"):
            continue
        gpx_df = Converter(input_file = "data_upload/"+ filename).gpx_to_dataframe()
        gpx_concat = pd.concat([gpx_concat, gpx_df])
        
    gpx_point = gpd.GeoDataFrame(gpx_concat, geometry=gpd.points_from_xy(gpx_concat.longitude, gpx_concat.latitude)).set_crs('epsg:4326')
    gpx_clip = gpd.clip(gpx_point, study_area, keep_geom_type=True)
    area_polygon = gpx_clip.buffer(BUFFER_DIST).set_crs(4326)
    if area_polygon.empty:
        with output:
            raise ValueError("A file outside the study area has been uploaded.")
            
    area_bounds = box(*area_polygon.total_bounds)
    graph = ox.graph_from_polygon(area_bounds, network_type='all', simplify=False)
    with output:
        print(graph) # number of nodes and edges
    
    map_con = InMemMap("road_network", 
                       use_latlon=True, 
                       use_rtree=True, 
                       index_edges=True)
    for node in graph.nodes:
        lat = graph.nodes[node]['y']
        lon = graph.nodes[node]['x']
        map_con.add_node(node, (lat, lon))

    for edge in graph.edges:
        node_a, node_b = edge[0], edge[1]
        map_con.add_edge(node_a, node_b)
        map_con.add_edge(node_b, node_a)
    
    map_con.purge()
    return study_area, boundary_sa, area_polygon, graph, map_con

In [11]:
def map_matching(area_polygon, graph, map_con, MAX_DIST, MIN_PROB_NORM, MAX_LATTICE_WIDTH, INCREASE_MAX_LATTICE_WIDTH, OBS_NOISE, OBS_NOISE_NE, DIST_NOISE, DIST_NOISE_NE):
    track_df = pd.DataFrame(columns=['Latitude', 'Longitude','id'])
    route_df = pd.DataFrame(columns=['Latitude', 'Longitude','id'])
    id = 1
    
    for filename in listdir("data_upload"):
        if not filename.endswith(".gpx"):
            continue
        
        with output:
            print("Map matching of " + filename + " started...")
        gpx_df = Converter(input_file = "data_upload/"+ filename).gpx_to_dataframe()
        gpx_point = gpd.GeoDataFrame(gpx_df, geometry=gpd.points_from_xy(gpx_df.longitude, gpx_df.latitude)).set_crs('epsg:4326')
        gpx_point['id'] = 1
        gpx_clip = gpd.clip(gpx_point, area_polygon, keep_geom_type=True)
        if gpx_clip.empty:
            with output:
                print("The GPX file is outside of the extent of study area. Map matching finished with no result.")
                print("---")
            continue
        
        gpx_clip = gpx_clip.sort_index()
        gpx_line = gpx_clip.groupby(['id']) ['geometry'].apply(lambda x: LineString(x.tolist()))
        line_gdf = gpd.GeoDataFrame(gpx_line, geometry='geometry').set_crs('epsg:4326')
        line_gdf['geometry'] = line_gdf['geometry'].simplify(TOLERANCE.value) # reducing line vertices inside the tolerance
        gpx_coords = line_gdf.apply(lambda row: list((row.geometry).coords), axis=1)
        for row in gpx_coords.items():
            passage = list(row[1:])
            passage = passage[0]
            track = []
            path = []
    
        for i in range(len(passage)):
            lat = passage[i][1]
            lon = passage[i][0]
            path.append((lat, lon))
            track.append([lat, lon])
        track = np.array(track)
        df = pd.DataFrame(track, columns=['Latitude', 'Longitude'])
        df['id'] = id # id for grouping into one line
        track_df = pd.concat([track_df, df])

        matcher = DistanceMatcher(map_con,
                                  max_dist = MAX_DIST,
                                  min_prob_norm = MIN_PROB_NORM,
                                  max_lattice_width = MAX_LATTICE_WIDTH,
                                  increase_max_lattice_width = INCREASE_MAX_LATTICE_WIDTH,
                                  obs_noise = OBS_NOISE, 
                                  obs_noise_ne = OBS_NOISE_NE,
                                  dist_noise = DIST_NOISE,
                                  dist_noise_ne = DIST_NOISE_NE,
                                  non_emitting_edgeid=False,
                                  restrained_ne = False)
    
        matcher.match(path, unique=False) # retain only unique nodes in the sequence (avoid repetitions)
        if matcher.early_stop_idx is not None:
            with output:
                print("Parts of the path were omitted from matching due to the road mismatch.")
            from_matches = matcher.best_last_matches(k=MAX_LATTICE_WIDTH)
            matcher.continue_with_distance(from_matches = from_matches, max_dist=MAX_DIST)
            matcher.match(path, expand=True)
  
        node_id = matcher.path_pred_onlynodes_withjumps # retrieve the node_ids the route passes through
        id_route = 1
        for i in range(len(node_id)-1):
            route_node = []
            lat = graph.nodes[node_id[i]]['y']
            lon = graph.nodes[node_id[i]]['x']
            latlon = [lat, lon]
            route_node.append(latlon)
            lat2 = graph.nodes[node_id[i+1]]['y']
            lon2 = graph.nodes[node_id[i+1]]['x']
            latlon2 = [lat2, lon2]
            route_node.append(latlon2)
            route_node = np.array(route_node)
            df = pd.DataFrame(route_node, columns=['Latitude', 'Longitude'])
            df['id'] = id_route # the same id for one line (street)
            route_df = pd.concat([route_df, df])
            id_route += 1
        id += 1
        with output:
            print("Matching of " + filename + " finished successfully.")
            print("---")
    print("---")
    print("Map Matching is done.")
    print("---")
    print("---")
    return track_df, route_df

In [37]:
def post_process(track_df, route_df, graph, study_area):
    track_point = gpd.GeoDataFrame(track_df, geometry=gpd.points_from_xy(track_df.Longitude, track_df.Latitude))
    track_lines = track_point.groupby(['id']) ['geometry'].apply(lambda x: LineString(x.tolist()))
    tracks_gdf = gpd.GeoDataFrame(track_lines, geometry='geometry').set_crs('epsg:4326')
    #print(tracks_gdf[:10])
    route_point = gpd.GeoDataFrame(route_df, geometry=gpd.points_from_xy(route_df.Longitude, route_df.Latitude))
    route_line = route_point.groupby(['id']) ['geometry'].apply(lambda x: LineString(x.tolist()))
    route_gdf = gpd.GeoDataFrame(route_line, geometry='geometry').set_crs('epsg:4326')
    #print(route_gdf[:10])
    street_lines = ox.graph_to_gdfs(graph, nodes = False)
    street_lines = street_lines.drop(columns=['oneway', 'lanes', 'highway', 'maxspeed',
                                              'reversed', 'bridge', 'name', 'junction',
                                              'service','access', 'tunnel', 'width', 'ref'])
    with output:
        print("Calculating passage frequences on streets...")
    street_freq = street_lines.overlay(route_gdf, how='intersection') # drop geometries not part of the routes
    frequency = []
    for _, row in street_freq.iterrows():
        series = route_gdf.covers(row["geometry"])
        frequency.append(series.values.sum())
    street_freq["frequency"] = frequency
    
    lines_frequency = gpd.clip(street_freq, study_area, keep_geom_type=True)
    with output:
        print("===")
        print("The length of matched roads is", round(lines_frequency["length"].sum()), "meters.")
    
    return tracks_gdf, lines_frequency

In [13]:
def map_vis(tracks_gdf, lines_frequency, boundary_sa):
    m_light = leafmap.Map(width="640", 
                          height="480",
                          draw_control=False,
                          attribution_control=True,
                          tiles="CartoDB positron")
    m_light.add_gdf(tracks_gdf,
                    layer_name='tracks',
                    info_mode=None, 
                    style={'color':'blue', 'weight':0.5, 'opacity': 0.5})
    m_light.add_data(lines_frequency,
                     "frequency",
                     cmap = "Wistia",
                     scheme='Quantiles',
                     k=5,
                     add_legend=True,
                     legend_title="Number of passages",
                     legend_position="bottomright",
                     layer_name="passages",
                     style_function = lambda feat: {"color": feat["properties"]["color"], 
                                                    "weight": 4, 
                                                    'opacity': 0.9})
    m_light.add_gdf(boundary_sa, 
                    layer_name='study area', 
                    style={'color':'black', 'weight':3, 'opacity': 0.5})
    m_light.zoom_to_gdf(lines_frequency)

    m_dark = leafmap.Map(width="640", 
                         height="480",
                         draw_control=False,
                         attribution_control=True,
                         tiles="Cartodbdark_matter")
    m_dark.add_gdf(tracks_gdf,
                   layer_name='tracks',
                   info_mode=None, 
                   style={'color':'red', 'weight':0.5, 'opacity': 0.5})
    m_dark.add_data(lines_frequency,
                    "frequency",
                    cmap = "YlOrBr_r",
                    scheme='Quantiles',
                    k=5,
                    add_legend=True,
                    legend_title="Number of passages",
                    legend_position="bottomright",
                    layer_name="passages",
                    style_function = lambda feat:{"color": feat["properties"]["color"], 
                                                  "weight": 4, 
                                                  'opacity': 0.8})
    m_dark.add_gdf(boundary_sa, 
                   layer_name='study area', 
                   style={'color':'grey', 'weight':3, 'opacity': 0.5})
    m_dark.zoom_to_gdf(lines_frequency)
    
    return m_light, m_dark

In [14]:
def click(b):
    output.clear_output()
    upload()
    with output:
        print("(1/3) OSM street network download for the study area.")
    study_area, boundary_sa, area_polygon, graph, map_con = create_graph(PLACE_NAME.value, BUFFER_DIST.value)
    with output:
        print("===")
        print("(2/3) Matching selected GPX files to the netwrok.")
    track_df, route_df = map_matching(area_polygon, graph, map_con,
                                      MAX_DIST.value, MIN_PROB_NORM.value, MAX_LATTICE_WIDTH.value, INCREASE_MAX_LATTICE_WIDTH.value, OBS_NOISE.value, OBS_NOISE_NE.value, DIST_NOISE.value, DIST_NOISE_NE.value)
    output.clear_output()
    with output:
        print("===")
        print("(3/3) Post-processing of the results.")
    tracks_gdf, lines_frequency = post_process(track_df, route_df, graph, study_area)
    m_light, m_dark = map_vis(tracks_gdf, lines_frequency, boundary_sa)
    with output:
        print("")
        display(m_light, m_dark)
        display(widget.HBox([widget.Label(value="Download HTML Web Map: "), light_button, dark_button]))
        display(widget.HBox([widget.Label(value="Downolad the final linear data: "), JSON_button, GPKG_button]))

In [42]:
output = widget.Output()
RUN_BUTTON = widget.Button(description='Run the Tool', button_style='primary')
RUN_BUTTON.on_click(click)
RUN_BUTTON

Button(button_style='primary', description='Run the Tool', style=ButtonStyle())

In [16]:
light_button = widget.Button(description='Light-mode Map')
dark_button = widget.Button(description='Dark-mode Map',
                            style=dict(button_color='DarkGray'))
JSON_button = widget.Button(description='GeoJSON', button_style='primary')
GPKG_button = widget.Button(description='GeoPackage',button_style='primary')

In [20]:
# save the final linear layer with frequences to GeoJSON and GeoPackage file
#lines_frequency.to_file(OUTPUT_FOLDER+"/lines_freq.json", driver="GeoJSON")
#lines_frequency.to_file(OUTPUT_FOLDER+"/lines_freq.gpkg", driver="GPKG")

In [21]:
# save the interactive map to html
#m_light.to_html(OUTPUT_FOLDER+"/freq_map.html")

In [40]:
output

Output()

---

<p style="text-align: center;">The tool is an attachment to the Master Thesis.</p>

<p style="text-align: center;">Benjamín Šramo, 2023</p>

[![License: CC BY 4.0](https://img.shields.io/badge/License-CC_BY_4.0-lightgrey.svg)](https://creativecommons.org/licenses/by/4.0/)