### SETUP WORK ENVIROMENT

In [1]:
import base64 as b64
import json
import time
import re
import os
import warnings

warnings.simplefilter(action='ignore', category=UserWarning)
warnings.filterwarnings("ignore", category=UserWarning, message="pandas only supports SQLAlchemy connectable .*")

import pwd
import numpy as np
import sys
import pandas as pd

from pyspark.sql import SparkSession
from random import randrange
import pyspark.sql.functions as F
#np.bool = np.bool_

import trino
from contextlib import closing
from urllib.parse import urlparse
from trino.dbapi import connect
from trino.auth import BasicAuthentication, JWTAuthentication


from pyspark.sql import SparkSession
from random import randrange
import pyspark.sql.functions as F

groupName='U1'

In [2]:
# PySpark core functions and types
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, lit, udf, expr, when, explode,
    from_unixtime, to_timestamp, unix_timestamp,
    year, month, dayofmonth, dayofweek, hour, minute,
    radians, sin, cos, atan2, sqrt, pow, exp, abs as abs_,
    substring, lower, trim, regexp_replace,
    countDistinct, coalesce, lead, lag,
    sum as spark_sum, max as spark_max, avg, variance, stddev,
    least, greatest, first
)
from pyspark.sql.types import StringType, DoubleType, ArrayType
from pyspark.sql.window import Window

# Datetime and timezone utilities
from datetime import datetime, timedelta
import pytz

# Geospatial and distance calculation
import geopandas as gpd
from shapely.geometry import Point
import shapely.wkb
from geopy.distance import geodesic
from math import radians, sin, cos, sqrt, atan2

# Data science utilities
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt

# Graph utilities
import networkx as nx
from collections import defaultdict
from heapq import heappush, heappop

# Widgets and display
import ipywidgets as widgets
import folium
from IPython.display import display, clear_output
from datetime import datetime, timedelta

In [3]:
def getUsername():
    payload = os.environ.get('EPFL_COM490_TOKEN').split('.')[1]
    payload=payload+'=' * (4 - len(payload) % 4)
    obj = json.loads(b64.urlsafe_b64decode(payload))
    if (time.time() > int(obj.get('exp')) - 3600):
        raise Exception('Your credentials have expired, please restart your Jupyter Hub server:'
                        'File>Hub Control Panel, Stop My Server, Start My Server.')
    time_left = int((obj.get('exp') - time.time())/3600)
    return obj.get('sub'), time_left

In [4]:
trinoAuth = JWTAuthentication(os.environ.get('EPFL_COM490_TOKEN'))
trinoUrl  = urlparse(os.environ.get('TRINO_URL'))
Query=[]

print(f"Warehouse URL: {trinoUrl.scheme}://{trinoUrl.hostname}:{trinoUrl.port}/")

conn = connect(
    host=trinoUrl.hostname,
    port=trinoUrl.port,
    auth=trinoAuth,
    http_scheme=trinoUrl.scheme,
    verify=True
)

print('Connected!')

Warehouse URL: https://iccluster028.iccluster.epfl.ch:8443/
Connected!


In [5]:
username = pwd.getpwuid(os.getuid()).pw_name
hadoopFS=os.getenv('HADOOP_FS', None)
groupName = 'U1'

print(os.getenv('SPARK_HOME'))
print(f"hadoopFSs={hadoopFS}")
print(f"username={username}")
print(f"group={groupName}")

/opt/spark
hadoopFSs=hdfs://iccluster059.iccluster.epfl.ch:9000
username=spasov
group=U1


In [6]:
username, validity_h = getUsername()
hadoopFS = os.environ.get('HADOOP_FS')
namespace = 'iceberg.' + username
sharedNS = 'iceberg.com490_iceberg'

if not re.search('[A-Z][0-9]', groupName):
    raise Exception('Invalid group name {groupName}')

print(f"you are: {username}")
print(f"credentials validity: {validity_h} hours left.")
print(f"shared namespace is: {sharedNS}")
print(f"your namespace is: {namespace}")
print(f"your group is: {groupName}")

you are: spasov
credentials validity: 43 hours left.
shared namespace is: iceberg.com490_iceberg
your namespace is: iceberg.spasov
your group is: U1


In [7]:
spark = SparkSession\
            .builder\
            .appName(pwd.getpwuid(os.getuid()).pw_name)\
            .config('spark.ui.port', randrange(4040, 4440, 5))\
            .config("spark.executorEnv.PYTHONPATH", ":".join(sys.path)) \
            .config('spark.jars', f'{hadoopFS}/data/com-490/jars/iceberg-spark-runtime-3.5_2.13-1.6.1.jar')\
            .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\
            .config('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')\
            .config('spark.sql.catalog.iceberg.type', 'hadoop')\
            .config('spark.sql.catalog.iceberg.warehouse', f'{hadoopFS}/data/com-490/iceberg/')\
            .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')\
            .config('spark.sql.catalog.spark_catalog.type', 'hadoop')\
            .config('spark.sql.catalog.spark_catalog.warehouse', f'{hadoopFS}/user/{username}/assignment-3/warehouse')\
            .config("spark.sql.warehouse.dir", f'{hadoopFS}/user/{username}/assignment-3/spark/warehouse')\
            .config('spark.eventLog.gcMetrics.youngGenerationGarbageCollectors', 'G1 Young Generation')\
            .config("spark.executor.memory", "6g")\
            .config("spark.executor.cores", "4")\
            .config("spark.executor.instances", "4")\
            .master('yarn')\
            .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/29 22:23:35 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [8]:
