# Setup


In [None]:
%reload_ext autoreload
%autoreload 2

from pathlib import Path

import contextily as cx
import cv2
import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
import rasterio
from geopandas import GeoDataFrame
from shapely.geometry import Point, box

from analyser import distance_transform_scipy, road_shapefile_to_opencv_mat
from plotting import (
    COMP_DOMAIN,
    VIS_DOMAIN,
    add_vis_base_map,
    crop_gdf_by_vis_domain,
    export_fig,
    get_map_axes,
    get_traffic_count_axes,
    get_vis_base_plot,
    plot_route,
    plot_with_scale,
    plot_contour_with_map
)
from project import PRESENTATION_MEDIA_DIR
from spatial import (
    get_traffic_data_by_site_id,
    get_traffic_data_from_df,
    get_traffic_df_from_gdf,
    interpolate_traffic_data,
    get_traffic_count_score,
    get_traffic_counts_by_time
)
from timeseries import get_corrected_co2_lat_long

# Process Locations


In [None]:
dir_path = Path("data/locations")
locations_file_path = dir_path / "locations.csv"
df = pd.read_csv(locations_file_path)

df["geometry"] = df.apply(lambda row: Point(row["Longitude"], row["Latitude"]), axis=1)
gdf = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")  # Setting CRS to WGS84
gdf.to_file(dir_path / "locations.shp")

gdf.plot()

offset = 0.02
offset_x = offset * (gdf.Longitude.max() - gdf.Longitude.min())
offset_y = offset * (gdf.Latitude.max() - gdf.Latitude.min())

for x, y, label in zip(
    gdf.geometry.centroid.x, gdf.geometry.centroid.y, gdf["Location"]
):
    plt.gca().text(x + offset_x, y + offset_y, label, fontsize=8, ha="center")

# Parameters


## Basemap


In [None]:
proj_name = "RoadSectionLine"
proj_path = Path("gis/RoadSectionLine_Jul2024")
shp_file_path = proj_path / f"{proj_name}.shp"

gdf: GeoDataFrame = gpd.read_file(shp_file_path)
ax = gdf.plot()
plt.show()
x_lim = ax.get_xlim()
y_lim = ax.get_ylim()

fig, ax = plt.subplots()
ax.set_xlim(x_lim)
ax.set_ylim(y_lim)
ax = get_map_axes(ax)
cx.add_basemap(ax, crs=gdf.crs.to_string(), source=cx.providers.OpenStreetMap.Mapnik)
export_fig(fig, PRESENTATION_MEDIA_DIR / "base_map.svg")

bbox = box(
    COMP_DOMAIN.min_lon, COMP_DOMAIN.min_lat, COMP_DOMAIN.max_lon, COMP_DOMAIN.max_lat
)
bbox_gdf = gpd.GeoDataFrame([[1]], geometry=[bbox], crs="EPSG:4326")
bbox_gdf.to_crs(gdf.crs).plot(ax=ax, fc="none", edgecolor="black")

cx.add_basemap(ax, crs=gdf.crs.to_string(), source=cx.providers.OpenStreetMap.Mapnik)
export_fig(fig, PRESENTATION_MEDIA_DIR / "base_map_with_bbox.svg")

## Computation Domain


In [None]:
gdf: GeoDataFrame = gpd.read_file(
    Path("gis/RoadSectionLine_Jul2024/RoadSectionLine.shp")
)
transformed_gdf = gdf.to_crs(crs="EPSG:4326")
points_gdf: GeoDataFrame = gpd.read_file(Path("data/locations/locations.shp"))

bbox = box(
    COMP_DOMAIN.min_lon, COMP_DOMAIN.min_lat, COMP_DOMAIN.max_lon, COMP_DOMAIN.max_lat
)
bbox_gdf = gpd.GeoDataFrame([[1]], geometry=[bbox], crs="EPSG:4326")
cropped_gdf = gpd.overlay(transformed_gdf, bbox_gdf, keep_geom_type=True)

fig, ax = plt.subplots()
ax = plot_with_scale(ax, [cropped_gdf], [{"zorder": -1}])
export_fig(fig, PRESENTATION_MEDIA_DIR / "compute_domain.svg")

fig, ax = plt.subplots()
ax = plot_with_scale(
    ax, [points_gdf, cropped_gdf], [{"color": "red", "zorder": 3}, {"zorder": -1}]
)
export_fig(fig, PRESENTATION_MEDIA_DIR / "points.svg")

In [None]:
fig, ax = get_vis_base_plot()
export_fig(fig, PRESENTATION_MEDIA_DIR / "vis_domain.svg")

In [None]:
fig, ax = get_vis_base_plot()
ax = plot_route(points_gdf, ax, label="Both Pis", color="C2")
ax.legend()
export_fig(fig, PRESENTATION_MEDIA_DIR / "2024-09-26_route.svg")

