Final Spark Creation of segments

In [2]:
# Reading leavetimes
# Loading the leavetime files
df_16 = spark.read.option("delimiter",";").csv("rt_leavetimes_2016_I_DB.txt",header=True)
df_17 = spark.read.option("delimiter",";").csv("rt_leavetimes_2017_I_DB.txt",header=True)

# This function performs reduce function and returns a new df with all the rows. Since this is an action and not a lazy transformation 
# this fucntion can take time depending on memory and size of dfs
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [3]:
df = unionAll(df_16,df_17)

In [4]:
# Read the trips data
df_16_trips = spark.read.option("delimiter",";").csv("rt_trips_2016_I_DB.txt",header=True)
df_17_trips = spark.read.option("delimiter",";").csv("rt_trips_2017_I_DB.txt",header=True)
# Merge the two dataframes
df_trips_raw = unionAll(df_16_trips,df_17_trips)
df_trips_tidy = df_trips_raw.selectExpr("dayofservice as Trips_dayofservice","tripid as Trips_tripid","lineid as lineid","routeid as routeid","direction as direction","plannedtime_arr as Trips_plannedtime_arr","plannedtime_dep as Trips_plannedtime_dep","actualtime_arr as Trips_actualtime_arr","actualtime_dep as Trips_actualtime_dep")

### Making Unique Segements from the Routes table

In [6]:
df.printSchema()

root
 |-- datasource: string (nullable = true)
 |-- dayofservice: string (nullable = true)
 |-- tripid: string (nullable = true)
 |-- progrnumber: string (nullable = true)
 |-- stoppointid: string (nullable = true)
 |-- plannedtime_arr: string (nullable = true)
 |-- plannedtime_dep: string (nullable = true)
 |-- actualtime_arr: string (nullable = true)
 |-- actualtime_dep: string (nullable = true)
 |-- vehicleid: string (nullable = true)
 |-- passengers: string (nullable = true)
 |-- passengersin: string (nullable = true)
 |-- passengersout: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- suppressed: string (nullable = true)
 |-- justificationid: string (nullable = true)
 |-- lastupdate: string (nullable = true)
 |-- note: string (nullable = true)



In [8]:
sdf_stops_ordered = df.drop('datasource','plannedtime_dep','actualtime_arr','actualtime_dep','vehicleid','passengers','passengersin','passengersout','distance','suppressed','justificationid','lastupdate','note')

In [10]:
sdf_stops_ordered = sdf_stops_ordered.orderBy(['tripid','plannedtime_arr'])

In [11]:
from pyspark.sql.types import * # We need this since we want to provide the schema for our new dataframe
fields = [
    StructField("dayofservice",StringType(),True),
    StructField("tripid",StringType(),True),
    StructField("time_at_1",StringType(),True),
    StructField("time_at_2",StringType(),True),
    StructField("segmentid",StringType(),True),
    StructField("traveltime",LongType())
]
schema = StructType(fields)
sdf_stops_model = sqlContext.createDataFrame(sc.emptyRDD(),schema)

In [12]:
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('postgresql+psycopg2://postgres:00001234@localhost:5433/jetaDb')

In [13]:
# Getting the list of segments and routes with the segments
df_routes_for_segment = pd.read_sql_query("select * from main_routes;",engine)
segmentids = []
routeids = []
segmentnos = []
for i, rows in df_routes_for_segment.iterrows():
    for j in range(len(rows['stopids'])):
        if j < (len(rows['stopids'])-1):
            segment = str(rows['stopids'][j])+"_"+str(rows['stopids'][j+1])
            segmentids.append(segment)
            segmentnos.append(j+1)
            routeids.append(rows['routeid'])
        else:
            break
df_journey = pd.DataFrame({'routeid':routeids,'segmentno':segmentnos,'segmentid':segmentids})

In [14]:
# Dropping all the duplicate segments -> Queueing Theory
df_journey.drop_duplicates(subset='segmentid',keep='first',inplace=True)

In [15]:
fields = [
    StructField("tripid",StringType(),True),
    StructField("dayofservice",StringType(),True),
    StructField("stoppointid",StringType(),True),
    StructField("plannedtime_arr",StringType(),True)
]
schema = StructType(fields)
from_stop_df = sqlContext.createDataFrame(sc.emptyRDD(),schema)
to_stop_df = sqlContext.createDataFrame(sc.emptyRDD(),schema)

In [16]:
df_journey.reset_index(inplace=True,drop=True)

In [17]:
pdf_trips = df_trips_tidy.toPandas()

In [18]:
fin_df = pdf_trips[['Trips_tripid','routeid']].drop_duplicates()

In [19]:
_39A_routes = fin_df['routeid'].loc[fin_df['routeid'].str.startswith('39A')].drop_duplicates().tolist()

In [20]:
_46A_routes =fin_df['routeid'].loc[fin_df['routeid'].str.startswith('46A')].drop_duplicates().tolist()

In [22]:
df_journey.sort_values(by='routeid', inplace=True)

In [None]:
# Here we will run our loops:
# First loop will iterate over the journey dataframe
from pyspark.sql.functions import *
import pyspark.sql.functions as sf
from pyspark.sql.types import LongType
for i, rows in df_journey.iterrows():
    route_trips_unq = []
    segments = rows['segmentid']
    route = rows['routeid']
    if route in _39A_routes or route in _46A_routes:
        route_trips_unq = fin_df[fin_df['routeid'].isin([route])]['Trips_tripid'].tolist()
        print(i)
        from_stop, to_stop = segments.split('_')
        from_stop_df = sdf_stops_ordered.selectExpr('dayofservice','tripid','plannedtime_arr as time_at_1','stoppointid').where((sdf_stops_ordered.stoppointid==from_stop)&(sdf_stops_ordered.tripid.isin(route_trips_unq)))
        to_stop_df = sdf_stops_ordered.selectExpr('dayofservice','tripid','plannedtime_arr as time_at_2','stoppointid').where((sdf_stops_ordered.stoppointid==to_stop)&(sdf_stops_ordered.tripid.isin(route_trips_unq)))
        to_stop_df = to_stop_df.drop('stoppointid')
        from_stop_df = from_stop_df.drop('stoppointid')
        joined_df = from_stop_df.join(to_stop_df,['dayofservice','tripid'],'inner')
        joined_df=joined_df.withColumn('segmentid',sf.lit(segments))
        joined_df = joined_df.withColumn('traveltime',joined_df["time_at_2"].cast(LongType())-joined_df["time_at_1"].cast(LongType()))
        sdf_stops_model = unionAll(sdf_stops_model,joined_df)
    if route.startswith('5'):
        break
    # # condition = [from_stop_df.tripid == to_stop_df.tripid, from_stop_df.dayofservice==to_stop_df.dayofservice]