### Dream Team:  Xingce Bao, Sohyeong Kim, Guilio Masinelli, Silvio Zanoli


# PART II. Data Processing

In part II, we are computing mean and variance for all trips with same journey number within a same day(monday, tuesday, ..., sunday). Since the traffic volume is different for each day we compute mean and variance considering the days of the week. The computation is done using sparks and since **the computation takes about more than 2hours for each day**, we have saved the computed data as csv-format so that we can simply import and use these data in the next part. 

We also make a list of stations where direct transfer is possible, for example, if the station A have more than two lines of  bus or tram, it is considered as a station where direct transfer is possible. 

#### WARNING: This part takes much time to run, so please do not run unless it is necessary. 

In [2]:
import getpass
import pyspark
from pyspark.sql import SparkSession
import os
import pandas as pd
import numpy as np
from datetime import timedelta

username = getpass.getuser()

spark = (SparkSession
        .builder
        .appName('streaming-{0}'.format(username))
        .master('local[8]') # this number must be greater than the number of sources
        .config('spark.executor.memory', '12g')
        .config('spark.executor.cores', '6')
        .getOrCreate())



sc = spark.sparkContext
conf = sc.getConf()

spark

## Calculate mean and variance of the trip

We first read the data that are preprocessed and already separted into 7 days. In this code, we are only showing the output of the processing data from 'monday' but in reality this part should be **run 7 times in total, one for each day**. The imported data has in total 13 columns and the data field is defined as:
1. `datetime`: date of the travel (ex. 15.06.2017)
2. `unique_id`: concatenation of the transport_id + station_id
3. `line_id`: concatenation of the transport_type+train_number+train_type
4. `transport_id` : transport journey which are unique for each journey
5. `transport_type`: Bus, Tram, Zug
6. `train_number`:  train number(ex. 1513) for Zug, operation number(ex. 85:872:3) for others
7. `train_type`:  train type(ex. IR, IC) for train, line number(ex. 3, 700) for others
8. `station_name`: name of the station
9. `station_id`: BPUIC number of the station
10. `arriving_time_scheduled`: the fixed schdule of arriving time of the train
11. `arriving_time_actual`: the real arrving time of the train
12. `departing_time_scheduled`: the fixed schdule of departing time of the train
13. `departing_time_actual`: the real departing time of the train

In [4]:
# First we read the data we already processed
data_rdd_monday = sc.textFile("./data/monday")
data_rdd_monday = data_rdd_monday.map(lambda x : x.split("\t"))

#data_rdd_tuesday = sc.textFile("./data/tuesday")
#data_rdd_tuesday = data_rdd_tuesday.map(lambda x : x.split("\t"))

#data_rdd_wednesday = sc.textFile("./data/wednesday")
#data_rdd_wednesday = data_rdd_wednesday.map(lambda x : x.split("\t"))

#data_rdd_thursday = sc.textFile("./data/thursday")
#data_rdd_thursday = data_rdd_thursday.map(lambda x : x.split("\t"))

#data_rdd_friday = sc.textFile("./data/friday")
#data_rdd_friday = data_rdd_friday.map(lambda x : x.split("\t"))

#data_rdd_saturday = sc.textFile("./data/saturday")
#data_rdd_saturday = data_rdd_saturday.map(lambda x : x.split("\t"))

#data_rdd_sunday = sc.textFile("./data/sunday")
#data_rdd_sunday = data_rdd_sunday.map(lambda x : x.split("\t"))

In [5]:
# See the first item in RDD
data_rdd_monday.first()
# data_rdd_tuesday.first()
# data_rdd_wednesday.first()
# data_rdd_thursday.first()
# data_rdd_friday.first()
# data_rdd_saturday.first()
# data_rdd_sunday.first()

['02.04.2018',
 '85:838:226851-05961-1:8587965',
 'Bus:85:838:961:961',
 '85:838:226851-05961-1',
 'Bus',
 '85:838:961',
 '961',
 'Erlenbach ZH, Bahnhof',
 '8587965',
 '',
 '',
 '02.04.2018 07:33',
 '02.04.2018 07:33:00']

We have notices that for some data they are missing actual arriving/departing time. These data make variances to explode. Therefore, we have filtered them out from our dataset. 

In [10]:
# Filter out the unwanted data
data_rdd_monday = data_rdd_monday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
data_rdd_monday.first()

