# Tube Twin: Passenger count forecasting/general tube analysis 

© Explore Data Science Academy

## Introduction 

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="../Assets/LU_Baker-street.jpg" 
     alt="London Underground"
     style="float: center; padding-bottom=0.5em"
     width=100%/>
     <p><em>Figure 1. The London Tube</em></p> 
</div>

**Client**: Transport for London (TfL) 

Transport for London runs the London Underground (aka “The Tube”), which is a network of train stations which connects the city of London.

**Team**: 

This is Team 6. A combination of data scientists and data engineers that have been assigned the Tube Twin project and this is a notebook for executing various aspect of the project workflow. 

## Context

This project's objective is to create a digital twin of the london tube that can be used for passenger count and traffic analyses/forecasting.


## Basic initialisation
To get started, let's import some basic Python libraries as well as Spark modules and functions.

In [1]:
import findspark
findspark.init()
findspark.find()

import glob 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import col, isnan, when, count 

> ℹ️ **Objective** ℹ️
>
>Initialise a new **Spark Context** and **Session**.

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc) 

## Reading and merging tube yearly/Qhr dataset 
At this point, the yearly entry/exit tube data are in seperate Excel files and will be merged into a single CSV file for better sutdy and analyses. 

> ℹ️ **Objective** ℹ️
>
>Merge the single Excel files into one big CSV file. 

In [3]:
# Specifying the path to the Excel files
path = "../Data/QhrEntryExit-TubeData/"
 
# Pointing to all of the required Excel files in the path 
file_list = glob.glob(path + "*.xlsx") 
 
# Empty list for the files we want to merge.
# pd.read_excel(file_path) reads the excel data into pandas dataframe. 
excl_list = []
 
for file in file_list:
    excl_list.append(pd.read_excel(file, sheet_name='ByQhr', header=6)) 
 
# Concatenate all DataFrames in the list into a single DataFrame, returns new DataFrame.
excl_merged = pd.concat(excl_list, ignore_index=True) 
 
# Exports the dataframe into a new CSV file with specified name.
excl_merged.to_csv('../Data/historical_tube_data.csv', index=False) 

# Success comfirmation 
print('Files merged and saved into specified location') 

Files merged and saved into specified location


## Examining the merged data
Let's take a look at what the CSV file we just merged looks like. 

> ℹ️ **Objective** ℹ️
>
> Read in the CSV file and inspect the DataFrame for better understanding. 

In [4]:
# Reading the historial_tube_data CSV file 
df = pd.read_csv('../Data/historical_tube_data.csv') 

# Setting column display to show all columns and limiting decimals to 2. 
pd.set_option("display.max.columns", None, "display.precision", 2)  

# Viewing the top five rows. 
df.head(5) 

