In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DecimalType

# Creating a SparkSession
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

## Read in Data

In [2]:
# read in data
df = spark.read.parquet('rail_data_cleaned_20230728132300.parquet')
df.limit(5).show()

+---------------+---------+------------------+-------------+--------------+-----------------+--------------------+----------------------+--------------------+----------------------+----+--------+------------+-------------------+---------------------+------------------+--------------------+-----------------+
|       route_id|unique_id|service_start_date|update_origin|train_platform|working_time_pass|working_time_arrival|working_time_departure|planned_time_arrival|planned_time_departure|pass|platform|train_length|actual_arrival_time|actual_departure_time|is_delayed_arrival|is_delayed_departure|__index_level_0__|
+---------------+---------+------------------+-------------+--------------+-----------------+--------------------+----------------------+--------------------+----------------------+----+--------+------------+-------------------+---------------------+------------------+--------------------+-----------------+
|202307277679020|   L79020|        2023-07-27|       Darwin|          STF

In [3]:
# print rows with respective datatypes
df.printSchema()

root
 |-- route_id: string (nullable = true)
 |-- unique_id: string (nullable = true)
 |-- service_start_date: string (nullable = true)
 |-- update_origin: string (nullable = true)
 |-- train_platform: string (nullable = true)
 |-- working_time_pass: timestamp_ntz (nullable = true)
 |-- working_time_arrival: timestamp_ntz (nullable = true)
 |-- working_time_departure: timestamp_ntz (nullable = true)
 |-- planned_time_arrival: timestamp_ntz (nullable = true)
 |-- planned_time_departure: timestamp_ntz (nullable = true)
 |-- pass: integer (nullable = true)
 |-- platform: string (nullable = true)
 |-- train_length: string (nullable = true)
 |-- actual_arrival_time: timestamp_ntz (nullable = true)
 |-- actual_departure_time: timestamp_ntz (nullable = true)
 |-- is_delayed_arrival: boolean (nullable = true)
 |-- is_delayed_departure: boolean (nullable = true)
 |-- __index_level_0__: long (nullable = true)



## Check for Dups/ Nulls

In [4]:
# find duplicate rows
df.groupBy(df.columns)\
    .count()\
    .where(F.col('count') > 1)\
    .select(F.sum('count'))\
    .show()

+----------+
|sum(count)|
+----------+
|      NULL|
+----------+



In [5]:
# get a null count for all columns
Dict_Null = {col:df.filter(df[col].isNull()).count() for col in df.columns}
Dict_Null

{'route_id': 0,
 'unique_id': 0,
 'service_start_date': 0,
 'update_origin': 585,
 'train_platform': 0,
 'working_time_pass': 37100,
 'working_time_arrival': 0,
 'working_time_departure': 0,
 'planned_time_arrival': 3245,
 'planned_time_departure': 3520,
 'pass': 37100,
 'platform': 2986,
 'train_length': 26505,
 'actual_arrival_time': 5331,
 'actual_departure_time': 2716,
 'is_delayed_arrival': 0,
 'is_delayed_departure': 0,
 '__index_level_0__': 0}

In [6]:
# remove cols with too many null values
df.drop('working_time_pass')
df.drop('pass')
df.drop('train_length')

DataFrame[route_id: string, unique_id: string, service_start_date: string, update_origin: string, train_platform: string, working_time_pass: timestamp_ntz, working_time_arrival: timestamp_ntz, working_time_departure: timestamp_ntz, planned_time_arrival: timestamp_ntz, planned_time_departure: timestamp_ntz, pass: int, platform: string, actual_arrival_time: timestamp_ntz, actual_departure_time: timestamp_ntz, is_delayed_arrival: boolean, is_delayed_departure: boolean, __index_level_0__: bigint]

In [7]:
# drop rows with null values from the dataset
df = df.dropna(subset='planned_time_departure')
df = df.dropna(subset='update_origin')
df.count()

33004

In [8]:
# pyspark value_counts
# get list of categorical var cols
categorical_columns = [c for c in df.columns if df.select(c).dtypes[0][1] == 'string']

# do a value counts for each categorical var
for col_name in categorical_columns:
    print(f"Unique values in {col_name}:")
    df.groupBy(col_name).count().show()

Unique values in route_id:
+---------------+-----+
|       route_id|count|
+---------------+-----+
|202307278024592|   40|
|202307278023407|    2|
|202307287142024|    8|
|202307277684886|    4|
|202307277623421|    3|
|202307277159156|    1|
|202307278931917|    2|
|202307278932995|   10|
|202307277679446|    4|
|202307288076176|    8|
|202307288075928|   36|
|202307278027471|    5|
|202307287195442|    1|
|202307278070469|    5|
|202307277683900|    1|
|202307278929733|    1|
|202307277683740|    2|
|202307287159171|    1|
|202307277194734|    1|
|202307277674757|    1|
+---------------+-----+
only showing top 20 rows

Unique values in unique_id:
+---------+-----+
|unique_id|count|
+---------+-----+
|   L79822|   11|
|   G59266|    1|
|   P25443|    2|
|   P24503|    9|
|   C18264|   31|
|   C16587|    1|
|   P71715|    3|
|   L34352|    2|
|   L23434|    4|
|   G95492|   10|
|   L23565|    1|
|   Y31409|   21|
|   Y30069|    1|
|   P27235|    5|
|   G67331|    1|
|   Y55093|    1|
|

## Merge Data with Rail References

In [9]:
# import rail references and merge with api data
rail_ref = spark.read.csv('RailReferences.csv', header=True, inferSchema=True)
rail_ref.printSchema()