#data_rdd_tuesday = data_rdd_tuesday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_tuesday.first()

#data_rdd_wednesday = data_rdd_wednesday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_wednesday.first()

#data_rdd_thursday = data_rdd_thursday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_thursday.first()

#data_rdd_friday = data_rdd_friday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_friday.first()

#data_rdd_saturday = data_rdd_saturday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_saturday.first()

#data_rdd_sunday = data_rdd_sunday.filter(lambda x: not((x[12] == "" and x[11] != "") | (x[10] == "" and x[9] != "" )))
#data_rdd_sunday.first()

['02.04.2018',
 '85:838:226851-05961-1:8587965',
 'Bus:85:838:961:961',
 '85:838:226851-05961-1',
 'Bus',
 '85:838:961',
 '961',
 'Erlenbach ZH, Bahnhof',
 '8587965',
 '',
 '',
 '02.04.2018 07:33',
 '02.04.2018 07:33:00']

For computation ease, we add columns which gives us the arriving/departure times in seconds for each day. We consider at '00:00' everyday as zero second and count the seconds upto the given time. For example, '02.04.2018 07:33' will be considered as '27180'seconds. We are going to call this times in second as **offset**. When there are not exists arriving/departing time(when the data is start of the travel or end of the travel), it is set to -1. 

In [15]:
# Add columns of time as a seconds of a day

# Arriving time scheduled in seconds
data_rdd_monday = data_rdd_monday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
# Arriving time actual in seconds
data_rdd_monday = data_rdd_monday.map(lambda x: x + [-1 if x[10]==""\
                        else\
                        (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\
                            (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\
                                (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))))
                        )])
# Departing time scheduled in seconds
data_rdd_monday = data_rdd_monday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
# Departing time actual in seconds
data_rdd_monday = data_rdd_monday.map(lambda x: x + [-1 if x[12]==""\
                        else\
                        (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\
                            (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\
                                (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))))
                        )])

# See the first item in RDD
data_rdd_monday.first()

['02.04.2018',
 '85:838:226851-05961-1:8587965',
 'Bus:85:838:961:961',
 '85:838:226851-05961-1',
 'Bus',
 '85:838:961',
 '961',
 'Erlenbach ZH, Bahnhof',
 '8587965',
 '',
 '',
 '02.04.2018 07:33',
 '02.04.2018 07:33:00',
 -1,
 -1,
 27180,
 27180]

In [None]:
# Do the same for the other days
'''
data_rdd_tuesday = data_rdd_tuesday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_tuesday = data_rdd_tuesday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_tuesday = data_rdd_tuesday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_tuesday = data_rdd_tuesday.map(lambda x: x + [-1 if x[12]==""\
                        else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_tuesday.first()


data_rdd_wednesday = data_rdd_wednesday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_wednesday = data_rdd_wednesday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_wednesday = data_rdd_wednesday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_wednesday = data_rdd_wednesday.map(lambda x: x + [-1 if x[12]==""\
                       else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_wednesday.first()


data_rdd_thursday = data_rdd_thursday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_thursday = data_rdd_thursday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_thursday = data_rdd_thursday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_thursday = data_rdd_thursday.map(lambda x: x + [-1 if x[12]==""\
                       else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_thursday.first()

data_rdd_friday = data_rdd_friday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_friday = data_rdd_friday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_friday = data_rdd_friday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_friday = data_rdd_friday.map(lambda x: x + [-1 if x[12]==""\
                       else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_friday.first()

data_rdd_saturday = data_rdd_saturday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_saturday = data_rdd_saturday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_saturday = data_rdd_saturday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_saturday = data_rdd_saturday.map(lambda x: x + [-1 if x[12]==""\
                       else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_saturday.first()

data_rdd_sunday = data_rdd_sunday.map(lambda x: x + [-1 if x[9]=="" else int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600))])
data_rdd_sunday = data_rdd_sunday.map(lambda x: x + [-1 if x[10]==""\
                        else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[10],dayfirst=True).day==pd.to_datetime(x[9],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[10],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[10],dayfirst=True).day>pd.to_datetime(x[9],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[9],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_sunday = data_rdd_sunday.map(lambda x: x + [-1 if x[11]=="" else int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600))])
data_rdd_sunday = data_rdd_sunday.map(lambda x: x + [-1 if x[12]==""\
                       else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) if pd.to_datetime(x[12],dayfirst=True).day==pd.to_datetime(x[11],dayfirst=True).day
                            else\ (int(pd.to_datetime(x[12],dayfirst=True).timestamp()%(24*3600)) + 86400 if pd.to_datetime(x[12],dayfirst=True).day>pd.to_datetime(x[11],dayfirst=True).day
                                else\ (int(pd.to_datetime(x[11],dayfirst=True).timestamp()%(24*3600)))))])
data_rdd_sunday.first()
'''