Unnamed: 0,Mode,NLC,ASC,Station,Coverage,year,day,dir,Total,Early,AM Peak,Inter Peak,PM Peak,Evening,Late,0500-0515,0515-0530,0530-0545,0545-0600,0600-0615,0615-0630,0630-0645,0645-0700,0700-0715,0715-0730,0730-0745,0745-0800,0800-0815,0815-0830,0830-0845,0845-0900,0900-0915,0915-0930,0930-0945,0945-1000,1000-1015,1015-1030,1030-1045,1045-1100,1100-1115,1115-1130,1130-1145,1145-1200,1200-1215,1215-1230,1230-1245,1245-1300,1300-1315,1315-1330,1330-1345,1345-1400,1400-1415,1415-1430,1430-1445,1445-1500,1500-1515,1515-1530,1530-1545,1545-1600,1600-1615,1615-1630,1630-1645,1645-1700,1700-1715,1715-1730,1730-1745,1745-1800,1800-1815,1815-1830,1830-1845,1845-1900,1900-1915,1915-1930,1930-1945,1945-2000,2000-2015,2015-2030,2030-2045,2045-2100,2100-2115,2115-2130,2130-2145,2145-2200,2200-2215,2215-2230,2230-2245,2245-2300,2300-2315,2315-2330,2330-2345,2345-0000,0000-0015,0015-0030,0030-0045,0045-0100,0100-0115,0115-0130,0130-0145,0145-0200,0200-0215,0215-0230,0230-0245,0245-0300,0300-0315,0315-0330,0330-0345,0345-0400,0400-0415,0415-0430,0430-0445,0445-0500
0,LU,500,ACTu,Acton Town,Station entry / exit,2020,MTT,IN,3701.9,288.2,1101.2,1185.7,824.2,213.2,89.4,8.6,13.4,15.3,22.4,37.5,49.3,68.0,73.7,102.9,110.8,116.2,120.3,140.6,118.9,106.0,79.1,64.3,56.1,44.5,41.5,43.0,43.0,35.0,30.9,40.0,39.0,34.7,31.1,38.5,33.3,36.2,32.8,37.8,42.3,42.1,36.5,56.7,55.1,56.1,57.4,66.7,77.8,121.3,98.4,97.3,90.4,83.0,72.6,77.7,98.0,69.8,59.6,55.9,42.1,43.9,33.9,27.6,24.2,24.8,17.2,35.2,17.0,15.0,12.0,10.7,10.0,9.6,9.9,11.5,10.1,10.5,6.1,11.5,6.6,5.5,3.7,2.2,0.6,1.8,1.5,1.8,0.1,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.6,15.3
1,LU,502,ALDu,Aldgate,Station entry / exit,2020,MTT,IN,2489.42,172.58,520.17,622.0,920.0,208.42,46.25,2.42,3.75,2.67,4.92,22.67,30.67,53.33,52.17,81.67,61.75,60.58,49.5,46.58,48.0,43.75,28.67,31.67,27.42,21.33,19.25,17.92,20.42,17.58,17.83,19.75,16.25,16.33,19.42,18.58,23.25,23.42,23.5,22.17,19.58,23.42,22.83,28.08,23.25,27.08,33.08,38.67,48.5,45.75,55.33,82.75,69.5,82.5,75.92,109.75,80.42,98.17,89.17,87.42,59.75,46.5,38.17,43.33,30.58,24.42,19.67,22.25,13.5,10.0,12.08,8.42,6.83,9.33,8.0,6.67,4.92,5.75,6.58,6.0,4.0,4.25,4.42,1.5,1.08,1.08,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.0,0.0
2,LU,503,ALEu,Aldgate East,Station entry / exit,2020,MTT,IN,3198.31,103.0,472.69,1063.92,1126.46,333.23,99.0,0.54,4.46,9.08,10.0,11.46,15.54,21.54,30.38,43.54,35.62,39.77,44.0,40.69,52.54,45.85,39.0,36.69,35.0,33.54,26.46,31.0,32.08,26.77,28.31,29.08,31.62,31.54,30.15,36.08,37.46,38.31,37.0,49.0,48.62,46.54,45.31,46.54,43.08,53.92,54.77,69.23,63.77,76.0,77.77,95.0,91.38,96.08,90.85,127.15,103.62,122.0,104.62,102.62,78.31,62.31,52.54,48.54,40.38,34.23,31.92,34.15,26.08,22.0,22.85,25.85,16.15,15.85,15.23,14.31,13.0,13.0,10.0,11.69,9.23,7.15,4.77,6.31,3.85,2.54,2.23,0.23,0.08,0.08,0.0,0,0,0,0,0,0,0,0,0,0,0.0,0.54
3,LU,505,ALPu,Alperton,Station entry / exit,2020,MTT,IN,2072.54,360.23,667.85,505.69,408.54,99.38,30.85,6.54,14.23,21.69,29.15,56.54,80.31,64.15,87.62,103.62,72.08,69.23,74.15,72.0,69.92,57.62,38.69,36.23,27.31,27.46,19.54,23.62,20.15,21.08,17.0,19.54,17.62,18.38,18.15,17.08,18.69,19.31,20.31,22.23,20.62,20.23,17.77,22.23,30.15,25.38,23.54,22.62,21.15,22.92,25.92,33.38,29.92,35.23,35.0,42.69,40.69,35.0,38.38,47.15,33.54,21.46,16.08,14.46,14.62,12.77,9.31,9.54,9.08,5.23,4.69,5.77,5.85,4.31,3.77,5.85,5.08,5.54,5.92,1.62,2.85,1.38,0.38,0.31,0.31,0.62,0.54,0.15,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.23,0.08
4,LU,506,AMEu,Amersham,Station entry / exit,2020,MTT,IN,980.47,148.4,424.67,256.6,124.13,21.07,5.6,2.53,6.0,21.4,5.87,35.6,26.2,22.27,28.53,32.67,47.53,62.27,133.07,49.87,30.47,14.93,14.07,9.07,10.33,14.47,5.93,7.67,6.33,9.27,4.67,5.8,4.8,8.53,4.53,7.4,3.67,9.07,11.93,6.4,3.93,6.07,7.4,8.47,5.33,12.2,5.53,8.67,14.53,77.47,16.93,14.6,9.67,16.27,15.4,21.33,8.93,9.4,6.93,8.93,4.4,5.8,2.47,3.07,2.0,1.93,1.0,3.2,1.53,1.73,1.07,2.2,0.33,1.47,1.53,1.67,0.0,0.4,0.8,0.47,0.2,0.2,0.07,0.47,0.13,0.07,0.2,0.47,0.13,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.13,0.2


