# Data Wrangling Notebook 1

In [2]:
# Importing relevant libraries

import boto3
import sys
import os
import pandas as pd
import csv

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import split, slice, count, when, expr, isnan, isnull
from pyspark.sql.functions import date_format, to_timestamp, concat, unix_timestamp, substring, lit
from pyspark.sql.functions import col, month, quarter, dayofweek, year
from pyspark.sql import functions as f
from pyspark.sql.types import StringType,BooleanType,DateType,IntegerType
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.window import Window
import configparser
import findspark
import lxml
from datetime import timedelta
from pandas.tseries.offsets import BDay
import itertools
import warnings
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline

# Reading From S3 Bucket using boto3

In [None]:
############# This code block is used for pulling data from S3 using boto3 library
############# Note: Not necessary for this project

#getting relevant information for the S3 bucket

# AWS_S3_BUCKET='w210-bucket'
# AWS_S3_REGION='us-east-1'
# AWS_PROFILE_NAME='default'
# session = boto3.Session(profile_name=AWS_PROFILE_NAME)
# s3 = session.resource('s3')
# s3_client = session.client('s3',region_name=AWS_S3_REGION)
# my_bucket = s3.Bucket(AWS_S3_BUCKET)

# #printing all the files in ridership directory 
# for objects in my_bucket.objects.filter(Prefix="ridership/"):
#     print(objects.key)

# #printing all the files in weather directory 
# for objects in my_bucket.objects.filter(Prefix="weather/"):
#     print(objects.key)

# #reading one file only as pandas df
# obj = s3_client.get_object(Bucket= AWS_S3_BUCKET, Key= "ridership/date-hour-soo-dest-2011.csv") 
# # get object and file (key) from bucket
# initial_df = pd.read_csv(obj['Body'], header=None) 
# initial_df

# Reading From S3 Bucket using PySpark

In [4]:
### Starting Pyspark Session

# spark = SparkSession.builder\
#                     .config('spark.master','local[*]')\
#                     .config('spark.add.name','S3app')\
#                     .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4')\
#                     .getOrCreate()

spark = SparkSession.builder\
                    .config('spark.master','local[*]')\
                    .config('spark.add.name','S3app')\
                    .config("spark.driver.memory", "8g") \
                    .config('spark.jars.packages','org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hadoop:hadoop-common:3.3.4')\
                    .getOrCreate()

In [5]:
spark

In [6]:
### Configuring Pyspark to read data from S3 Bucket. 

findspark.init()
config = configparser.ConfigParser()
# AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
# AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

aws_profile = 'default'
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get(aws_profile, "aws_access_key_id") 
access_key = config.get(aws_profile, "aws_secret_access_key")
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)
# spark.conf.set("spark.sql.broadcastTimeout", 3000)

# spark.conf.set("spark.driver.maxResultSize",8)




sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.committer.name","magic")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.awsAccessKeyId", access_id)
hadoop_conf.set("fs.s3a.awsSecretAccessKey", access_key)
hadoop_conf.set('spark.sql.files.maxPartitionBytes','134217728')

In [108]:
# sc.getConf().getAll()

# Ridership Data

### In this section of Notebook, we will read all the ridership data and do data wrangling

In [109]:
#reading all the ridership data
ridership_2011 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2011.csv")
ridership_2012 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2012.csv")
ridership_2013 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2013.csv")
ridership_2014 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2014.csv")
ridership_2015 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2015.csv")
ridership_2016 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2016.csv")
ridership_2017 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2017.csv")
ridership_2018 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2018.csv")
ridership_2019 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2019.csv")
ridership_2020 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2020.csv")
ridership_2021 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2021.csv")
ridership_2022 = spark.read.option('header',False).csv("s3a://w210-bucket/ridership/date-hour-soo-dest-2022.csv")

In [110]:
# Concatenate all the ridership data from 2011 to 2022
df_ridership_all = ridership_2011.union(ridership_2012).union(ridership_2013).union(ridership_2014)\
                             .union(ridership_2015).union(ridership_2016).union(ridership_2017)\
                             .union(ridership_2018).union(ridership_2019).union(ridership_2020)\
                             .union(ridership_2021).union(ridership_2022)

#Rename columns and creating a new column called origin-destination pair 
df_ridership_all =df_ridership_all.withColumnRenamed('_c0',"date") \
        .withColumnRenamed('_c1','hour') \
        .withColumnRenamed('_c2','origin') \
        .withColumnRenamed('_c3','destination') \
        .withColumnRenamed('_c4','ridership_number') \
        .withColumn('origin-des', concat(col('origin'), lit('-'), col('destination')))