Now, we struct our data into a dataframe to use for the next steps.

In [20]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType

# Define a schema for our data structure
schema = StructType([
    StructField("datetime", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("line_id", StringType(), True),
    StructField("train_number", StringType(), True),
    StructField("transport_type", StringType(), True),  
    StructField("original_train_number", StringType(), True),
    StructField("train_type", StringType(), True),
    StructField("station_name", StringType(), True),
    StructField("station_id", StringType(), True), 
    StructField("arriving_time_scheduled", StringType(), True),
    StructField("arriving_time_actual", StringType(), True),
    StructField("departing_time_scheduled", StringType(), True),
    StructField("departing_time_actual", StringType(), True),
    StructField("arrivaltimeoffsetschedule", IntegerType(), True),
    StructField("arrivaltimeoffset", IntegerType(), True),
    StructField("departuretimeoffsetschedule", IntegerType(), True),
    StructField("departuretimeoffset", IntegerType(), True)])

In [21]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

data_df_monday = sqlContext.createDataFrame(data_rdd_monday, schema)
#data_df_tuesday = sqlContext.createDataFrame(data_rdd_tuesday, schema)
#data_df_wednesday = sqlContext.createDataFrame(data_rdd_wednesday, schema)
#data_df_thursday = sqlContext.createDataFrame(data_rdd_thursday, schema)
#data_df_friday = sqlContext.createDataFrame(data_rdd_friday, schema)
#data_df_saturday = sqlContext.createDataFrame(data_rdd_saturday, schema)
#data_df_sunday = sqlContext.createDataFrame(data_rdd_sunday, schema)

In [23]:
# Convert the data into Pandas Dataframe
data_df_monday.limit(5).toPandas()
#data_df_tuesday.limit(5).toPandas()
#data_df_wednesday.limit(5).toPandas()
#data_df_thursday.limit(5).toPandas()
#data_df_friday.limit(5).toPandas()
#data_df_saturday.limit(5).toPandas()
#data_df_sunday.limit(5).toPandas()

Unnamed: 0,datetime,unique_id,line_id,train_number,transport_type,original_train_number,train_type,station_name,station_id,arriving_time_scheduled,arriving_time_actual,departing_time_scheduled,departing_time_actual,arrivaltimeoffsetschedule,arrivaltimeoffset,departuretimeoffsetschedule,departuretimeoffset
0,02.04.2018,85:838:226851-05961-1:8587965,Bus:85:838:961:961,85:838:226851-05961-1,Bus,85:838:961,961,"Erlenbach ZH, Bahnhof",8587965,,,02.04.2018 07:33,02.04.2018 07:33:00,-1,-1,27180,27180
1,02.04.2018,85:838:226851-05961-1:8587971,Bus:85:838:961:961,85:838:226851-05961-1,Bus,85:838:961,961,"Erlenbach ZH, Im Loo",8587971,02.04.2018 07:34,02.04.2018 07:34:36,02.04.2018 07:34,02.04.2018 07:34:36,27240,27276,27240,27276
2,02.04.2018,85:838:226851-05961-1:8587964,Bus:85:838:961:961,85:838:226851-05961-1,Bus,85:838:961,961,"Erlenbach ZH, Alterswohnheim",8587964,02.04.2018 07:34,02.04.2018 07:35:18,02.04.2018 07:34,02.04.2018 07:35:24,27240,27318,27240,27324
3,02.04.2018,85:838:226851-05961-1:8587973,Bus:85:838:961:961,85:838:226851-05961-1,Bus,85:838:961,961,"Erlenbach ZH, Im Vogelsang",8587973,02.04.2018 07:35,02.04.2018 07:36:12,02.04.2018 07:35,02.04.2018 07:36:12,27300,27372,27300,27372
4,02.04.2018,85:838:226851-05961-1:8587976,Bus:85:838:961:961,85:838:226851-05961-1,Bus,85:838:961,961,"Erlenbach ZH, Rankstrasse",8587976,02.04.2018 07:36,02.04.2018 07:37:00,02.04.2018 07:36,02.04.2018 07:37:06,27360,27420,27360,27426


Here, we are computing mean an variance of each journey(both real arrivial time and real departure time) per day and this data gives the distribution of the train schedule data. We assume that the public transport data are follwing the **gamma distribution** and also compute a shape parameter k and a scale parameter θ.

There are some data having huge variance over 16 Million(4000 seconds $\simeq$ 66.7 minutes). When the variance of the data is exploded like this, there is no way that we will choose this journey when finding the route. Therefore, we simply filtered out the data having huge variance over 16 Million. 

In [29]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col

In [30]:
# Compute the mean and variance for each journey for each day and add them as additional columns
data_mean_variance_monday = data_df_monday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

# Do the same thing for the other days
'''
data_mean_variance_tuesday = data_df_tuesday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

data_mean_variance_wednesday = data_df_wednesday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

data_mean_variance_thursday = data_df_thursday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

data_mean_variance_friday = data_df_friday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

data_mean_variance_saturday = data_df_saturday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

data_mean_variance_sunday = data_df_sunday.groupBy('train_number','station_id','line_id').agg(        
    F.round(F.mean("arrivaltimeoffset"),0).alias("avg(arrivaltimeoffset)"),
    F.round(F.mean("departuretimeoffset"),0).alias("avg(departuretimeoffset)"),
    F.round(F.variance("arrivaltimeoffset"),0).alias("var(arrivaltimeoffset)"), 
    F.round(F.variance("departuretimeoffset"),0).alias("var(departuretimeoffset)")
).cache()

'''

In [35]:
# Check the first 5 items in the table
data_mean_variance_monday.show(5)
#data_mean_variance_tuesday.show(5)
#data_mean_variance_wednesday.show(5)
#data_mean_variance_thursday.show(5)
#data_mean_variance_friday.show(5)
#data_mean_variance_saturday.show(5)
#data_mean_variance_sunday.show(5)

+--------------------+----------+------------------+----------------------+------------------------+----------------------+------------------------+
|        train_number|station_id|           line_id|avg(arrivaltimeoffset)|avg(departuretimeoffset)|var(arrivaltimeoffset)|var(departuretimeoffset)|
+--------------------+----------+------------------+----------------------+------------------------+----------------------+------------------------+
|85:838:226864-059...|   8587964|Bus:85:838:961:961|               42631.0|                 42631.0|                 497.0|                   497.0|
|85:849:206591-370...|   8591367| Bus:85:849:031:31|               76097.0|                 76115.0|                   NaN|                     NaN|
|85:849:329913-370...|   8591186| Bus:85:849:031:31|                  -1.0|                 17292.0|                   NaN|                     NaN|
|85:849:448172-370...|   8591261| Bus:85:849:031:31|               22174.0|                 22186.0|      

In [40]:
# Filter out the data having extremely huge variance. 
data_mean_variance_monday = data_mean_variance_monday.where(col("var(departuretimeoffset)")<16e6)
data_mean_variance_monday.show(5)

#data_mean_variance_tuesday = data_mean_variance_tuesday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_tuesday.show(5)

#data_mean_variance_wednesday = data_mean_variance_wednesday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_wednesday.show(5)

#data_mean_variance_thursday = data_mean_variance_thursday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_thursday.show(5)

#data_mean_variance_friday = data_mean_variance_friday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_friday.show(5)

#data_mean_variance_saturday = data_mean_variance_saturday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_saturday.show(5)

#data_mean_variance_sunday = data_mean_variance_sunday.where(col("var(departuretimeoffset)")<16e6)
#data_mean_variance_sunday.show(5)

+--------------------+----------+------------------+----------------------+------------------------+----------------------+------------------------+
|        train_number|station_id|           line_id|avg(arrivaltimeoffset)|avg(departuretimeoffset)|var(arrivaltimeoffset)|var(departuretimeoffset)|
+--------------------+----------+------------------+----------------------+------------------------+----------------------+------------------------+
|85:838:226864-059...|   8587964|Bus:85:838:961:961|               42631.0|                 42631.0|                 497.0|                   497.0|
|85:849:144645-050...|   8591307| Bus:85:849:032:32|               61681.0|                 61693.0|                  19.0|                    19.0|
|85:849:144669-050...|   8591291| Bus:85:849:032:32|               46964.0|                 46976.0|                 800.0|                   800.0|
|85:849:144669-050...|   8591086| Bus:85:849:032:32|               47403.0|                 47415.0|      

In [45]:
# Convert the SQL query to pandas and add columns of k and theta for both arriving and departing time distribution.

data_mean_variance_monday = data_mean_variance_monday.toPandas()
data_mean_variance_monday['arrival_theta'] = data_mean_variance_monday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_monday['departure_theta'] = data_mean_variance_monday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_monday['arrival_k'] = data_mean_variance_monday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_monday['departure_k'] = data_mean_variance_monday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)

