**Import libraries that will be used.** 

In [1]:
import pandas as pd
import numpy as np
import re
from scipy import stats
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType
import geopy.distance
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

**Create a SparkSession.**
* _**SparkSession is an entry point to program in Spark.**_

In [2]:
spark = SparkSession.builder.appName('BART').getOrCreate()

**Load the 2016 and 2017 BART ridership data.**

In [3]:
df2016 = spark.read.csv('date-hour-soo-dest-2016.csv', inferSchema = True, header = True)
df2017 = spark.read.csv('date-hour-soo-dest-2017.csv', inferSchema = True, header = True)

**Merge the two dataframes into one.**

In [4]:
merged_df = df2016.union(df2017)
merged_df.show(5)

+------+-----------+----------+-------------------+
|Origin|Destination|Throughput|           DateTime|
+------+-----------+----------+-------------------+
|  12TH|       12TH|         1|2016-01-01 00:00:00|
|  12TH|       16TH|         1|2016-01-01 00:00:00|
|  12TH|       24TH|         4|2016-01-01 00:00:00|
|  12TH|       ASHB|         4|2016-01-01 00:00:00|
|  12TH|       BALB|         2|2016-01-01 00:00:00|
+------+-----------+----------+-------------------+
only showing top 5 rows



**Confirm the dataframes have been merged correctly.**

In [5]:
df2016.count() + df2017.count() == merged_df.count()

True

**Load the station data and read first five rows to ensure the dataset has been loaded correctly.**

In [6]:
station = spark.read.csv('station_info.csv', header = True, escape = '\"')
station.show(5, truncate = False, vertical = True)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Abbreviation | 12TH                                                                                                                                                                                                                                                                                     
 Description  | 1245 Broadway, Oakland CA 94612<br />12th St. Oakland City Center Station is in the heart of Downtown Oakland, near historic Old Oakland and Oakland's Chinatown.                                                                                                                        
 Location     | -122.271450,37.803768,0                                                                   

**Extract the address from the "Description" column.**

In [7]:
def get_Address(text):
    address = text.split('<br')[0]
    return address

In [8]:
udf = F.udf(get_Address)
station = station.withColumn('Address', udf(station.Description))
station.select('Address').show(5, truncate = False)

+-------------------------------------------+
|Address                                    |
+-------------------------------------------+
|1245 Broadway, Oakland CA 94612            |
|2000 Mission Street, San Francisco CA 94110|
|1900 Broadway, Oakland CA 94612            |
|2800 Mission Street, San Francisco CA 94110|
|3100 Adeline Street, Berkeley CA 94703     |
+-------------------------------------------+
only showing top 5 rows



**Using regular expressions, remove the abbreviated text from the "Name" column for each row.**

In [9]:
def remove_text(text):
    clean_text = re.sub('\(\w*\)', '', text)
    final_text = clean_text.rstrip()
    return final_text

In [10]:
udf = F.udf(remove_text)
cleaned_df = station.withColumn('Name', udf(station.Name))
cleaned_df.select(F.col('Name')).show(5, truncate = False)

+----------------------------+
|Name                        |
+----------------------------+
|12th St. Oakland City Center|
|16th St. Mission            |
|19th St. Oakland            |
|24th St. Mission            |
|Ashby                       |
+----------------------------+
only showing top 5 rows



**Create latitude and longitude columns by extracting the values from the "Location" column.**

In [11]:
def get_lat(location):
    split_list = location.split(',')
    latitude = split_list[1]
    return latitude

def get_lng(location):
    split_list = location.split(',')
    longitude = split_list[0]
    return longitude

In [12]:
udf_lat = F.udf(get_lat)
split_df = cleaned_df.withColumn('Latitude', udf_lat(cleaned_df.Location).cast('float').alias('Latitude'))

udf_lng = F.udf(get_lng)
split_df = split_df.withColumn('Longitude', udf_lng(cleaned_df.Location).cast('float').alias('Longitude'))

split_df.select('Latitude', 'Longitude').show(5)

+---------+-----------+
| Latitude|  Longitude|
+---------+-----------+
| 37.80377| -122.27145|
| 37.76506| -122.41969|
| 37.80835|  -122.2686|
| 37.75247|-122.418144|
|37.852802|-122.270065|
+---------+-----------+
only showing top 5 rows



**Retrieve the important columns and read the first five rows.**

In [13]:
imp_df = split_df.select(['Abbreviation', 'Name', 'Address', 'Latitude', 'Longitude'])
imp_df.show(5, truncate = False)

