# Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
!pip install apache-sedona geopandas

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import os
import pwd
import sys

from pyspark.sql import SparkSession, DataFrame, Window
from sedona.spark import SedonaContext
from random import randrange
import pyspark.sql.functions as F

username = pwd.getpwuid(os.getuid()).pw_name
hadoopFS=os.getenv('HADOOP_FS', None)
groupName = 'A1'
namespace = 'iceberg.' + username
sharedNS = 'iceberg.com490_iceberg'

print(os.getenv('SPARK_HOME'))
print(f"hadoopFSs={hadoopFS}")
print(f"username={username}")
print(f"group={groupName}")

/opt/spark
hadoopFSs=hdfs://iccluster059.iccluster.epfl.ch:9000
username=jsiffert
group=A1


In [4]:
spark = SparkSession\
    .builder\
    .appName(pwd.getpwuid(os.getuid()).pw_name)\
    .config('spark.ui.port', randrange(4040, 4440, 5))\
    .config("spark.executorEnv.PYTHONPATH", ":".join(sys.path)) \
    .config('spark.jars', f"{hadoopFS}/data/com-490/jars/iceberg-spark-runtime-3.5_2.13-1.6.1.jar")\
    .config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-3.5_2.13:1.7.1,"
        "org.datasyslab:geotools-wrapper:1.7.1-28.5",
    )\
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )\
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')\
    .config('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')\
    .config('spark.sql.catalog.iceberg.type', 'hadoop')\
    .config('spark.sql.catalog.iceberg.warehouse', f'{hadoopFS}/data/com-490/iceberg/')\
    .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')\
    .config('spark.sql.catalog.spark_catalog.type', 'hadoop')\
    .config('spark.sql.catalog.spark_catalog.warehouse', f'{hadoopFS}/user/{username}/assignment-3/warehouse')\
    .config("spark.sql.warehouse.dir", f'{hadoopFS}/user/{username}/assignment-3/spark/warehouse')\
    .config('spark.eventLog.gcMetrics.youngGenerationGarbageCollectors', 'G1 Young Generation')\
    .config("spark.executor.memory", "6g")\
    .config("spark.executor.cores", "4")\
    .config("spark.executor.instances", "4")\
    .master('yarn')\
    .getOrCreate()

sedona = SedonaContext.create(spark)

https://artifacts.unidata.ucar.edu/repository/unidata-all added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/jsiffert/.ivy2/cache
The jars for the packages stored in: /home/jsiffert/.ivy2/jars
org.apache.sedona#sedona-spark-3.5_2.13 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1be1fb6a-3d9f-4515-85b9-9375b374752b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/spark-3.5.2-bin-hadoop3-scala2.13/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.sedona#sedona-spark-3.5_2.13;1.7.1 in central
	found org.apache.sedona#sedona-common;1.7.1 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found org.locationtech.jts#jts-core;1.20.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found org.locationtech.spatial4j#spatial4j;0.8 in central
	found com.google.geometry#s2-geometry;2.0.0 in central
	found com.google.guava#guava;25.1-jre in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found org.checkerframework#checker-qual;2.0.0 in central
	found com.google.errorprone#error_prone_annotations;2.1.3 in central
	found com.google.j2objc#j2objc-annotations;1.1 in central
	found org.codehaus.mojo#animal-sniffer-annotations;1.14 in central
	found com.uber#h3;4.1.1 in central
	found net.sf.geographiclib#GeographicLib-Java;1.52 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.2 in central
	found org.checkerframework#checker-qual;3.10.0 in central
	found com.google.error

In [5]:
spark.sql(f'CREATE SCHEMA IF NOT EXISTS spark_catalog.{username}')
spark.sql(f'USE spark_catalog.{username}')

DataFrame[]

# Robust Journey Planning

In [17]:
from graph.graph import MultiDiGraph
from graph.path import Path
from config import *
from graph.algorithms import dijkstra, yens_k_shortest_paths
from utilities import string_to_switzerland_time
from transportation_network.network import build_timetable, build_network
from predictive_modeling.predictive_model import PredictiveModel

In [8]:
DEFAULT_ARRIVAL_TIME = string_to_switzerland_time("2024-04-04 10:30:00")
DEFAULT_MAXIMUM_WALKING_DISTANCE = 1000
DEFAULT_CONFIDENCE_THRESHOLD = 0.8
DEFAULT_PATH_COUNT = 3
DEFAULT_START_STOP_ID = "8501214" # Ecublens VD, EPFL
DEFAULT_END_STOP_ID = "8579239" # Epalinges, Croisettes
DEFAULT_RAIN = False
DEFAULT_TEMPERATURE = "medium"
# "8501214", # Ecublens VD, EPFL
# "8592226", # St-Sulpice VD, Russel
# "8590442", # Lausanne, Riponne-M. Béjart
# "8588157", # St-Sulpice VD, Bochet
# "8579239" # Epalinges, Croisettes
# "8501210" # Lausanne, Bourdonnette

In [9]:
stop_times_df, stops_df, transfer_df = build_timetable(
    spark, 
    arrival_datetime=DEFAULT_ARRIVAL_TIME,
    region_uuids=REGION_UUIDS,
    min_departure_hour=MIN_DEPARTURE_HOUR,
    max_arrival_hour=MAX_ARRIVAL_HOUR
)

                                                                                

In [10]:
network_df = build_network(
    stops_df,
    stop_times_df,
    transfer_df,
    maximum_walking_distance=DEFAULT_MAXIMUM_WALKING_DISTANCE,
    walking_speed=WALKING_SPEED,
)

In [11]:
network_df = network_df.cache()

In [12]:
graph = MultiDiGraph.from_spark_dataframe(network_df, arrival_datetime=DEFAULT_ARRIVAL_TIME)

                                                                                

In [13]:
yen_paths = yens_k_shortest_paths(
    graph,
    source_id=DEFAULT_END_STOP_ID, # reverse path finding
    target_id=DEFAULT_START_STOP_ID,
    K=DEFAULT_PATH_COUNT,
    arrival_datetime=DEFAULT_ARRIVAL_TIME,
    maximum_walking_distance=DEFAULT_MAXIMUM_WALKING_DISTANCE,
    walking_speed=WALKING_SPEED,
    default_transfer_time=DEFAULT_TRANSFER_TIME
)

In [14]:
paths = [Path.from_dijkstra_output(
    path,
    arrival_datetime=DEFAULT_ARRIVAL_TIME,
    maximum_walking_distance=DEFAULT_MAXIMUM_WALKING_DISTANCE,
    walking_speed=WALKING_SPEED
) for path in yen_paths]

In [15]:
paths = [path.compress() for path in paths]

In [18]:
predictive_model = PredictiveModel(spark, f'/user/{username}/assignment-3/{PM_TABLE_NAME}.parquet')

                                                                                

In [19]:
predictive_model.get_path_confidence(paths[0], DEFAULT_RAIN, DEFAULT_TEMPERATURE)

Computing path confidence | Ecublens VD, EPFL -> Epalinges, Croisettes
[297.TA.91-m1-j24-1.3.H] -> (Lausanne-Flon, pl. de l'Europe) -> [984.TA.91-m2-j24-1.2.H] | P(d<180.0)=0.8583333333333333 (<built-in method count of Row object at 0x7fd1ff331a10> samples)
Key ('8592128', 10, False, 'medium') don't match any distributions, fallback to exponential distribution.
[984.TA.91-m2-j24-1.2.H] -> (Lausanne, Vennes) = Arrival | P(d<107.0)=0.99 (-1 samples)


0.8497499999999999

In [20]:
# spark.stop()