In [None]:
%%local
import os
import pickle
import math
import pandas as pd
import numpy as np
pd.set_option("display.max_columns", 50)

import matplotlib.pyplot as plt
%matplotlib inline

import plotly.express as px
import plotly.graph_objects as go


In [None]:
%%local
from IPython import get_ipython
ipython = get_ipython()

root_data = "../data/"
username = os.environ['JUPYTERHUB_USER']

ipython.run_cell_magic('configure','-f','{{ "name":"final-{0}" }}'.format(username))
print("Runnuing as user: ", username)

In [None]:
spark

In [None]:
import math
import pyspark.sql.functions as SFunc
import pyspark.sql.functions as F
from pyspark.sql import Window

#### Note that you can skip part I if you do not whish to rederive all the data from scratch!

## I) Data Import & Wrangling:

In [None]:
%%local
hiveaddr = os.environ['HIVE_SERVER_2']
print("Operating as: {0}".format(username))
print("Operating on hiveaddr: {0}".format(hiveaddr))

In [None]:
%%local
from pyhive import hive

# create connection
conn = hive.connect(host=hiveaddr, 
                    port=10000,
                    username=username) 
# create cursor
cur = conn.cursor()

#### A. First of all, if you have never done so, prepare the required orc tables in your personnal HDFS by running the notebook `PrepareTablesHDFS.ipynb`

#### B. Processing the geostops

We need the geostops both to pre-process the SBB istdaten and the timetables. As the geostops is quite small, we will simply process it once in a pandas dataframe for SBB istdaten usage and once in a spark Dataframe to use on the cluster.  
Note that alternatively we could store it on HDFS to prevent processing the dataset twice but given it's size and low complexity we decided not to. 

#######TODO: Ok or cleaner to change it again and create a HDFS table ?

##### Import all the geostops from the previously created orc table in a local pandas dataframe:

In [None]:
%%local
query = """
    select STATIONID as id, REMARK as name, LATITUDE as lat, LONGITUDE as lon
    from {0}.sbb_geostops
""".format(username)
geostops_df = pd.read_sql(query, conn)

In [None]:
%%local
geostops_df.head(5)

In [None]:
%%local
data_types_dict = {'id': str, 'name': str, 'lat': float, 'lon': float}
geostops_df = geostops_df.astype(data_types_dict)

geostops_df.info(memory_usage="deep")

##### Now we will filter this dataframe to only keep the stops that are within the studied 15km around ZurichHB area: 

We deal here with a short distance (15km) and our accuracy doesn't have to be exact to the centimeter, so we can treat the surface of the earth as flat.
So to perform our check we can just make a conversion from degrees to kilometers at the latitude of the center point, then Pythagore's theorem to get the distance.

We could also use methods offered by libraries such as geopy / geo-py but this adds unnecessary complexity and additional library to the project.

In [None]:
%%local
# Some constants to determine points within 15km from Zürich HB based on their (lat,lon) coordinates
earth_radius = 6378.0
zurich_avg_altitude = 0.430
earth_circumference = 40075.0

def distance(lat1, lon1, lat2, lon2, earth_circumference=earth_circumference):
    """
    Computes the euclidean distance between two given points given their latitude and longitude coordinates
    Code inspiration: https://stackoverflow.com/questions/24680247/check-if-a-latitude-and-longitude-is-within-a-circle-google-maps
    """
    km_per_degree_lat = earth_circumference / 360.0
    km_per_degree_lon = math.cos(math.pi * lat2 / 180.0) * km_per_degree_lat
    dx = abs(lon2 - lon1) * km_per_degree_lon
    dy = abs(lat2 - lat1) * km_per_degree_lat
    return math.sqrt(dx*dx + dy*dy)

def dist_from_center(lat_lon_row, central_lat=47.378177, central_lon=8.540192,earth_circumference=earth_circumference):
    """
    Returns wether the distance of the given point (lat, long) from the central point (ZurichHB)
    """
    return distance(lat_lon_row.lat, lat_lon_row.lon, central_lat, central_lon,earth_circumference)

In [None]:
%%local
max_dist=15.0
geostops_df['center_dist'] = geostops_df.apply(dist_from_center, axis=1)
zurich_geostops_df = geostops_df[geostops_df.center_dist <= max_dist]

In [None]:
%%local
zurich_geostops_df.head()

In [None]:
%%local
zurich_geostops_df.info(memory_usage="deep")

We can see that by considering only stops within Zürich area, we keep 1947 stops over the total 39026.

In [None]:
%%local
unique_stop_ids = len(set(zurich_geostops_df.id.tolist()))
unique_stop_names = len(set(zurich_geostops_df.name.tolist()))

