## 11.5. GTFS Realtime Analysis of the City of Riga

In [None]:
pip install gtfs-realtime-bindings requests pandas plotly pyarrow 

In [None]:
pip install pyarrow==10.0.1

In [None]:
#pip install pyarrow 
#Not working for Mac M1

In [18]:
import requests
import pandas as pd
import time
from google.transit import gtfs_realtime_pb2

In [19]:
import pyarrow as pa
import pyarrow.parquet as pq

# 11.4. GTFS-Realtime URL for Riga

# Section 11.4.1. Part 1 - Ingesting GTFS Realtime Feed 

In [20]:
# 11.4. GTFS-Realtime URL for Riga
# Section 11.4.1. - Ingesting GTFS Realtime Feed - Part 1

GTFS_REALTIME_URL = "https://saraksti.rigassatiksme.lv/gtfs_realtime.pb"

# Function to fetch GTFS-Realtime data
def fetch_gtfs_realtime_data(url):
    """
    Fetches the GTFS-Realtime data from the given URL with SSL verification disabled.
    
    Parameters:
    - url: The GTFS-Realtime endpoint URL
    
    Returns:
    - feed: Parsed GTFS-Realtime protobuf feed
    """
    feed = gtfs_realtime_pb2.FeedMessage()
    
    try:
        # Disable SSL verification
        response = requests.get(url, verify=False)
        response.raise_for_status()  # Check for HTTP errors

        # Parse the protobuf data
        feed.ParseFromString(response.content)
        
    except requests.exceptions.RequestException as e:
        print(f"Error fetching GTFS-Realtime data: {e}")
        return None
    
    return feed

# Section 11.4.1. Part 2 - Function to extract vehicle positions 

In [21]:
# Section 11.4.1. - Part 2 - Function to extract vehicle positions 

def extract_vehicle_positions(feed):
    """
    Extracts vehicle positions from the GTFS-Realtime feed.
    
    Parameters:
    - feed: Parsed GTFS-Realtime protobuf feed
    
    Returns:
    - List of dictionaries containing vehicle positions and trip information
    """
    vehicle_position = []
    for entity in feed.entity:
        if entity.HasField('vehicle'):
            vehicle = entity.vehicle
            vehicle_position.append({
                "id": entity.id,
                "trip_id": vehicle.trip.trip_id,
                "schedule_relationship": vehicle.trip.schedule_relationship,
                "latitude": vehicle.position.latitude,
                "longitude": vehicle.position.longitude,
                "bearing": vehicle.position.bearing,
                "speed": vehicle.position.speed * 3.6,  # Speed in km/h
                "current_status": vehicle.current_status,
                "timestamp": vehicle.timestamp,
                "stop_id": vehicle.stop_id,
                "vehicle_id": vehicle.vehicle.id,
                "vehicle_label": vehicle.vehicle.label,
                "vehicle_license_plate": vehicle.vehicle.license_plate       
            })
    return vehicle_position

# 11.4.2. Parquet Files and DuckDB for Mobility Data 

In [None]:
# Section 11.4.2. Parquet Files and DuckDB for Mobility Data
# Function to collect vehicle positions for a specific duration and save into a Parquet File

def collect_vehicle_positions(duration_minutes, interval_seconds):
    """
    Collects vehicle positions from the GTFS-Realtime endpoint at regular intervals.
    
    Parameters:
    - duration_minutes: Duration of the collection period in minutes
    - interval_seconds: Interval between successive API calls in seconds
    
    Returns:
    - A DataFrame containing all collected vehicle positions
    """
    collected_data = []
    end_time = time.time() + duration_minutes * 60  # Convert minutes to seconds

    while time.time() < end_time:
        # Fetch the GTFS-Realtime data
        feed = fetch_gtfs_realtime_data(GTFS_REALTIME_URL)

        # If feed is fetched, extract vehicle positions
        if feed:
            vehicle_positions = extract_vehicle_positions(feed)
            if vehicle_positions:
                collected_data.extend(vehicle_positions)  # Add the new data to the list
                print(f"Collected {len(vehicle_positions)} vehicle positions.")
            else:
                print("No vehicle positions found in the feed.")
        else:
            print("Failed to fetch the GTFS-Realtime feed.")
        
        # Wait for the specified interval before making the next request
        time.sleep(interval_seconds)
    
    # Convert the collected data into a DataFrame
    df = pd.DataFrame(collected_data)
    return df

