# MapReduce using Pyspark

Dataset:  NYC Taxi and Limousine Commission (TLC)

Big Data Framework: Spark using Pyspark

Link to access the data: S. Mohanasundaram. “Newyork Yellow Taxi Trip Data.”, 2022. URL https://www.kaggle.com/datasets/microize/newyork-yellow-taxi-trip-data-2020-2019?select=yellow_tripdata_2020-06.csv.

In [1]:
!pip install pyspark
!pip install plotly
!pip install networkx
!pip install pyvis

from pyvis.network import Network
import networkx as nx
import pandas as pd
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name
import glob
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, date_format, concat
from pyspark.sql.functions import year, month, dayofmonth, hour, dayofweek
import plotly.subplots as sp
import plotly.graph_objects as go


^C


## Get Partitioned Data

In [None]:
spark = SparkSession.builder.appName("ReadAllFiles").getOrCreate()

In [None]:
#This code gets all the files
def get_data():

  # Initialize a Spark session
  # Specify the folder path, files can be downloaded and saved to google drive
  folder_path = "path_to_files"

  # Get a list of all CSV file paths in the folder
  file_paths = glob.glob(folder_path + "/*.csv")

  # Create a list of DataFrames by reading the CSV files
  dataframes = [spark.read.csv(file_path, header=True, inferSchema=True) for file_path in file_paths]

  # Concatenate the DataFrames into one DataFrame
  combined_df = dataframes[0]
  for df in dataframes[1:]:
      combined_df = combined_df.union(df)

  return combined_df


combined_df = get_data()
combined_df.show(3)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|              1|          1.5|         1|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.65|         0.0|                  0.3

In [5]:
#demonstration using one file
combined_df = spark.read.csv("path_to_file", header=True, inferSchema=True)
combined_df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2019-01-01 00:46:40|  2019-01-01 00:53:20|              1|          1.5|         1|                 N|         151|         239|           1|        7.0|  0.5|    0.5|      1.65|         0.0|                  0.3

In [6]:
# Get date columns
from pyspark.sql.functions import split, col, substring, substring_index, lit

def get_date_cols(df):

  df = df.withColumn("tpep_pickup_date", date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))

  df = df.withColumn("tpep_dropoff_date", date_format(col("tpep_dropoff_datetime"), "yyyy-MM-dd"))

  df= df.withColumn('year',year(df['tpep_pickup_date'])).withColumn('month', month(df['tpep_pickup_date']))

  df = df.withColumn('day_of_week_name', date_format('tpep_pickup_date', 'EEEE'))

  df = df.withColumn("year-week", concat(col("year"),  lit("-"), col("day_of_week_name")))

  df = df.withColumn("year-month", concat(col("year"),  lit("-"), col("month")))

  df = df.withColumn("hour", substring_index(df["tpep_pickup_datetime"], " ", 2).substr(12, 2))

  df = df.withColumn("year-hour", concat(col("year"),  lit("-"), col("hour")))

  return df

combined_df_1 = get_date_cols(combined_df)
combined_df_1.show(3)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------+-----------------+----+-----+----------------+------------+----------+----+---------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|tpep_pickup_date|tpep_dropoff_date|year|month|day_of_week_name|   year-week|year-month|hour|year-hour|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------+---

## Lookup Tables

In [7]:
# Get lookup table for locattions

def get_lookup_tables():

  #Locations Lookup
  excel_file_path = "taxi+_zone_lookup.csv"
  locations = pd.read_csv(excel_file_path)
  locations = locations[["LocationID", "Borough", 'Zone']]
  locations_spark = spark.createDataFrame(locations)

  #pickup locations
  pu_locations_spark = locations_spark.withColumnRenamed("LocationID", "PULocationID")
  pu_locations_spark = pu_locations_spark.withColumnRenamed("Zone", "PU_Zone")

  #Drop-off locations
  do_locations_spark = locations_spark.withColumnRenamed("LocationID", "DOLocationID")
  do_locations_spark = do_locations_spark.withColumnRenamed("Zone", "DO_Zone")

  # RateCode Lookups
  ratecode_path = "RateCode.csv"
  ratecode = pd.read_csv(ratecode_path, delimiter=';')
  ratecode_spark = spark.createDataFrame(ratecode)

  #Vendor Lookups
  vendor_path = "Vendors.csv"
  vendor = pd.read_csv(vendor_path, delimiter=';')
  vendor_spark = spark.createDataFrame(vendor)

  # Payments Lookups
  payments_path = "Payments.csv"
  payments = pd.read_csv(payments_path, delimiter=';')
  payments_spark = spark.createDataFrame(payments)

  return pu_locations_spark, do_locations_spark, ratecode_spark, vendor_spark, payments_spark



In [8]:
# merge df with locations