In [11]:
# Viewing the last five rows. 

df.tail(5) 

Unnamed: 0,Mode,NLC,ASC,Station,Coverage,year,day,dir,Total,Early,AM Peak,Inter Peak,PM Peak,Evening,Late,0500-0515,0515-0530,0530-0545,0545-0600,0600-0615,0615-0630,0630-0645,0645-0700,0700-0715,0715-0730,0730-0745,0745-0800,0800-0815,0815-0830,0830-0845,0845-0900,0900-0915,0915-0930,0930-0945,0945-1000,1000-1015,1015-1030,1030-1045,1045-1100,1100-1115,1115-1130,1130-1145,1145-1200,1200-1215,1215-1230,1230-1245,1245-1300,1300-1315,1315-1330,1330-1345,1345-1400,1400-1415,1415-1430,1430-1445,1445-1500,1500-1515,1515-1530,1530-1545,1545-1600,1600-1615,1615-1630,1630-1645,1645-1700,1700-1715,1715-1730,1730-1745,1745-1800,1800-1815,1815-1830,1830-1845,1845-1900,1900-1915,1915-1930,1930-1945,1945-2000,2000-2015,2015-2030,2030-2045,2045-2100,2100-2115,2115-2130,2130-2145,2145-2200,2200-2215,2215-2230,2230-2245,2245-2300,2300-2315,2315-2330,2330-2345,2345-0000,0000-0015,0015-0030,0030-0045,0045-0100,0100-0115,0115-0130,0130-0145,0145-0200,0200-0215,0215-0230,0230-0245,0245-0300,0300-0315,0315-0330,0330-0345,0345-0400,0400-0415,0415-0430,0430-0445,0445-0500
17453,TfLRail,3191,HANr,Hanwell,Station entry / exit,2019,SUN,OUT,14.0,0.0,1.0,12.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,5.0,0.0,1.0,0.0,0.0,1.0,0.0,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.0,0.0
17454,TfLRail,3187,STLr,Southall,TfL Rail boarding / alighting,2019,SUN,OUT,1885.0,5.0,82.0,821.0,530.0,351.0,96.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,4.0,2.0,5.0,4.0,6.0,0.0,10.0,3.0,13.0,8.0,10.0,6.0,15.0,11.0,17.0,7.0,20.0,17.0,30.0,12.0,43.0,26.0,57.0,33.0,54.0,23.0,59.0,25.0,58.0,19.0,56.0,35.0,47.0,29.0,51.0,31.0,61.0,23.0,54.0,28.0,52.0,23.0,57.0,27.0,67.0,28.0,77.0,22.0,72.0,37.0,49.0,36.0,46.0,24.0,41.0,24.0,33.0,4.0,30.0,5.0,22.0,6.0,23.0,8.0,15.0,9.0,14.0,5.0,1.0,11.0,1.0,0.0,2.0,0.0,0.0,0.0,1.0,0,0,0,0,0,0,0,0,0,0,0.0,0.0
17455,TfLRail,3186,HAYr,Hayes & Harlington,TfL Rail boarding / alighting,2019,SUN,OUT,1466.0,4.0,83.0,449.0,474.0,345.0,111.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,2.0,5.0,5.0,3.0,10.0,3.0,6.0,11.0,5.0,8.0,5.0,13.0,9.0,6.0,9.0,4.0,11.0,6.0,16.0,11.0,19.0,13.0,17.0,12.0,24.0,15.0,22.0,18.0,30.0,13.0,36.0,21.0,33.0,25.0,30.0,34.0,24.0,38.0,31.0,42.0,35.0,40.0,37.0,42.0,44.0,36.0,38.0,46.0,45.0,40.0,53.0,32.0,35.0,29.0,37.0,22.0,35.0,17.0,9.0,26.0,10.0,22.0,13.0,14.0,9.0,16.0,3.0,17.0,0.0,9.0,3.0,0.0,1.0,3.0,0.0,0.0,1.0,0,0,0,0,0,0,0,0,0,0,0.0,0.0
17456,TfLRail,7090,HXXr,Heathrow Terminals 2 & 3 EL,TfL Rail boarding / alighting,2019,SUN,OUT,2153.0,53.0,375.0,880.0,577.0,222.0,46.0,2.0,5.0,4.0,4.0,3.0,6.0,8.0,21.0,27.0,23.0,28.0,29.0,43.0,20.0,48.0,22.0,38.0,32.0,40.0,25.0,34.0,26.0,39.0,28.0,38.0,35.0,47.0,31.0,37.0,32.0,48.0,25.0,55.0,26.0,54.0,26.0,36.0,30.0,49.0,32.0,39.0,30.0,53.0,30.0,66.0,39.0,41.0,37.0,60.0,38.0,60.0,55.0,67.0,34.0,53.0,27.0,36.0,23.0,35.0,20.0,19.0,16.0,15.0,12.0,12.0,9.0,13.0,12.0,7.0,11.0,5.0,5.0,5.0,5.0,4.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,2.0,1.0
17457,TfLRail,7091,HAFr,Heathrow Terminal 4 EL,TfL Rail boarding / alighting,2019,SUN,OUT,1834.0,113.0,292.0,676.0,445.0,248.0,60.0,0.0,8.0,24.0,8.0,16.0,11.0,11.0,35.0,29.0,25.0,20.0,19.0,27.0,23.0,20.0,20.0,22.0,32.0,34.0,21.0,23.0,17.0,34.0,19.0,19.0,20.0,31.0,28.0,36.0,25.0,45.0,20.0,33.0,19.0,30.0,24.0,38.0,20.0,31.0,20.0,37.0,41.0,41.0,25.0,44.0,33.0,57.0,20.0,40.0,23.0,41.0,45.0,53.0,28.0,38.0,23.0,37.0,28.0,37.0,18.0,23.0,15.0,14.0,10.0,16.0,12.0,24.0,14.0,16.0,5.0,16.0,5.0,7.0,5.0,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0.0,1.0