# Function to save DataFrame to a Parquet file
def save_to_parquet(df, file_name):
    """
    Saves the collected vehicle data to a Parquet file.
    
    Parameters:
    - df: The DataFrame containing the vehicle positions data
    - file_name: The name of the output Parquet file
    """
    if not df.empty:
        table = pa.Table.from_pandas(df)
        pq.write_table(table, file_name)
        print(f"Data successfully saved to {file_name}")
    else:
        print("No data to save.")
        
        

# Main script
if __name__ == "__main__":
    # Parameters
    DURATION_MINUTES = 60  # Collect data for 1 hour
    INTERVAL_SECONDS = 10   # Call the endpoint every 10 seconds

    # Collect vehicle positions over the specified duration
    print(f"Starting data collection for {DURATION_MINUTES} minutes, querying every {INTERVAL_SECONDS} seconds...")
    vehicle_positions_df = collect_vehicle_positions(DURATION_MINUTES, INTERVAL_SECONDS)

    # Save the data to a Parquet file
    output_file = "riga_vehicle_positions_test.parquet"
    save_to_parquet(vehicle_positions_df, output_file)

# 11.4.3. Querying and Visualizing Vehicle Trajectories with Parquet Files and DuckDB 

In [None]:
#Section 11.4.3. Querying and Visualizing Vehicle Trajectories with Parquet Files and DuckDB

import duckdb
import pandas as pd
import plotly.express as px


# Path to the Parquet file
parquet_file = "/Users/avaisman/tmp/PythonNotebooks/riga_vehicle_positions.parquet"

# Function to load the parquet file and query the trajectory of Tram 1
def query_tram1_trajectory(parquet_file):
    """
    Queries the trajectory of Tram 1 from the parquet file using DuckDB.
    
    Parameters:
    - parquet_file: Path to the Parquet file
    
    Returns:
    - DataFrame containing the trajectory of Tram 1
    """
    # Connect to DuckDB in-memory database
    con = duckdb.connect()

    # Load the parquet file and query for Tram 1 (Assuming Tram 1 is identified by route_id or trip_id)
    # Adjust the condition to match the identifier for Tram 1
    query = f"""
    SELECT * 
    FROM parquet_scan('{parquet_file}')
    WHERE trip_id LIKE '%TRAM1-%'  -- Adjust based on actual trip_id or route_id for Tram 1
    ORDER BY timestamp
    """
    
    # Execute the query and return the result as a pandas DataFrame
    df = con.execute(query).fetchdf()

    # Close the connection
    con.close()
    
    return df

# Function to visualize the trajectory of Tram 1
def visualize_tram1_trajectory(df):
    """
    Visualizes the trajectory of Tram 1 using Plotly.
    
    Parameters:
    - df: DataFrame containing the trajectory of Tram 1
    """
    if df.empty:
        print("No data available for Tram 1.")
        return
    
    fig = px.scatter_mapbox(df, lat="latitude", lon="longitude", 
                            hover_name="vehicle_id", 
                            hover_data=["trip_id", "speed"],
                            color="timestamp", 
                            color_continuous_scale="Viridis",
                            title="Trajectory of Tram 1",
                            zoom=12)

    fig.update_layout(mapbox_style="open-street-map", mapbox_zoom=12, mapbox_center={"lat": 56.9496, "lon": 24.1052})
    fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
    
    fig.show()

