## Spark stuff

**In peak hours the sesson might not be able to start** and throw:
```
The code failed because of a fatal error:
	Session xxxx did not start up in 60 seconds..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.
```
Solution is to keep retrying ;) good luck for that!

In [1]:
%%local
import os
import json
from IPython import get_ipython

username = os.environ['RENKU_USERNAME']

configuration = dict(
    name = f"{username}-final-project",
    executorMemory = "4G",
    executorCores = 4,
    numExecutors = 10,
    conf = {
        
        "spark.jars.repositories": "https://repos.spark-packages.org",
        "spark.jars.packages": "graphframes:graphframes:0.8.2-spark2.4-s_2.11"
    }
)


get_ipython().run_cell_magic('configure', line="-f", 
                             cell=json.dumps(configuration))

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3127,application_1680948035106_2893,pyspark,idle,Link,Link,,
3270,application_1680948035106_3027,pyspark,idle,Link,Link,,
3291,application_1680948035106_3036,pyspark,idle,Link,Link,,
3300,application_1680948035106_3045,pyspark,idle,Link,Link,,
3301,application_1680948035106_3046,pyspark,idle,Link,Link,,
3303,application_1680948035106_3047,pyspark,idle,Link,Link,,
3304,application_1680948035106_3049,pyspark,idle,Link,Link,,
3311,application_1680948035106_3056,pyspark,idle,Link,Link,,
3320,application_1680948035106_3064,pyspark,idle,Link,Link,,
3322,application_1680948035106_3066,pyspark,idle,Link,Link,,


In [2]:
%spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3323,application_1680948035106_3067,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%send_to_spark -i username -t str -n username

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'username' as 'username' to Spark kernel

## Loading data

### data reading