In [12]:
#Viewing data type
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17458 entries, 0 to 17457
Columns: 111 entries, Mode to 0445-0500
dtypes: float64(93), int64(12), object(6)
memory usage: 14.8+ MB


In [13]:
''' Let's see a summary of the data. 
Note that the below function won't try to calculate a mean or a standard deviation for the object columns 
since they mostly include text strings. However, it will still display some descriptive statistics. 
''' 
df.describe() 

Unnamed: 0,NLC,year,Total,Early,AM Peak,Inter Peak,PM Peak,Evening,Late,0500-0515,0515-0530,0530-0545,0545-0600,0600-0615,0615-0630,0630-0645,0645-0700,0700-0715,0715-0730,0730-0745,0745-0800,0800-0815,0815-0830,0830-0845,0845-0900,0900-0915,0915-0930,0930-0945,0945-1000,1000-1015,1015-1030,1030-1045,1045-1100,1100-1115,1115-1130,1130-1145,1145-1200,1200-1215,1215-1230,1230-1245,1245-1300,1300-1315,1315-1330,1330-1345,1345-1400,1400-1415,1415-1430,1430-1445,1445-1500,1500-1515,1515-1530,1530-1545,1545-1600,1600-1615,1615-1630,1630-1645,1645-1700,1700-1715,1715-1730,1730-1745,1745-1800,1800-1815,1815-1830,1830-1845,1845-1900,1900-1915,1915-1930,1930-1945,1945-2000,2000-2015,2015-2030,2030-2045,2045-2100,2100-2115,2115-2130,2130-2145,2145-2200,2200-2215,2215-2230,2230-2245,2245-2300,2300-2315,2315-2330,2330-2345,2345-0000,0000-0015,0015-0030,0030-0045,0045-0100,0100-0115,0115-0130,0130-0145,0145-0200,0200-0215,0215-0230,0230-0245,0245-0300,0300-0315,0315-0330,0330-0345,0345-0400,0400-0415,0415-0430,0430-0445,0445-0500
count,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0,17458.0
mean,1852.83,2019.16,7775.08,233.72,1409.91,2562.41,2006.96,1020.89,541.18,3.95,7.9,12.11,19.41,28.79,40.68,52.84,68.04,81.28,94.91,108.04,119.36,128.24,138.52,142.18,140.47,128.2,118.09,110.04,100.56,90.86,90.67,91.77,91.69,90.15,93.6,97.05,98.5,100.6,103.7,106.73,108.24,110.32,111.98,112.66,111.86,111.49,111.92,113.26,114.8,118.34,122.43,127.38,132.41,141.32,146.89,153.89,158.79,174.8,180.15,187.9,186.05,185.87,177.51,164.8,148.98,136.11,120.5,106.46,95.63,88.49,81.0,75.04,69.17,66.43,63.33,60.34,58.38,60.27,61.56,60.34,57.58,55.02,50.1,44.04,36.85,28.75,22.17,15.62,10.62,6.85,4.98,4.09,3.36,2.38,2.12,1.94,1.77,1.73,1.58,1.47,1.28,1.24,1.02,1.08,1.36
std,2424.45,1.38,13769.65,468.23,3259.32,4657.45,3892.3,1973.02,1265.21,12.03,21.25,27.41,48.34,61.95,84.46,106.9,138.15,163.6,190.24,226.21,264.06,300.61,340.11,371.51,384.43,353.14,306.7,259.5,220.78,184.76,181.44,184.15,181.78,174.89,180.45,190.52,193.54,194.6,197.78,203.2,203.79,204.08,205.25,207.46,204.9,203.1,202.52,204.49,207.34,213.96,219.48,223.94,232.47,254.35,267.42,283.7,298.48,346.13,365.35,386.51,384.38,380.2,361.89,329.66,291.11,264.65,231.37,201.33,181.72,172.0,158.2,146.53,137.16,135.16,131.09,124.05,124.17,137.6,149.5,141.31,129.56,122.86,112.51,98.71,83.88,69.85,57.83,43.26,33.73,28.13,24.4,21.08,17.38,15.61,14.03,13.11,12.11,12.21,11.52,10.23,8.4,7.86,6.26,5.72,5.67
min,500.0,2017.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-31.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-2.0,0.0,-1.5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-4.0,0.0,0.0,-9.0,-19.0,-11.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,608.0,2018.0,1611.19,23.97,200.58,549.03,370.0,166.25,54.0,0.0,0.0,0.0,0.58,1.59,3.0,5.0,7.79,10.0,12.0,14.0,15.0,15.0,15.0,16.0,16.04,16.0,16.0,16.0,16.0,15.77,15.88,16.0,16.38,16.8,17.0,18.0,18.25,19.0,20.0,21.0,21.69,22.25,22.67,23.5,23.03,24.0,23.62,24.0,24.81,26.0,27.0,28.55,29.1,30.33,31.0,31.0,31.46,33.0,32.0,31.33,30.0,29.52,27.73,26.0,23.73,21.67,19.0,17.0,15.0,14.22,13.0,11.46,10.41,10.0,9.0,8.33,8.0,8.0,7.13,6.62,6.0,5.4,4.67,3.38,2.0,1.0,0.25,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,719.0,2019.0,3491.0,80.62,508.0,1117.94,832.18,407.0,161.0,0.11,1.0,3.0,5.33,9.0,13.09,18.0,23.78,29.66,35.0,39.77,42.0,42.33,44.0,44.0,42.03,41.0,39.0,40.0,38.52,37.0,36.71,37.0,37.5,38.0,38.0,39.4,40.0,42.0,43.0,45.0,46.0,48.0,48.0,48.0,48.54,49.0,48.81,50.0,51.0,53.0,55.71,59.0,61.0,64.0,66.0,67.86,68.67,72.5,73.15,73.13,72.0,72.0,68.57,63.91,58.0,54.0,48.0,43.0,38.68,35.71,32.61,29.41,27.0,26.0,24.0,22.36,21.0,21.6,20.67,19.0,18.0,18.0,15.75,13.0,10.0,6.0,3.5,1.56,0.53,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,1083.0,2020.0,7759.1,231.0,1325.19,2521.48,1931.0,1016.0,482.0,2.67,7.0,12.0,19.0,28.62,39.69,52.0,67.0,82.0,98.67,111.79,120.08,121.0,125.0,121.49,115.88,106.0,103.0,100.0,93.08,89.04,90.0,89.79,89.0,90.17,93.0,96.0,96.0,99.0,102.0,104.0,106.0,110.0,111.0,111.0,109.88,109.0,108.87,111.0,111.0,115.0,118.9,125.55,131.04,138.0,143.07,150.0,155.0,165.0,171.11,175.0,176.0,175.0,171.16,159.0,146.0,134.0,119.0,107.73,96.0,88.0,80.0,74.84,68.0,65.0,60.3,58.64,55.0,56.0,55.0,54.0,53.0,52.0,47.25,42.0,35.0,26.0,19.0,12.0,6.0,1.95,0.32,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.09
max,9846.0,2021.0,157088.0,7104.0,63134.0,68897.0,60425.0,29266.0,25260.0,363.0,727.0,571.0,2358.0,1082.0,1244.0,1591.0,2275.0,2914.0,4018.0,5111.0,6186.0,7748.0,7468.0,7939.0,8060.0,7524.0,5957.0,5009.0,4260.0,3167.0,2615.0,2605.0,2725.0,2502.0,2701.0,2939.0,3205.0,3192.0,3140.0,3200.0,3333.0,3157.0,3081.0,3109.0,3299.0,3027.0,3002.0,3022.0,2980.0,3312.0,3062.0,2924.0,2814.0,3197.0,3220.0,3461.0,4242.0,5656.0,5927.0,6448.0,6701.0,6555.0,6223.0,5479.0,4655.0,3921.0,3290.0,2806.0,2618.0,2592.0,2513.0,2247.0,2175.0,2119.0,1970.0,1767.0,1940.0,3158.0,2649.0,2297.0,1957.0,1790.0,1860.0,1543.0,1363.0,1203.0,1101.0,877.0,690.0,650.0,609.0,545.0,483.0,460.0,419.0,406.0,393.0,427.0,447.0,369.0,291.0,242.0,189.0,129.0,108.0


