# Network modeling
This notebook aim to represent our data as a network where each node represent a stationin with the 15 km boundaries of Zurich HB and edges represent a connection from one stop to another.

## Initialize the environment

In [1]:
%load_ext sparkmagic.magics

Cleaning up livy sessions on exit is enabled


In [2]:
import os
from IPython import get_ipython
username = os.environ['RENKU_USERNAME']
server = "http://iccluster044.iccluster.epfl.ch:8998"

# set the application name as "<your_gaspar_id>-final-project"
get_ipython().run_cell_magic(
    'spark',
    line='config', 
    cell="""{{ "name": "{0}-final-projectt", "executorMemory": "4G", "executorCores": 4, "numExecutors": 10, "driverMemory": "4G" }}""".format(username)
)

In [3]:
get_ipython().run_line_magic(
    "spark", f"""add -s {username}-final-projectt -l python -u {server} -k"""
)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5557,application_1680948035106_5057,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [4]:
%%spark
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.types import FloatType, BooleanType

print('We are using Spark %s' % spark.version)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We are using Spark 2.4.8.7.1.8.0-801

## Useful fonctions and constant definition

In [5]:
%%spark
from math import radians, sin, asin, cos, sqrt

zurich_lat = 47.378177
zurich_lon = 8.540192
zurich_ID = 8503000