# change data schema
df_ridership_all = df_ridership_all.withColumn("date",col("date").cast(DateType())) \
    .withColumn("hour",col("hour").cast(IntegerType())) \
    .withColumn("ridership_number",col("ridership_number").cast(IntegerType()))

# print the schema and the first 5 rows
df_ridership_all.printSchema()
df_ridership_all.show(10)
# df_ridership_all.count()

root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- ridership_number: integer (nullable = true)
 |-- origin-des: string (nullable = true)

+----------+----+------+-----------+----------------+----------+
|      date|hour|origin|destination|ridership_number|origin-des|
+----------+----+------+-----------+----------------+----------+
|2011-01-01|   0|  12TH|       12TH|               1| 12TH-12TH|
|2011-01-01|   0|  12TH|       16TH|               1| 12TH-16TH|
|2011-01-01|   0|  12TH|       24TH|               3| 12TH-24TH|
|2011-01-01|   0|  12TH|       ASHB|               2| 12TH-ASHB|
|2011-01-01|   0|  12TH|       BAYF|               5| 12TH-BAYF|
|2011-01-01|   0|  12TH|       CIVC|               3| 12TH-CIVC|
|2011-01-01|   0|  12TH|       COLS|               1| 12TH-COLS|
|2011-01-01|   0|  12TH|       CONC|               2| 12TH-CONC|
|2011-01-01|   0|  12TH|       DALY|

In [111]:
#### read the station data to get station abbreviations, Longitude, and Latitude

df_station = spark.read.option("header",True) \
           .csv("s3a://w210-bucket/ridership/station_bart.csv")

#rename columns
df_station = df_station.withColumnRenamed('Abbreviation','abbreviation') \
                        .withColumnRenamed('Location','location')\
                        .withColumnRenamed('Name','station_name').drop('description').drop('name')

#split longitude and latitude into separate columns
df_station = df_station.withColumn("longitude", split(col("location"), ",").getItem(0)) \
                         .withColumn("latitude", split(col("location"), ",").getItem(1)).drop('location')

# print schema and the first 5 rows
df_station.printSchema()
df_station.show(10)
# df_station.count()

root
 |-- abbreviation: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)

+------------+--------------------+-----------+---------+
|abbreviation|        station_name|  longitude| latitude|
+------------+--------------------+-----------+---------+
|        12TH|12th St. Oakland ...|-122.271450|37.803768|
|        16TH|16th St. Mission ...|-122.419694|37.765062|
|        19TH|19th St. Oakland ...|-122.268602|37.808350|
|        24TH|24th St. Mission ...|-122.418143|37.752470|
|        ASHB|        Ashby (ASHB)|-122.270062|37.852803|
|        BALB|  Balboa Park (BALB)|-122.447506|37.721585|
|        BAYF|     Bay Fair (BAYF)|-122.126514|37.696924|
|        CAST|Castro Valley (CAST)|-122.075602|37.690746|
|        CIVC|Civic Center/UN P...|-122.414123|37.779732|
|        COLS|Coliseum/Oakland ...|-122.196869|37.753661|
+------------+--------------------+-----------+---------+
only showing 

In [112]:
#checking null values in df_station data
missing_counts = df_station.select([count(when(col(c).isNull(), c)).alias(c) for c in df_station.columns]).toPandas()
missing_counts

Unnamed: 0,abbreviation,station_name,longitude,latitude
0,0,0,0,0


In [19]:
# looking at unique bart stations in df_station and ridership data
print(f"Unique number of bart stations in df_station: {df_station.select('abbreviation').distinct().count()}")
print(f"Unique number of bart stations in df_ridership_all: {df_ridership_all.select('origin').distinct().count()}")

Unique number of bart stations in df_station: 46




Unique number of bart stations in df_ridership_all: 50


                                                                                

In [113]:
# finding out which bart stations don't exists
stations_df_stations = df_station.select('abbreviation').distinct().toPandas()
stations_df_ridership_all = df_ridership_all.select('origin').distinct().toPandas()


                                                                                

In [114]:
stations_df_stations_list = stations_df_stations['abbreviation'].tolist()
stations_df_ridership_all_list = stations_df_ridership_all['origin'].tolist()

missing_bart_stations = [x for x in stations_df_ridership_all_list if x not in  stations_df_stations_list]
missing_bart_stations

['ANTC', 'PCTR', 'BERY', 'MLPT']

In [115]:
# adding the missing station data to the df_station data
columns = ['abbreviation', 'station_name', 'longitude', 'latitude']
data = [\
        ('ANTC','Antioch Station', '-121.780320', '37.996012'),\
        ('PCTR','Pittsburg Center Station', '-121.888538', '38.018200'),\
        ('BERY','Berryessa Bart Station', '-121.874689', '37.368572'),\
        ('MLPT','Milpitas Bart Station', '-121.890621', '37.409878')]