In [14]:
# Now let's show a summary for the rest of the data that aren't digits. 
df.describe(include=object) 

Unnamed: 0,Mode,ASC,Station,Coverage,day,dir
count,17458,17458,17458,17426,17458,17458
unique,6,698,467,6,5,2
top,LU,SFDu,Stratford,Station entry / exit,SUN,IN
freq,10200,144,152,14806,4584,8729


In [16]:
# Checking for missing values along each columns. 
df.count(axis=0) 

Mode         17458
NLC          17458
ASC          17458
Station      17458
Coverage     17426
             ...  
0345-0400    17458
0400-0415    17458
0415-0430    17458
0430-0445    17458
0445-0500    17458
Length: 111, dtype: int64

## Reading CSV file and working with Spark

We will now begin to use Spark to analyze and check the data further. 

> ℹ️ **Objective** ℹ️
>
>Re-read the merged CSV data into the notebook. Spark will read all data as strings, so we will be required to convert to the appropriate data types at a later stage. 

In [5]:
# Pointing to, and reading the merged CSV 
test_df = spark.read.csv("../Data/historical_tube_data.csv", header=True) 

# Printing schema and data infomation 
test_df.printSchema()

print('test_df size:')
print((test_df.count(), len(test_df.columns))) 

root
 |-- Mode: string (nullable = true)
 |-- NLC: string (nullable = true)
 |-- ASC: string (nullable = true)
 |-- Station: string (nullable = true)
 |-- Coverage: string (nullable = true)
 |-- year: string (nullable = true)
 |--  day: string (nullable = true)
 |--  dir: string (nullable = true)
 |-- Total: string (nullable = true)
 |-- Early     : string (nullable = true)
 |-- AM Peak: string (nullable = true)
 |-- Inter Peak: string (nullable = true)
 |-- PM Peak: string (nullable = true)
 |-- Evening   : string (nullable = true)
 |-- Late      : string (nullable = true)
 |-- 0500-0515: string (nullable = true)
 |-- 0515-0530: string (nullable = true)
 |-- 0530-0545: string (nullable = true)
 |-- 0545-0600: string (nullable = true)
 |-- 0600-0615: string (nullable = true)
 |-- 0615-0630: string (nullable = true)
 |-- 0630-0645: string (nullable = true)
 |-- 0645-0700: string (nullable = true)
 |-- 0700-0715: string (nullable = true)
 |-- 0715-0730: string (nullable = true)
 |-- 0730