def join_lookup_tables(df, pu_locations_spark, do_locations_spark, ratecode_spark, vendor_spark, payments_spark):

  left_joined_df = df.join(pu_locations_spark, on = "PULocationID" , how="left")
  left_joined_df = left_joined_df.withColumnRenamed("Borough", "Pick_Up_Town")

  left_joined_df = left_joined_df.join(do_locations_spark, on = "DOLocationID" , how="left")
  left_joined_df = left_joined_df.withColumnRenamed("Borough", "Drop_Off_Town")

  left_joined_df = left_joined_df.join(ratecode_spark, on = "RateCodeID" , how="left")

  left_joined_df = left_joined_df.join(vendor_spark, on = "VendorID" , how="left")

  left_joined_df = left_joined_df.join(payments_spark, on = "payment_type" , how="left")

  left_joined_df = left_joined_df.withColumn("source_destination_town", concat(col("Pick_Up_Town"),  lit("-"), col("Drop_Off_Town")))


  # get only the necessary columns

  columns_to_drop = ["payment_type","VendorID", "RatecodeID", "DOLocationID", "PULocationID" , "tpep_pickup_datetime", "tpep_dropoff_datetime" ]

  left_joined_df = left_joined_df.drop(*columns_to_drop)

  return left_joined_df

In [9]:
pu_locations_spark, do_locations_spark, ratecode_spark, vendor_spark, payments_spark = get_lookup_tables()
processed_df =  join_lookup_tables(combined_df_1, pu_locations_spark, do_locations_spark, ratecode_spark, vendor_spark, payments_spark)


In [10]:
processed_df.show(3)

+---------------+-------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------+-----------------+----+-----+----------------+-----------+----------+----+---------+------------+-------------+-------------+--------------------+-------------------+--------------------+------------------+-----------------------+
|passenger_count|trip_distance|store_and_fwd_flag|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|tpep_pickup_date|tpep_dropoff_date|year|month|day_of_week_name|  year-week|year-month|hour|year-hour|Pick_Up_Town|      PU_Zone|Drop_Off_Town|             DO_Zone|RateCodeDescription|   VendorDescription|PaymentDescription|source_destination_town|
+---------------+-------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------+-----------------+-

### Convert to RDD

In [11]:
Pick_Up_Town = processed_df.select('Pick_Up_Town')
Drop_Off_Town = processed_df.select('Drop_Off_Town')
RateCodeDescription =  processed_df.select('RateCodeDescription')
VendorDescription =  processed_df.select('VendorDescription')
PaymentDescription = processed_df.select('PaymentDescription')
store_and_fwd_flag = processed_df.select('store_and_fwd_flag')

Source_Destination = processed_df.select('source_destination_town')
year_week = processed_df.select('year-week')
year_month = processed_df.select('year-month')
year_hour = processed_df.select('year-hour')

## Mapper

In [36]:
RateCodeDescription_mapper = RateCodeDescription.rdd.map(lambda x: (x,1))
VendorDescription_mapper = VendorDescription.rdd.map(lambda x: (x,1))
PaymentDescription_mapper = PaymentDescription.rdd.map(lambda x: (x,1))
store_and_fwd_flag_mapper = store_and_fwd_flag.rdd.map(lambda x: (x,1))
Source_Destination_mapper = Source_Destination.rdd.map(lambda x: (x,1))

In [14]:
year_week_mapper = year_week.rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y : x+y)
year_month_mapper = year_month.rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y : x+y)
year_hour_mapper = year_hour.rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y : x+y)

## Reducer

In [15]:
RateCodeDescription_reducer = RateCodeDescription_mapper.reduceByKey(lambda x,y : x+y)

VendorDescription_reducer = VendorDescription_mapper.reduceByKey(lambda x,y : x+y)

PaymentDescription_reducer = PaymentDescription_mapper.reduceByKey(lambda x,y : x+y)

Source_Destination_reducer = Source_Destination_mapper.reduceByKey(lambda x,y : x+y)

store_and_fwd_flag_reducer = store_and_fwd_flag_mapper.reduceByKey(lambda x,y : x+y)

In [16]:
year_week_reducer = year_week_mapper.reduceByKey(lambda x,y : x+y)
year_month_reducer = year_month_mapper.reduceByKey(lambda x,y : x+y)
year_hour_reducer = year_hour_mapper.reduceByKey(lambda x,y : x+y)

In [37]:
RateCodeDescription_reducer.collect()

VendorDescription_reducer.collect()

PaymentDescription_reducer.collect()

Source_Destination_reducer.collect()

store_and_fwd_flag_reducer.collect()

[(Row(store_and_fwd_flag='N'), 7630142), (Row(store_and_fwd_flag='Y'), 37650)]

In [18]:
year_week_reducer.collect()
year_month_reducer.collect()
year_hour_reducer.collect()