spark.sparkContext

### IMPORT LIBRARIES 

In [9]:
#IMPORT custom scripts file

import importlib.util
import os

# Define path to the file
file_path = os.path.abspath('./scripts/pathfinder.py')
# Load the module
spec = importlib.util.spec_from_file_location("utils", file_path)
pathfinder = importlib.util.module_from_spec(spec)
spec.loader.exec_module(pathfinder)
print(pathfinder.test())

Imported successfully!


In [10]:
# !pip install geopy
# !pip install folium

### IMPORT FUNCTIONS

### PREPARE SPARK DATAFRAMES 

In [11]:
# model = spark.read.parquet(f"{hadoopFS}/user/com-490/group/U1/multiclass_model.parquet")
master = spark.read.parquet(f"{hadoopFS}/user/com-490/group/U1/edges")
master = master.drop("route_id")
model = pathfinder.get_confidence_df(spark)
# master.show()
# model.show()

                                                                                

In [12]:
# master.filter(col("from_stop") == '8595939') \
#     .filter(col("to_stop") == '8595937') \
#     .filter(col("dow") == 3) \
#     .filter(col("t_arrival") >= '13:00:00') \
#     .orderBy("t_arrival") \
#     .show(30, truncate=False)

### USER TEST 

In [13]:
existing_stops = pathfinder.get_all_close_stop_pairs(spark, radius_meters=500)[1]
# Create stop dictionary
stop_name_dict = {
    row["stop_id"]: row["stop_name"]
    for row in existing_stops.select("stop_id", "stop_name").collect()
}

submit_button = pathfinder.create_stop_selector_widget(spark, master, model, stop_name_dict)

                                                                                