+------------+----------------------------+-------------------------------------------+---------+-----------+
|Abbreviation|Name                        |Address                                    |Latitude |Longitude  |
+------------+----------------------------+-------------------------------------------+---------+-----------+
|12TH        |12th St. Oakland City Center|1245 Broadway, Oakland CA 94612            |37.80377 |-122.27145 |
|16TH        |16th St. Mission            |2000 Mission Street, San Francisco CA 94110|37.76506 |-122.41969 |
|19TH        |19th St. Oakland            |1900 Broadway, Oakland CA 94612            |37.80835 |-122.2686  |
|24TH        |24th St. Mission            |2800 Mission Street, San Francisco CA 94110|37.75247 |-122.418144|
|ASHB        |Ashby                       |3100 Adeline Street, Berkeley CA 94703     |37.852802|-122.270065|
+------------+----------------------------+-------------------------------------------+---------+-----------+
only showi

**The next objective is to join the 2016 and 2017 merged dataframe with the station dataframe using the column with the abbreviated station names from each dataframe.**

**Before the two dataframes are joined, the unique values from each column needs to be checked for inconsistencies.**

In [14]:
def compare_lists(list1,list2):
    print('Checking if names from first list is in second list..')
    for name in list1:
        if name not in list2:
            print('"{}" is not a common value.'.format(name))
            print('\n')
        else:
            pass
    
    print('Checking if names from second list is in first list..')
    for name in list2:
        if name not in list1:
            print('"{}" is not a common value.'.format(name))
            print('\n')
        else: 
            pass

In [15]:
station_names = imp_df.select('Abbreviation').distinct().rdd.flatMap(lambda x: x).collect()
origin_names = merged_df.select('Origin').distinct().rdd.flatMap(lambda x: x).collect()
dest_names = merged_df.select('Destination').distinct().rdd.flatMap(lambda x: x).collect()

In [16]:
compare_lists(station_names,origin_names)

Checking if names from first list is in second list..
"WARM" is not a common value.


Checking if names from second list is in first list..
"WSPR" is not a common value.




In [17]:
compare_lists(station_names,dest_names)

Checking if names from first list is in second list..
"WARM" is not a common value.


Checking if names from second list is in first list..
"WSPR" is not a common value.




In [18]:
if compare_lists(origin_names,dest_names) == None:
    print('All names are common in both columns.')

Checking if names from first list is in second list..
Checking if names from second list is in first list..
All names are common in both columns.


**The abbreviated station name "WARM" is missing in the "Origin" and "Destination" columns whereas "WSPR" is missing in the "Abbreviation" column.**

In [19]:
imp_df.where(F.col('Abbreviation') == 'WARM').show(truncate = False)

+------------+--------------------------+------------------------------------------+--------+-----------+
|Abbreviation|Name                      |Address                                   |Latitude|Longitude  |
+------------+--------------------------+------------------------------------------+--------+-----------+
|WARM        |Warm Springs/South Fremont|45193 Warm Springs Blvd, Fremont, CA 94539|37.50217|-121.939316|
+------------+--------------------------+------------------------------------------+--------+-----------+



In [20]:
merged_df.where((F.col('Origin') == 'WSPR') & (F.col('Destination') == 'WSPR')).show()

+------+-----------+----------+-------------------+
|Origin|Destination|Throughput|           DateTime|
+------+-----------+----------+-------------------+
|  WSPR|       WSPR|         1|2016-08-11 12:00:00|
|  WSPR|       WSPR|         1|2016-08-17 11:00:00|
|  WSPR|       WSPR|         1|2016-08-18 10:00:00|
|  WSPR|       WSPR|         2|2016-08-24 10:00:00|
|  WSPR|       WSPR|         1|2016-08-26 12:00:00|
|  WSPR|       WSPR|         1|2016-09-06 12:00:00|
|  WSPR|       WSPR|         1|2016-09-12 15:00:00|
|  WSPR|       WSPR|         4|2016-09-13 09:00:00|
|  WSPR|       WSPR|         1|2016-09-19 14:00:00|
|  WSPR|       WSPR|         1|2016-09-21 11:00:00|
|  WSPR|       WSPR|         4|2016-09-21 12:00:00|
|  WSPR|       WSPR|        12|2016-09-22 12:00:00|
|  WSPR|       WSPR|         1|2016-09-27 18:00:00|
|  WSPR|       WSPR|         1|2016-09-28 13:00:00|
|  WSPR|       WSPR|        13|2016-09-29 11:00:00|
|  WSPR|       WSPR|         5|2016-09-29 12:00:00|
|  WSPR|    