## Update column names
To make the data easier to work with, we will need to make a few changes:
1. Column headers should all be in lowercase; and
2. Whitespaces should be replaced with underscores.


> ℹ️ **Objective** ℹ️
>
>To ensure that the column headers are all in lowercase and that any whitespaces are replaced with underscores. 

In [6]:
# Updating the column names 
for column in test_df.columns:
    test_df = test_df.withColumnRenamed(column, '_'.join(column.split()).lower()) 

test_df.show(1, False) 

+----+---+----+----------+--------------------+----+---+---+-----------------+-----+-------+------------------+-----------------+-------+-----------------+-----------------+------------------+---------+---------+-----------------+---------+-----------------+---------+---------+---------+---------+------------------+------------------+---------+---------+-----------------+---------+---------+---------+-----------------+---------+---------+-----------------+---------+---------+-----------------+---------+---------+---------+---------+-----------------+---------+---------+---------+---------+---------+---------+---------+---------+-----------------+-----------------+---------+------------------+---------+-----------------+---------+-----------------+---------+-----------------+---------+-----------------+---------+-----------------+---------+-----------------+---------+---------+-----------------+---------+---------+---------+------------------+------------------+------------------+------

## Null Values
We will check for Null values which often represent missing pieces of data. It is always good to know where they lie so we can quickly identify and remedy any issues stemming from them. 