Combobox(value='Lausanne, Bellerive', continuous_update=False, description='Origin:', ensure_option=True, layo…

Combobox(value='Ecublens VD, EPFL (bus)', continuous_update=False, description='Target:', ensure_option=True, …

Text(value='15:00:00', description='Arrival Time:', layout=Layout(width='300px'), placeholder='HH:MM:SS')

Dropdown(description='Day of Week:', index=3, layout=Layout(width='300px'), options={'Monday': 0, 'Tuesday': 1…

Button(button_style='success', description='Submit', style=ButtonStyle())

Output()

**Comment:** An obvious inconsistency with the result is that the model predicts very small (or no confidence) in making successful trips. This is attributed to inconsistencies in the timetable data where the same trip is assigned multiple trip_ids, or multiple trips share the same trip_id.

# PATH FINDING ALGORITHM

The goal is to find the shortest and less risky path from given origin to a target and target arrival time.

Dijkstra algorithm computes the shortest path from a given origin (A) to ***all*** other acessible points (from A) within a defined network. A connection between nodes A and B is called an edge.
The method checks and stores distances to all nodes in vicinty () of the current node. Then moves to the closest node that has not been visited yet. Provided the network is *well-connected* meaning: no isolated stops and no connections leading far far away from the city and the stop clustering. This is for sure not going to be a primary issue. The method explores the distance (edge weight) to all nodes and does so with compexity $\mathcal{O}((V + E) \log V)$, where $V$ is number of network nodes (stops) and $E$ is number of edges (connections). This is the worst case scenario, with luck and good origin the method completes much faster.  

The standard netowrxx library function offers a well optimized method with option to generate the fastest path from the origin to the stop you desire. 
Going further, using the **Yen's algorithm**, which is sucessive to Dijstra alg. You go throught Dijsktra path and then find the best node to branch off at and recompute a so-called Spur path by the same principles. This gives you two (and in principle *k* best paths using networxx shortest_distance).

The issue are the Edge weights! In our code an edge A and B will carry 3 parameters, be it walking or vehicle between: **T_nominal**, **t_arrival** @B and **R_risk** @A. T_nominal is the scheduled travel time comming from trip schedule (*master*) or geospatial distance divided by walking speed. R_risk is the expected dalay that we get from (*model*). It goes by stopID, day of week and hour of day. Ideally we wanted to join by type of transport but the table master comes from has a very weird style for transporation labeling, which made a join impossible.

Now, Dijkstra algorithm is *static* is the sense that the edgeweight dont reflect time propagation. It is therefore an excellent method for path finding based on distance which is static. 
As an example:
	You want tot go to EPFL from Ouchy. You took the 24 buss to Bourdonnette at 12.00 and now want to go futher by m1. Dijkstra checked multiple options 12.15, 12.35, 12.55, 13.15 etc (maybe even 11.35 cause why not) and came to the conclusion that based on delays R and Tnominal alone, the 11.35 or 12.35 is fastest (has less delays or fastest nominal travel). Static Dijkstra is clueless about waiting or even what causality is!!!

To combat this we use t_arrival vs CLK (clock). We tried implementing our own algorithm modification where the propagation would consider both weights and time. You can run the **dynamic_dijkstra** and see the result. At this stage the path is a collection of many many small steps (that seem right, but in the grand total dont leed toward the target). Add the fact that there are many tripID issues in master with trips missing/disappearing mid route (more on that later) causes the path to become a *"browinaian motion"* with some sanity restrictions. To highlight the two main difficulties: Find a different way to trace vehicle edges (with consistent schedules), **Directionaity** -we walk all over the place without nearing the target.

*The first problem can be seen by running a simple filtering like so: 

master.filter(col("from_stop") == '8595939') \
    .filter(col("to_stop") == '8595937') \
    .filter(col("dow") == 3) \
    .filter(col("t_arrival") >= '13:00:00') \
    .orderBy("t_arrival") \
    .show(30, truncate=False)

If done for stops pairs that you know follow each other, you can trace how some tripIDs disappear mid route or simply dont exist at all. In one hour window a pair will have 6 different instances (counting two directions), while the next hour its only 4, or 0! This inconsitency makes reliable back-tracing impossible, and the reason in the output you see so many walking edges. 

*The second problem with lack of directionality is **solved** by using static dijkstra to determine the optimal path based on R and T alone. It is navigating a geographical "heatmap" of delays and the scheduled travel. Even though the algorithm still doesnt consider causality and wating, the stops sequence it deems optimal are likely to be best verion (and most importantly we can have a garantee that both origin and target are found!). 
The example I mentioned earlier is actually safe in this approach. You recover the important Ouchy... -> Bourdonette -> EPFL, and can then **backtrace** but with direct directives as to where! 

After the **skeleton** is computed, we can backtrace and select best tripIDs based on *t_arrival* and *CLK*. All would be great, if it werent for tripID issue in transit_edges, and sometimes missing walking edges. Walking edges missing is due to the max distance requirement, but tripID iconsitencies is an actuall plague. Keep in mind we apply an overall time-window by default 1 hour, so if its a long journey that has to wait a lot (because of missing trips) the will end up going out of time bounds before raching origin in backwards propagation.

Finally the risk assessment. At this point, the risk is more a measure of likelyhood of inconsitency in transit edges, than acutally what it is supposed to mean. Indeed using the formula $((wait-walk)-R\_risk/std\_dev(R)$ looses meaning if either the walking or transit edges is None. Which is what happens in many cases. If something is missing, we have to use default values, but the logic of the formula is still lost, and with it the siginificance of the overall cumulative confidence. 

### ALTERNATIVE TO WIDGET

In [29]:
# ALTERNATIVE TO WIDGET

# # First for widget to get correct options
# existing_stops = get_all_close_stop_pairs(spark, radius_meters=500)[1]
# # Create stop dictionary
# stop_name_dict = {
#     row["stop_id"]: row["stop_name"]
#     for row in existing_stops.select("stop_id", "stop_name").collect()
# }

# # Inputs for test
# # 8595937 UNIL sport
# # 8591989 Lausanne, Bellerive
# # 8501214 Ecoublens VD, EPFL
# # 8501210 Lausanne, Bourdonnette
# # 8501209 Lausanne, MAlley

# # If you want your own:
# # name_to_id = {v: k for k, v in stop_name_dict.items()}
# # stop_id = name_to_id["Lausanne, Vidy-Port"]
# # print(stop_id)

# origin, target = "8591989", "8595937"  
# arrival = "15:00:00"
# dow = "3"
# time_window_sec=5400

# # Create the edges and nodes
# existing_stops, transit_edges, walking_edges = get_data(master, model, arrival, dow, time_window_sec)

# # transit_edges.show()
# # walking_edges.show()

# # Build the Network
# graph = build_graph_from_edges(transit_edges, walking_edges, risk=True)

# # Run your static method on the network
# # path = dijkstra_path(graph, origin, target)
# path_prim, path_spur = multi_path(graph, origin, target)

# # Visulatize the path, static skeleton path.
# print(path_prim)
# print(path_spur)
# plot_stop_network_with_path(graph, existing_stops, path_prim, path_spur)


# path_rendering, danger = dynamic_backtracing(path_prim, arrival, transit_edges, walking_edges)
# path_rendering_df = display_path(path_rendering, stop_name_dict)
# print(f"Cumulative Risk Factor: {math.exp(danger)}")

### TIME DEPENDANT DIJKSTRA

In [None]:
# origin, target = "8591989", "8595937"
# arrival = "15:00:00"
# dow = "3"
# time_window_sec=3600

# existing_stops, transit_edges, walking_edges = pathfinder.get_data(master, model, arrival, dow, time_window_sec)
# graph = pathfinder.get_network_dict(existing_stops, transit_edges, walking_edges)
# path_rendering, map_rendering, visited = pathfinder.dynamic_dijkstra(graph, origin, target, arrival)

In [126]:
spark.stop()