In [None]:
def plot_two_pi_routes(day: str):
    df = pd.read_csv(Path(f"data/routes/{day}/route.csv"))
    fig, ax = get_vis_base_plot()
    route_1 = points_gdf.iloc[df["Location"][df["Pi"] == 3] - 1]
    ax = plot_route(route_1, ax, color="C0", label="Pi 3")
    route_2 = points_gdf.iloc[df["Location"][df["Pi"] == 4] - 1]
    ax = plot_route(route_2, ax, color="C1", label="Pi 4")
    ax.legend()
    return fig, ax

In [None]:
day = "2024-10-18"
fig, ax = plot_two_pi_routes(day)
export_fig(fig, PRESENTATION_MEDIA_DIR / f"{day}_route.svg")

In [None]:
day = "2024-10-22"
fig, ax = plot_two_pi_routes(day)
export_fig(fig, PRESENTATION_MEDIA_DIR / f"{day}_route.svg")

In [None]:
day = "2024-10-24"
fig, ax = plot_two_pi_routes(day)
export_fig(fig, PRESENTATION_MEDIA_DIR / f"{day}_route.svg")

## Traffic Count Data


### Visualize Points


In [None]:
gdf: GeoDataFrame = gpd.read_file(
    Path("gis/Traffic_Count_Data/Traffic_Count_Data.gdb"), layer="Traffic_Count_Data"
)
transformed_gdf = gdf.to_crs(crs="EPSG:4326")
# gdf.plot()

bbox = box(
    COMP_DOMAIN.min_lon, COMP_DOMAIN.min_lat, COMP_DOMAIN.max_lon, COMP_DOMAIN.max_lat
)
bbox_gdf = gpd.GeoDataFrame([[1]], geometry=[bbox], crs="EPSG:4326")
cropped_gdf = gpd.overlay(transformed_gdf, bbox_gdf, keep_geom_type=True)
cropped_gdf = cropped_gdf.drop(
    cropped_gdf[cropped_gdf["SiteID"] == "J524"].index
)  # No data for this site

point_gdf_1 = cropped_gdf.copy()
point_gdf_1 = point_gdf_1[point_gdf_1["SiteID"].isin(["NS9"])]

point_gdf_2 = cropped_gdf.copy()
point_gdf_2 = point_gdf_2[point_gdf_2["SiteID"].isin(["J970"])]

point_gdf_3 = cropped_gdf.copy()
point_gdf_3 = point_gdf_3[point_gdf_3["SiteID"].isin(["116"])]

fig, ax = plt.subplots()
ax = plot_with_scale(
    ax,
    [cropped_gdf, point_gdf_1, point_gdf_2, point_gdf_3],
    [
        {"color": "k", "zorder": 3, "markersize": 20},
        {"color": "C0", "zorder": 4, "markersize": 40},
        {"color": "C1", "zorder": 4, "markersize": 40},
        {"color": "C2", "zorder": 4, "markersize": 40},
    ],
)
# ax = plot(cropped_gdf, ax, label="Both Pis", color="C2")
export_fig(fig, PRESENTATION_MEDIA_DIR / "traffic_data.svg")

### Check Parsing of Data


In [None]:
traffic_gdf: GeoDataFrame = gpd.read_file(
    Path("gis/Traffic_Count_Data/Traffic_Count_Data.gdb"),
    layer="Traffic_Count_Data__ATTACH",
)

site_id = "125"

for site_id in cropped_gdf["SiteID"]:
    df = get_traffic_df_from_gdf(traffic_gdf, site_id)
    time, traffic = get_traffic_data_from_df(df)

### Plot data


In [None]:
fig, ax = plt.subplots()

site_id = "NS9"
time, traffic = get_traffic_data_by_site_id(traffic_gdf, site_id)
ax.plot(time, traffic)

site_id = "J970"
time, traffic = get_traffic_data_by_site_id(traffic_gdf, site_id)
ax.plot(time, traffic)

site_id = "116"
time, traffic = get_traffic_data_by_site_id(traffic_gdf, site_id)
ax.plot(time, traffic)

ax = get_traffic_count_axes(ax)
export_fig(plt.gcf(), PRESENTATION_MEDIA_DIR / "traffic_data_plot.svg")

### Interpolate Missing Time Points


In [None]:
site_id = "J970"
time, traffic = get_traffic_data_by_site_id(traffic_gdf, site_id)

fig, ax = plt.subplots()
ax = get_traffic_count_axes(ax)
ax.plot(time, traffic, color="C1")
export_fig(fig, PRESENTATION_MEDIA_DIR / "before_interpolation_plot.svg")
plt.show()