data_mean_variance_monday.head(5)

# Do the same for other days 
'''
data_mean_variance_tuesday = data_mean_variance_tuesday.toPandas()
data_mean_variance_tuesday['arrival_theta'] = data_mean_variance_tuesday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_tuesday['departure_theta'] = data_mean_variance_tuesday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_tuesday['arrival_k'] = data_mean_variance_tuesday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_tuesday['departure_k'] = data_mean_variance_tuesday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_tuesday.head(5)

data_mean_variance_wednesday = data_mean_variance_wednesday.toPandas()
data_mean_variance_wednesday['arrival_theta'] = data_mean_variance_wednesday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_wednesday['departure_theta'] = data_mean_variance_wednesday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_wednesday['arrival_k'] = data_mean_variance_wednesday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_wednesday['departure_k'] = data_mean_variance_wednesday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_wednesday.head(5)

data_mean_variance_thursday = data_mean_variance_thursday.toPandas()
data_mean_variance_thursday['arrival_theta'] = data_mean_variance_thursday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_thursday['departure_theta'] = data_mean_variance_thursday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_thursday['arrival_k'] = data_mean_variance_thursday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_thursday['departure_k'] = data_mean_variance_thursday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_thursday.head(5)

data_mean_variance_friday = data_mean_variance_friday.toPandas()
data_mean_variance_friday['arrival_theta'] = data_mean_variance_friday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_friday['departure_theta'] = data_mean_variance_friday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_friday['arrival_k'] = data_mean_variance_friday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_friday['departure_k'] = data_mean_variance_friday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_friday.head(5)

data_mean_variance_saturday = data_mean_variance_saturday.toPandas()
data_mean_variance_saturday['arrival_theta'] = data_mean_variance_saturday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_saturday['departure_theta'] = data_mean_variance_saturday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_saturday['arrival_k'] = data_mean_variance_saturday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_saturday['departure_k'] = data_mean_variance_saturday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_saturday.head(5)

data_mean_variance_sunday = data_mean_variance_sunday.toPandas()
data_mean_variance_sunday['arrival_theta'] = data_mean_variance_sunday[['var(arrivaltimeoffset)','avg(arrivaltimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_sunday['departure_theta'] = data_mean_variance_sunday[['var(departuretimeoffset)','avg(departuretimeoffset)']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_sunday['arrival_k'] = data_mean_variance_sunday[['avg(arrivaltimeoffset)','arrival_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_sunday['departure_k'] = data_mean_variance_sunday[['avg(departuretimeoffset)','departure_theta']].apply(lambda x: -1 if x[1]==0 or np.isnan(x[0]) else x[0]/x[1], axis=1)
data_mean_variance_sunday.head(5)

'''