print("Also remark that in those stops even so all %s stops have distinct Id, only %s have distinct names." %(unique_stop_ids, unique_stop_names))

In [None]:
%%local
# convert and save the dataframe to pickle
pickle.dump(zurich_geostops_df, open(root_data+"zurich_geostops_df.pickle", "wb"))

##### Similar procedure to create Spark Dataframe for timetable processing usage:

In [None]:
earth_circumference = 40075.0

@SFunc.udf
def distance(lat1, lon1, lat2=47.378177, lon2=8.540192, earth_circumference=earth_circumference):
    """
    Computes the euclidean distance between two given points given their latitude and longitude coordinates
    Code inspiration: https://stackoverflow.com/questions/24680247/check-if-a-latitude-and-longitude-is-within-a-circle-google-maps
    """
    km_per_degree_lat = earth_circumference / 360.0
    km_per_degree_lon = math.cos(math.pi * lat2 / 180.0) * km_per_degree_lat
    dx = abs(lon2 - lon1) * km_per_degree_lon
    dy = abs(lat2 - lat1) * km_per_degree_lat
    return math.sqrt(dx*dx + dy*dy)

In [None]:
max_dist=15.0

geostops = spark.read.orc("/data/sbb/orc/geostops")
geostops = geostops.withColumn('distance', distance(geostops['stop_lat'], geostops['stop_lon'])).filter(SFunc.col('distance') <= max_dist)
geostops = geostops.drop('location_type', 'parent_station')
geostops.show(5)

#### C. Processing the timetables data:

We will create a dataframe corresponding only to the schedules on May 13-17, 2019. As this is a typical week schedule, we will use it as our base timetable:

In [None]:
timetable = spark.read.csv("/data/sbb/csv/timetable/stop_times/2019/05/07/stop_times.csv", header=True, encoding='utf8')
timetable.show(5)

Fist of all we noted that most of those trips have duration under 1min (which makes sense for all bus stops close in location):

In [None]:
trips_count = timetable.count()
long_trips_count = timetable[timetable.departure_time != timetable.arrival_time].count()
print("Over the {0} trips, only {1} have duration higher than a minute.".format(trips_count, long_trips_count))

##### Let's now only keep the trips that were made within our area of interest:

In [None]:
#list of all stop_id that are in Zurich
zurich_stops = set([str(stop.stop_id) for stop in geostops.select('stop_id').collect()])


#filter the timetable to only contains Stops that are in Zurich
zurich_timetable=timetable.filter(F.col('stop_id').isin(zurich_stops))
zurich_timetable.show(5)

In [None]:
from pyspark.sql import Window

import pyspark.sql.functions as F

In [None]:
#partition by trip_id order by arrival_time and get the next stop_id and next arrival_time

rolling_pair_window=Window.partitionBy("trip_id").orderBy("arrival_time")

next_arrival=F.lead("arrival_time").over(rolling_pair_window).alias("arr_time")
next_stop_id=F.lead("stop_id").over(rolling_pair_window).alias("arr_stop")

connections=zurich_timetable.select("trip_id","departure_time","stop_id",next_arrival,next_stop_id)\
                            .na.drop("any")\
                            .withColumnRenamed("stop_id","dep_stop")\
                            .withColumnRenamed("departure_time","dep_time")


connections.show(5)

In [None]:
max_foot_distance=0.5 # in km
walking_speed_kmPerMin=0.05 # in km/min

footpaths=geostops.alias('l').join(geostops.alias('r'))\
                .where('abs(r.distance- l.distance)<{0} and l.stop_id<>r.stop_id'.format(max_foot_distance))\
                .select(F.col('l.stop_id').alias('dep_stop'),
                        F.col('r.stop_id').alias('arr_stop'),
                        distance(F.col('l.stop_lat'),F.col('l.stop_lon'),F.col('r.stop_lat'),F.col('r.stop_lon')).alias('distance'))\
                .where('distance<{0}'.format(max_foot_distance))\
                .select('dep_stop',
                        'arr_stop',
                        (F.col('distance')/walking_speed_kmPerMin).alias('dur'))

#duration is in minute
footpaths.show(5)

###############################   
This is to discuss but we could imagine further filtering the kind of trips we want to keep (ex: only keep trips within 2am and 11.30pm / ...)
###############################  

##### Create all the connections in our dataframe

In [None]:
#TODO: use Spark Windows ? 

##### Order them by departing time for CSA

#### D. Processing the required istDaten SBB data:

In order to train our model, we will first make an external table only containing all the journeys that:
- Are between two stations within 15km of Zurich main train station ('Zürich HB (8503000)', lat=47.378177, lon=8.540192)
- AN_PROGNOSE_STATUS and AB_PROGNOSE_STATUS equal to REAL or GESCHAETZT
- Standard date of trip format
- Non empty product id