missing_stations = spark.createDataFrame(data, columns)

df_station_full = df_station.union(missing_stations)

In [116]:
### join ridership data to station data based on origin

df_station_ridership = df_ridership_all.join(df_station_full,df_ridership_all.origin ==  df_station_full.abbreviation,"left").drop('abbreviation')

#extract month, quarter, and day_of_week from the date column
df_station_ridership = df_station_ridership.withColumn('year', year(df_station_ridership.date)) \
                                            .withColumn('month', month(df_station_ridership.date)) \
                                            .withColumn('quarter', quarter(df_station_ridership.date)) \
                                            .withColumn('day_of_week', dayofweek(df_station_ridership.date)) 

#set timeparser policy to legacy 
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

#create timestamp
df_station_ridership = df_station_ridership.withColumn("ts",f.to_timestamp(f.concat("date","hour"),"yyyy-MM-ddHH"))

#change data schema and print first 5 rows
df_station_ridership = df_station_ridership.withColumn("date",col("date").cast(DateType())) \
    .withColumn("hour",col("hour").cast('string')) \
    .withColumn("year",col("year").cast('string')) \
    .withColumn("month",col("month").cast('string')) \
    .withColumn("quarter",col("quarter").cast('string')) \
    .withColumn("day_of_week",col("day_of_week").cast('string'))
    # .withColumn('ts_long', col('ts').cast('long'))

df_station_ridership.printSchema()
df_station_ridership.show(10)
# df_station_ridership.count()
# df_station_ridership.select('ts_long').show()

root
 |-- date: date (nullable = true)
 |-- hour: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- ridership_number: integer (nullable = true)
 |-- origin-des: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- quarter: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- ts: timestamp (nullable = true)





+----------+----+------+-----------+----------------+----------+--------------------+-----------+---------+----+-----+-------+-----------+-------------------+
|      date|hour|origin|destination|ridership_number|origin-des|        station_name|  longitude| latitude|year|month|quarter|day_of_week|                 ts|
+----------+----+------+-----------+----------------+----------+--------------------+-----------+---------+----+-----+-------+-----------+-------------------+
|2011-01-01|   0|  12TH|       12TH|               1| 12TH-12TH|12th St. Oakland ...|-122.271450|37.803768|2011|    1|      1|          7|2011-01-01 00:00:00|
|2011-01-01|   0|  12TH|       16TH|               1| 12TH-16TH|12th St. Oakland ...|-122.271450|37.803768|2011|    1|      1|          7|2011-01-01 00:00:00|
|2011-01-01|   0|  12TH|       24TH|               3| 12TH-24TH|12th St. Oakland ...|-122.271450|37.803768|2011|    1|      1|          7|2011-01-01 00:00:00|
|2011-01-01|   0|  12TH|       ASHB|          

                                                                                

In [117]:
# checking for nulls in df_station_ridership data
missing_counts = df_station_ridership.select([count(when(col(c).isNull(), c)).alias(c) for c in df_station_ridership.columns]).toPandas()
missing_counts

                                                                                

Unnamed: 0,date,hour,origin,destination,ridership_number,origin-des,station_name,longitude,latitude,year,month,quarter,day_of_week,ts
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


# Weather Data

### In this section, we will read all the weather data and do some data wrangling

In [69]:
### Custome Functions

def weather_read(df):
    """
    Input = file path in S3 Bucket
    Ouput = PySpark DataFrame
    """
    path = df
    df = spark.read.option('header',True).csv(path)
    return df

def weather_cleanup(df):
    """
    Input: Weather station PySpark DataFrame
    Operation: selects columns, gets rid of minutes and zeroes out the hours,
               creates timestamp, and drops duplicate rows for timestamps
               
    Output: Pyspark DataFrame
    """
    df = df.withColumn('date1',concat(substring('DATE',1,10), lit(' '), substring('DATE', 12,2), lit(':00:00'))) \
           .withColumn('ts', f.to_timestamp('date1',"yyyy-MM-dd HH:mm:ss")).drop('date1').dropDuplicates(['ts'])\
           .select('ts','STATION','DATE','SOURCE','LATITUDE','LONGITUDE','ELEVATION','NAME',
                   'REPORT_TYPE','CALL_SIGN','QUALITY_CONTROL','WND','CIG','VIS','TMP','DEW','SLP','AA1','AT1','AU1')
           # .withColumn('ts_long', col('ts').cast('long'))
    return df
 

