# Get "ideal" connections timetable using files under HDFS

In this notebook, we select the most relevant trips for our project and convert them to connections between stops so that we have a transportation schedule for an "ideal" working day. We also add important information to the connections so that the mapping with the historical SBB data it is easier.  The notebook is structured as follows: 

*   **[Start Spark](#spark)** 
*   **[Get relevant trips](#relevant)**  
*   **[Transform trips to connections](#raw_connections)** 
*   **[Add useful mappingg information](#id_connections)**  




<a id = 'spark'></a>
### 1. Start Spark


We will be using a Spark Session for performing different transformations and actions on the raw dataframes

In [None]:
%%configure -f
{
    "conf": {
        "spark.app.name": "datavirus_connections",
    }
}

In [2]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8421,application_1589299642358_2953,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fa6ab43e290>

In [4]:
import pyspark.sql.functions as f
from pyspark.sql import Window

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a id = 'relevant'></a>
### 2. Get relevant trips

Load `trips`, `calendar` and `routes` data.

In [None]:
trips = spark.read.orc("/data/sbb/timetables/orc/trips/")
calendar = spark.read.orc("/data/sbb/timetables/orc/calendar/")
routes = spark.read.orc("/data/sbb/timetables/orc/routes/")

In [None]:
# select all services that are scheduled on mondays
monday_service_id = (
    calendar
    .where(calendar.monday == True)
    .select('service_id')
    .dropDuplicates()
)

In [None]:
# keep only trips that are scheduled on mondays
monday_trips = (
    trips
    .join(f.broadcast(monday_service_id), 'service_id')
)

We join `monday_trips` with `routes` in order to obtain useful mapping information (used later) and we select only the relevant routes (where the transport type is InterCity, InterRegio, Regionalzug, S-Bahn, Bus, Tram).

In [None]:
# add route information to trips
route_trips = (
    monday_trips
    .join(routes, 'route_id')
    
    # drop all transports that are not of type:
    # InterCity, InterRegio, Regionalzug, S-Bahn, Bus, Tram
    .where(routes.route_type.isin(['102', '103', '106', '400', '700', '900']))
    
    # Create id that is used to merge with "ist data".
    # The id has the following format: 85:<agency_id>:<route_number>
    # The route number is just the "number" an agency assigns to
    # a certain route. For busses and trams this number corresponds
    # to the route_short_name. For trains it is stored in the
    # trip_short_name.
    
    .withColumn(
        'id',
        f.when(routes.route_type.isin(['700', '900']),
            # route is either a bus or a tram
            f.concat_ws(':', f.lit('85'), routes.agency_id.cast('int').cast('string'), routes.route_short_name.cast('int').cast('string'))
        ).otherwise(
            # route is a train
            f.concat_ws(':', f.lit('85'), routes.agency_id.cast('int').cast('string'), trips.trip_short_name.cast('int').cast('string'))
        )
    )
)

In [12]:
route_trips.write.format('orc').save('/user/datavirus/route_trips.orc', mode='overwrite')
#route_trips = spark.read.orc('/user/datavirus/route_trips.orc')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a id = 'raw_connections'></a>
### 3. Transform trips to connections

The <code>stop_times</code> file contains all the timing information for every stop and trip but we have to rehsape the dataframe in terms of connections (link between two sequenced stops). We will only use the trips ocurring between 5h and 21h.

In [78]:
# read timetable stop times and filter values we are interested in

stop_time = spark.read.orc("/data/sbb/timetables/orc/stop_times/")

stop_time = (
    stop_time
    .select(['trip_id', 'arrival_time', 'departure_time', 'stop_id', 'stop_sequence'])
    .filter(
        (stop_time.departure_time >= '05:00:00')
        & (stop_time.departure_time <= '21:00:00')
        & (stop_time.arrival_time >= '05:00:00')
        & (stop_time.arrival_time <= '21:00:00') 
    )
    .withColumn('station_id', stop_time.stop_id.substr(0, 7))
    .drop('stop_id')
    
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We filter out all stations that are not within 15 km of Zurich and those that only appear once (meaning they go to/come from outside Zurich)

In [79]:
# load stations around zurich
stations = spark.read.csv("../data/zurich_stations_ids.csv")
stations = (
    stations
    .select(stations._c0.alias('station_id'))
)

# remove all stops that are not close to zurich
stop_filtered = (
    stop_time
    .join(f.broadcast(stations), 'station_id')
)

# because we removed some stops there are now trips with a single stop
# we only want to keep trips with multiple stops

stop_keep = (
    stop_filtered
    .groupBy('trip_id')
    .count()
    .filter('count > 1')
    .select('trip_id')
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In order to transform trips (rows containing only one stop) to connections (link between two stops), we have to merge the dataframe with itself (but shifted of one stop) so that each arrival station matches with each departure stations

In [83]:
# because we removed stops there are now gaps in the stop_sequence
# we recreate the stop_sequence with a rank function
trip_window = Window.partitionBy('trip_id').orderBy(f.asc('stop_sequence'))
stop_sequence = f.rank().over(trip_window)

stop_resequenced = (
    stop_filtered
    .join(stop_keep, 'trip_id')
    .withColumn('stop_sequence', stop_sequence)
)

stop_resequenced.write.format('orc').save('/user/datavirus/stop_resequenced.orc', mode='overwrite')

# create two dataframes, one representing departure stations
# and the other representing arrival stations
departure = spark.read.orc('/user/datavirus/stop_resequenced.orc').alias('departure').repartition(100, 'trip_id')
arrival = spark.read.orc('/user/datavirus/stop_resequenced.orc').alias('arrival').repartition(100, 'trip_id')
arrival = arrival.withColumn('stop_sequence', arrival.stop_sequence - 1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# join the dataframes, this gives us all connections

raw_connections = (
    departure
    .join(arrival, ['trip_id', 'stop_sequence'])
    .select(
        departure.stop_sequence.alias('stop_sequence'),
        departure.trip_id.alias('trip_id'),
        departure.station_id.alias('start_id'),
        departure.departure_time.alias('start_time'),
        arrival.arrival_time.alias('stop_time'),
        arrival.station_id.alias('stop_id')
    )
)

raw_connections.write.format('orc').save('/user/datavirus/raw_connections.orc')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a id = 'id_connections'></a>
### 4. Add useful mapping information

Now that we have the timetable that we were looking for, we will complete it with relevant information about the trips and routes so that the mapping with the probability data can be done more easily.

In [13]:
id_connections = (
    raw_connections
    .join(route_trips, 'trip_id')
    .withColumn('arrival_time_minute', raw_connections.stop_time.substr(0, 5))
    .withColumn('arrival_time_hour', raw_connections.stop_time.substr(0, 2))
    .withColumn('station_id', raw_connections.stop_id)
    .withColumn(
        'produkt_id', 
        f.when(
            route_trips.route_type == '700',
            f.lit('bus')            
        ).otherwise(
            f.when(
                route_trips.route_type == '900',
                f.lit('tram')
            ).otherwise(f.lit('zug'))
        )
    )
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
id_connections.write.format('orc').save('/user/datavirus/id_connections.orc', mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…