> ℹ️ **Objective** ℹ️
>
>This code will count the number of null values found in each column. 

In [42]:
NullCount = test_df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull(), c 
                           )).alias(c)
                    for c in test_df.columns]) 

NullCount.show() 

+----+---+---+-------+--------+----+---+---+-----+-----+-------+----------+-------+-------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----

## Data type conversion - The final data schema

Now that we have identified the number of missing values in the data set, we'll move on to convert our data schema to the required data types. 

> ℹ️ **Objective** ℹ️
>
>We will use typecasting to convert the string data types in our current data schema to more appropriate data types. 

In [7]:
# Converting the first few columns that have different data types but are easily accessible 
test_df = test_df.withColumn("mode",col("mode").cast(StringType())) \
    .withColumn("nlc",col("nlc").cast(IntegerType()))\
    .withColumn("asc",col("asc").cast(StringType()))\
    .withColumn("station",col("station").cast(StringType()))\
    .withColumn("coverage",col("coverage").cast(StringType()))\
    .withColumn('year', F.to_date(F.col('year'), 'yyyy'))\
    .withColumn("day",col("day").cast(StringType()))\
    .withColumn("dir",col("dir").cast(StringType()))\
    .withColumn("total",col("total").cast(FloatType()))\
    .withColumn("early",col("early").cast(FloatType()))\
    .withColumn("am_peak",col("am_peak").cast(FloatType()))\
    .withColumn("inter_peak",col("inter_peak").cast(FloatType()))\
    .withColumn("pm_peak",col("pm_peak").cast(FloatType()))\
    .withColumn("evening",col("evening").cast(FloatType()))\
    .withColumn("late",col("late").cast(FloatType()))\