# Main script
if __name__ == "__main__":
    # Load and query the trajectory of Tram 1
    tram1_df = query_tram1_trajectory(parquet_file)
    
    if not tram1_df.empty:
        # Visualize the trajectory of Tram 1
        visualize_tram1_trajectory(tram1_df)
    else:
        print("No data found for Tram 1.")

# 11.4.4 Querying and Visualizing Vehicle Trajectories with PostgreSQL 

In [None]:
## 11.4.4 Querying and Visualizing Vehicle Trajectories with PostgreSQL

import pandas as pd
import psycopg

# PostgreSQL connection details
db_host = "localhost"
db_port = "5432"
db_user = "postgres"
db_pass = "postgres"
db_name = "UrbanMobility"


# Function to connect to the PostgreSQL database
def connect_to_postgres():
    """
    Establishes a connection to the PostgreSQL database.
    
    Returns:
    - conn: The PostgreSQL connection object
    - cur: The cursor object for executing SQL queries
    """
    try:
        conn = psycopg.connect(
            host=db_host,
            port=db_port,
            dbname=db_name,
            user=db_user,
            password=db_pass
        )
        cur = conn.cursor()
        return conn, cur
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None, None

# Function to create the 'vehiclePosition' table in PostgreSQL

def create_vehicle_position_table(cur):
    """
    Creates the 'vehiclePosition' table in PostgreSQL if it doesn't exist.
    
    Parameters:
    - cur: The cursor object for executing SQL queries
    """
    create_table_query = """
    CREATE TABLE IF NOT EXISTS vehiclePosition (
        id TEXT,
        trip_id TEXT,
        schedule_relationship TEXT,
        latitude FLOAT8,
        longitude FLOAT8,
        bearing FLOAT8,
        speed FLOAT8,  -- Speed in kilometers per hour (km/h)
        current_status TEXT,
        timestamp BIGINT,
        stop_id TEXT,
        vehicle_id TEXT,
        vehicle_label TEXT,
        vehicle_license_plate TEXT
    );
    """
    cur.execute(create_table_query)

# Function to load the Parquet file into a Pandas DataFrame
def load_parquet_file(parquet_file):
    """
    Loads the Parquet file into a Pandas DataFrame.
    
    Parameters:
    - parquet_file: Path to the Parquet file
    
    Returns:
    - df: DataFrame containing the vehicle positions data
    """
    try:
        df = pd.read_parquet(parquet_file)
        return df
    except Exception as e:
        print(f"Error loading Parquet file: {e}")
        return pd.DataFrame()

# Function to insert the DataFrame data into the PostgreSQL 'vehiclePosition' table
def insert_data_to_postgres(df, cur, conn):
    """
    Inserts the data from the DataFrame into the PostgreSQL 'vehiclePosition' table.
    
    Parameters:
    - df: The DataFrame containing the vehicle positions data
    - cur: The cursor object for executing SQL queries
    - conn: The PostgreSQL connection object
    """
    insert_query = """
    INSERT INTO vehiclePosition (id, trip_id, schedule_relationship, latitude, longitude, bearing, speed,
                                 current_status, timestamp, stop_id, vehicle_id, vehicle_label, vehicle_license_plate)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """
    
    # Iterate over the DataFrame rows and insert them into the database
    for _, row in df.iterrows():
        cur.execute(insert_query, (
            row['id'],
            row['trip_id'],
            row['schedule_relationship'],
            row['latitude'],
            row['longitude'],
            row['bearing'],
            row['speed'],
            row['current_status'],
            row['timestamp'],
            row['stop_id'],
            row['vehicle_id'],
            row['vehicle_label'],
            row['vehicle_license_plate']
        ))
    
    # Commit the transaction
    conn.commit()