In [70]:
# San Franciso station
sf_2011 = weather_read("s3a://w210-bucket/weather/weather_sf_2011.csv")
sf_2011 = weather_cleanup(sf_2011)
sf_2012 = weather_read("s3a://w210-bucket/weather/weather_sf_2012.csv")
sf_2012 = weather_cleanup(sf_2012)
sf_2013 = weather_read("s3a://w210-bucket/weather/weather_sf_2013.csv")
sf_2013 = weather_cleanup(sf_2013)
sf_2014 = weather_read("s3a://w210-bucket/weather/weather_sf_2014.csv")
sf_2014 = weather_cleanup(sf_2014)
sf_2015 = weather_read("s3a://w210-bucket/weather/weather_sf_2015.csv")
sf_2015 = weather_cleanup(sf_2015)
sf_2016 = weather_read("s3a://w210-bucket/weather/weather_sf_2016.csv")
sf_2016 = weather_cleanup(sf_2016)
sf_2017 = weather_read("s3a://w210-bucket/weather/weather_sf_2017.csv")
sf_2017 = weather_cleanup(sf_2017)
sf_2018 = weather_read("s3a://w210-bucket/weather/weather_sf_2018.csv")
sf_2018 = weather_cleanup(sf_2018)
sf_2019 = weather_read("s3a://w210-bucket/weather/weather_sf_2019.csv")
sf_2019 = weather_cleanup(sf_2019)
sf_2020 = weather_read("s3a://w210-bucket/weather/weather_sf_2020.csv")
sf_2020 = weather_cleanup(sf_2020)
sf_2021 = weather_read("s3a://w210-bucket/weather/weather_sf_2021.csv")
sf_2021 = weather_cleanup(sf_2021)
sf_2022 = weather_read("s3a://w210-bucket/weather/weather_sf_2022.csv")
sf_2022 = weather_cleanup(sf_2022)

# Concord Station
concord_2011 = weather_read("s3a://w210-bucket/weather/weather_concord_2011.csv")
concord_2011 = weather_cleanup(concord_2011)
concord_2012 = weather_read("s3a://w210-bucket/weather/weather_concord_2012.csv")
concord_2012 = weather_cleanup(concord_2012)
concord_2013 = weather_read("s3a://w210-bucket/weather/weather_concord_2013.csv")
concord_2013 = weather_cleanup(concord_2013)
concord_2014 = weather_read("s3a://w210-bucket/weather/weather_concord_2014.csv")
concord_2014 = weather_cleanup(concord_2014)
concord_2015 = weather_read("s3a://w210-bucket/weather/weather_concord_2015.csv")
concord_2015 = weather_cleanup(concord_2015)
concord_2016 = weather_read("s3a://w210-bucket/weather/weather_concord_2016.csv")
concord_2016 = weather_cleanup(concord_2016)
concord_2017 = weather_read("s3a://w210-bucket/weather/weather_concord_2017.csv")
concord_2017 = weather_cleanup(concord_2017)
concord_2018 = weather_read("s3a://w210-bucket/weather/weather_concord_2018.csv")
concord_2018 = weather_cleanup(concord_2018)
concord_2019 = weather_read("s3a://w210-bucket/weather/weather_concord_2019.csv")
concord_2019 = weather_cleanup(concord_2019)
concord_2020 = weather_read("s3a://w210-bucket/weather/weather_concord_2020.csv")
concord_2020 = weather_cleanup(concord_2020)
concord_2021 = weather_read("s3a://w210-bucket/weather/weather_concord_2021.csv")
concord_2021 = weather_cleanup(concord_2021)
concord_2022 = weather_read("s3a://w210-bucket/weather/weather_concord_2022.csv")
concord_2022 = weather_cleanup(concord_2022)


# livermore Station
livermore_2011 = weather_read("s3a://w210-bucket/weather/weather_livermore_2011.csv")
livermore_2011 = weather_cleanup(livermore_2011)
livermore_2012 = weather_read("s3a://w210-bucket/weather/weather_livermore_2012.csv")
livermore_2012 = weather_cleanup(livermore_2012)
livermore_2013 = weather_read("s3a://w210-bucket/weather/weather_livermore_2013.csv")
livermore_2013 = weather_cleanup(livermore_2013)
livermore_2014 = weather_read("s3a://w210-bucket/weather/weather_livermore_2014.csv")
livermore_2014 = weather_cleanup(livermore_2014)
livermore_2015 = weather_read("s3a://w210-bucket/weather/weather_livermore_2015.csv")
livermore_2015 = weather_cleanup(livermore_2015)
livermore_2016 = weather_read("s3a://w210-bucket/weather/weather_livermore_2016.csv")
livermore_2016 = weather_cleanup(livermore_2016)
livermore_2017 = weather_read("s3a://w210-bucket/weather/weather_livermore_2017.csv")
livermore_2017 = weather_cleanup(livermore_2017)
livermore_2018 = weather_read("s3a://w210-bucket/weather/weather_livermore_2018.csv")
livermore_2018 = weather_cleanup(livermore_2018)
livermore_2019 = weather_read("s3a://w210-bucket/weather/weather_livermore_2019.csv")
livermore_2019 = weather_cleanup(livermore_2019)
livermore_2020 = weather_read("s3a://w210-bucket/weather/weather_livermore_2020.csv")
livermore_2020 = weather_cleanup(livermore_2020)
livermore_2021 = weather_read("s3a://w210-bucket/weather/weather_livermore_2021.csv")
livermore_2021 = weather_cleanup(livermore_2021)
livermore_2022 = weather_read("s3a://w210-bucket/weather/weather_livermore_2022.csv")
livermore_2022 = weather_cleanup(livermore_2022)