**Looking at the rows, it can be inferred "WSPR" is an abbreviation for Warm Springs. "WARM" and "WSPR" should actually have the same abbreviated name.**

**Change WARM to WSPR.**

In [21]:
def changed_df(df,colmn,word_to_replace,new_word):
    dataframe = df.withColumn(colmn, F.when(F.col(colmn) == word_to_replace, new_word).otherwise(F.col(colmn)))
    return dataframe

In [22]:
imp_df = changed_df(imp_df,'Abbreviation','WARM', 'WSPR')
imp_df.where(F.col('Abbreviation') == 'WSPR').show(truncate = False)

+------------+--------------------------+------------------------------------------+--------+-----------+
|Abbreviation|Name                      |Address                                   |Latitude|Longitude  |
+------------+--------------------------+------------------------------------------+--------+-----------+
|WSPR        |Warm Springs/South Fremont|45193 Warm Springs Blvd, Fremont, CA 94539|37.50217|-121.939316|
+------------+--------------------------+------------------------------------------+--------+-----------+



**Join the merged and station dataframe, creating columns to give more information about the origin station.**  
**Change the column names to better represent the columns.**  
**Drop the "Abbreviation" column since the column is a duplicate.**


In [23]:
cond1 = [merged_df.Origin == imp_df.Abbreviation]
join_df1 = merged_df.join(imp_df, on = cond1, how = 'left')
join_df1 = join_df1.withColumnRenamed('Name', 'Origin Name')
join_df1 = join_df1.withColumnRenamed('Address', 'Origin Address')
join_df1 = join_df1.withColumnRenamed('Latitude', 'Ori_lat')
join_df1 = join_df1.withColumnRenamed('Longitude', 'Ori_lng')
join_df1 = join_df1.drop(F.col('Abbreviation'))

**Use the same approach to add more columns for the destination station.**

In [24]:
cond2 = [merged_df.Destination == imp_df.Abbreviation]
join_df2 = join_df1.join(imp_df, on = cond2, how = 'left')
join_df2 = join_df2.withColumnRenamed('Name', 'Destination Name')
join_df2 = join_df2.withColumnRenamed('Address', 'Destination Address')
join_df2 = join_df2.withColumnRenamed('Latitude', 'Dest_lat')
join_df2 = join_df2.withColumnRenamed('Longitude', 'Dest_lng')
join_df2 = join_df2.drop(F.col('Abbreviation'))

**Save the column names into a list in the preferred order the new dataframe will be viewed.**  
**The ordered dataframe will be saved into the variable "spark_df."**

In [25]:
col_order_list = ['DateTime',
                  'Origin',
                  'Origin Name',
                  'Origin Address',
                  'Ori_lat',
                  'Ori_lng',
                  'Destination',
                  'Destination Name',
                  'Destination Address',
                  'Dest_lat',
                  'Dest_lng',
                  'Throughput']

spark_df = join_df2.select(col_order_list)
spark_df.show(2, truncate = False, vertical = True)

-RECORD 0----------------------------------------------------------
 DateTime            | 2016-01-01 00:00:00                         
 Origin              | 12TH                                        
 Origin Name         | 12th St. Oakland City Center                
 Origin Address      | 1245 Broadway, Oakland CA 94612             
 Ori_lat             | 37.80377                                    
 Ori_lng             | -122.27145                                  
 Destination         | 12TH                                        
 Destination Name    | 12th St. Oakland City Center                
 Destination Address | 1245 Broadway, Oakland CA 94612             
 Dest_lat            | 37.80377                                    
 Dest_lng            | -122.27145                                  
 Throughput          | 1                                           
-RECORD 1----------------------------------------------------------
 DateTime            | 2016-01-01 00:00:00      

**Check for null values.**

In [26]:
def nulls(df):
    null_df = df.select([F.count(F.when(F.isnan(col), col)).alias(col) for col in df.columns])
    return null_df.show(vertical = True)

In [27]:
nulls(spark_df)

-RECORD 0------------------
 DateTime            | 0   
 Origin              | 0   
 Origin Name         | 0   
 Origin Address      | 0   
 Ori_lat             | 0   
 Ori_lng             | 0   
 Destination         | 0   
 Destination Name    | 0   
 Destination Address | 0   
 Dest_lat            | 0   
 Dest_lng            | 0   
 Throughput          | 0   



_**Imputing null values won't be necessary for this project.**_

**Check to see if the data type for each column is correct.**

In [28]:
spark_df.printSchema()

