In [None]:
import os
import json
import fiona
import networkx as nx
import geopandas as gpd
import pandas as pd
import matplotlib.pyplot as plt
from shapely.geometry import Polygon
from shapely import wkt
from uoapi import api_client
from gnn_package import data_utils
from gnn_package.src.preprocessing.import_graph import get_street_network_gdfs
from private_uoapi import APIClient, APIConfig, APIAuth

In [None]:
urban_observatory_data_dir = "../gnn_package/data/urban_observatory/"
private_sensors_file = (
    "../gnn_package/data/urban_observatory/private_sensors/private_sensors_gdf.shp"
)
public_sensors_file = (
    "../gnn_package/data/urban_observatory/public_sensors/public_sensors_gdf.shp"
)


def check_files_exist(file_path):
    return os.path.exists(file_path)


def read_or_create_public_sensors_nodes_traffic():
    client = api_client.APIClient()
    print("Getting public sensors from API")
    public_sensors = client.get_sensors(theme="Vehicles")
    sensor_geometry = {
        sensor["Sensor Name"]: sensor["Location (WKT)"]
        for sensor in public_sensors["sensors"]
    }
    sensor_df = pd.DataFrame(sensor_geometry.items(), columns=["location", "geometry"])
    sensor_df["geometry"] = sensor_df["geometry"].apply(wkt.loads)
    public_sensors_gdf = gpd.GeoDataFrame(
        sensor_df, geometry="geometry", crs="EPSG:4326"
    )
    public_sensors_gdf = public_sensors_gdf.to_crs("EPSG:27700")
    return public_sensors_gdf


def read_or_create_public_sensors_nodes(public_sensors_file):
    if public_sensors_file and check_files_exist(public_sensors_file):
        print("Reading public sensors from file")
        public_sensors_gdf = gpd.read_file(public_sensors_file)
        return public_sensors_gdf
    else:
        client = api_client.APIClient()
        print("Getting public sensors from API")
        public_sensors = client.get_sensors(theme="People")
        sensor_geometry = {
            sensor["Sensor Name"]: sensor["Location (WKT)"]
            for sensor in public_sensors["sensors"]
        }
        sensor_df = pd.DataFrame(
            sensor_geometry.items(), columns=["location", "geometry"]
        )
        sensor_df["geometry"] = sensor_df["geometry"].apply(wkt.loads)
        public_sensors_gdf = gpd.GeoDataFrame(
            sensor_df, geometry="geometry", crs="EPSG:4326"
        )
        public_sensors_gdf = public_sensors_gdf.to_crs("EPSG:27700")
        public_sensors_gdf.to_file(public_sensors_file)
        return public_sensors_gdf


def read_or_create_private_sensor_nodes(private_sensors_file):
    if private_sensors_file and check_files_exist(private_sensors_file):
        print("Reading private sensors from file")
        private_sensors_gdf = gpd.read_file(private_sensors_file)
        return private_sensors_gdf
    else:
        config = APIConfig()
        auth = APIAuth(config)
        client = APIClient(config, auth)
        locations = client.get_sensor_locations()
        private_sensors_gdf = gpd.GeoDataFrame(
            locations["location"],
            geometry=gpd.points_from_xy(locations["lon"], locations["lat"]),
            crs="EPSG:4326",
        )
        private_sensors_gdf = private_sensors_gdf.to_crs("EPSG:27700")
        private_sensors_gdf.to_file(private_sensors_file)
        return private_sensors_gdf


def get_bbox_transformed():
    polygon_bbox = Polygon(
        [
            [-1.65327, 54.93188],
            [-1.54993, 54.93188],
            [-1.54993, 55.02084],
            [-1.65327, 55.02084],
        ]
    )

    # Create a GeoDataFrame from the bounding box polygon
    bbox_gdf = gpd.GeoDataFrame(geometry=[polygon_bbox], crs="EPSG:4326")

    # Assuming your road data is in British National Grid (EPSG:27700)
    # Transform the bbox to match the road data's CRS
    bbox_transformed = bbox_gdf.to_crs("EPSG:27700")
    return bbox_transformed

In [None]:
check_files_exist(private_sensors_file)

In [None]:
# Example usage
place_name = "Newcastle upon Tyne, UK"  # Replace with your area of interest

# Get the network
network_gdf = get_street_network_gdfs(place_name)

In [None]:
private_sensors_gdf = read_or_create_private_sensor_nodes(private_sensors_file)

In [None]:
public_sensors_gdf = read_or_create_public_sensors_nodes(public_sensors_file)
traffic_sensors_gdf = read_or_create_public_sensors_nodes_traffic()
# Get the bounding box
bbox_transformed = get_bbox_transformed()
public_sensors_gdf = public_sensors_gdf.clip(bbox_transformed)
traffic_sensors_gdf = traffic_sensors_gdf.clip(bbox_transformed)

In [None]:
# get data types of individual observations in the geometry column
traffic_sensors_gdf

In [None]:
traffic_sensors_gdf = traffic_sensors_gdf[
    traffic_sensors_gdf.geometry.geom_type == "Point"
]

In [None]:
fig, ax = plt.subplots(figsize=(10, 10))
network_gdf.plot(ax=ax, color="black", alpha=0.5)
private_sensors_gdf.plot(ax=ax, color="red", markersize=10)
public_sensors_gdf.plot(ax=ax, color="blue", markersize=10)
traffic_sensors_gdf.plot(ax=ax, color="green", markersize=10)

In [None]:
# First verify the CRS of both dataframes
print("Roads CRS:", road_network.crs)
print("Sensor nodes CRS:", sensor_nodes_gdf.crs)

# Create the plot with a larger figure size
fig, ax = plt.subplots(figsize=(12, 12))

# Plot highways and paths
road_network.plot(ax=ax, color="grey", linewidth=0.5)

# Plot sensor points with different styling
sensor_nodes_gdf.plot(
    ax=ax,
    color="red",
    markersize=20,
    alpha=0.6,  # Add some transparency
    zorder=2,  # Ensure points are plotted on top
)

# Set the plot bounds based on the highways extent - there are some sensors that are a long way from the roads
bounds = road_network.total_bounds
ax.set_xlim([bounds[0], bounds[2]])
ax.set_ylim([bounds[1], bounds[3]])

# Add gridlines
ax.grid(True)

# Add title
plt.title("OS Open Roads - Road Links")

# Show the plot
plt.tight_layout()
plt.show()

# Print some diagnostic information
print("\nHighways bounds:", road_network.total_bounds)
print("Sensor nodes bounds:", sensor_nodes_gdf.total_bounds)