[(Row(year-hour='2019-05'), 74697),
 (Row(year-hour='2019-11'), 373930),
 (Row(year-hour='2019-12'), 399329),
 (Row(year-hour='2019-13'), 402444),
 (Row(year-hour='2019-15'), 450406),
 (Row(year-hour='2019-19'), 474337),
 (Row(year-hour='2019-22'), 368428),
 (Row(year-hour='2019-04'), 61122),
 (Row(year-hour='2018-21'), 28),
 (Row(year-hour='2018-20'), 28),
 (Row(year-hour='2018-19'), 34),
 (Row(year-hour='2009-02'), 5),
 (Row(year-hour='2009-08'), 1),
 (Row(year-hour='2009-04'), 1),
 (Row(year-hour='2019-03'), 78039),
 (Row(year-hour='2019-09'), 364353),
 (Row(year-hour='2019-14'), 431096),
 (Row(year-hour='2019-16'), 418572),
 (Row(year-hour='2019-20'), 422418),
 (Row(year-hour='2019-21'), 409206),
 (Row(year-hour='2019-00'), 207619),
 (Row(year-hour='2019-02'), 109379),
 (Row(year-hour='2018-23'), 107),
 (Row(year-hour='2018-13'), 4),
 (Row(year-hour='2018-14'), 23),
 (Row(year-hour='2018-16'), 36),
 (Row(year-hour='2008-23'), 22),
 (Row(year-hour='2003-00'), 2),
 (Row(year-hour='20

In [19]:
#Convert to a dataframe
rate = RateCodeDescription_reducer.toDF(["RateCodeDescription",'Number'])

vendor = VendorDescription_reducer.toDF(["VendorDescription",'Number'])

payment = PaymentDescription_reducer.toDF(["PaymentDescription",'Number'])

src_dest = Source_Destination_reducer.toDF(["source_destination_town",'Number'])

flag = store_and_fwd_flag_reducer.toDF(["store_and_fwd_flag",'Number'])

In [20]:
#SparkDataFrame to Pandas
rate_pd = rate.toPandas()

vendor_pd= vendor.toPandas()

payment_pd = payment.toPandas()

src_dest_pd = src_dest.toPandas()

flag_pd = flag.toPandas()

In [38]:
#sort values for visualisation
rate_pd = rate_pd.sort_values('Number', ascending=False)
vendor_pd = vendor_pd.sort_values('Number', ascending=False)
payment_pd = payment_pd.sort_values('Number', ascending=False)
src_dest_pd = src_dest_pd.sort_values('Number', ascending=False)
flag_pd = flag_pd.sort_values('Number', ascending=False)

In [29]:
file_names = [ 'rate_pd.csv', 'vendor_pd.csv', 'payment_pd.csv', 'src_dest_pd.csv', 'flag_pd.csv']
path = 'visuals_data_1/'

In [28]:
#write to csv, to be used for visuals

dfs = [rate_pd, vendor_pd, payment_pd, src_dest_pd, flag_pd]

path = 'visuals_data_1/'

[df.to_csv(path+file_name, index=False) for df, file_name in zip(dfs, file_names)]


[None, None, None, None, None]

# Graph Visuals - Plotly

In [None]:
# Get reduced results from the folders and remove unecessary characters

In [30]:
rate_vis= pd.read_csv(path + file_names[0])
rate_vis['RateCodeDescription'] = rate_vis['RateCodeDescription'].str.extract(r"'(.*?)'")


vendor_vis= pd.read_csv(path + file_names[1])
vendor_vis['VendorDescription'] = vendor_vis['VendorDescription'].str.extract(r"'(.*?)'")


payment_vis= pd.read_csv(path + file_names[2])
payment_vis['PaymentDescription'] = payment_vis['PaymentDescription'].str.extract(r"'(.*?)'")

src_dest_pd= pd.read_csv(path + file_names[3])
src_dest_pd['source_destination_town'] = src_dest_pd['source_destination_town'].str.extract(r"'(.*?)'")


flag_pd= pd.read_csv(path + file_names[4])
flag_pd['store_and_fwd_flag'] = flag_pd['store_and_fwd_flag'].str.extract(r"'(.*?)'")

In [35]:
# Visualization 2: Pie Chart
fig2 = px.pie(flag_pd, names='store_and_fwd_flag', values='Number', title='In-vehicle memory before sending to Vendor (Store & Forward Flag) ', hole=.5)
fig2.show()




In [34]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create subplots with 2 rows and 2 columns
fig = make_subplots(rows=2, cols=2, subplot_titles=('Rate Type', 'Payment Types Count', 'VendorTypes Count', 'PaymentTypes Count'))

# Add bar plots to the subplots
fig.add_trace(go.Bar(x=rate_vis['RateCodeDescription'], y=rate_vis['Number']), row=1, col=1)
fig.add_trace(go.Bar(x=vendor_vis['VendorDescription'], y=vendor_vis['Number']), row=2, col=1)
fig.add_trace(go.Bar(x=payment_vis['PaymentDescription'], y=payment_vis['Number']), row=1, col=2)

# Update the layout of the subplots
fig.update_layout(title='Subplots of Data', showlegend=False)  # You can customize the layout as needed

# Show the figure with subplots
fig.show()



In [None]:
# spark.stop()