root
 |-- AtcoCode: string (nullable = true)
 |-- TiplocCode: string (nullable = true)
 |-- CrsCode: string (nullable = true)
 |-- StationName: string (nullable = true)
 |-- StationNameLang: string (nullable = true)
 |-- GridType: string (nullable = true)
 |-- Easting: integer (nullable = true)
 |-- Northing: integer (nullable = true)
 |-- CreationDateTime: timestamp (nullable = true)
 |-- ModificationDateTime: timestamp (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- Modification: string (nullable = true)



In [10]:
# Merge both dfs for Station Name
merged_df = df.join(rail_ref, df['train_platform'] == rail_ref['TiplocCode'], how='left')
merged_df.count()

33004

## Creating New Cols

In [13]:
# create separate cols for date, time, hour
from pyspark.sql.functions import *

#planned arrival
merged_df = merged_df.withColumn('planned_arrival_date', to_date(F.col('planned_time_arrival'), 'YYYY-mm-dd'))
merged_df = merged_df.withColumn('planned_arrival_hour', hour(F.col('planned_time_arrival')))

# planned departure
merged_df = merged_df.withColumn('planned_dep_date', to_date(F.col('planned_time_departure'), 'YYYY-mm-dd'))
merged_df = merged_df.withColumn('planned_dep_hour', hour(F.col('planned_time_departure')))

# actual arrival
merged_df = merged_df.withColumn('actual_arrival_date', to_date(F.col('actual_arrival_time'), 'YYYY-mm-dd'))
merged_df = merged_df.withColumn('actual_arrival_hour', hour(F.col('actual_arrival_time')))

# actual departure
merged_df = merged_df.withColumn('actual_dep_date', to_date(F.col('actual_departure_time'), 'YYYY-mm-dd'))
merged_df = merged_df.withColumn('actual_dep_hour', hour(F.col('actual_departure_time')))

# difference between planned and actual arrival time in minutes
merged_df = merged_df.withColumn('arrival_time_delayed_by', (unix_timestamp("actual_arrival_time") - unix_timestamp('planned_time_arrival'))/60)
merged_df = merged_df.withColumn('dep_time_delayed_by', (unix_timestamp("actual_departure_time") - unix_timestamp('planned_time_departure'))/60)


merged_df.select(F.col('arrival_time_delayed_by')).filter(F.col('arrival_time_delayed_by') > 0).orderBy('arrival_time_delayed_by', ascending=False).show()

+-----------------------+
|arrival_time_delayed_by|
+-----------------------+
|                  149.0|
|                  148.0|
|                  148.0|
|                  148.0|
|                  148.0|
|                  113.0|
|                  113.0|
|                  113.0|
|                  113.0|
|                  113.0|
|                  113.0|
|                  112.0|
|                  112.0|
|                  111.0|
|                  111.0|
|                  100.0|
|                  100.0|
|                  100.0|
|                   99.0|
|                   99.0|
+-----------------------+
only showing top 20 rows



## Describe/ Explore Data

In [None]:
# Suggestions
# Merge the Station Rail Name Reference Table with the API data in order to provide more readable station names.
# How would you determine the top 10 most frequently visited stations in a day?
# What information can be explored to understand train delays?
# Are certain stations busier than others at certain times?

## Data Dict
- route_id: The unique identifier for each train route.
- unique_id: The unique identifier for each train
- service_start_date: The date when the train service for the particular route started.
- update_origin: Source or system from which the data was updated or retrieved.
- train_platform: The short name for a station where the train arrives and departs.
- working_time_pass: The planned or scheduled time for the train to pass a certain point or station.
- working_time_arrival: The planned or scheduled time for the train to arrive at a station.
- working_time_departure: The planned or scheduled time for the train to depart from a station.
- planned_time_arrival: The officially planned time for the train to arrive at a station. This could be the public-facing scheduled time.
- planned_time_departure: The officially planned time for the train to depart from a station. This could be the public-facing scheduled time.
- platform: The platform number at the station where the train arrives and departs.
- train_length: The physical length of the train. Number of cars.
- actual_arrival_time: The actual time when the train arrived at a station.
- actual_departure_time: The actual time when the train departed from a station.
- is_delayed_arrival: A boolean flag indicating whether the train arrived late
- is_delayed_departure: A boolean flag indicating whether the train departed late from a station.

In [None]:
# compare planned arrival/ departure with actual
# comapre average delay
# delays by station, count is delayed
# look at timestamps for busiest stations
# look at delays during night vs morning
# col subtracting planned vs actual

In [None]:
merged_df.groupBy('StationName').mean().limit(5).show()

In [None]:
# create new df with route_id, unique_id, StationName, isDelayed
delayed_df = merged_df[['route_id', 'unique_id', 'StationName', 'is_delayed_arrival', 'is_delayed_departure']]
delayed_df.limit(5).show()

### Station with most delays

In [None]:
delayed_df.filter(F.col('is_delayed_arrival') == True).groupBy('StationName', 'is_delayed_arrival').count().orderBy('count', ascending=False).show()
delayed_df.filter(F.col('is_delayed_departure') == True).groupBy('StationName', 'is_delayed_departure').count().orderBy('count', ascending=False).show()

In [None]:
# find station with the greatest time delays
merged_df.groupBy('StationName').sum('dep_time_delayed_by').orderBy('sum(dep_time_delayed_by)', ascending=False).show()

In [None]:
# convert to pandas for additional exploration and visualization
pandas_df = merged_df.toPandas()

In [None]:
pandas_df['StationName'] = pandas_df['StationName'].fillna(pandas_df['train_platform'])