The input data was mapmatched, so for every order we have an average speed on the traversed segments. Additionally, in this notebook we do the interpolation step and filtering start and end of the ride.


# Loading libraries and data


In [None]:
# We use pyrosm to get the road network graph atm
# We use networkx for interpolation (and could be used for shortest path finding)
%pip install pyrosm tqdm contextily

In [None]:
from functools import cache
import glob
import logging
import math
from pathlib import Path

import boto3
import contextily as ctx
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyrosm
from tqdm import tqdm

In [None]:
WEEKDAYS = range(7)
HOURS = range(24)
MINUTES = [0, 15, 30, 45]
FREQ = "15min"
MAX_SPEED = 110
MIN_SPEED = 5
# Bin count determines how many edges will be processed during model training step at the same time
# Bins are made by doing df[bin_feature] % BIN_COUNT, where bin feature is e.g. last_node
BIN_COUNT = 20
# How many splits of parquet inputs to consider at once. Higher = less memory & more processing time
BATCH_COUNT = 20
BIN_FEATURE = "end_node"
CITY_ID = 1_000_000
CITY_NAME = "bucharest"
MAP_FILE = f"{CITY_ID}-latest.osm.pbf"
S3_BUCKET = "some_bucket"
S3_SUBDIR = f"subdir_path"
S3_DATA = "data_path"

s3 = boto3.client('s3')
logger = logging.getLogger("logger_nb")
logger.setLevel(logging.INFO)

In [None]:
s3.download_file(S3_BUCKET, f"{S3_SUBDIR}/{CITY_ID}-latest.osm.pbf", "bucharest.pbf")

# Download matched data

In [None]:
# Timezone are ml_timestamp is localised to timezone already
s3.download_file(S3_BUCKET, f"{S3_SUBDIR}/bucharest_traffic.tar.gz", "bucharest_traffic.tar.gz")

In [None]:
!tar -xf bucharest_traffic.tar.gz