# Main script
if __name__ == "__main__":
    # Step 1: Connect to PostgreSQL
    conn, cur = connect_to_postgres()
    if conn is None or cur is None:
        print("Failed to connect to the PostgreSQL database.")
        exit()
    
    # Step 2: Create the 'vehiclePosition' table if it doesn't exist
    print("Creating 'vehiclePosition' table if it doesn't exist...")
    create_vehicle_position_table(cur)
    
    # Step 3: Load the Parquet file into a DataFrame
    print("Loading Parquet file...")
    vehicle_positions_df = load_parquet_file(parquet_file)
    
    if not vehicle_positions_df.empty:
        print(f"Loaded {len(vehicle_positions_df)} rows from the Parquet file.")
        
        # Step 4: Insert the data into the PostgreSQL 'vehiclePosition' table
        print("Inserting data into PostgreSQL...")
        insert_data_to_postgres(vehicle_positions_df, cur, conn)
        print("Data insertion complete.")
    else:
        print("No data found in the Parquet file.")
    
    # Step 5: Close the PostgreSQL connection
    cur.close()
    conn.close()
    print("PostgreSQL connection closed.")

# 11.4.4 Dash Visualizations 

In [None]:
# Section 11.4.4. Dash Visualizations

import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import pandas as pd
import plotly.express as px
import dash_bootstrap_components as dbc


# Function to connect to the PostgreSQL database
def connect_to_postgres():
    try:
        conn = psycopg.connect(
            host=db_host,
            port=db_port,
            dbname=db_name,
            user=db_user,
            password=db_pass
        )
        cur = conn.cursor()
        return conn, cur
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None, None

# Function to fetch available trip_ids from PostgreSQL
def fetch_trip_ids():
    conn, cur = connect_to_postgres()
    if conn is None or cur is None:
        return []

    trip_id_query = "SELECT DISTINCT trip_id FROM vehiclePosition;"
    cur.execute(trip_id_query)
    trip_ids = [row[0] for row in cur.fetchall()]
    cur.close()
    conn.close()
    
    return trip_ids

# Function to fetch vehicle positions for a specific trip_id
def fetch_vehicle_positions(trip_id):
    conn, cur = connect_to_postgres()
    if conn is None or cur is None:
        return pd.DataFrame()

    query = """
    SELECT latitude, longitude, vehicle_id, speed, timestamp
    FROM vehiclePosition
    WHERE trip_id = %s
    ORDER BY timestamp;
    """
    
    cur.execute(query, (trip_id,))
    vehicle_positions = pd.DataFrame(cur.fetchall(), columns=['latitude', 'longitude', 'vehicle_id', 'speed', 'timestamp'])
    cur.close()
    conn.close()
    
    return vehicle_positions

# Dash app layout
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Get initial trip_ids to populate the dropdown
trip_ids = fetch_trip_ids()

# Set default trip_id (first one in the list)
initial_trip_id = trip_ids[0] if trip_ids else None

app.layout = dbc.Container([
    html.H1("Vehicle Position Visualization"),
    
    # Dropdown for selecting trip_id
    dcc.Dropdown(
        id="trip-dropdown",
        options=[{'label': trip_id, 'value': trip_id} for trip_id in trip_ids],
        value=initial_trip_id,  # Default to the first trip_id
        clearable=False,
        style={"width": "80%"}
    ),
    
    # Graph to display vehicle positions
    dcc.Graph(id="vehicle-map")
])

# Callback to update map based on selected trip_id
@app.callback(
    Output("vehicle-map", "figure"),
    Input("trip-dropdown", "value")
)
def update_map(trip_id):
    if trip_id is None:
        return px.scatter_mapbox()  # Empty map if no trip_id is selected

    # Fetch vehicle positions for the selected trip_id
    vehicle_positions_df = fetch_vehicle_positions(trip_id)
    
    if vehicle_positions_df.empty:
        return px.scatter_mapbox()  # Return empty map if no data

    # Create a map visualization using Plotly
    fig = px.scatter_mapbox(vehicle_positions_df, 
                            lat="latitude", lon="longitude", 
                            hover_name="vehicle_id", 
                            hover_data=["speed", "timestamp"],
                            title=f"Vehicle Positions for {trip_id}",
                            zoom=12)
    fig.update_traces(marker=dict(size=10, color='red'))  # Increase the size and set color to red
        
    fig.update_layout(mapbox_style="open-street-map", 
                      mapbox_center={"lat": vehicle_positions_df["latitude"].mean(), 
                                     "lon": vehicle_positions_df["longitude"].mean()},
                      margin={"r":0,"t":0,"l":0,"b":0})
    
    return fig