def haversine_distance(lon1, lat1, lon2, lat2):
    """Compute the distance between two locations in kilometers given the latitude and longitude of both location"""
    
    # Convert latitude and longitude to radian (from degree) 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    lon_d = lon2 - lon1 
    lat_d = lat2 - lat1 
    a = sin(lat_d/2)**2 + cos(lat1) * cos(lat2) * sin(lon_d/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers
    return c * r

@F.udf
def dist_to_zurich(lon, lat):
    return haversine_distance(lon, lat, zurich_lon, zurich_lat)

@F.udf(returnType=FloatType())
def dist_stations(lon1, lat1, lon2, lat2):
    return haversine_distance(lon1, lat1, lon2, lat2)

@F.udf(returnType=FloatType())
def walking_time(distance):
    """Return the walking time (seconds) for a given distance (km)"""
    return 120 + distance * 1200 # we count 2 min + 1min per 50m

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data loading and processing
The data used are from the week of the 15.03.2023

In [6]:
%%spark
# Path to the ORC data
DATA_PATH = '../jdidio/final/data/'
path_istdaten = "/data/sbb/part_orc/istdaten"
path_allstops = "/data/sbb/orc/allstops"

# Loading ORC data into a Spark dataframe
df_istdaten = spark.read.orc(path_istdaten)
df_allstops = spark.read.orc(path_allstops)
df_trips = spark.read.csv(DATA_PATH + 'trips.txt', header=True)
df_routes = spark.read.csv(DATA_PATH + 'routes.txt', header=True)
df_stop_times = spark.read.csv(DATA_PATH + 'stop_times.txt', header=True)
df_calendar = spark.read.csv(DATA_PATH + 'calendar.txt', header=True)



df_istdaten = df_istdaten.withColumnRenamed("betriebstag", "date_of_trip")\
                .withColumnRenamed("fahrt_bezeichner", "trip_id")\
                .withColumnRenamed("betreiber_id", "operator_id")\
                .withColumnRenamed("betreiber_abk", "operator_abk")\
                .withColumnRenamed("betreiber_name", "operator_name")\
                .withColumnRenamed("produkt_id", "transport_type")\
                .withColumnRenamed("linien_id", "train_number(train)")\
                .withColumnRenamed("linien_text", "service type(train)")\
                .withColumnRenamed("umlauf_id", "circulation_id")\
                .withColumnRenamed("verkehrsmittel_text", "means_of_transport_text")\
                .withColumnRenamed("zusatzfahrt_tf", "if_additional")\
                .withColumnRenamed("faellt_aus_tf", "if_failed")\
                .withColumnRenamed("bpuic", "stop_id")\
                .withColumnRenamed("haltestellen_name", "stop_name")\
                .withColumnRenamed("ankunftszeit", "arrival_time")\
                .withColumnRenamed("an_prognose", "actual_arrival_time")\
                .withColumnRenamed("abfahrtszeit", "departure_time")\
                .withColumnRenamed("ab_prognose", "actual_departure_time")\
                .withColumnRenamed("durchfahrt_tf", "not_stop")

print("***** Actual data *****")
df_istdaten.printSchema()

print("***** All stops *****")
df_allstops.printSchema()

print("***** Trips *****")
df_trips.printSchema()

print("***** Routes *****")
df_routes.printSchema()

print("***** Stop Times *****")
df_stop_times.printSchema()

print("***** Calendar *****")
df_calendar.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

***** Actual data *****
root
 |-- date_of_trip: string (nullable = true)
 |-- trip_id: string (nullable = true)
 |-- operator_id: string (nullable = true)
 |-- operator_abk: string (nullable = true)
 |-- operator_name: string (nullable = true)
 |-- transport_type: string (nullable = true)
 |-- train_number(train): string (nullable = true)
 |-- service type(train): string (nullable = true)
 |-- circulation_id: string (nullable = true)
 |-- means_of_transport_text: string (nullable = true)
 |-- if_additional: string (nullable = true)
 |-- if_failed: string (nullable = true)
 |-- stop_id: string (nullable = true)
 |-- stop_name: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- actual_arrival_time: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- actual_departure_time: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- not_stop: string (nullable = true)
 |-- ye

### Trips filtering based on calendar
We keep only trips that are operating on Tuesday

In [7]:
%%spark
df_tuesday_services = df_calendar.filter(df_calendar.tuesday == '1').select('service_id')
df_trips = df_trips.join(df_tuesday_services, df_trips.service_id == df_tuesday_services.service_id, 'inner')\
                    .select(df_trips.route_id, df_trips.trip_id)

df_trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------------------+
|     route_id|             trip_id|
+-------------+--------------------+
|91-10-A-j23-1|1.TA.91-10-A-j23-...|
|91-10-A-j23-1|10.TA.91-10-A-j23...|
|91-10-A-j23-1|102.TA.91-10-A-j2...|
|91-10-A-j23-1|105.TA.91-10-A-j2...|
|91-10-A-j23-1|106.TA.91-10-A-j2...|
+-------------+--------------------+
only showing top 5 rows

### Radius filtering

In [8]:
%%spark
stops_in_radius = df_allstops.filter(dist_to_zurich(df_allstops.stop_lon, df_allstops.stop_lat) < 20)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
%%spark
stops_in_radius.describe().show()
stops_in_radius.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------------+-------------------+-------------------+-------------+--------------+
|summary|           stop_id|       stop_name|           stop_lat|           stop_lon|location_type|parent_station|
+-------+------------------+----------------+-------------------+-------------------+-------------+--------------+
|  count|              3028|            3028|               3028|               3028|         3028|          3028|
|   mean| 8568073.374598809|            null|  47.37913347644312|  8.543148671133036|          1.0|          null|
| stddev|186157.88503859195|            null|0.07683802222411877|0.11870116215213007|          0.0|          null|
|    min|               176|          Aathal|   47.1985932328675|   8.27702313011239|             |              |
|    max|     Parent8591065|Zürichhorn (See)|   47.5564281823636|   8.80283401543927|            1| Parent8591065|
+-------+------------------+----------------+-------------------+---------------

## Walking table

In [10]:
%%spark
df_walking_stop = stops_in_radius.select(F.col("stop_id").alias("stop_id1"), \
            F.col("stop_name").alias("name1"), \
            F.col("stop_lat").alias("lat1"), \
            F.col("stop_lon").alias("lon1")) \
    .crossJoin(stops_in_radius.select(F.col("stop_id").alias("stop_id2"),\
                                  F.col("stop_name").alias("name2"),\
                                  F.col("stop_lat").alias("lat2"),\
                                  F.col("stop_lon").alias("lon2"))) \
    .withColumn("distance", dist_stations(F.col("lon1"), F.col("lat1"), F.col("lon2"), F.col("lat2")))\
    .select(F.col("stop_id1"), F.col("name1"), F.col("stop_id2"), F.col("name2"), F.col("distance")) \
    .filter("distance<=0.5 and distance>=0.0 and stop_id1 != stop_id2")

df_walking_stop.describe().show()
df_walking_stop.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------------+------------------+----------------+-------------------+
|summary|          stop_id1|           name1|          stop_id2|           name2|           distance|
+-------+------------------+----------------+------------------+----------------+-------------------+
|  count|             26328|           26328|             26328|           26328|              26328|
|   mean| 8570280.564485388|            null| 8570280.564485388|            null|0.21297821044136592|
| stddev|204297.91894663416|            null|204297.91894663335|            null|0.17456925516634392|
|    min|               176|          Aathal|               176|          Aathal|                0.0|
|    max|     Parent8591065|Zürichhorn (See)|     Parent8591065|Zürichhorn (See)|          0.4999911|
+-------+------------------+----------------+------------------+----------------+-------------------+

+--------+--------------------+-------------+-------------+----------+
|stop_id1|

In [11]:
%%spark
df_walking_stop = df_walking_stop.withColumn("walking_time", walking_time(df_walking_stop.distance))\
                             .select("stop_id1","name1","stop_id2","name2","walking_time", "distance")

df_walking_stop = df_walking_stop.withColumn("reachable_stops", F.struct('stop_id2', 'name2', 'walking_time', 'distance'))\
                                .select('stop_id1', 'name1', 'reachable_stops')\
                                .groupBy('stop_id1', 'name1').agg(F.collect_list('reachable_stops').alias('reachable_stops'))

df_walking_stop.describe().show(truncate=False)
df_walking_stop.show(5)
df_walking_stop.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------------+
|summary|stop_id1          |name1           |
+-------+------------------+----------------+
|count  |2852              |2852            |
|mean   |8569317.333167082 |null            |
|stddev |193816.35158222966|null            |
|min    |176               |Aathal          |
|max    |Parent8591065     |Zürichhorn (See)|
+-------+------------------+----------------+

+--------------+-----------------+--------------------+
|      stop_id1|            name1|     reachable_stops|
+--------------+-----------------+--------------------+
|8503000:0:32CD|        Zürich HB|[[8503000, Zürich...|
|   8503141:0:2|Küsnacht Goldbach|[[8503141:0, Küsn...|
|     8503307:0|      Bassersdorf|[[8503307:0:1, Ba...|
|       8507447|  Hirzel, Wässeri|[[8503752, Hirzel...|
|       8573725|  Islisberg, Dorf|[[8583206, Islisb...|
+--------------+-----------------+--------------------+
only showing top 5 rows

root
 |-- stop_id1: string (nullable = true)
 |-- name1:

## Trips table
- List of triplet (stopID, arrival time, departure time)
- routeID
- ID

In [12]:
%%spark
df_stop_times_tuples = df_stop_times.withColumn("trip_tuple",F.struct(df_stop_times.stop_id,df_stop_times.departure_time,df_stop_times.arrival_time))\
                                    .select('trip_id', 'trip_tuple')
df_stop_times_tuples.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+-----------------------------+
|trip_id              |trip_tuple                   |
+---------------------+-----------------------------+
|1.TA.91-C25-j23-1.1.H|[8772568, 07:27:00, 07:27:00]|
|1.TA.91-C25-j23-1.1.H|[8772135, 07:40:00, 07:39:00]|
|1.TA.91-C25-j23-1.1.H|[8772133, 07:50:00, 07:48:00]|
|1.TA.91-C25-j23-1.1.H|[8772128, 07:59:00, 07:58:00]|
|1.TA.91-C25-j23-1.1.H|[8772319, 08:17:00, 08:14:00]|
+---------------------+-----------------------------+
only showing top 5 rows

In [13]:
%%spark
df_stop_times_grouped = df_stop_times_tuples.groupby('trip_id').agg(F.collect_list('trip_tuple').alias('trip_tuples'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
%%spark
df_trips_clean = df_trips.join(df_stop_times_grouped, df_trips.trip_id == df_stop_times_grouped.trip_id, "inner")\
                    .select(df_trips.route_id, df_trips.trip_id, df_stop_times_grouped.trip_tuples)

df_trips_clean.show(5)
df_trips_clean.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+
|      route_id|             trip_id|         trip_tuples|
+--------------+--------------------+--------------------+
| 91-27-B-j23-1|1.TA.91-27-B-j23-...|[[8503225:0:6, 16...|
|  91-4-A-j23-1|1.TA.91-4-A-j23-1...|[[8503097:0:2, 18...|
| 91-4A-Y-j23-1|1.TA.91-4A-Y-j23-...|[[8505307:0:1, 13...|
|  91-P34-j23-1|1.TA.91-P34-j23-1...|[[8721202, 12:34:...|
|91-P45-A-j23-1|1.TA.91-P45-A-j23...|[[8718500, 18:38:...|
+--------------+--------------------+--------------------+
only showing top 5 rows

696922

In [15]:
%%spark

stop_in_rad = list(stops_in_radius.select('stop_id').toPandas().stop_id)

@F.udf(returnType=BooleanType())
def trip_in_rad(arr):
    outcome = False
    for s in arr:
        if (s[0] in stop_in_rad):
            outcome = True
    return outcome

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
%%spark

df_trips_clean = df_trips_clean.filter(trip_in_rad(df_trips_clean.trip_tuples))
df_trips_clean.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

59829

### Routes table
Should contain:
- List of stopID (all stop on the route)
- List of tripID (all trip of the route)
- RouteID
- Transport type

In [17]:
%%spark
df_stop_times_tuples = df_stop_times.withColumn("stop_tuple",F.struct(df_stop_times.stop_id,df_stop_times.stop_sequence))\
                                    .select('trip_id', 'stop_tuple')\
                                    .groupBy('trip_id').agg(F.collect_list('stop_tuple').alias('stop_tuples'))

df_routes_stops = df_trips.join(df_stop_times_tuples, df_trips.trip_id == df_stop_times_tuples.trip_id, "inner")\
                    .select(df_trips.route_id, df_stop_times_tuples.trip_id, df_stop_times_tuples.stop_tuples)\
                    .groupBy('route_id').agg(F.max('stop_tuples').alias('stop_tuples'))

df_trips_grouped_route = df_trips_clean.groupby('route_id').agg(F.collect_list('trip_id').alias('list_trip_id'))

df_routes_trips = df_routes.join(df_trips_grouped_route, df_routes.route_id == df_trips_grouped_route.route_id, "inner")\
                    .select(df_routes.route_id, df_trips_grouped_route.list_trip_id, df_routes.route_desc)

df_routes_stops.show(5)
df_routes_trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+
|      route_id|         stop_tuples|
+--------------+--------------------+
| 91-10-E-j23-1|[[8591406, 1], [8...|
|  91-3-B-j23-1|[[8500218:0:9, 1]...|
|92-107-F-j23-1|[[8583721:0:U, 1]...|
|92-110-B-j23-1|[[8582007, 1], [8...|
|92-312-D-j23-1|[[1100330, 1], [1...|
+--------------+--------------------+
only showing top 5 rows

+--------------+--------------------+----------+
|      route_id|        list_trip_id|route_desc|
+--------------+--------------------+----------+
|  92-674-j23-1|[1.TA.92-674-j23-...|         B|
| 91-10-E-j23-1|[85.TA.91-10-E-j2...|         T|
|   92-99-j23-1|[34.TA.92-99-j23-...|         B|
|92-305-A-j23-1|[165.TA.92-305-A-...|         B|
|96-186-9-j23-1|[1.TA.96-186-9-j2...|         B|
+--------------+--------------------+----------+
only showing top 5 rows

In [18]:
%%spark
df_routes_clean = df_routes_stops.join(df_routes_trips, df_routes_stops.route_id == df_routes_trips.route_id, 'inner')\
                            .select(df_routes_stops.route_id, df_routes_stops.stop_tuples, df_routes_trips.list_trip_id, df_routes_trips.route_desc)
df_routes_clean.show(5)
df_routes_clean.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+----------+
|      route_id|         stop_tuples|        list_trip_id|route_desc|
+--------------+--------------------+--------------------+----------+
| 91-10-E-j23-1|[[8591406, 1], [8...|[750.TA.91-10-E-j...|         T|
|  92-674-j23-1|[[8576297, 1], [8...|[234.TA.92-674-j2...|         B|
| 91-43-Y-j23-1|[[8506302:0:2, 1]...|[23.TA.91-43-Y-j2...|        RE|
| 91-4Q-Y-j23-1|[[8502213:0:2, 1]...|[1.TA.91-4Q-Y-j23...|         S|
|92-305-A-j23-1|[[8587020:0:C, 1]...|[74.TA.92-305-A-j...|         B|
+--------------+--------------------+--------------------+----------+
only showing top 5 rows

408

In [19]:
%%spark

df_routes_clean = df_routes_clean.filter(trip_in_rad(df_routes_clean.stop_tuples))
df_routes_clean.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

394

### Save table in HDFS

In [20]:
%%spark
OUTPUT_PATH = '../rjaccard/final/data/clean/'
df_walking_stop.write.format("parquet").mode("overwrite").option("header", "true").save(OUTPUT_PATH + 'walking.parquet')
df_routes_clean.write.format("parquet").mode("overwrite").option("header", "true").save(OUTPUT_PATH + 'routes.parquet')
df_trips_clean.write.format("parquet").mode("overwrite").option("header", "true").save(OUTPUT_PATH + 'trips.parquet')
stops_in_radius.write.format("parquet").mode("overwrite").option("header", "true").save(OUTPUT_PATH + 'stops.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Load table from HDFS

In [21]:
%%spark
DATA_PATH = '../rjaccard/final/data/clean/'
routes = spark.read.parquet(OUTPUT_PATH + 'routes.parquet')
trips = spark.read.parquet(OUTPUT_PATH + 'trips.parquet')
walking = spark.read.parquet(OUTPUT_PATH + 'walking.parquet')

routes.show(5)
trips.show(5)
walking.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+--------------------+----------+
|      route_id|         stop_tuples|        list_trip_id|route_desc|
+--------------+--------------------+--------------------+----------+
|  92-3-Q-j23-1|[[8590991, 1], [8...|[5360.TA.92-3-Q-j...|         B|
|  92-950-j23-1|[[8576101, 1], [8...|[135.TA.92-950-j2...|         B|
|96-154-9-j23-1|[[8502889:0:A, 1]...|[8.TA.96-154-9-j2...|         B|
|  92-156-j23-1|[[8590824, 2], [8...|[899.TA.92-156-j2...|         B|
| 92-46-D-j23-1|[[8591423, 1], [8...|[226.TA.92-46-D-j...|         B|
+--------------+--------------------+--------------------+----------+
only showing top 5 rows

+--------------+--------------------+--------------------+
|      route_id|             trip_id|         trip_tuples|
+--------------+--------------------+--------------------+
|  92-816-j23-1|1.TA.92-816-j23-1...|[[8573504, 07:27:...|
|96-186-1-j23-1|1.TA.96-186-1-j23...|[[8573703, 07:25:...|
|92-126-A-j23-1|10.TA.92-126-A-j2...|[[8503798:0:C

In [22]:
%%spark
routes.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

394

In [23]:
%%spark
trips.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

59829

In [25]:
# %spark cleanup