# San Jose Station
sanjose_2011 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2011.csv")
sanjose_2011 = weather_cleanup(sanjose_2011)
sanjose_2012 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2012.csv")
sanjose_2012 = weather_cleanup(sanjose_2012)
sanjose_2013 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2013.csv")
sanjose_2013 = weather_cleanup(sanjose_2013)
sanjose_2014 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2014.csv")
sanjose_2014 = weather_cleanup(sanjose_2014)
sanjose_2015 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2015.csv")
sanjose_2015 = weather_cleanup(sanjose_2015)
sanjose_2016 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2016.csv")
sanjose_2016 = weather_cleanup(sanjose_2016)
sanjose_2017 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2017.csv")
sanjose_2017 = weather_cleanup(sanjose_2017)
sanjose_2018 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2018.csv")
sanjose_2018 = weather_cleanup(sanjose_2018)
sanjose_2019 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2019.csv")
sanjose_2019 = weather_cleanup(sanjose_2019)
sanjose_2020 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2020.csv")
sanjose_2020 = weather_cleanup(sanjose_2020)
sanjose_2021 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2021.csv")
sanjose_2021 = weather_cleanup(sanjose_2021)
sanjose_2022 = weather_read("s3a://w210-bucket/weather/weather_sanjose_2022.csv")
sanjose_2022 = weather_cleanup(sanjose_2022)

In [71]:
### concatenate weather station data per year

# San Francisco
sf = sf_2011.union(sf_2012).union(sf_2013).union(sf_2014).union(sf_2015)\
            .union(sf_2016).union(sf_2017).union(sf_2018).union(sf_2019)\
            .union(sf_2020).union(sf_2021).union(sf_2022)

# Concord
concord = concord_2011.union(concord_2012).union(concord_2013).union(concord_2014).union(concord_2015)\
            .union(concord_2016).union(concord_2017).union(concord_2018).union(concord_2019)\
            .union(concord_2020).union(concord_2021).union(concord_2022)
        
# Livermore
livemore = livermore_2011.union(livermore_2012).union(livermore_2013).union(livermore_2014).union(livermore_2015)\
            .union(livermore_2016).union(livermore_2017).union(livermore_2018).union(livermore_2019)\
            .union(livermore_2020).union(livermore_2021).union(livermore_2022)
        
# San Jose
sanjose = sanjose_2011.union(sanjose_2012).union(sanjose_2013).union(sanjose_2014).union(sanjose_2015)\
            .union(sanjose_2016).union(sanjose_2017).union(sanjose_2018).union(sanjose_2019)\
            .union(sanjose_2020).union(sanjose_2021).union(sanjose_2022)
        
### concatenate all weather stations 
weather = sf.union(concord).union(livemore).union(sanjose)

In [72]:
# print(f'San Francisco Count: {sf.count()}')
# print(f'Concord Count: {concord.count()}')
# print(f'Livermore Count: {livemore.count()}')
# print(f'San Jose Count: {sanjose.count()}')
# print(f'Total Count: {weather.count()}')

In [73]:
weather.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- SOURCE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- ELEVATION: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- REPORT_TYPE: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- QUALITY_CONTROL: string (nullable = true)
 |-- WND: string (nullable = true)
 |-- CIG: string (nullable = true)
 |-- VIS: string (nullable = true)
 |-- TMP: string (nullable = true)
 |-- DEW: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- AA1: string (nullable = true)
 |-- AT1: string (nullable = true)
 |-- AU1: string (nullable = true)



In [74]:
# extracting needed columns from existing columns 
weather2 = weather.withColumn('wind_speed',f.split(weather['WND'], ',').getItem(3).cast('integer')) \
                  .withColumn('air_temp',f.split(weather['TMP'], ',').getItem(0).cast('integer')) \
                  .withColumn('precipitation',f.split(weather['AA1'], ',').getItem(0).cast('integer')) \
                  .withColumn('wth_type',f.split(weather['AU1'], ',').getItem(2).cast('string'))