# Main function to run the Dash app
if __name__ == "__main__":
    app.run_server(debug=True, port=8050)

# 11.5.2. Speed Analysis Over Network Segments 

In [None]:
## 11.5.2. Speed Analysis Over Network Segments


import geopandas as gpd
import folium as fl
from folium.features import GeoJson, GeoJsonTooltip
import branca.colormap as cm


# SQL query to get the average speed per segment and geometry
sql_query = """
SELECT
    AVG(s.distance_m / EXTRACT(EPOCH FROM (ts.end_time_actual - ts.start_time_actual)) * 3.6) AS speedKMH,
    s.geometry,
    ts.start_stop_id,
    ts.end_stop_id
    FROM tripSegments ts
JOIN segments s
ON ts.start_stop_id = s.start_stop_id AND ts.end_stop_id = s.end_stop_id
WHERE ts.start_time_actual IS NOT NULL
  AND EXTRACT(EPOCH FROM (ts.end_time_actual - ts.start_time_actual)) > 0
GROUP BY s.geometry, ts.start_stop_id, ts.end_stop_id;
"""

# Function to fetch data from PostgreSQL and return a GeoDataFrame
def fetch_segment_speed_data():
    """
    Connects to PostgreSQL and fetches the average speed per segment.

    Returns:
    - GeoDataFrame containing the segments, speedKMH, and geometries.
    """
    conn = psycopg.connect(host=db_host, port=db_port, dbname=db_name, user=db_user, password=db_pass)
    
    # Load the data into a GeoDataFrame
    gdf = gpd.GeoDataFrame.from_postgis(sql_query, conn, geom_col='geometry')
    
    conn.close()
    return gdf

# Function to create a color map based on speed
def create_colormap(min_speed, max_speed):
    """
    Creates a colormap from white (low speed) to red (high speed).

    Parameters:
    - min_speed: The minimum speed for the color gradient.
    - max_speed: The maximum speed for the color gradient.

    Returns:
    - A colormap function.
    """
    return cm.LinearColormap(['red', 'white'], vmin=min_speed, vmax=max_speed) #faster segments are more white and slower segments are more red

# Function to visualize the speed per segment using folium and GeoJSON
def visualize_speed_map(gdf, cutoff=35):
    """
    Visualizes the average speed per segment on a folium map using GeoJSON.

    Parameters:
    - gdf: GeoDataFrame containing the segment geometries and speedKMH.
    - cutoff: The maximum speed for the color gradient. Speeds above this are capped at the cutoff.
    
    Returns:
    - A folium map with segments color-coded by speedKMH.
    """
    # Create a folium map centered around the area
    transport_map = fl.Map(location=[56.9496, 24.1052], zoom_start=12)

    # Create a colormap for speed values
    min_speed = gdf['speedkmh'].min()
    max_speed = min(gdf['speedkmh'].max(), cutoff)  # Apply cutoff to the maximum speed
    colormap = create_colormap(min_speed, max_speed)

    # Add each segment to the map
    for _, segment in gdf.iterrows():
        # Apply cutoff to the speed
        speed_kmh = min(segment['speedkmh'], cutoff)

        # Get color based on speed
        color = colormap(speed_kmh)

        # Convert the geometry to GeoJSON with associated properties
        geo_json = GeoJson(
            data={
                "type": "Feature",
                "geometry": segment['geometry'].__geo_interface__,
                "properties": {
                    "speedkmh": round(segment['speedkmh'], 2),
                    "from_stop_id": segment['start_stop_id'],
                    "to_stop_id": segment['end_stop_id']
                }
            },
            style_function=lambda x, color=color: {
                'color': color,
                'weight': 3,
                'opacity': 0.7
            },
            tooltip=GeoJsonTooltip(
                fields=['speedkmh', 'from_stop_id', 'to_stop_id'],
                aliases=['Speed (km/h)', 'From Stop', 'To Stop'],
                localize=True
            )
        )
        geo_json.add_to(transport_map)

    # Add the colormap to the map
    colormap.add_to(transport_map)

    return transport_map