Unnamed: 0,train_number,station_id,line_id,avg(arrivaltimeoffset),avg(departuretimeoffset),var(arrivaltimeoffset),var(departuretimeoffset),arrival_theta,departure_theta,arrival_k,departure_k
0,85:11:13794:001,8503020,Zug:13794:SN9,11504.0,11576.0,6272.0,6672.0,0.545202,0.576365,21100.45,20084.5
1,85:11:18:004,8503000,Zug:18:EC,60738.0,-1.0,6407.0,0.0,0.105486,-0.0,575792.8,-1.0
2,85:11:18257:001,8503000,Zug:18257:S2,54977.0,55082.0,3447.0,2507.0,0.062699,0.045514,876840.9,1210222.0
3,85:11:18272:002,8503202,Zug:18272:S2,68281.0,68386.0,278.0,139.0,0.004071,0.002033,16770850.0,33644930.0
4,85:11:18319:001,8503000,Zug:18319:S3,-1.0,20037.0,0.0,1183.0,-0.0,0.059041,-1.0,339375.6


Finally, we save this processed data in csv-format so that we don't have to compute them again when we are building the routing algorithm. 

In [47]:
# Save the processed data into csv format (REAL TIME DATA)
data_mean_variance_monday.to_csv('monday_processed.csv',index = None)
#data_mean_variance_tuesday.to_csv('tuesday_processed.csv',index = None)
#data_mean_variance_wednesday.to_csv('wednesday_processed.csv',index = None)
#data_mean_variance_thursday.to_csv('thursday_processed.csv',index = None)
#data_mean_variance_friday.to_csv('friday_processed.csv',index = None)
#data_mean_variance_saturday.to_csv('saturday_processed.csv',index = None)
#data_mean_variance_sunday.to_csv('sunday_processed.csv',index = None)