#putting null for values of 999, 99 and so on based on the documentation
# dropping unnessary columns 

weather2 = weather2.withColumn('air_temp', f.when(weather2['air_temp'] == 9999, None).otherwise(weather2['air_temp'])) \
                   .withColumn('wind_speed', f.when(weather2['wind_speed'] == 9999, None).otherwise(weather2['wind_speed']))\
                   .withColumn('precipitation', f.when(weather2['precipitation'] == 99, None).otherwise(weather2['precipitation'])) \
                   .drop('REPORT_TYPE','CALL_SIGN','QUALITY_CONTROL','ELEVATION','SOURCE','WND',
                         'CIG','VIS','TMP','DEW','SLP','AA1','AT1','AU1','DATE')

# converting PySpark dataframe to Pandas Dataframe
weather3 = weather2.toPandas()

# forward filling for missing values 
for col in ['wind_speed','air_temp','precipitation']:
    weather3[col] = weather3[col].ffill()

weather3['precipitation'] = weather3['precipitation'].bfill()
    
# mapping weather types to values 
d = {'00': 'no_precipitation', '01': 'Drizzle', '02': 'Rain',
     '03': 'Snow', '08': 'hail', '09': None}

weather3['wth_type'] = weather3['wth_type'].map(d).ffill()
weather3['wth_type'] = weather3['wth_type'].bfill()

#converting back to PysSpark Dataframe
weather4 = spark.createDataFrame(weather3)

[Stage 224:>  (0 + 2) / 2][Stage 225:>  (0 + 2) / 2][Stage 226:>  (0 + 2) / 2]2]

23/03/13 04:19:50 WARN DAGScheduler: Broadcasting large task binary with size 1345.5 KiB


                                                                                

In [80]:
weather4.show(5)
weather4.printSchema()
weather4.count()

23/03/13 03:33:26 WARN TaskSetManager: Stage 435 contains a task of very large size (6272 KiB). The maximum recommended task size is 1000 KiB.
+-------------------+-----------+--------+---------+--------------------+----------+--------+-------------+--------+
|                 ts|    STATION|LATITUDE|LONGITUDE|                NAME|wind_speed|air_temp|precipitation|wth_type|
+-------------------+-----------+--------+---------+--------------------+----------+--------+-------------+--------+
|2011-01-01 00:00:00|72494023234| 37.6197|-122.3647|SAN FRANCISCO INT...|       0.0|    89.0|          1.0|    Rain|
|2011-01-01 01:00:00|72494023234| 37.6197|-122.3647|SAN FRANCISCO INT...|       0.0|    89.0|          1.0|    Rain|
|2011-01-01 02:00:00|72494023234| 37.6197|-122.3647|SAN FRANCISCO INT...|       0.0|    89.0|          1.0|    Rain|
|2011-01-01 03:00:00|72494023234| 37.6197|-122.3647|SAN FRANCISCO INT...|      15.0|    89.0|          1.0|    Rain|
|2011-01-01 04:00:00|72494023234| 37.6

420615

# Joining Weather and unique station Datasets

We will first join the weather dataset with station data set to get the distance between each bart station to its closet weather station. Afterwards, we will join this data set to the ridership dataset. 

In [118]:
# renaming columns 
weather4 = weather4.withColumnRenamed("LATITUDE","latitude_wthr").withColumnRenamed("LONGITUDE","longitude_wthr")\
                   .withColumnRenamed("STATION","station").withColumnRenamed('NAME','wthr_station_name')\
                   .withColumn('wthr_station_name', f.when(weather4['wthr_station_name'] == 'SAN JOSE, CA US', 'SAN JOSE INTERNATIONAL AIRPORT, CA US').otherwise(weather4['wthr_station_name']))