root
 |-- DateTime: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Origin Name: string (nullable = true)
 |-- Origin Address: string (nullable = true)
 |-- Ori_lat: float (nullable = true)
 |-- Ori_lng: float (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Destination Name: string (nullable = true)
 |-- Destination Address: string (nullable = true)
 |-- Dest_lat: float (nullable = true)
 |-- Dest_lng: float (nullable = true)
 |-- Throughput: integer (nullable = true)



_**Note: The "DateTime" column is incorrectly labeled as a string.**_

**Change the data type of "DateTime" column to "timestamp."  
This step will be critical for gaining insight and for modeling.**

In [29]:
spark_df = spark_df.withColumn('DateTime', F.to_timestamp(F.col('DateTime'),'yyyy-MM-dd HH:mm:ss').alias('DateTime'))
spark_df.printSchema()

root
 |-- DateTime: timestamp (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Origin Name: string (nullable = true)
 |-- Origin Address: string (nullable = true)
 |-- Ori_lat: float (nullable = true)
 |-- Ori_lng: float (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Destination Name: string (nullable = true)
 |-- Destination Address: string (nullable = true)
 |-- Dest_lat: float (nullable = true)
 |-- Dest_lng: float (nullable = true)
 |-- Throughput: integer (nullable = true)



**Check to see if there were minutes and seconds data collected.**

In [30]:
dt_minute = spark_df.select(F.minute('DateTime').alias('minute'))
max_minute = dt_minute.agg({'minute': 'max'}).collect()[0]
print(max_minute)

Row(max(minute)=0)


In [31]:
dt_second = spark_df.select(F.second('DateTime').alias('second'))
max_second = dt_second.agg({'second': 'max'}).collect()[0]
print(max_second)

Row(max(second)=0)


**Since the max amount for minute and second is 0, I will only be focusing on hour for the time.**

**Create month, day, and hour columns.**

In [32]:
month = F.date_format('DateTime','MMM').alias('month')
day = F.date_format('DateTime', 'E').alias('day')
hour = F.hour('DateTime').alias('hour')

spark_df = spark_df.select('DateTime',
                           'Origin',
                           'Origin Name',
                           'Origin Address',
                           'Ori_lat',
                           'Ori_lng',
                           'Destination',
                           'Destination Name',
                           'Destination Address',
                           'Dest_lat',
                           'Dest_lng',
                            month,
                            day,
                            hour,
                           'Throughput')

spark_df.show(2, truncate = False, vertical = True)

-RECORD 0----------------------------------------------------------
 DateTime            | 2016-01-01 00:00:00                         
 Origin              | 12TH                                        
 Origin Name         | 12th St. Oakland City Center                
 Origin Address      | 1245 Broadway, Oakland CA 94612             
 Ori_lat             | 37.80377                                    
 Ori_lng             | -122.27145                                  
 Destination         | 12TH                                        
 Destination Name    | 12th St. Oakland City Center                
 Destination Address | 1245 Broadway, Oakland CA 94612             
 Dest_lat            | 37.80377                                    
 Dest_lng            | -122.27145                                  
 month               | Jan                                         
 day                 | Fri                                         
 hour                | 0                        

**The next step of the project is to analyze the data to answer important questions.**

**Question 1: Which BART station is the busiest?**

**Approach:**
1. _**Group spark the dataframe by the unique values in the "Origin" column and assign the grouped dataframe into a variable. The full station name and the total number of ridership for each origin station will be retained. Change the column names to preferred names.**_
2. _**Use the same approach to create a separate dataframe for "Destination."**_
3. _**Merge the two dataframes vertically.**_
4. _**Group the merged dataframe by the unique values in the "Abbreviation" column. Similar to the previous step, the full station name and the total number of ridership for each station will be retained.**_  
5. _**Sort the newly created dataframe in descending order and output the first row to find out which station is the busiest.**_

In [33]:
origin = spark_df.groupBy('Origin').agg({'Origin Name': 'first', 'Throughput': 'sum'})
origin = origin.toDF(*['Abbreviation','Name','Num_of_passengers'])

dest = spark_df.groupBy('Destination').agg({'Destination Name': 'first', 'Throughput': 'sum'})
dest = dest.toDF(*['Abbreviation','Name','Num_of_passengers'])

con_df = origin.union(dest)
con_df.show(5, truncate = False)

+------------+----------------------------+-----------------+
|Abbreviation|Name                        |Num_of_passengers|
+------------+----------------------------+-----------------+
|NCON        |North Concord/Martinez      |1101650          |
|POWL        |Powell St.                  |14138322         |
|CIVC        |Civic Center/UN Plaza       |10145110         |
|12TH        |12th St. Oakland City Center|5322811          |
|16TH        |16th St. Mission            |5365000          |
+------------+----------------------------+-----------------+
only showing top 5 rows



In [34]:
con_df = con_df.groupBy('Abbreviation').agg({'Name': 'first', 'Num_of_passengers': 'sum'})
con_df = con_df.toDF(*['Abbreviation','Name','Tot_passengers'])
con_df.sort(F.col('Tot_passengers').desc()).show(1)

+------------+-----------+--------------+
|Abbreviation|       Name|Tot_passengers|
+------------+-----------+--------------+
|        EMBR|Embarcadero|      34076644|
+------------+-----------+--------------+
only showing top 1 row



**Answer: Busiest station is Embarcadero.**

**Question 2: What is the least popular BART route?**

**Approach:**
1. _**Group the spark dataframe by each unique origin and destination combination. The full origin and destination station names will be retained. The total number of ridership will be calculated for each unique combination.**_
2. _**Change the column names to preferred names and change the order columns are viewed in the dataframe.**_
3. _**Sort the grouped dataframe in ascending order and output the first row to find out which route is the least popular.**_

In [35]:
grouped_df = spark_df.groupBy(['Origin','Destination']).agg({'Origin Name': 'first','Destination Name': 'first','Throughput': 'sum'})
grouped_df = grouped_df.toDF(*['Origin', 'Destination', 'Origin Name', 'Destination Name', 'Tot_passengers'])
reordered_df = grouped_df.select(['Origin', 'Origin Name', 'Destination', 'Destination Name', 'Tot_passengers'])
least_pop = reordered_df.orderBy('Tot_passengers', ascending = True).show(1, truncate = False)

+------+--------------------------+-----------+----------------+--------------+
|Origin|Origin Name               |Destination|Destination Name|Tot_passengers|
+------+--------------------------+-----------+----------------+--------------+
|WSPR  |Warm Springs/South Fremont|SBRN       |San Bruno       |40            |
+------+--------------------------+-----------+----------------+--------------+
only showing top 1 row



**Answer: The least popular route is from Warm Springs/South Fremont to San Bruno.**

**Question 3: When is the best time to go to SF from Berkeley if you want to find a seat?**

**Approach:**
1. _**Pull the distinct origin addresses located in Berkeley.**_
2. _**Pull the distinct destination addresses located in San Francisco.**_
3. _**Save the abbreviated origin names in a list and the abbreviated destination names in another list.**_
4. _**Retrieve only the rows from the spark dataframe if the "Origin" column involves Berkeley and if the "Destination" column involves "San Francisco."**_
5. _**Group the filtered dataframe by the unique combination of origin name, destination name, and hour of the ridership. Add the total number of ridership for each of these combination and change the column names to the desired names.**_
6. _**Group the grouped dataframe just by the unique combinations of origin and destinations and retrieve the minimum number of passenges for each combination.**_
7. _**Join the first grouped dataframe and the dataframe containing the minimum values. This will give insight to the optimal time to travel from Berkeley to San Francisco for each combination.**_

**Note: Steps 1 and 2 should give an idea of how many different combinations there are.**

In [36]:
Berk = spark_df.select('Origin', 'Origin Name', 'Origin Address').where(F.col('Origin Address').contains('Berkeley'))
Berk.distinct().show(Berk.distinct().count(), truncate = False)

+------+-----------------+-----------------------------------------+
|Origin|Origin Name      |Origin Address                           |
+------+-----------------+-----------------------------------------+
|ASHB  |Ashby            |3100 Adeline Street, Berkeley CA 94703   |
|DBRK  |Downtown Berkeley|2160 Shattuck Avenue, Berkeley CA 94704  |
|NBRK  |North Berkeley   |1750 Sacramento Street, Berkeley CA 94702|
+------+-----------------+-----------------------------------------+



In [37]:
SF = spark_df.select('Destination', 'Destination Name', 'Destination Address').where(F.col('Destination Address').contains('San Francisco'))
SF.distinct().show(SF.distinct().count(), truncate = False)

+-----------+---------------------------+---------------------------------------------------------------------+
|Destination|Destination Name           |Destination Address                                                  |
+-----------+---------------------------+---------------------------------------------------------------------+
|16TH       |16th St. Mission           |2000 Mission Street, San Francisco CA 94110                          |
|GLEN       |Glen Park                  |2901 Diamond Street, San Francisco CA 94131                          |
|MONT       |Montgomery St.             |598 Market Street, San Francisco CA 94104                            |
|24TH       |24th St. Mission           |2800 Mission Street, San Francisco CA 94110                          |
|SFIA       |San Francisco Int'l Airport|International Terminal, Level 3, San Francisco Int'l Airport CA 94128|
|CIVC       |Civic Center/UN Plaza      |1150 Market Street, San Francisco CA 94102                     

In [38]:
berk_list = [row['Origin'] for row in Berk.distinct().collect()]
SF_list = [row['Destination'] for row in SF.distinct().collect()]

In [39]:
filtered_df = spark_df.filter((spark_df['Origin'].isin(berk_list)) & (spark_df['Destination'].isin(SF_list)))
grouped_df = filtered_df.groupBy(['Origin','Destination','hour']).agg({'Throughput': 'sum'})
grouped_df = grouped_df.withColumnRenamed('sum(Throughput)', 'Passengers')
grouped_df.show(5)

+------+-----------+----+----------+
|Origin|Destination|hour|Passengers|
+------+-----------+----+----------+
|  NBRK|       CIVC|   5|      1406|
|  ASHB|       GLEN|  14|       596|
|  DBRK|       MONT|  20|      7887|
|  DBRK|       CIVC|  22|      8316|
|  NBRK|       24TH|  22|      1178|
+------+-----------+----+----------+
only showing top 5 rows



In [40]:
min_df = grouped_df.groupBy(['Origin','Destination']).agg({'Passengers': 'min'})
min_df = min_df.withColumnRenamed('min(Passengers)','Passengers')
min_df.show(5)

+------+-----------+----------+
|Origin|Destination|Passengers|
+------+-----------+----------+
|  DBRK|       POWL|         3|
|  ASHB|       24TH|         5|
|  NBRK|       GLEN|         1|
|  ASHB|       EMBR|         2|
|  DBRK|       CIVC|         8|
+------+-----------+----------+
only showing top 5 rows



In [41]:
best = min_df.join(grouped_df, on = ['Origin','Destination','Passengers'], how = 'left')
best.orderBy('Origin').show(best.count())

+------+-----------+----------+----+
|Origin|Destination|Passengers|hour|
+------+-----------+----------+----+
|  ASHB|       GLEN|         1|   4|
|  ASHB|       EMBR|         2|   3|
|  ASHB|       24TH|         5|   3|
|  ASHB|       16TH|         1|   3|
|  ASHB|       BALB|        51|   1|
|  ASHB|       POWL|         1|   2|
|  ASHB|       SSAN|        14|   1|
|  ASHB|       CIVC|         4|   2|
|  ASHB|       SFIA|         1|   4|
|  ASHB|       SFIA|         1|   2|
|  ASHB|       MONT|         3|   3|
|  DBRK|       CIVC|         8|   2|
|  DBRK|       POWL|         3|   3|
|  DBRK|       BALB|         3|   4|
|  DBRK|       SFIA|         1|   2|
|  DBRK|       MONT|         9|   2|
|  DBRK|       EMBR|         2|   3|
|  DBRK|       GLEN|         2|   2|
|  DBRK|       SFIA|         1|   3|
|  DBRK|       24TH|         4|   3|
|  DBRK|       SSAN|         2|   3|
|  DBRK|       16TH|         1|   3|
|  NBRK|       16TH|         2|   3|
|  NBRK|       24TH|         2|   3|
|

**Answer: The dataframe above represents the best time to travel to each station in San Francisco City from various stations in Berkeley.**

**Question 4: Which day of the week is the busiest?**

**Approach:**
1. _**Group the spark dataframe by the day and sum up the ridership that fall under the same day. Change the column names as desired.**_
2. _**Order the grouped dataframe in descending order.**_
3. _**Output the first row to find out which day is the busiest.**_


In [42]:
grouped_day = spark_df.groupBy('day').agg({'Throughput': 'sum'})
grouped_day = grouped_day.withColumnRenamed('sum(Throughput)','Num_of_passengers')
busy_day = grouped_day.orderBy(F.col('Num_of_passengers').desc())
busy_day.collect()[0]

Row(day='Wed', Num_of_passengers=30677189)

**Answer: Busiest day is Wednesday.**

**Question 5: How many people take the BART late at night?**

**Approach:**
1. _**Retrieve the rows that fall between the start of a late night and the end of a late night.**_
2. _**Add the number of ridership from the retrived rows.**_

In [43]:
#function to count how many people use BART in a given time period
def people_count(df):
    try:
        start_hr = input('Please enter the starting hour: ')
        end_hr = input('Please enter the ending hour: ')
        
        int_start = int(start_hr)
        int_end = int(end_hr)
    
    except:
        print('Please enter an integer number.')
    
    filtered_df = df.where((F.col('hour') >= int_start) | (F.col('hour') <= int_end))
    result = filtered_df.agg({'Throughput': 'sum'}).collect()[0]
    
    print('\n')
    print('Calculating...')
    
    return result['sum(Throughput)']

In [44]:
people_count(spark_df)

Please enter the starting hour: 23
Please enter the ending hour: 5


Calculating...


6939024

**Answer: Assuming late night is from 11:00PM to 5:00AM, the number of people who use BART during this timeframe is 6,939,024.**

**Compute the straight line distance between every station.**

**Calculate distance in miles using the coordinates.**

In [45]:
def get_dist(ori_lat, ori_lng, dest_lat, dest_lng):
    ori_coord = (ori_lat, ori_lng)
    dest_coord = (dest_lat, dest_lng)
    
    return round(geopy.distance.distance(ori_coord, dest_coord).miles, 2)

In [46]:
udf = F.udf(get_dist, DoubleType())
spark_df = spark_df.withColumn('Distance', udf('Ori_lat','Ori_lng','Dest_lat','Dest_lng'))

In [47]:
dist_df = spark_df.groupBy(['Origin','Origin Address','Destination','Destination Address']).agg({'Distance':'first'})
dist_df = dist_df.withColumnRenamed('first(Distance)','Distance')
dist_df.show(5, truncate = False, vertical = True)

-RECORD 0--------------------------------------------------------------
 Origin              | HAYW                                            
 Origin Address      | 699 'B' Street, Hayward CA 94541                
 Destination         | COLM                                            
 Destination Address | 365 D Street, Colma CA 94014                    
 Distance            | 20.81                                           
-RECORD 1--------------------------------------------------------------
 Origin              | PITT                                            
 Origin Address      | 1700 West Leland Road, Pittsburg CA 94565       
 Destination         | BALB                                            
 Destination Address | 401 Geneva Avenue, San Francisco CA 94112       
 Distance            | 34.28                                           
-RECORD 2--------------------------------------------------------------
 Origin              | SSAN                                     

**The next step of the project is to build a model that can predict the number of people commuting to work by Bart between any 2 stations.**

**Before data can be used to build a model, numerical data must be checked for normality.**

In [48]:
def transformation(df, colmn):
    
    before_mean = df.select(F.mean(F.col(colmn)).alias('before_mean')).collect()[0]['before_mean']
    before_median = df.approxQuantile(colmn,[0.5], 0.25)[0]
    before_skew = df.select(F.skewness(F.col(colmn)).alias('before_skew')).collect()[0]['before_skew']
    before_kurtosis = df.select(F.kurtosis(F.col(colmn)).alias('before_kurtosis')).collect()[0]['before_kurtosis']
    
    tf_df = df.select(F.log(F.col(colmn) + 1).alias(colmn))
    
    after_mean = tf_df.select(F.mean(F.col(colmn)).alias('after_mean')).collect()[0]['after_mean']
    after_median = tf_df.approxQuantile(colmn,[0.5], 0.25)[0]
    after_skew = tf_df.select(F.skewness(F.col(colmn)).alias('after_skew')).collect()[0]['after_skew']
    after_kurtosis = tf_df.select(F.kurtosis(F.col(colmn)).alias('after_kurtosis')).collect()[0]['after_kurtosis']
    
    stats_df = spark.createDataFrame(
        [
            ('before', before_mean, before_median, before_skew, before_kurtosis),
            ('after', after_mean, after_median, after_skew, after_kurtosis)
        ],
        ['index','mean', 'median', 'skew', 'kurtosis']
    )
    return stats_df.show()

In [49]:
transformation(spark_df, 'hour')

+------+------------------+------------------+-------------------+-------------------+
| index|              mean|            median|               skew|           kurtosis|
+------+------------------+------------------+-------------------+-------------------+
|before|13.632914338481893|              13.0|-0.2864554716759491|-0.6325473419843961|
| after|2.5615035320970487|2.6390573296152584| -2.381418878422813|  7.430904673149209|
+------+------------------+------------------+-------------------+-------------------+



In [50]:
transformation(spark_df, 'Distance')

+------+------------------+-----------------+-------------------+--------------------+
| index|              mean|           median|               skew|            kurtosis|
+------+------------------+-----------------+-------------------+--------------------+
|before|13.560962073081413|            11.79|0.45672145109303297|-0.48874095132369444|
| after|  2.46183935686871|2.548663615590751|-1.1543890523218658|  1.3531670255355017|
+------+------------------+-----------------+-------------------+--------------------+



**Note: Since the skewness of the hour and distance column is close to 0, a feature transformation will not be necessary.**

**Next, the hour and distance columns must be normalized.**  
**This is so one of the columns won't have a bigger impact on the prediction than the other column.**

In [51]:
cols_to_scale = ['hour','Distance']

assemblers = [VectorAssembler(inputCols = [col], outputCol = col + '_vec') for col in cols_to_scale]
scalers = [MinMaxScaler(inputCol = col + '_vec', outputCol = col + '_scaled') for col in cols_to_scale]
pipeline = Pipeline(stages = assemblers + scalers)
scalerModel = pipeline.fit(spark_df)
scaledDF = scalerModel.transform(spark_df)

**Finally, dummies are created for the catergorical features so they can be used for modeling.**

In [52]:
ori_list = scaledDF.select('Origin').distinct().rdd.flatMap(lambda x: x).collect()
origin_dum = [F.when(F.col('Origin') == ori,1).otherwise(0).alias('Ori_'+ ori) for ori in ori_list]

dest_list = scaledDF.select('Destination').distinct().rdd.flatMap(lambda x: x).collect()
dest_dum = [F.when(F.col('Destination') == dest, 1).otherwise(0).alias('Dest_' + dest) for dest in dest_list]

month_list = scaledDF.select('month').distinct().rdd.flatMap(lambda x: x).collect()
month_dum = [F.when(F.col('month') == m, 1).otherwise(0).alias('month_' + m) for m in month_list]

day_list = scaledDF.select('day').distinct().rdd.flatMap(lambda x: x).collect()
day_dum = [F.when(F.col('day') == d, 1).otherwise(0).alias('day_' + d) for d in day_list]

In [53]:
BART = scaledDF.select(*origin_dum,*dest_dum,*month_dum,*day_dum,'hour_scaled','Distance_scaled','Throughput')

**For Apache Spark, the independent features must be converted into one vector column.**

In [54]:
ind_col_names = BART.columns[0:-1]
featureAssembler = VectorAssembler(inputCols = ind_col_names, outputCol = 'Independent Features')

In [55]:
finalDF = featureAssembler.transform(BART)
finalDF = finalDF.select('Independent Features','Throughput')
finalDF.show(5)

+--------------------+----------+
|Independent Features|Throughput|
+--------------------+----------+
|(113,[3,49,101,10...|         1|
|(113,[3,50,101,10...|         1|
|(113,[3,91,101,10...|         4|
|(113,[3,66,101,10...|         4|
|(113,[3,90,101,10...|         2|
+--------------------+----------+
only showing top 5 rows



**The data is split into training and testing dataframes.**

In [56]:
splits = finalDF.randomSplit([0.7,0.3])
trainingDF = splits[0]
testingDF = splits[1]

**Three algorithms are compared and the model with the least error between the predicted values and the actual values is chosen to be the best model.**

In [57]:
def model_evaluation(features, label, train, test):
    r2_list = ['r2']
    rmse_list = ['RMSE']
    
    lr = LinearRegression(featuresCol = features, labelCol = label)
    rf = RandomForestRegressor(featuresCol = features, labelCol = label)
    gb = GBTRegressor(featuresCol = features, labelCol = label)
    
    list_of_algorithms = [lr, rf, gb]
    
    for ind in range(len(list_of_algorithms)):
        model = list_of_algorithms[ind].fit(train)
        pred = model.transform(test)
        
        rmse_evaluator = RegressionEvaluator(labelCol = label, predictionCol = 'prediction', metricName = 'rmse')
        rmse = round(rmse_evaluator.evaluate(pred, params = {}), 3)
        rmse_list.append(rmse)
        
        r2_evaluator = RegressionEvaluator(labelCol = label, predictionCol = 'prediction', metricName = 'r2')
        r2 = round(r2_evaluator.evaluate(pred, params = {}), 3)
        r2_list.append(r2)
        
    metrics_df = spark.createDataFrame(
        [
            (r2_list),
            (rmse_list)
        ],
        ['Metric','Linear Regression','Random Forest','Gradient Boosting']
    )
    
    return metrics_df.show()

In [58]:
model_evaluation('Independent Features', 'Throughput', trainingDF, testingDF)

+------+-----------------+-------------+-----------------+
|Metric|Linear Regression|Random Forest|Gradient Boosting|
+------+-----------------+-------------+-----------------+
|    r2|            0.166|        0.257|            0.578|
|  RMSE|           30.551|       28.825|           21.721|
+------+-----------------+-------------+-----------------+