fig, ax = plt.subplots()
ax = get_traffic_count_axes(ax)
ax.scatter(time, traffic, color="C1", s=10)
export_fig(fig, PRESENTATION_MEDIA_DIR / "before_interpolation_points.svg")

df = interpolate_traffic_data(time, traffic)
ax.plot(df.index, df["traffic"], linestyle="--", color="C1")
ax = get_traffic_count_axes(ax)
export_fig(fig, PRESENTATION_MEDIA_DIR / "interpolated.svg")
plt.show()

In [None]:
data_dict: dict[str, pd.Series] = {}

for site_id in cropped_gdf["SiteID"]:
    df = get_traffic_df_from_gdf(traffic_gdf, site_id)
    time, traffic = get_traffic_data_from_df(df)
    interpolated_df = interpolate_traffic_data(time, traffic)
    data_dict[site_id] = interpolated_df

### Plot Heat Map


In [None]:
# Add a new column to traffic_gdf with interpolated traffic data
cropped_gdf["interpolated_traffic"] = cropped_gdf["SiteID"].apply(
    lambda site_id: data_dict.get(site_id, pd.DataFrame())
    .get("traffic", pd.Series())
    .mean()
)

# Plot the heatmap spatially
from matplotlib.colors import Normalize

norm = Normalize(vmin=0, vmax=5000)

fig, ax = plt.subplots(1, 1, figsize=(10, 10))
cropped_gdf_plot_kwargs = {
    "zorder": 3,
    "column": "interpolated_traffic",
    "cmap": "hot",
    "markersize": 50,
    "norm": norm,
}

ax = plot_with_scale(ax, [cropped_gdf], [cropped_gdf_plot_kwargs])
# Format colorbar
cbar = plt.colorbar(ax.get_children()[0], ax=ax, shrink=0.8)
export_fig(fig, PRESENTATION_MEDIA_DIR / "traffic_heatmap_points.svg")

ax.set_xlim(VIS_DOMAIN.x_bounds_32619)
ax.set_ylim(VIS_DOMAIN.y_bounds_32619)
add_vis_base_map(ax)
export_fig(fig, PRESENTATION_MEDIA_DIR / "traffic_heatmap_points_vis.svg")

In [None]:
fig, ax = plot_contour_with_map(cropped_gdf, "interpolated_traffic")
export_fig(fig, PRESENTATION_MEDIA_DIR / "traffic_heatmap_interpolation_linear.svg")
plt.show()

fig, ax = plot_contour_with_map(
    cropped_gdf,
    "interpolated_traffic",
    interpolation_method="cubic",
    map_true_range=True,
)
export_fig(fig, PRESENTATION_MEDIA_DIR / "traffic_heatmap_interpolation_cubic.svg")
plt.show()

In [None]:
transformed_gdf = cropped_gdf.to_crs("EPSG:32619")
gdf_plot_kwargs = {
    "zorder": 3,
    "column": "interpolated_traffic",
    "cmap": "hot",
    "markersize": 50,
}


def plot_power_law(n: float):
    fig, ax = plt.subplots(figsize=(10, 10))
    ax = plot_with_scale(ax, [transformed_gdf], [gdf_plot_kwargs])
    xi, yi, traffic_score = get_traffic_count_score(
        "0800", transformed_gdf, data_dict, n=n
    )
    contour = ax.contourf(
        xi,
        yi,
        traffic_score.reshape(100, 100),
        levels=14,
        cmap="hot",
        zorder=4,
        alpha=0.5,
    )
    fig.colorbar(contour, ax=ax, shrink=0.8)
    export_fig(fig, PRESENTATION_MEDIA_DIR / f"traffic_heatmap_score_n={n}.svg")
    plt.show()

In [None]:
plot_power_law(1.0)
plot_power_law(1.5)
plot_power_law(2.0)

In [None]:
time = "1000"
timestamp = pd.to_datetime(time, format="%H%M")

# Add a new column to traffic_gdf with interpolated traffic data
cropped_gdf["interpolated_traffic"] = cropped_gdf["SiteID"].apply(
    lambda site_id: data_dict.get(site_id, pd.DataFrame())
    .get("traffic", pd.Series())
    .get(timestamp, 0)
)
cropped_gdf["interpolated_traffic"]

In [None]:
import numpy as np

time_lin = pd.date_range("1900-01-01 06:00", "1900-01-01 20:45", freq="15min").time
# Format as HHMM
hhmms = [time.strftime("%H%M") for time in time_lin]

traffic_scores = []
for hhmm in hhmms:
    _, _, traffic_score = get_traffic_count_score(hhmm, transformed_gdf, data_dict)
    traffic_scores.append(traffic_score)

traffic_scores = np.array(traffic_scores)
idx_with_largest_range = traffic_scores.max(axis=1).argmax()
traffic_scores.max()

In [None]:
import matplotlib.animation as animation