In [None]:
%%local
within_15_stop_stations = tuple(set(zurich_geostops_df.name.tolist()))

query = """
    drop table if exists {0}.zurich_istdaten
""".format(username)
cur.execute(query)

query = """
    create external table {0}.zurich_istdaten
    as
    select FAHRT_BEZEICHNER as trip_id, lower(PRODUKT_ID) as ttype, LINIEN_ID as train_nb, FAELLT_AUS_TF as trip_failed, DURCHFAHRT_TF as no_stop,
    HALTESTELLEN_NAME as stop_name, ZUSATZFAHRT_TF as unplanned_trip, LINIEN_TEXT as linien, VERKEHRSMITTEL_TEXT as verkehrsmittel,
    unix_timestamp(ANKUNFTSZEIT, 'dd.MM.yyyy HH:mm') as expected_ar, unix_timestamp(AN_PROGNOSE,'dd.MM.yyyy hh:mm:ss') as actual_ar,
    unix_timestamp(ABFAHRTSZEIT, 'dd.MM.yyyy HH:mm') as expected_dep, unix_timestamp(AB_PROGNOSE,'dd.MM.yyyy hh:mm:ss') as actual_dep
    from {0}.sbb_orc
    where BETRIEBSTAG like '__.__.____' and PRODUKT_ID is not NULL and PRODUKT_ID <> ''
    and AN_PROGNOSE_STATUS in ('REAL', 'GESCHAETZT')
    and AB_PROGNOSE_STATUS in ('REAL', 'GESCHAETZT')
    and HALTESTELLEN_NAME in {1}
""".format(username, within_15_stop_stations)
cur.execute(query)

In [None]:
%%local
query = """
    select *, floor((actual_ar-expected_ar)/(12))
    from {0}.zurich_istdaten
    where ttype = 'bus' and extract(hour from FROM_UNIXTIME(expected_ar)) = 12 and ((floor(expected_ar/86400) + 4) % 7+1) = 1
    limit 5
""".format(username)
tr_sbb_df = pd.read_sql(query, conn)

In [None]:
%%local
print(tr_sbb_df.columns)
tr_sbb_df.head()

In [None]:
## Can either do some more preprocessing or just work on the delays in the next part (II)

## II) Determine delays

In [None]:
%%local
!git lfs ls-files --all

In [None]:
%%local
# Pull from git lfs our pickles
!git lfs pull

In [None]:
%%local
bucket_size = 12 # Set at 12, every delay (in seconds) will be grouped by bucket of size 12 seconds : [0,11], [12,25], .. 
query="""
    (select S.ttype as ttype,  S.day as day, S.hour as hour, S.delay as delay, count(*) as count 
    FROM (SELECT t.ttype, (floor(t.expected_ar/86400) + 4) % 7+1 as day, extract(hour from FROM_UNIXTIME(t.expected_ar)) as hour, floor((t.actual_ar-t.expected_ar)/{1})*{1}/60 as delay
    FROM {0}.zurich_istdaten T) S
    WHERE s.delay <= 8 and S.delay >= 0
    GROUP BY S.ttype,  S.day, S.delay, S.hour
    ORDER BY S.ttype,  S.day, S.delay, S.hour)
""".format(username,bucket_size)
dis = pd.read_sql(query, conn)

ax = dis[(dis['ttype'] == 'bus') & (dis['day'] == 3) & (dis['hour'] == 8)].plot.bar(x='delay',y='count')
ax.set_ylabel('Number of delays')
ax.set_xlabel('Delay in minutes')
ax.xaxis.set_major_locator(plt.MaxNLocator(9))

In [None]:
%%local

cdf = {}

for ttype in ['bus','zug']:
    cdf_day = {}
    for day in [1,2,3,4,5,6,7]:
        cdf_hour = {}
        for hour in range(6,22):
            hist = dis[(dis['ttype'] == ttype) & (dis['day'] == day) & (dis['hour'] == hour)]['count'].to_numpy()
            while len(hist) < 41 and len(hist) != 0 :
                hist = np.append(hist,0.0)

            cdf_hour[hour] = np.cumsum(hist/hist.sum())
        cdf_day[day] = cdf_hour
    cdf[ttype]=cdf_day

    
# Cumulative Distribution Function of the delay (not arrival time, the *delay*) of a BUS on WEDNESDAY at 10:00
plt.ylabel('P(delay < x)')
plt.xlabel("Delay in Minutes")
plt.plot(np.linspace(0,8,41),cdf['bus'][3][10]) 

## III) Find the best journeys: the Connection Scan Algorithm (CSA)

In [None]:
%%local
range(24)

In [None]:
spark.stop()