In [119]:
weather4.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- station: string (nullable = true)
 |-- latitude_wthr: string (nullable = true)
 |-- longitude_wthr: string (nullable = true)
 |-- wthr_station_name: string (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- wth_type: string (nullable = true)



In [120]:
# station_weather_joined to the station data
df_station_full1 = df_station_full.withColumn("index", monotonically_increasing_id())

#parititioning by index 
w=Window().partitionBy("index")

#calculating the Euclidean distance between Bart stations and weather stations
#filtering by min distance from each bart station to weather stations 
station_weather_joined = df_station_full1.join(weather4.withColumnRenamed("LATITUDE","latitude_wthr")\
            .withColumnRenamed("LONGITUDE","longitude_wthr")) \
            .withColumn("distance",f.sqrt(f.pow(f.col("latitude")-f.col("latitude_wthr"),2)+\
                                   f.pow(f.col("longitude")-f.col("longitude_wthr"),2)))\
            .withColumn("min_distance", f.min("distance").over(w))\
            .filter('distance=min_distance') \
            .drop('index','distance','min_distance','latitude','station_name','longitude')

                               

In [121]:
# writing to parquet
# station_weather_joined.write.parquet('s3a://w210-bucket/data_wrangling/station_weather_cleanedUp.parquet',mode='overwrite')

23/03/13 04:44:25 WARN TaskSetManager: Stage 359 contains a task of very large size (6273 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [122]:
# reading the station_weather_joined data
station_weather_cleaned = spark.read.parquet("s3a://w210-bucket/data_wrangling/station_weather_cleanedUp.parquet")

#getting rid of duplicate airport name
station_weather_cleaned = station_weather_cleaned.filter(col('wthr_station_name') != 'SAN JOSE, CA US')
#finding the closing the weather station and bart stations
station_weather_closest = station_weather_cleaned.select('abbreviation','station').dropDuplicates()

#joining station_weather_closest to the weather dataset

final_weather = weather4.join(station_weather_closest,['station'],'left')

In [123]:
# writing to parquet
# final_weather.write.parquet('s3a://w210-bucket/data_wrangling/final_weather.parquet',mode='overwrite')

23/03/13 04:44:53 WARN TaskSetManager: Stage 366 contains a task of very large size (6272 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [124]:
# reading the station_weather_joined data
final_weather = spark.read.parquet("s3a://w210-bucket/data_wrangling/final_weather.parquet")

In [125]:
# checking for nulls in station_weather_cleaned data
missing_counts = final_weather.select([count(when(col(c).isNull(), c)).alias(c) for c in final_weather.columns]).toPandas()
missing_counts

                                                                                

Unnamed: 0,station,ts,latitude_wthr,longitude_wthr,wthr_station_name,wind_speed,air_temp,precipitation,wth_type,abbreviation
0,0,0,0,0,0,0,0,0,0,0


In [126]:
final_weather = final_weather.dropDuplicates()

In [284]:
# # dropping duplicates from df_station_ridership and saving to parquet
# df_station_ridership = df_station_ridership.dropDuplicates()
# df_station_ridership.write.parquet('s3a://w210-bucket/data_wrangling/df_station_ridership.parquet',mode='overwrite')

                                                                                

In [127]:
final_weather.select('wthr_station_name').distinct().show()

+--------------------+
|   wthr_station_name|
+--------------------+
|CONCORD BUCHANAN ...|
|LIVERMORE MUNICIP...|
|SAN JOSE INTERNAT...|
|SAN FRANCISCO INT...|
+--------------------+



In [128]:
# reading the df_station_ridership data
df_station_ridership = spark.read.parquet("s3a://w210-bucket/data_wrangling/df_station_ridership.parquet")

In [129]:
final_weather.printSchema()

root
 |-- station: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- latitude_wthr: string (nullable = true)
 |-- longitude_wthr: string (nullable = true)
 |-- wthr_station_name: string (nullable = true)
 |-- wind_speed: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- wth_type: string (nullable = true)
 |-- abbreviation: string (nullable = true)



In [130]:
### joining station_weather dataset with the ridership dataset
# # renaming the column to make it easier to join
final_weather = final_weather.withColumnRenamed('abbreviation','origin')

# df_joined1 = df_station_ridership.join(station_weather_cleaned1,(df_station_ridership['ts'] == station_weather_cleaned1['ts1']) & \
#                                        (df_station_ridership.origin == station_weather_cleaned1.abbreviation),'left').drop('ts1','abbreviation')

df_joined1 = df_station_ridership.join(final_weather,['origin','ts'],'left')

In [131]:
#writing to parquet
# df_joined1.write.parquet('s3a://w210-bucket/data_wrangling/df_joined1.parquet',mode='overwrite')

                                                                                

In [138]:
# reading df_joined1 and counting missing values 
df_joined1 = spark.read.parquet("s3a://w210-bucket/data_wrangling/df_joined1.parquet")

# checking for nulls in df_joined1 data
missing_counts = df_joined1.select([count(when(col(c).isNull(), c)).alias(c) for c in df_joined1.columns]).toPandas()
missing_counts

                                                                                

Unnamed: 0,origin,ts,date,hour,destination,ridership_number,origin-des,station_name,longitude,latitude,...,quarter,day_of_week,station,latitude_wthr,longitude_wthr,wthr_station_name,wind_speed,air_temp,precipitation,wth_type
0,0,0,0,0,0,0,0,0,0,0,...,0,0,261669,261669,261669,261669,261669,261669,261669,261669


In [9]:
# dropping null values. About 0.24% of the data
df_joined1 = spark.read.parquet("s3a://w210-bucket/data_wrangling/df_joined1.parquet")
df_joined1 = df_joined1.na.drop()

### joining weather based on destination

In [10]:
# renaming weather columns for the origin
df_joined1_renamed = df_joined1.withColumnRenamed('station', 'station_origin')\
                       .withColumnRenamed('wthr_station_name', 'wthr_station_origin')\
                       .withColumnRenamed('latitude_wthr', 'latitude_wthr_origin')\
                       .withColumnRenamed('longitude_wthr', 'longitude_wthr_origin')\
                       .withColumnRenamed('wind_speed', 'wind_speed_origin')\
                       .withColumnRenamed('air_temp', 'air_temp_origin')\
                       .withColumnRenamed('precipitation', 'precipitation_origin')\
                       .withColumnRenamed('wth_type', 'wth_type_origin')


In [11]:
df_joined1_renamed.printSchema()

root
 |-- origin: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- ridership_number: integer (nullable = true)
 |-- origin-des: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- quarter: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- station_origin: string (nullable = true)
 |-- latitude_wthr_origin: string (nullable = true)
 |-- longitude_wthr_origin: string (nullable = true)
 |-- wthr_station_origin: string (nullable = true)
 |-- wind_speed_origin: double (nullable = true)
 |-- air_temp_origin: double (nullable = true)
 |-- precipitation_origin: double (nullable = true)
 |-- wth_type_origin: string (nullable = true)



In [12]:
### joining station_weather dataset with the ridership dataset
# # renaming the column to make it easier to join

# reading the final_weather data
final_weather = spark.read.parquet("s3a://w210-bucket/data_wrangling/final_weather.parquet")

#dropping duplicates 
final_weather = final_weather.dropDuplicates()

final_weather = final_weather.withColumnRenamed('abbreviation','destination')

df_joined2 = df_joined1_renamed.join(final_weather,['destination','ts'],'left')

In [13]:
# saving to S3 as a parquet file 
# df_joined2.write.parquet('s3a://w210-bucket/data_wrangling/df_joined2.parquet',mode='overwrite')

23/03/13 05:13:29 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [14]:
# reading the first joined data
df_joined2 = spark.read.parquet("s3a://w210-bucket/data_wrangling/df_joined2.parquet")

In [15]:
df_joined2.printSchema()

root
 |-- destination: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- origin: string (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: string (nullable = true)
 |-- ridership_number: integer (nullable = true)
 |-- origin-des: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- quarter: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- station_origin: string (nullable = true)
 |-- latitude_wthr_origin: string (nullable = true)
 |-- longitude_wthr_origin: string (nullable = true)
 |-- wthr_station_origin: string (nullable = true)
 |-- wind_speed_origin: double (nullable = true)
 |-- air_temp_origin: double (nullable = true)
 |-- precipitation_origin: double (nullable = true)
 |-- wth_type_origin: string (nullable = true)
 |-- station: string (nullable = true)


In [16]:
df_joined2.count()

109442517

In [17]:
# checking for nulls in df_joined1 data
missing_counts = df_joined2.select([count(when(col(c).isNull(), c)).alias(c) for c in df_joined2.columns]).toPandas()
missing_counts

                                                                                

Unnamed: 0,destination,ts,origin,date,hour,ridership_number,origin-des,station_name,longitude,latitude,...,precipitation_origin,wth_type_origin,station,latitude_wthr,longitude_wthr,wthr_station_name,wind_speed,air_temp,precipitation,wth_type
0,0,0,0,0,0,0,0,0,0,0,...,0,0,226936,226936,226936,226936,226936,226936,226936,226936


In [18]:
df_joined2_1 = df_joined2.na.drop()

In [19]:
# checking for nulls in df_joined1 data
missing_counts = df_joined2_1.select([count(when(col(c).isNull(), c)).alias(c) for c in df_joined2_1.columns]).toPandas()
missing_counts

                                                                                

Unnamed: 0,destination,ts,origin,date,hour,ridership_number,origin-des,station_name,longitude,latitude,...,precipitation_origin,wth_type_origin,station,latitude_wthr,longitude_wthr,wthr_station_name,wind_speed,air_temp,precipitation,wth_type
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [20]:
df_joined2_1.count()

                                                                                

109215581

In [21]:
#getting rid of nulls and resaving the file
# df_joined2_1.write.parquet('s3a://w210-bucket/data_wrangling/df_joined2_1.parquet',mode='overwrite')

                                                                                