'''Since our DataFrame has too many columns that we can't type out one after the other, we will use For-loop to iterate the
convertion on the TimeFrame columns since they all carry the same data types. 
''' 
time_columns = test_df.columns[15:] # A list of columns from the first timeframe (0500-0515) to the last one (0445-0500). 

for item in time_columns:
    test_df = test_df.withColumn(item,col(item).cast(FloatType())) 

test_df.printSchema() 


root
 |-- mode: string (nullable = true)
 |-- nlc: integer (nullable = true)
 |-- asc: string (nullable = true)
 |-- station: string (nullable = true)
 |-- coverage: string (nullable = true)
 |-- year: date (nullable = true)
 |-- day: string (nullable = true)
 |-- dir: string (nullable = true)
 |-- total: float (nullable = true)
 |-- early: float (nullable = true)
 |-- am_peak: float (nullable = true)
 |-- inter_peak: float (nullable = true)
 |-- pm_peak: float (nullable = true)
 |-- evening: float (nullable = true)
 |-- late: float (nullable = true)
 |-- 0500-0515: float (nullable = true)
 |-- 0515-0530: float (nullable = true)
 |-- 0530-0545: float (nullable = true)
 |-- 0545-0600: float (nullable = true)
 |-- 0600-0615: float (nullable = true)
 |-- 0615-0630: float (nullable = true)
 |-- 0630-0645: float (nullable = true)
 |-- 0645-0700: float (nullable = true)
 |-- 0700-0715: float (nullable = true)
 |-- 0715-0730: float (nullable = true)
 |-- 0730-0745: float (nullable = true)
 |-

## Consolidate missing values
We have to check if the data type conversion above was done correctly.
If the casting was not successful, a null value gets inserted into the dataframe. We can thus, since we know how many null values we had when we checked earlier, check for successful conversion by determining if that figure is still the same. Otherwise any excess from the resulting dataframe will result from the data type conversion. 


> ℹ️ **Objective** ℹ️
>
>Again, as the post-convertion count, we will count the number of invalid entries (nulls) in the DataFrame. Then compare both pre-conversion (from a previous cell above) and post-conversion results. 


In [44]:
NullCount = test_df.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull(), c 
                           )).alias(c)
                    for c in test_df.columns]) 

NullCount.show() 

+----+---+---+-------+--------+----+---+---+-----+-----+-------+----------+-------+-------+----+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----

Gladly, our casting has no issues. 
We won't attempt to correct any missing values at this point until later sections in the data cleaning process. 

## Generate parquet files
When writing in Spark, we typically use parquet format. This format allows parallel writing using Spark's optimisation while maintaining other useful things like metadata.

When writing, it is good to make sure that the data is sufficiently partitioned. 

Generally, data should be partitioned with one partition for every 200MB of data, but this also depends on the size of our cluster and executors. 


In [8]:
# Importing required libraries for partitioning and checking the size of the dataframe before partitioning. 
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer 
rdd = test_df.rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
obj = rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
size = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(obj)
size_MB = size/1000000
partitions = max(int(size_MB/200), 2) 
print(f'The dataframe is {size_MB} MB') 

The dataframe is 32.29856 MB


### Writing parquet files to the local directory
> ℹ️ **Objective** ℹ️
>
> We will use the **coalesce** function and the number of **partitions** derived above to write parquet files to our local directory 


In [9]:
test_df.coalesce(partitions).write.format("parquet").mode("append").save("../Data/data_ingestion.parquet") 