Here, we also save the schedule of the travels in separate files. 

In [50]:
# Select the columns of train_number, station_id, line_id and arrivialtimeoffsetschedule and departuretimeoffsetschedule
data_schedule_monday = data_df_monday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()

#data_schedule_tuesday = data_df_tuesday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()
#data_schedule_wednesday = data_df_wednesday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()
#data_schedule_thursday = data_df_thursday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()
#data_schedule_friday = data_df_friday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()
#data_schedule_saturday = data_df_saturday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()
#data_schedule_sunday = data_df_sunday.select('train_number','station_id','line_id','arrivaltimeoffsetschedule','departuretimeoffsetschedule').distinct().cache()

In [55]:
# Show the example of the only schedule data
data_schedule_monday.show(5)
#data_schedule_tuesday.show(5)
#data_schedule_wednesday.show(5)
#data_schedule_thursday.show(5)
#data_schedule_friday.show(5)
#data_schedule_saturday.show(5)
#data_schedule_sunday.show(5)

+--------------------+----------+-------------------+-------------------------+---------------------------+
|        train_number|station_id|            line_id|arrivaltimeoffsetschedule|departuretimeoffsetschedule|
+--------------------+----------+-------------------+-------------------------+---------------------------+
|     85:11:18540:001|   8502222|       Zug:18540:S5|                    41340|                      41340|
|     85:11:19454:001|   8502229|      Zug:19454:S14|                    54000|                      54000|
|85:849:66828-05067-1|   8502572|  Bus:85:849:067:67|                    28260|                      28260|
|85:3849:84063-020...|   8502572|Tram:85:3849:014:14|                    30120|                      30180|
|85:3849:83959-020...|   8502572|Tram:85:3849:014:14|                    33720|                      33720|
+--------------------+----------+-------------------+-------------------------+---------------------------+
only showing top 5 rows



In [60]:
# Save the schedule data into csv-format (SCHEDULE TIME DATA)
data_schedule_monday.toPandas().to_csv('monday_schedule.csv',index = None)
#data_schedule_tuesday.toPandas().to_csv('tuesday_schedule.csv',index = None)
#data_schedule_wednesday.toPandas().to_csv('wednesday_schedule.csv',index = None)
#data_schedule_thursday.toPandas().to_csv('thursday_schedule.csv',index = None)
#data_schedule_friday.toPandas().to_csv('friday_schedule.csv',index = None)
#data_schedule_saturday.toPandas().to_csv('saturday_schedule.csv',index = None)
#data_schedule_sunday.toPandas().to_csv('sunday_schedule.csv',index = None)

### ----   Calculating mean and variance is finished at this point. 
**(If you don't have processed data, you have to run this part after uncommenting all the lines for all days. The estimated running time to process the all data is around 16 hours)**

## Find a stations where direct transfer is possible

Using the transfer_station data created from the Part I, we sort out the station where we can make a direct transfer. It is same as finding stations where there are more than two lines which is equvalent of having more than one `transfer_count`. The sorted list is then save as a pickle. 

In [63]:
import pickle

# First we read the data we already processed
transfer_stations = sc.textFile("./data/transfer_stations")
transfer_stations = transfer_stations.map(lambda x : x.split("\t"))

# Choose the station which has more than one line of the transportation
transfer_stations = transfer_stations.filter(lambda x : int(x[1]) > 1)
transfer_station_list = [int(i[0]) for i in transfer_stations.collect()]

# Save it as pickle
pickle.dump(transfer_station_list, open( "transfer.p", "wb" ) )