fig, ax = plt.subplots()
fig.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=None, hspace=None)
slide_bg = np.array([255 / 255, 242 / 255, 204 / 255, 1])
fig.patch.set_facecolor(slide_bg)

contour_norm = Normalize(vmin=0, vmax=150)
points_norm = Normalize(vmin=0, vmax=5000)
gdf_plot_kwargs = {
    "zorder": 3,
    "column": "interpolated_traffic",
    "cmap": "hot",
    "markersize": 50,
    "norm": points_norm,
}
# Plot for the item with the largest range first
transformed_gdf = get_traffic_counts_by_time(
    hhmms[idx_with_largest_range], transformed_gdf, data_dict
)
ax = plot_with_scale(ax, [transformed_gdf], [gdf_plot_kwargs])
xi, yi, traffic_score = get_traffic_count_score(
    hhmms[idx_with_largest_range], transformed_gdf, data_dict
)
contour = ax.contourf(
    xi,
    yi,
    traffic_score.reshape(100, 100),
    levels=14,
    cmap="hot",
    zorder=4,
    alpha=0.5,
    norm=contour_norm,
)
fig.colorbar(contour, ax=ax, shrink=0.8)

# Add text at top right corner indicating the time
time_text = plt.text(
    0.99,
    0.99,
    hhmms[idx_with_largest_range],
    horizontalalignment="right",
    verticalalignment="top",
    transform=ax.transAxes,
    fontsize=14,
    color="black",
    zorder=5,
)
time_text.set_bbox(dict(facecolor="white", edgecolor="none"))


# Function to update the heatmap
def update(frame):
    global transformed_gdf
    ax.clear()

    transformed_gdf = get_traffic_counts_by_time(
        hhmms[frame], transformed_gdf, data_dict
    )
    plot_with_scale(ax, [transformed_gdf], [gdf_plot_kwargs])
    xi, yi, traffic_score = get_traffic_count_score(
        hhmms[frame], transformed_gdf, data_dict
    )

    contour = ax.contourf(
        xi,
        yi,
        traffic_score.reshape(100, 100),
        levels=14,
        cmap="hot",
        zorder=4,
        alpha=0.5,
        norm=contour_norm,
    )

    time_text = plt.text(
        0.99,
        0.99,
        hhmms[frame],
        horizontalalignment="right",
        verticalalignment="top",
        transform=ax.transAxes,
        fontsize=14,
        color="black",
        zorder=5,
    )
    time_text.set_bbox(dict(facecolor="white", edgecolor="none"))

    return [contour, time_text]


# Create the animation
ani = animation.FuncAnimation(fig, update, frames=len(traffic), blit=True, repeat=False)

# Save the animation
# ani.save(
#     PRESENTATION_MEDIA_DIR / "traffic_heatmap_animation.gif",
#     writer="pillow",
#     savefig_kwargs={"pad_inches": None},
# )

# Write to mp4 format
ani.save(
    PRESENTATION_MEDIA_DIR / "traffic_heatmap_animation.mp4",
    writer="ffmpeg",
    savefig_kwargs={"pad_inches": None},
)
plt.show()

In [None]:
# Add text at top right corner indicating the time
time_text = ax.text(
    0.95 * xi.max(),
    0.95 * yi.max(),
    hhmms[idx_with_largest_range],
    horizontalalignment="right",
    verticalalignment="top",
    # transform=ax.transAxes,
    fontsize=0.95 * xi.max(),
    color="black",
    zorder=5,
)
ax.add_artist(time_text)

### Compare with measurements


In [None]:
co2_df = get_corrected_co2_lat_long()
# co2_means = co2_df["co2_mean"]
# co2_stds = co2_df["co2_std"]
# dates = co2_df["date"]
# times = co2_df["time"]
co2_df

## Distance Transform


In [None]:
road_image = road_shapefile_to_opencv_mat(geo_data=cropped_gdf, img_width=3200)
dist = cv2.distanceTransform(cv2.bitwise_not(road_image), cv2.DIST_L2, 0)

fig, ax = plt.subplots()
ax.imshow(dist[:, :], origin="lower")
ax = get_map_axes(ax)

In [None]:
transform, distance_transform = distance_transform_scipy(
    gdf=cropped_gdf, resolution=1e-5
)

processed_dir = proj_path / "processed"
processed_dir.mkdir(exist_ok=True)

with rasterio.open(
    proj_path / "processed" / "distance_transform.tif",
    "w",
    driver="GTiff",
    height=distance_transform.shape[0],
    width=distance_transform.shape[1],
    count=1,
    dtype=distance_transform.dtype,
    crs=cropped_gdf.crs,
    transform=transform,
) as dst:
    dst.write(distance_transform, 1)

plt.imshow(distance_transform)
plt.colorbar()
plt.show()