In [None]:
!mkdir bucharest
!mv city_id\={CITY_ID}/* bucharest

In [None]:
ls bucharest


# Preparation code

In here we do the following steps:

1) Interpolation step
2) Cutting first and last 10% of points

As an output, you should now have interpolated data.

In [None]:
class CityGraph:
    """
    Class shared between processes for shortest path finding (interpolation step)
    """
    city_nx_graph = None

@cache
def find_shortest_paths(start_end, k):
    start, end = start_end
    try:
        # Get shortest path
        shortest_path = nx.shortest_path(CityGraph.city_nx_graph, source=start, target=end, weight=None)
        shortest_path_length = len(shortest_path)
    except Exception:
        # Missing pair from graph, return none
        return None, None

    cutoff_from_shortest = len(shortest_path)
    if len(shortest_path) - 2 > k:
        # Too many intermediary nodes, skip.
        return None, None

    if shortest_path_length > 2:
        # If shortest path length is longer than a direct connection (i.e. there is no direct A->B, but
        # we have to go through some intermediate node, A->X->B, then find all alternatives with the same length
        # cutoff_from_shortest-1 (-1) is there because A->X->C is length 2 (number of hops to reach from A to C)
        shortest_paths = list(
            nx.all_simple_paths(
                CityGraph.city_nx_graph,
                source=start,
                target=end,
                cutoff=cutoff_from_shortest - 1,
            )
        )
        return start_end, shortest_paths

    return None, None


def get_replacement_mapping_df(shortest_proxy_mapping: dict) -> pd.DataFrame:
    """
    Turn shortest_proxy_mapping into a dataframe for join
    Output is
    before_edge, after_edge
         (1,3),       (1,2)
         (1,3),       (2,3)
    """
    replacement_mapping = {
        "before_edge": [],
        "after_edge": [],
        "after_start_node": [],
        "after_end_node": [],
    }

    for key, replacements in shortest_proxy_mapping.items():
        for alternative_path in replacements:
            for ind in range(1, len(alternative_path)):
                replacement_mapping["before_edge"].append(key)
                replacement_mapping["after_edge"].append((alternative_path[ind - 1], alternative_path[ind]))
                replacement_mapping["after_start_node"].append(alternative_path[ind - 1])
                replacement_mapping["after_end_node"].append(alternative_path[ind])

    repl_mapping_df = pd.DataFrame.from_dict(replacement_mapping)
    return repl_mapping_df


def create_shortest_proxy_mapping(start_end_pairs, k):
    """
    Find all shortest paths between each two input nodes and return following mapping
    shortest_proxy_mapping = {
        (1, 5): [[1,2,3,5],[1,2,4,5]]
        ...
    }
    where each key is start and end node (what we try to interpolate) and values are
    list of lists of all shortest paths of minimum length.

    The step considers graph as unweighted i.e. if one alternative is in meters longer than other, but
    there is same amount of intermediate nodes, then both are still included.

    k parameter sets how many intermediate nodes are allowed, e.g. if k = 1, we can get alternatives
    (1,3)

    @param start_end_pairs: For which nodes all shortest paths are found
    @param k: number of intermediate nodes considered
    """

    logger.debug("Starting shortest path finding")

    shortest_proxy_mapping_list = Parallel(n_jobs=-1, backend="threading", verbose=0)(
        delayed(find_shortest_paths)(start_end_pair, k) for start_end_pair in start_end_pairs
    )

    # Unpack results for parallel job into shortest_proxy_mapping
    shortest_proxy_mapping = {}
    for start_end_pair, shortest_paths in shortest_proxy_mapping_list:
        if start_end_pair is not None:
            start, end = start_end_pair
            shortest_proxy_mapping[(start, end)] = shortest_paths

    replacements_count = len(shortest_proxy_mapping.keys())
    logger.debug(f"Number of replacements found: {replacements_count}")

    return shortest_proxy_mapping


def interpolate_missing_edges(df: pd.DataFrame, k: int = 1, retain_all: bool = False) -> pd.DataFrame:
    """
    TODO: Refactor this to simplify.
    k: int - maximum number of intermediary nodes
    retain_all: bool - are old edges kept as well
    Interpolate matching data to inbetween nodes when it is missing.
    There are many matched edges where we have the following situation:
    We have matched two nodes, A->C that are not directly connected in a city's graph.
    This function finds intermediary nodes, e.g. B, A->X1->X2->...->Xk->C, such that this order
    will end up being consecutive.
    To do this we perform a shortest path search on unweighted OSM graph and use the shortest
    found path as a filler.
    In cases there are multiple shortest paths, e.g. the following situation:
    A ------ Y
    |.       |
    |        |
    X ------ C
    Then we add both pairs A-Y, Y-C and A-X, X-C, i.e. every possible shortest path.
    Before:
    start_node, end_node, km/h
    1           3          25
    After, if 1-3 are not directly connected, but connected through 2 :
    start_node, end_node, km/h
    1           2          25
    2           3          25

    """
    logger.debug("Starting interpolation")

    logger.debug(f"Initial df shape: {df.shape}")
    # Get unique start and end node pairs. We need to only calculate once for each edge
    unique_start_end = df[["start_node", "end_node"]].value_counts().index

    shortest_proxy_mapping = create_shortest_proxy_mapping(unique_start_end, k)

    logger.debug("Turn replacements into mapping")
    replacement_mapping_df = get_replacement_mapping_df(shortest_proxy_mapping)
    del shortest_proxy_mapping

    if replacement_mapping_df.empty:
        # If there are no replacements, then return start
        logger.info("No replacements found, return initial dataframe.")
        return df.reset_index(drop=True)

    # Keep only unique pairs in replacement_mapping_df, otherwise,
    # we might have multiple speeds on one segment, e.g. situations like
    # the following example. In that case we would have duplicate added speeds to B-C, which we would not want.
    #     X1
    #    /  \
    #   A    B - C
    #    \  /
    #     X2
    logger.debug(f"Replacement mapping shape before unique check: {replacement_mapping_df.shape}")
    replacement_mapping_df = replacement_mapping_df.drop_duplicates()
    logger.debug(f"Replacement mapping shape after unique check: {replacement_mapping_df.shape}")

    # Get new edges with inner join.
    new_edges_df = df.merge(replacement_mapping_df, left_on="edge", right_on="before_edge", how="inner")
    new_edges_df["edge"] = new_edges_df["after_edge"]
    new_edges_df["start_node"] = new_edges_df["after_start_node"]
    new_edges_df["end_node"] = new_edges_df["after_end_node"]
    new_edges_df = new_edges_df.drop(
        ["before_edge", "after_edge", "after_start_node", "after_end_node"], axis=1
    ).reset_index(drop=True)

    # Do we drop rows which were replaced? If it is replaced, then it exists in shortest_proxy_mapping
    if not retain_all:
        logger.debug("Dropping old edges")
        before_drop = len(df)
        df = df[~df.edge.isin(replacement_mapping_df["before_edge"].unique())]
        logger.debug(f"Dropped {before_drop - len(df)} rows by not retaining")

    logger.debug(f"Number of new rows generated: {len(new_edges_df)}")
    logger.debug(f'Number of unique edges being replaced: {len(replacement_mapping_df["before_edge"].unique())}')

    # Concat old row not dropped with new rows
    output_df = pd.concat([df.reset_index(drop=True), new_edges_df], axis=0)
    logger.debug(f"Final dataframe shape {output_df.shape}")

    return output_df


def load_all_edges_in_graph(map_path: Path, city_id: int):
    """
    Get a list of all edges that exist in osm.pbf
    """
    # Load road network. Use osmium-filtered smaller map.
    map_filepath = Path(map_path, f"{city_id}-latest.osm.pbf")
    osm = pyrosm.OSM(str(map_filepath))
    nodes, edges = osm.get_network(nodes=True, network_type="driving+service")
    logger.info(f"Converting {map_filepath} to NetworkX graph")

    # Save graph in a Global class that is shared between local processes.
    CityGraph.city_nx_graph = osm.to_graph(nodes, edges, graph_type="networkx")
    edges["edge"] = list(zip(edges["u"], edges["v"]))
    all_edges = set()
    for edge, oneway in zip(edges["edge"], edges["oneway"]):
        # if oneway == "-1":
        #    # Only add reversed
        #    all_edges.add(tuple(reversed(edge)))

        all_edges.add(edge)
        if oneway != "yes":
            # And if need, then also reversed
            all_edges.add(tuple(reversed(edge)))

    return all_edges


def remove_start_and_end_of_ride(df: pd.DataFrame):
    """
    Throw away first 10% and last 10% of matchpoints, as they are not representative of real traffic
    Args:
        df: Sorted (by id, ml_timestamp) DataFrame
    """

    logger.info("Removing pickup/dropoff matchpoints")
    logger.info(f"DF length before cutting: {len(df)}")

    cumcount = df.groupby("id")["ml_timestamp"].transform("cumcount")
    totcount = df.groupby("id")["ml_timestamp"].transform("count")
    df["count_pct"] = cumcount / totcount
    df = df[df["count_pct"].between(0.1, 0.9)]
    del df["count_pct"]

    print(f"DF length after cutting: {len(df)}")
    return df

    
def batch(iterable, n=1):
    len_iterable = len(iterable)
    for ndx in range(0, len_iterable, n):
        yield iterable[ndx : min(ndx + n, len_iterable)]


def append_to_parquet_table(dataframe, filepath=None, writer=None):
    """Method writes/append dataframes in parquet format.
    This method is used to write pandas DataFrame as pyarrow Table in parquet format. If the methods is invoked
    with writer, it appends dataframe to the already written pyarrow table.
    :param dataframe: pd.DataFrame to be written in parquet format.
    :param filepath: target file location for parquet file.
    :param writer: ParquetWriter object to write pyarrow tables in parquet format.
    :return: ParquetWriter object. This can be passed in the subsequenct method calls to append DataFrame
        in the pyarrow Table
    """
    table = pa.Table.from_pandas(dataframe)
    if writer is None:
        writer = pq.ParquetWriter(filepath, table.schema)
    writer.write_table(table=table)
    return writer
    

def batch_interpolate_and_bin(
    matched_data_path,
    city_map_path,
    binned_data_path,
    city_id,
    bin_feature,
    bins,
    batch_count,
    delete_input_data,
):
    """
    Reads in matched data in (batch_count) different batches.
    For each batch bin interpolated match data into (bins) bins, depending on
    bin_feature.
    Bin data into several bins. Takes mod of the bin_feature column for binning.
    The higher the bins number, the less memory it takes, but more processing time.
    batch_count: how many batches of parquet files will be processed.
    E.g. we have values
    11
    29
    32
    We want to split to 10 bins, then we have
    11 % 10 -> 1
    29 % 10 -> 9
    32 % 10 -> 2
    @param city_id: city_id
    @param city_map_path: where .osm.pbf file for the city is read from
    @param matched_data_path: Where matched data is read from
    @param bin_feature: which integer feature to bin data by. By default it is end_edge
    @param binned_data_path: where to put binned data
    @param bins: how many bins to bin data into. Increase this reduce memory use, increase processing time.
    @param batch_count: how many files are read in at once. Increase this to reduce peak memory use.
    @param delete_input_data: if matching data is deleted after processing. Saves disk space.
    @return:
    """
    assert 1 < bins < 1000
    data_files = glob.glob(str(matched_data_path) + "/*.parquet")
    if len(data_files) == 0:
        # Skip all, if no matched data available
        logger.info(f"No matched data available for city_id {city_id} from {matched_data_path}, skipping binning")
        return

    logger.info(f"Performing interpolate and bin on {len(data_files)} data files from folder {matched_data_path}")
    # Read graph
    logger.info(f"Loading graph for city_id {city_id}")
    all_edges = load_all_edges_in_graph(city_map_path, city_id=city_id)

    # Collect open parquet writers
    pq_writers = {}
    logger.info(f"Binning data into {bins} bins")

    # If batch count is larger than count of files, then set batch count equal to files.
    # E.g. if we have only 4 files, then only make 4 batches.
    batch_count = batch_count if len(data_files) >= batch_count else len(data_files)
    # Determine batch size
    batch_size = math.ceil(len(data_files) / batch_count)

    logger.info(f"Processing {batch_count} batches with max {batch_size} files in each.")

    for i, data_batch in enumerate(batch(data_files, batch_size)):
        logger.info(f"Starting to process batch {i} with {len(data_batch)} files.")
        if len(data_batch) == 0:
            continue
        sub_df = pd.concat(
            [
                pd.read_parquet(
                    f,
                    columns=[
                        "id",
                        "start_node",
                        "end_node",
                        "ml_timestamp",
                        "speed_kmh",
                    ],
                )
                for f in data_batch
            ]
        )

        sub_df["edge"] = list(zip(sub_df.start_node, sub_df.end_node))
 
        # Perform interpolation step
        sub_df = interpolate_missing_edges(df=sub_df, k=2)
        # Keep only edges in graph
        sub_df = sub_df[sub_df.edge.isin(all_edges)]
        sub_df = sub_df.drop("edge", axis=1)

        # TODO: We could now add this back. Removing start and end of the ride.
        # sub_df = sub_df.sort_values(["id", "ml_timestamp"])
        sub_df = sub_df.sort_values("ml_timestamp")
        sub_df = remove_start_and_end_of_ride(sub_df)

        # Bin data by bin_feature into files
        sub_df["bin_group"] = sub_df[bin_feature] % bins
        grouped_df = sub_df.groupby("bin_group")

        for key, group in grouped_df:
            if group.empty:
                # If empty df, then ignore
                continue
            # If file already exists, then append without header
            writer = pq_writers.get(key, None)
            group.drop(["bin_group", "index"], axis=1, inplace=True)

            pq_writers[key] = append_to_parquet_table(
                group,
                filepath=f"{binned_data_path}/bin_{key}.parquet",
                writer=writer,
            )

        # Delete input file from disk
        if delete_input_data:
            for filename in data_batch:
                Path(filename).unlink(missing_ok=True)

    for key, writer in pq_writers.items():
        writer.close()

    # Invalidate lru cache as it wont be used for the city anymore
    logger.info(f"{find_shortest_paths.cache_info()}")
    find_shortest_paths.cache_clear()

# Reading data and preprocessing

In [None]:
batch_interpolate_and_bin(
    "bucharest/matched_data/",
    "bucharest/maps",
    "bucharest",
    CITY_ID,
    BIN_FEATURE,
    BIN_COUNT,
    BATCH_COUNT,
    False,
)

In [None]:
ls bucharest

In [None]:
pd.read_parquet("bucharest/bin_0.parquet", columns=["ml_timestamp"]).ml_timestamp.agg(["min", "max"])

# Save binned data to S3

In [None]:
for i in range(BIN_COUNT):
    s3.upload_file(f"{CITY_NAME}/bin_{i}.parquet", S3_BUCKET, f"save/path/bin_{i}.parquet")

# Download binned data from s3

In [None]:
for i in range(BIN_COUNT):
    s3.download_file(S3_BUCKET, f"{S3_SUBDIR}/save/path/bin_{i}.parquet", f"bin_{i}.parquet")

In [None]:
fig, ax = plt.subplots()
for i in range(BIN_COUNT):
    pd.read_parquet(f"bin_{i}.parquet", columns=["speed_kmh"]).hist(histtype="step", ax=ax)

# Filtering the dataset based on geo bounding box

## Load city edges

In [None]:
osm = pyrosm.OSM("bucharest.pbf")
nodes, edges = osm.get_network(nodes=True, network_type="driving+service")
edges["edge"] = list(zip(edges["u"], edges["v"]))

In [None]:
print("Total number of nodes in the map", nodes.shape[0])
print("Total number of edges in the map", edges.shape[0])

## Filtering edges

In [None]:
df = pd.read_parquet("bucharest/bin_0.parquet")

In [None]:
df.shape

In [None]:
df.head()

In [None]:
df["edge"] = list(zip(df.start_node, df.end_node))

In [None]:
df.edge.nunique()

In [None]:
edges.columns

In [None]:
edges["centroid_lon"] = [g.centroid.x for g in edges["geometry"]]
edges["centroid_lat"] = [g.centroid.y for g in edges["geometry"]]

In [None]:
# bbox_north, bbox_south, bbox_east, bbox_west = 44.541396, 44.334247, 26.225577, 25.966674
bbox_north, bbox_south, bbox_east, bbox_west = 44.45, 44.41, 26.14, 26.06

filtered_edges = edges[
    (edges.centroid_lat >= bbox_south) & (edges.centroid_lat <= bbox_north) &
    (edges.centroid_lon <= bbox_east) & (edges.centroid_lon >= bbox_west)
    ]

In [None]:
filtered_edges.shape

In [None]:
ax = filtered_edges.plot()
ctx.add_basemap(ax, crs=filtered_edges.crs)
# plt.axis("off")

In [None]:
mkdir -p bbox_filtered

In [None]:
def filter_by_edges(data_path, filtered_edges):
    for prq in glob.glob(data_path):
        df = pd.read_parquet(prq)
        df["edge"] = list(zip(df.start_node, df.end_node))
        print(f"Number of edges before: {df.edge.nunique()}")
        print(f"Number of samples before: {df.shape[0]}")
        df = df[df.edge.isin(filtered_edges.edge)]
        print(f"Number of edges after bbox filtering: {df.edge.nunique()}")
        print(f"Number of samples after bbox filtering: {df.shape[0]}")
        df.drop("edge", axis=1, inplace=True)
        df.to_parquet(f"bbox_filtered/{prq.split('/')[-1]}")

In [None]:
filter_by_edges("bucharest/*.parquet", filtered_edges)

In [None]:
ls bbox_filtered

In [None]:
for prq in glob.glob(f"bbox_filtered/*.parquet"):
    s3.upload_file(prq, S3_BUCKET, f"{S3_SUBDIR}/{S3_DATA}/{prq}")

## Review filtering results

### Download sampled dataset
Run this if you didn't do the previous steps

In [None]:
mkdir bbox_filtered

In [None]:
for i in range(BIN_COUNT):
    s3.download_file(S3_BUCKET, f"{S3_SUBDIR}/{S3_DATA}/bbox_filtered/bbox_filtered/bin_{i}.parquet", f"bbox_filtered/bin_{i}.parquet")

In [None]:
fig, ax = plt.subplots()
for i in range(BIN_COUNT):
    pd.read_parquet(f"bbox_filtered/bin_{i}.parquet", columns=["speed_kmh"]).hist(histtype="step", ax=ax)

# Aggregating the dataset by edge and time bucket

In [None]:
ls bbox_filtered

In [None]:
writer = None
for prq in tqdm(glob.glob(f"bbox_filtered/*.parquet")):
    df = pd.read_parquet(prq)
    df["minute_bucket"] = df.ml_timestamp.dt.floor(FREQ)
    df["speed_kmh"] = df.speed_kmh.apply(lambda x: max(int(x), MIN_SPEED))
    print(f"Number of samples before: {df.shape[0]}")
    df = df.groupby(["minute_bucket", "start_node", "end_node"])[["speed_kmh"]].median().reset_index()
    print(f"Number of samples after aggregation: {df.shape[0]}")
    df.speed_kmh.hist()
    # writer = append_to_parquet_table(df, filepath="edge_time_aggregated.parquet", writer=writer)
# writer.close()

In [None]:
!du -h bbox_filtered/
!du -h edge_time_aggregated.parquet

In [None]:
df = pd.read_parquet("edge_time_aggregated.parquet")

In [None]:
df.shape

In [None]:
df["edge"] = list(zip(df.start_node, df.end_node))
df.edge.nunique()

In [None]:
df.speed_kmh.hist()

In [None]:
s3.upload_file("edge_time_aggregated.parquet", S3_BUCKET, f"{S3_SUBDIR}/{S3_DATA}/bbox_filtered/edge_time_aggregated.parquet")

# Examine preprocessed data

In [None]:
s3.download_file(S3_BUCKET, f"{S3_SUBDIR}/{S3_DATA}/bbox_filtered/edge_time_aggregated.parquet", "edge_time_aggregated.parquet")

In [None]:
df = pd.read_parquet("edge_time_aggregated.parquet")

In [None]:
count_df = df.groupby(["start_node", "end_node", "minute_bucket"]).count()
duplicates = count_df[count_df.speed_kmh > 1]
duplicates

In [None]:
df.head()

In [None]:
df.minute_bucket.agg(["min", "max"])

In [None]:
df.speed_kmh.hist()

# Extract historical speed features

In [None]:
def get_historical_stats(df, lags, unit='m'):
    for lag in lags:
        df[f"{lag}_{unit}_lag"] = pd.to_datetime(df['minute_bucket']) - pd.to_timedelta(lag, unit=unit)
        df = pd.merge(df, df[["start_node", "end_node", "speed_kmh", "minute_bucket"]],
                left_on=["start_node", "end_node", f"{lag}_{unit}_lag"],
                right_on=["start_node", "end_node", "minute_bucket"], how="left", suffixes=('', f"_lag_{lag}_{unit}"))
        df = df.drop(f"minute_bucket_lag_{lag}_{unit}", axis=1)
        df = df.drop(f"{lag}_{unit}_lag", axis=1)
    return df

In [None]:
df = get_historical_stats(df, [15, 30, 45, 60], 'm') # minutes
# full_df = get_historical_stats(full_df, [1, 2, 3], 'W') # weeks

In [None]:
df.sample(n=10)

In [None]:
# full_df["edge"] = list(zip(full_df.start_node, full_df.end_node))

In [None]:
# full_df = full_df.groupby("edge").ffill()

In [None]:
# full_df.sample(n=10)

In [None]:
df.speed_kmh.agg(["min", "max"])

In [None]:
file_name = "edge_time_aggregated_4_lags.parquet"
df.to_parquet(file_name)
s3.upload_file(file_name, S3_BUCKET, f"{S3_SUBDIR}/{S3_DATA}/bbox_filtered/{file_name}")

In [None]:
import time
time.sleep(3600)