In [4]:
orc_file_path = "/data/sbb/part_orc/timetables"
stop_times = spark.read.orc(orc_file_path + "/stop_times")
calendar = spark.read.orc(orc_file_path + "/calendar")
routes = spark.read.orc(orc_file_path + "/routes")
trips = spark.read.orc(orc_file_path + "/trips")
csv_file_path = "/data/sbb/part_csv/timetables"
stops_csv = spark.read.csv(csv_file_path + "/stops", header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### looking at data & pre-pre processing (haolong)

In [5]:
stops_csv.show(5)
stops_number = stops_csv.count()
print("There are " + str(stops_number) + " records when reading data")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+----------------+----------------+-------------+--------------+----+-----+---+
|stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|year|month|day|
+-------+--------------------+----------------+----------------+-------------+--------------+----+-----+---+
|1100008|Zell (Wiesental),...|47.7100842702352|7.85964788274668|         null|          null|2022|   12|  7|
|1100009|Zell (Wiesental),...|47.7131911044794|7.86290876722849|         null|          null|2022|   12|  7|
|1100010|           Atzenbach|47.7146175266411| 7.8723500608659|         null|          null|2022|   12|  7|
|1100011|     Mambach, Brücke|47.7282088873189| 7.8774704579861|         null|          null|2022|   12|  7|
|1100012|  Mambach, Mühlschau|47.7340818684375| 7.8813871126254|         null|          null|2022|   12|  7|
+-------+--------------------+----------------+----------------+-------------+--------------+----+-----+---+
only showing top 5 

For one stop, there exist diffenent dates, they are useless, however pumps up our compuatation load.
I'll just remove the time attributes and only leave one stop for each `stop_id`

In [6]:
# dropping the time attributes
# leaving only one entry for each `stop_id`
stops_csv = stops_csv.drop('year', 'month', 'day')
stops_csv = stops_csv.dropDuplicates(['stop_id'])
print("There are " + str(stops_csv.count()) + " records after removing records that only has different times")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

There are 52277 records after removing records that only has different times

In [7]:
stop_times.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------+-----------+-------------+-----------+-------------+----+-----+---+
|             trip_id|arrival_time|departure_time|    stop_id|stop_sequence|pickup_type|drop_off_type|year|month|day|
+--------------------+------------+--------------+-----------+-------------+-----------+-------------+----+-----+---+
|231.TA.91-9-F-j22...|    06:58:00|      06:58:00|8506206:0:4|            7|          0|            0|2022|   12|  7|
|7041.TA.91-10-A-j...|    19:26:00|      19:26:00|    8500994|           20|          0|            0|2022|   12|  7|
|232.TA.91-9-F-j22...|    08:35:00|      08:35:00|8506200:0:5|            1|          0|            0|2022|   12|  7|
|7041.TA.91-10-A-j...|    19:27:00|      19:27:00|    8500079|           21|          0|            0|2022|   12|  7|
|232.TA.91-9-F-j22...|    08:37:00|      08:37:00|8506201:0:1|            2|          0|            0|2022|   12|  7|
+--------------------+------------+--------------+------

In [8]:
trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----------+--------------------+--------------------+---------------+------------+----+-----+---+
|     route_id|service_id|             trip_id|       trip_headsign|trip_short_name|direction_id|year|month|day|
+-------------+----------+--------------------+--------------------+---------------+------------+----+-----+---+
|91-10-A-j22-1|  TA+8i000|1.TA.91-10-A-j22-...|Oberwil BL, Hüsli...|          51125|           0|2022|   12|  7|
|91-10-A-j22-1|     TA+6V|10.TA.91-10-A-j22...|Oberwil BL, Hüsli...|          20605|           0|2022|   12|  7|
|91-10-A-j22-1|  TA+f6n00|100.TA.91-10-A-j2...|Oberwil BL, Hüsli...|          22991|           0|2022|   12|  7|
|91-10-A-j22-1|     TA+rV|1000.TA.91-10-A-j...|    Dornach, Bahnhof|          51153|           0|2022|   12|  7|
|91-10-A-j22-1|     TA+rV|1001.TA.91-10-A-j...|    Dornach, Bahnhof|          51061|           0|2022|   12|  7|
+-------------+----------+--------------------+--------------------+---------------+------------

In [9]:
calendar.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------+---------+--------+------+--------+------+----------+--------+----+-----+---+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|year|month|day|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+----+-----+---+
|        TA|  TRUE|   TRUE|     TRUE|   FALSE| FALSE|   FALSE|  TRUE|  20221211|20221214|2022|   12|  7|
|      TA#1|  TRUE|   TRUE|     TRUE|    TRUE|  TRUE|    TRUE|  TRUE|  20211212|20221210|2022|   12|  7|
|  TA+00000| FALSE|  FALSE|    FALSE|   FALSE| FALSE|    TRUE| FALSE|  20211212|20221210|2022|   12|  7|
|  TA+00010|  TRUE|   TRUE|     TRUE|   FALSE| FALSE|   FALSE|  TRUE|  20211212|20221210|2022|   12|  7|
|  TA+001c0|  TRUE|   TRUE|     TRUE|    TRUE|  TRUE|   FALSE| FALSE|  20211212|20221210|2022|   12|  7|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+----+-----+---+
only showing top 5 rows

In [10]:
routes.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+---------+----------------+---------------+----------+----------+----+-----+---+
|      route_id|agency_id|route_short_name|route_long_name|route_desc|route_type|year|month|day|
+--------------+---------+----------------+---------------+----------+----------+----+-----+---+
|92-A00-5-j23-1|   80_BOD|               1|               |         B|       700|2022|   12|  7|
|92-A00-5-j22-1|   80_BOD|               1|               |         B|       700|2022|   12|  7|
|92-A00-4-j23-1|   80_BOD|               4|               |         B|       700|2022|   12|  7|
|92-A00-4-j22-1|      839|               1|               |       EXB|       702|2022|   12|  7|
|92-A00-3-j23-1|   80_BOD|               2|               |         B|       700|2022|   12|  7|
+--------------+---------+----------------+---------------+----------+----------+----+-----+---+
only showing top 5 rows

In [11]:
actual_temp = spark.read.load("/data/sbb/part_orc/istdaten", format="orc", sep=";", inferSchema="true", header="true")
#Rename the columns, from German to English
actual_condition = actual_temp.withColumnRenamed("betriebstag", "Date_of_trip")\
                .withColumnRenamed("fahrt_bezeichner", "Trip_id")\
                .withColumnRenamed("betreiber_id", "Operator_id")\
                .withColumnRenamed("betreiber_abk", "Operator_abk")\
                .withColumnRenamed("betreiber_name", "Operator_name")\
                .withColumnRenamed("produkt_id", "Transport_type")\
                .withColumnRenamed("linien_id", "Train_number(train)")\
                .withColumnRenamed("linien_text", "Service type(train)")\
                .withColumnRenamed("umlauf_id", "Circulation_id")\
                .withColumnRenamed("verkehrsmittel_text", "Means_of_transport_text")\
                .withColumnRenamed("zusatzfahrt_tf", "If_additional")\
                .withColumnRenamed("faellt_aus_tf", "If_failed")\
                .withColumnRenamed("bpuic", "Stop_id")\
                .withColumnRenamed("haltestellen_name", "Stop_name")\
                .withColumnRenamed("ankunftszeit", "Arrival_time")\
                .withColumnRenamed("an_prognose", "Actual_arrival_time")\
                .withColumnRenamed("abfahrtszeit", "Departure_time")\
                .withColumnRenamed("ab_prognose", "Actual_departure_time")\
                .withColumnRenamed("durchfahrt_tf", "Not_stop")
actual_condition.printSchema()
actual_condition.show(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Date_of_trip: string (nullable = true)
 |-- Trip_id: string (nullable = true)
 |-- Operator_id: string (nullable = true)
 |-- Operator_abk: string (nullable = true)
 |-- Operator_name: string (nullable = true)
 |-- Transport_type: string (nullable = true)
 |-- Train_number(train): string (nullable = true)
 |-- Service type(train): string (nullable = true)
 |-- Circulation_id: string (nullable = true)
 |-- Means_of_transport_text: string (nullable = true)
 |-- If_additional: string (nullable = true)
 |-- If_failed: string (nullable = true)
 |-- Stop_id: string (nullable = true)
 |-- Stop_name: string (nullable = true)
 |-- Arrival_time: string (nullable = true)
 |-- Actual_arrival_time: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- Departure_time: string (nullable = true)
 |-- Actual_departure_time: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- Not_stop: string (nullable = true)
 |-- year: integer (nullable = 

## Data proprocessing

We filter out all stations that are 15kms away from the given Zurich location.

For distance calculation, refer to [Haversine formula](https://en.wikipedia.org/wiki/Haversine_formula)

In [12]:
from math import sin, cos, sqrt, atan2, radians
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType

@F.udf(returnType=FloatType())

#Implement the Haversine formula. 
def distance_calculation(latitude_1, longitude_1, latitude_2, longitude_2):
    #Use Haversine formula. 
    #The Haversine formula calculates the distance between two points on a sphere 
    #(such as the Earth) based on their latitude and longitude.
    radius_of_Earth = 6371.0 #Earth radius, just refer to the actual data 
    
    #First, Convert latitude and longitude from degrees to radians
    latitude_1 = radians(float(latitude_1))
    latitude_2 = radians(float(latitude_2))
    longitude_1 = radians(float(longitude_1))
    longitude_2 = radians(float(longitude_2))

    ## Haversine formula implementation
    delta_latitude = latitude_2 - latitude_1
    delta_longitude = longitude_2 - longitude_1

    a = cos(latitude_1)*cos(latitude_2)*sin(delta_longitude/2)**2+sin(delta_latitude/2)**2
    c = 2*atan2(sqrt(a), sqrt(1-a))

    distance = radius_of_Earth * c 
    return distance

print("We have " + str(stops_csv.distinct().count()) + " stops before")

stops_in_15 = stops_csv.where(distance_calculation(F.lit(47.378177), F.lit(8.540192), F.col("stop_lat"), F.col("stop_lon")) <=15)

print("There are " + str(stops_in_15.distinct().count()) + " stops that are in 15km radius of Zurich HB")

stops_in_15.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 52277 stops before
There are 2264 stops that are in 15km radius of Zurich HB
+--------+--------------------+----------------+----------------+-------------+--------------+
| stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+--------+--------------------+----------------+----------------+-------------+--------------+
| 8502471|  Kloten, Waldeggweg|47.4411280309022|8.57578482734645|         null|          null|
| 8502508|Spreitenbach, Rai...|47.4163939893986|8.37617917118731|         null|          null|
| 8503078|            Waldburg|47.3454699490061| 8.5930234976511|         null|          null|
| 8503088|       Zürich HB SZU|47.3774340729037|8.53916949636064|         null|      8503088P|
|8503101P|         Küsnacht ZH| 47.319156370147|8.58063572988137|            1|          null|
+--------+--------------------+----------------+----------------+-------------+--------------+
only showing top 5 rows

With the within-15km stops, we want to preprocess the walking time, for this there are two steps:
- Filter out stations that are too far away for walking (>500m)
- Calculate walking time

In [13]:
walking_df = stops_in_15.select(F.col("stop_id").alias("stop_id_1"), F.col("stop_name").alias("stop_name_1"), F.col("stop_lat").alias("stop_lat_1"), F.col("stop_lon").alias("stop_lon_1")) \
    .crossJoin(stops_in_15.select(F.col("stop_id").alias("stop_id_2"), F.col("stop_name").alias("stop_name_2"), F.col("stop_lat").alias("stop_lat_2"),F.col("stop_lon").alias("stop_lon_2"))) \
    .withColumn("distance", distance_calculation(F.col("stop_lat_1"), F.col("stop_lon_1"), F.col("stop_lat_2"), F.col("stop_lon_2"))) \
    .select(F.col("stop_id_1"), F.col("stop_name_1"), F.col("stop_id_2"), F.col("stop_name_2"), F.col("distance")) \
    .filter("distance<=0.5 and distance>0.0")

walking_df = walking_df.withColumn("used_time", walking_df.distance*1200).select("stop_id_1","stop_name_1","stop_id_2","stop_name_2","used_time")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sanity check:

**Note: DONT TRY TO RUN `walking_df.count()`, IT KILLS THE SESSION**

In [14]:
walking_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+---------+--------------------+---------+
|  stop_id_1|         stop_name_1|stop_id_2|         stop_name_2|used_time|
+-----------+--------------------+---------+--------------------+---------+
|    8573729|Bonstetten, Isenbach|  8573729|Bonstetten, Isenbach| 83.76533|
|8503306:0:2|           Dietlikon|  8590541|Dietlikon, Dornen...|528.27905|
|    8502471|  Kloten, Waldeggweg|  8580432|      Kloten, Bramen|441.21054|
|    8506895|      Lufingen, Dorf|  8573228| Lufingen, Unterdorf|401.51526|
|    8589111|Horgen, Gumelenst...| 8502208P|     Horgen Oberdorf|350.84717|
+-----------+--------------------+---------+--------------------+---------+
only showing top 5 rows

According to the requirement of the task, we select only the weekdays.

The weekdays filtering can be done in calendar and then we use service_id to join other dataframes to get the transportation methods in weekdays.

In [25]:
weekdays_only = calendar.where("monday = 1 and tuesday = 1 and wednesday = 1 and thursday = 1 and friday = 1").select('service_id').distinct()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We then implement all the joins using primary keys

In [26]:
weekday_trips = trips.join(weekdays_only, "service_id")
weekday_trips = weekday_trips.join(routes.select("route_id", "route_desc"), "route_id")

weekday_trips.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o500.showString.
: java.lang.OutOfMemoryError: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:122)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withExecutionId$1.apply(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.exchange.Broadc

In [22]:
weekday_stop_times = stop_times.join(weekday_trips, "trip_id")

weekday_stop_times = weekday_stop_times.drop("pickup_type", "drop_off_type")

weekday_stop_times.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
An error occurred while calling o453.showString.
: java.lang.OutOfMemoryError: GC overhead limit exceeded

Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 381, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/cloudera/parcels/CDH-7.1.8-1.cdh7.1.8.p0.30990532/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o453.showString.
:

In [23]:
weekday_all_info = weekday_stop_times.join(stops_in_15, "stop_id")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…