# Main script
if __name__ == "__main__":
    # Fetch the segment speed data
    segment_speed_gdf = fetch_segment_speed_data()

    # Visualize the average speed per segment on a folium map
    speed_map = visualize_speed_map(segment_speed_gdf)

    # display the map
    display(speed_map)

# 11.5.3 Delay Analysis in Public Transport

In [None]:
# 11.5.3 Delay Analysis in Public Transport

import pandas as pd
import plotly.express as px

# Function to connect to the PostgreSQL database
def connect_to_postgres():
    try:
        conn = psycopg.connect(
            host=db_host,
            port=db_port,
            dbname=db_name,
            user=db_user,
            password=db_pass
        )
        cur = conn.cursor()
        return conn, cur
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None, None

# Function to fetch the delay data from the database
def fetch_delay_data():
    conn, cur = connect_to_postgres()
    if conn is None or cur is None:
        return pd.DataFrame()

    query = """
    WITH noduptrip AS(
	  SELECT distinct(t.*) 
	  FROM tripStops t),
   tripDelay AS(
        SELECT actual_trip_id, schedule_trip_id,
            tgeompoint_Seq(array_agg(tgeompoint_inst(stop_loc, schedule_time) ORDER BY schedule_time)) AS schedule_trip,
            tgeompoint_Seq(array_agg(tgeompoint_inst(trip_Geom, actual_time) ORDER BY actual_time)) AS actual_trip
        FROM noduptrip
        WHERE actual_trip_id like 'TROL4-%'
        GROUP BY actual_trip_id, schedule_trip_id),
    Final AS (SELECT actual_trip_id, schedule_trip_id, extract(EPOCH FROM actual - schedule)/60 as delay, actual AS t
    FROM  (
      SELECT actual_trip_id, schedule_trip_id,
        unnest(timestamps(actual_trip)) AS actual,
        unnest(timestamps(schedule_trip)) AS schedule
      FROM tripDelay
    ) x)
    SELECT * 
    FROM Final
    WHERE delay < 12;
    """
    
    cur.execute(query)
    delay_data = pd.DataFrame(cur.fetchall(), columns=['actual_trip_id', 'schedule_trip_id', 'delay', 't'])
    cur.close()
    conn.close()
    
    return delay_data

# Function to plot all trips on the same plot
def plot_all_trips(delay_data):
    """
    Plots all delay functions for each trip in one plot.
    
    Parameters:
    - delay_data: DataFrame containing delay data for all trips
    """
    # Create a line plot where color represents each trip
    fig = px.line(delay_data, x='t', y='delay', color='actual_trip_id',
                  title="",
                  labels={"t": "Time", "delay": "Delay (minutes)", "actual_trip_id": "Trip ID"},
                  markers=True)
    
    # Customize the layout
    fig.update_layout(xaxis_title="Time", yaxis_title="Delay (minutes)",
                      xaxis=dict(tickformat="%H:%M:%S"),
                      template="plotly_white")

    # Show the plot
    fig.show()

# Main script
if __name__ == "__main__":
    # Step 1: Fetch the delay data
    delay_data = fetch_delay_data()
    
    if delay_data.empty:
        print("No delay data found.")
    else:
        # Step 2: Plot all delay functions in one plot
        plot_all_trips(delay_data)