### Importing libraries

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col,to_date,hour,asc,desc,isnan,when,count
from pyspark.sql import functions as F

import warnings
if not sys.warnoptions:
    warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=DeprecationWarning) 

### Initiating Spark

In [3]:
spark = SparkSession.builder.appName("bds").getOrCreate()

23/09/07 19:44:10 WARN Utils: Your hostname, Aayushi resolves to a loopback address: 127.0.1.1; using 172.18.33.221 instead (on interface eth0)
23/09/07 19:44:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/07 19:44:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/07 19:44:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Verifying the source files in the HDFS folder

In [1]:
!hdfs dfs -ls /BDS_G106/

Found 2 items
-rw-r--r--   1 hadoopuser supergroup      10724 2023-09-01 21:09 /BDS_G106/taxi+_zone_lookup.csv
-rw-r--r--   1 hadoopuser supergroup   45371782 2023-09-01 21:10 /BDS_G106/yellow_tripdata_2020-06.xlsx


## 1. Read the data in yellow_tripdata_2020-06.csv file into a dataframe created in spark.

##### Reading the yellow_tripdata_2020-06.xlsx file from hdfs using pandas read_excel function

In [4]:
# Reading the yellow_tripdata_2020-06.xlsx file from hdfs using pandas read_excel function

df_yellow_tripdata = pd.read_excel("hdfs://localhost:9000/BDS_G106/yellow_tripdata_2020-06.xlsx")

2023-09-07 19:44:14,508 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
df_yellow_tripdata.iteritems = df_yellow_tripdata.items
df_spark_yellow_tripdata = spark.createDataFrame(df_yellow_tripdata)
df_spark_yellow_tripdata.show(5, truncate=False)

23/09/07 19:46:18 WARN TaskSetManager: Stage 0 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|2020-06-01 00:31:23 |2020-06-01 00:49:58  |1.0            |3.6          |140         |68          |1.0         |15.5       |3.0  |0.5    |4.0       |0.0         |0.3                  |23.3        |
|2020-06-01 00:42:50 |2020-06-01 01:04:33  |1.0            |5.6          |79          |226         |1.0         |19.5       |3.0  |0.5    |2.0       |0.0         |0.3                  |25.3        |
+----

                                                                                

In [None]:
df_yellow_tripdata.dtypes

##### Extracting 3 new columns from tpep_pickup_datetime and tpep_dropoff_datetime:
1. pick_date
2. pick_date_hour
3. pick_hour
4. drop_date
5. drop_date_hour
6. drop_hour

In [6]:
# Adding 2 new columns pick_date and drop_date by converting the columns tpep_pickup_datetime and tpep_dropoff_datetime into timestamp format

df_spark_taxi_trip = df_spark_yellow_tripdata.withColumn('pick_date',F.to_timestamp('tpep_pickup_datetime','yyyy-MM-dd HH:mm:ss')).withColumn('drop_date',F.to_timestamp('tpep_dropoff_datetime','yyyy-MM-dd HH:mm:ss'))


In [7]:
# Adding 4 new columns pick_date_hour, pick_hour, drop_date_hour and drop_hour. 
# pick_date_hour and drop_date_hour contains the date and the hour removing the minutes and seconds
# pick_hour and drop_hour contains the hour part only

df_spark_taxi_trip_data = df_spark_taxi_trip.withColumn('pick_date_hour', F.date_trunc('hour', F.to_timestamp('pick_date', "yyyy-MM-dd HH:mm:ss 'UTC'"))).withColumn('pick_hour',hour(col('pick_date_hour'))).withColumn('drop_date_hour', F.date_trunc('hour', F.to_timestamp('drop_date', "yyyy-MM-dd HH:mm:ss 'UTC'"))).withColumn('drop_hour',hour(col('drop_date_hour'))).select('*')


In [8]:
df_spark_taxi_trip_data.show(2, truncate=False)

+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------------+-------------------+-------------------+---------+-------------------+---------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pick_date          |drop_date          |pick_date_hour     |pick_hour|drop_date_hour     |drop_hour|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------------+-------------------+-------------------+---------+-------------------+---------+
|2020-06-01 00:31:23 |2020-06-01 00:49:58  |1.0            |3.6          |140         |68          |1.0

23/09/07 19:46:19 WARN TaskSetManager: Stage 1 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.


## Create a table view of the data frame created above.

In [9]:
# Creating a temporary view from the dataframe

df_spark_taxi_trip_data.createOrReplaceTempView("table_spark_taxi_trip")


### Data preparation and Exploratory Data Analysis

#### Scenario 1 - Cancellation trip data

#### Observation: 
We found few trips with same PULocationID, DOLocationID, trip_distance, pick_date. Also, they have same fare amount and total amount but one trip has positive value and other has negative value. Which clearly reflects these are cancellation trips. 

#### Assumption: 
We are considering the trips as cancelled if they have same PULocationID, DOLocationID, trip_distance, pick_date and also fare amount and total amounts are same but one trip having them in postive values and other in negative values. We are and removing them from the data.

In [10]:
# Storing all the cancellation data in dataframe using spark query

df_trip_cancellation = spark.sql('''
SELECT DISTINCT tbl_a.* FROM 
table_spark_taxi_trip as tbl_a
INNER JOIN table_spark_taxi_trip as tbl_b
ON tbl_a.total_amount = tbl_b.total_amount * -1
AND tbl_a.PULocationID = tbl_b.PULocationID
AND tbl_a.DOLocationID = tbl_b.DOLocationID
AND tbl_a.trip_distance = tbl_b.trip_distance
AND tbl_a.fare_amount = tbl_b.fare_amount * -1
AND tbl_a.pick_date = tbl_b.pick_date
WHERE tbl_a.total_amount<>0
''')


In [11]:
df_trip_cancellation.createOrReplaceTempView("table_spark_cancellation_trip")

In [12]:
# Spark query to check the count of negative total_amount values who have same corresponding positive values and are more than single trips 

spark.sql('''
SELECT total_amount,COUNT(*) AS CC 
FROM table_spark_cancellation_trip
GROUP BY total_amount
-- HAVING CC>2
ORDER BY ABS(total_amount) DESC
''').show(10,truncate=False)

23/09/07 19:46:21 WARN TaskSetManager: Stage 2 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:46:24 WARN TaskSetManager: Stage 3 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+------------+---+
|total_amount|CC |
+------------+---+
|59.8        |9  |
|-59.8       |9  |
|57.3        |6  |
|-57.3       |6  |
|55.3        |38 |
|-55.3       |38 |
|-52.8       |24 |
|52.8        |24 |
|50.8        |3  |
|-50.8       |3  |
+------------+---+
only showing top 10 rows



                                                                                

In [13]:
# Showing the count of all trips vs count of cancellation trips

print(df_spark_taxi_trip_data.distinct().count())
print(df_trip_cancellation.distinct().count())

23/09/07 19:46:33 WARN TaskSetManager: Stage 7 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

549759


23/09/07 19:46:37 WARN TaskSetManager: Stage 10 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:46:38 WARN TaskSetManager: Stage 11 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

4416


                                                                                

In [14]:
# Substracting the cancelled trips as part of cleaning of data 
# as the cancelled trips wont contribute to effective data analytics

df_spark_clean_cancellation = df_spark_taxi_trip_data.distinct().subtract(df_trip_cancellation)


In [15]:
df_spark_clean_cancellation.count()

23/09/07 19:46:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/09/07 19:46:44 WARN TaskSetManager: Stage 15 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:46:46 WARN TaskSetManager: Stage 16 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:46:47 WARN TaskSetManager: Stage 19 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

545343

In [16]:
df_spark_clean_cancellation.createOrReplaceTempView("table_spark_clean_cancellation_trip")

#### Scenario 2 - Trip distance and Total amount is 0 

#### Observation: 
We found few trips where both trip_distance and total_amount having 0 values. 

#### Assumption: 
We are considering the trips didn't happen if both trip_distance and total_amount have 0 values and removing them from the data.

In [17]:
# Storing all the no-trips data into a dataframe

df_spark_clean_dist_amt = spark.sql('''
SELECT * 
FROM table_spark_clean_cancellation_trip
WHERE trip_distance = 0 AND total_amount = 0
''')


In [18]:
df_spark_clean_dist_amt.count()

23/09/07 19:47:02 WARN TaskSetManager: Stage 26 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:03 WARN TaskSetManager: Stage 23 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:05 WARN TaskSetManager: Stage 24 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

192

In [19]:
# Substracting all the no trips data as part of cleaning of data as these trip data dont add value to the analysis

df_spark_clean_cancellation_dist_amt = df_spark_clean_cancellation.distinct().subtract(df_spark_clean_dist_amt)

In [20]:
df_spark_clean_cancellation_dist_amt.count()

23/09/07 19:47:15 WARN TaskSetManager: Stage 32 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:16 WARN TaskSetManager: Stage 31 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:16 WARN TaskSetManager: Stage 33 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:17 WARN TaskSetManager: Stage 37 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

545151

In [21]:
df_spark_clean_cancellation_dist_amt.createOrReplaceTempView("table_spark_clean_dist_amt")

In [None]:
trip_distance and total_amount having 0 values.

#### Scenario 3 - Trip distance is greater than 10,000 miles and corresponding Total amount is very less in comparison

#### Observation: 
We found few trips where trip_distance is greater than 10,000 miles but its corresponding fare_amount and total_amount is very less and seems incorrect entries.

#### Assumption: 
We are considering the trips as outliers and removing them from the data.

In [22]:
# Extracting all the trip records where trip_distance < 10000 as valid records

df_spark_clean_trip_dist_outliers = spark.sql('''
SELECT * 
FROM table_spark_clean_dist_amt
WHERE trip_distance < 10000
''')

In [23]:
df_spark_clean_trip_dist_outliers.count()

23/09/07 19:47:38 WARN TaskSetManager: Stage 43 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:39 WARN TaskSetManager: Stage 42 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:39 WARN TaskSetManager: Stage 44 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:47:40 WARN TaskSetManager: Stage 48 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

545146

In [24]:
df_spark_clean_trip_dist_outliers.createOrReplaceTempView("table_spark_clean_outliers")

#### Scenario 4 - Trip distance is 0 

#### Observation: 
We found few trips where trip_distance is 0 but its corresponding fare_amount and total_amount is of non-zero value which seems not feasible scenario and seems incorrect entries.

#### Assumption: 
We are considering the trips as invalid and outliers and removing them from the data.

In [25]:
# Extracting all the trip records as valid records where trip_distance is non-zero 

df_spark_clean_valid_trip = spark.sql('''
SELECT *
FROM table_spark_clean_outliers
WHERE trip_distance <> 0
''')

In [26]:
df_spark_clean_valid_trip.createOrReplaceTempView("table_spark_clean_valid_trip")

In [27]:
df_spark_clean_valid_trip.count()

23/09/07 19:47:59 WARN TaskSetManager: Stage 53 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:48:00 WARN TaskSetManager: Stage 54 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:48:01 WARN TaskSetManager: Stage 56 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 19:48:02 WARN TaskSetManager: Stage 60 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

528419

#### Scenario 5 - Null values for Passenger count and Payment type

#### Observation: 
We found few trips where both passenger_count and payment_type have null values.

#### Assumption: 
We imputed these data by replacing the null values in the passenger_count with 0 and replacing the null values in the payment_type with 5 which indicates unknown payment type.

In [49]:
# Spark query to replace null values in the passenger_count with 0 and replacing the null values in the payment_type with 5

df_spark_clean = spark.sql('''
SELECT 
tpep_pickup_datetime,
tpep_dropoff_datetime,
COALESCE(CAST(passenger_count AS int),0) AS passenger_count,
trip_distance,
PULocationID,
DOLocationID,
COALESCE(CAST(payment_type AS int),5) AS payment_type,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
improvement_surcharge,
total_amount,
pick_date,
pick_date_hour,
pick_hour,
drop_date,
drop_date_hour,
drop_hour
FROM table_spark_clean_valid_trip
''')


In [51]:
df_spark_clean_pd = df_spark_clean.toPandas()

23/09/07 20:01:08 WARN TaskSetManager: Stage 120 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:01:09 WARN TaskSetManager: Stage 121 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:01:10 WARN TaskSetManager: Stage 123 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:01:10 WARN TaskSetManager: Stage 126 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [52]:
# Verifying if there exists any null values in the data

df_spark_clean_pd.isnull().sum()

tpep_pickup_datetime     0
tpep_dropoff_datetime    0
passenger_count          0
trip_distance            0
PULocationID             0
DOLocationID             0
payment_type             0
fare_amount              0
extra                    0
mta_tax                  0
tip_amount               0
tolls_amount             0
improvement_surcharge    0
total_amount             0
pick_date                0
pick_date_hour           0
pick_hour                0
drop_date                0
drop_date_hour           0
drop_hour                0
dtype: int64

In [53]:
df_spark_clean.show(5,truncate=False)

23/09/07 20:02:28 WARN TaskSetManager: Stage 130 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:02:29 WARN TaskSetManager: Stage 131 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:02:30 WARN TaskSetManager: Stage 134 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:02:31 WARN TaskSetManager: Stage 137 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------------+-------------------+---------+-------------------+-------------------+---------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|pick_date          |pick_date_hour     |pick_hour|drop_date          |drop_date_hour     |drop_hour|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-------------------+-------------------+---------+-------------------+-------------------+---------+
|2020-06-04 16:01:00 |2020-06-04 16:41:00  |0              |7.53         |65          |39          |0  

                                                                                

#### Storing the clean data in a spark temporary view

In [50]:
df_spark_clean.createOrReplaceTempView("table_spark_clean")

### 2. Count the number of taxi trips for each hour 

Running query in dataframe operations as we are asked to write Spark programs either in pySpark or Scala to do the following.

#### Assumption:
In the question it is mentioned to count the number of taxi trips for each hour.<br> 
We are assuming - <br> 
1. The hour as 24 hours of the day.<br>
2. Also, a trip gets started from the pickup hour so we are considering the column pick_hour for our calculations for this question.<br>

In [54]:
df_spark_clean_trip_hour = df_spark_clean.groupby('pick_hour').count().select('pick_hour', col('count').alias('count_taxi_trip'))

In [55]:
df_spark_clean_trip_hour.sort(asc('pick_hour')).show(25,truncate=False)

23/09/07 20:03:02 WARN TaskSetManager: Stage 140 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:03:02 WARN TaskSetManager: Stage 141 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:03:03 WARN TaskSetManager: Stage 143 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:03:05 WARN TaskSetManager: Stage 147 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+---------+---------------+
|pick_hour|count_taxi_trip|
+---------+---------------+
|0        |7633           |
|1        |6146           |
|2        |4797           |
|3        |4791           |
|4        |6617           |
|5        |6684           |
|6        |14548          |
|7        |19454          |
|8        |24073          |
|9        |27546          |
|10       |30789          |
|11       |33632          |
|12       |36255          |
|13       |37648          |
|14       |38820          |
|15       |39306          |
|16       |37237          |
|17       |37034          |
|18       |33179          |
|19       |25610          |
|20       |17866          |
|21       |14427          |
|22       |12749          |
|23       |11578          |
+---------+---------------+



                                                                                

### 3. Average fare amount collected by hour of the day

#### Assumption:
In the question it is mentioned to calculate average fare amount collected by hour of the day.<br>
We are assuming - <br> 
1. The hour of the day as 24 hours of the day.<br>
2. Also, a trip gets completed at the dropoff hour so we are considering the column drop_hour for our calculations for this question.<br>

In [56]:
spark.sql('''SELECT
  drop_hour,
  ROUND(AVG(fare_amount),3) AS avg_fare_amount
FROM table_spark_clean
GROUP BY 1
ORDER BY 1;
''').show(25, truncate=False)

23/09/07 20:05:59 WARN TaskSetManager: Stage 151 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:06:00 WARN TaskSetManager: Stage 152 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:06:01 WARN TaskSetManager: Stage 154 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:06:01 WARN TaskSetManager: Stage 157 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+---------+---------------+
|drop_hour|avg_fare_amount|
+---------+---------------+
|0        |19.342         |
|1        |25.488         |
|2        |30.32          |
|3        |34.45          |
|4        |38.999         |
|5        |29.448         |
|6        |12.455         |
|7        |10.927         |
|8        |11.031         |
|9        |10.824         |
|10       |11.171         |
|11       |11.535         |
|12       |11.52          |
|13       |11.488         |
|14       |11.422         |
|15       |12.078         |
|16       |12.952         |
|17       |13.081         |
|18       |13.24          |
|19       |12.794         |
|20       |13.299         |
|21       |15.157         |
|22       |15.846         |
|23       |17.913         |
+---------+---------------+



                                                                                

### 4. Average fare amount compared to the average trip distance.

#### Assumption:
In the question it is mentioned to calculate Average fare amount compared to the average trip distance.<br>
We are assuming - <br> 
1. Showing the comparision with respect to the dropoff hour of the day.<br>
2. The hour as 24 hours of the day.<br>
3. Also, a trip gets completed at the dropoff hour so we are considering the column drop_hour for our calculations for this question.<br>

In [65]:
spark.sql('''
SELECT drop_hour,
  ROUND(AVG(fare_amount),3) AS avg_fare_amount,
  ROUND(AVG(trip_distance),3) AS avg_trip_distance
FROM
  table_spark_clean
GROUP BY
  drop_hour
ORDER BY
  drop_hour;
''').show(10,truncate=False)

23/09/07 20:21:16 WARN TaskSetManager: Stage 195 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:21:17 WARN TaskSetManager: Stage 196 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:21:18 WARN TaskSetManager: Stage 199 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:21:18 WARN TaskSetManager: Stage 201 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+---------------+-----------------+
|avg_fare_amount|avg_trip_distance|
+---------------+-----------------+
|19.342         |5.999            |
|25.488         |7.128            |
|30.32          |8.152            |
|34.45          |9.316            |
|38.999         |10.665           |
|29.448         |8.931            |
|12.455         |3.519            |
|10.927         |2.756            |
|11.031         |2.677            |
|10.824         |2.525            |
|11.171         |2.653            |
|11.535         |2.788            |
|11.52          |2.78             |
|11.488         |2.762            |
|11.422         |2.709            |
|12.078         |2.914            |
|12.952         |3.206            |
|13.081         |3.278            |
|13.24          |3.453            |
|12.794         |3.383            |
|13.299         |3.614            |
|15.157         |4.296            |
|15.846         |4.571            |
|17.913         |5.442            |
+---------------+-----------

                                                                                

### 5. Average fare amount and average trip distance by day of the week.

#### Assumption:
In the question it is mentioned to calculate average fare amount and average trip distance by day of the week.<br>
We are assuming - <br> 
1. The day of the week as 7 days of the week starting from Sunday (Day-1).<br>
2. Also, fare is paid on a trip completion which gets completed at the dropoff hour for the date so we are considering the column drop_date for our calculations for this question.<br>

In [66]:
spark.sql('''SELECT  
  CASE WHEN DAYOFWEEK(drop_date) = 1 THEN 'Sunday'
  CASE WHEN DAYOFWEEK(drop_date) = 2 THEN 'Monday' 
  CASE WHEN DAYOFWEEK(drop_date) = 3 THEN 'Tuesday'
  CASE WHEN DAYOFWEEK(drop_date) = 4 THEN 'Wednesday'
  CASE WHEN DAYOFWEEK(drop_date) = 5 THEN 'Thursday'
  CASE WHEN DAYOFWEEK(drop_date) = 6 THEN 'Friday'
  CASE WHEN DAYOFWEEK(drop_date) = 7 THEN 'Saturday'
  END AS day_of_week,
  ROUND(AVG(fare_amount),3) AS avg_fare_amount,
  ROUND(AVG(trip_distance),3) AS avg_trip_distance
FROM
  table_spark_clean
GROUP BY 1
ORDER BY 1;
''').show(10,truncate=False)

23/09/07 20:26:53 WARN TaskSetManager: Stage 207 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:26:54 WARN TaskSetManager: Stage 206 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:26:55 WARN TaskSetManager: Stage 208 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 20:26:55 WARN TaskSetManager: Stage 212 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+-----------+---------------+-----------------+
|day_of_week|avg_fare_amount|avg_trip_distance|
+-----------+---------------+-----------------+
|1          |14.524         |4.051            |
|2          |13.288         |3.385            |
|3          |13.284         |3.319            |
|4          |13.185         |3.342            |
|5          |13.327         |3.395            |
|6          |13.553         |3.448            |
|7          |13.87          |3.735            |
+-----------+---------------+-----------------+



                                                                                

### 6. In the month of June 2020, find the zone which had maximum number of pick ups.

##### Reading the taxi+_zone_lookup.csv file from hdfs using pandas read function

In [78]:
# Reading the taxi+_zone_lookup.csv file from hdfs using pandas read function

df_taxi_lookup = spark.read.format("csv").option("header","true").load("hdfs://localhost:9000/BDS_G106/taxi+_zone_lookup.csv")

In [79]:
df_taxi_lookup.createOrReplaceTempView("table_taxi_lookup")

In [83]:
# Joining both taxi_zone_lookup and yellow_tripdata data to get the zone details on basis of locationids 
# to get maximum number of pick ups

spark.sql('''SELECT
  tbl_lookup.Zone,
  COUNT(tbl_clean.PULocationID) AS pickup_count
FROM
  table_spark_clean AS tbl_clean
  INNER JOIN table_taxi_lookup AS tbl_lookup
  ON tbl_clean.PULocationID =  CAST(tbl_lookup.LocationID AS bigint)
WHERE
  YEAR(tbl_clean.pick_date) = 2020
  AND MONTH(tbl_clean.pick_date) = 6
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1;
''').show(truncate=False)


23/09/07 21:01:09 WARN TaskSetManager: Stage 273 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:01:10 WARN TaskSetManager: Stage 274 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:01:10 WARN TaskSetManager: Stage 277 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:01:12 WARN TaskSetManager: Stage 279 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+---------------------+------------+
|Zone                 |pickup_count|
+---------------------+------------+
|Upper East Side North|22658       |
+---------------------+------------+



                                                                                

### 7. In the month of June 2020, find the zone which had maximum number of drops.

In [84]:
# Joining both taxi_zone_lookup and yellow_tripdata data to get the zone details on basis of locationids 
# to get maximum number of drops


spark.sql('''SELECT
  tbl_lookup.Zone,
  COUNT(tbl_clean.DOLocationID) AS drop_count
FROM
  table_spark_clean AS tbl_clean
  INNER JOIN table_taxi_lookup AS tbl_lookup
  ON tbl_clean.DOLocationID =  CAST(tbl_lookup.LocationID AS bigint)
WHERE
  YEAR(tbl_clean.drop_date) = 2020
  AND MONTH(tbl_clean.drop_date) = 6
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1;
''').show(truncate=False)


23/09/07 21:13:45 WARN TaskSetManager: Stage 286 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:13:47 WARN TaskSetManager: Stage 287 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:13:48 WARN TaskSetManager: Stage 290 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:13:50 WARN TaskSetManager: Stage 292 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------------------+----------+
|Zone                 |drop_count|
+---------------------+----------+
|Upper East Side North|21836     |
+---------------------+----------+



### 8. Average no of passengers by hour of the day.

#### Assumption:
In the question it is mentioned to count the average no of passengers by hour of the day<br> 
We are assuming - <br> 
1. The hour as 24 hours of the day.<br>
2. Also, a trip gets started from the pickup hour so we are considering the column pick_hour for our calculations for this question.<br>

In [89]:
spark.sql('''SELECT
  pick_hour,
  CEIL(ROUND(AVG(CAST(passenger_count AS int)),3)) AS avg_passenger_count -- CEIL (can remove later)
FROM
  table_spark_clean
GROUP BY 1
ORDER BY 1;
''').show(25,truncate=False)

23/09/07 21:31:59 WARN TaskSetManager: Stage 331 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:31:59 WARN TaskSetManager: Stage 332 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:32:00 WARN TaskSetManager: Stage 335 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:32:02 WARN TaskSetManager: Stage 337 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+---------+-------------------+
|pick_hour|avg_passenger_count|
+---------+-------------------+
|0        |1                  |
|1        |1                  |
|2        |1                  |
|3        |1                  |
|4        |1                  |
|5        |1                  |
|6        |2                  |
|7        |2                  |
|8        |2                  |
|9        |2                  |
|10       |2                  |
|11       |2                  |
|12       |2                  |
|13       |2                  |
|14       |2                  |
|15       |2                  |
|16       |2                  |
|17       |2                  |
|18       |2                  |
|19       |2                  |
|20       |2                  |
|21       |2                  |
|22       |2                  |
|23       |2                  |
+---------+-------------------+



                                                                                

### 9. Total number of payments made by different type for the month.

#### Assumption:
In the question it is mentioned to count the total number of payments made by different type for the month.<br> 
We are assuming - <br> 
1. Showing the comparision with respect to the payment_type and month. So, considering a payment_type is done at different month, it is captured seperately at monthly level.<br>
2. The month as 12 months of the year starting from January (Month-1).<br>
3. Also, payment is done on a trip completion which gets completed at the dropoff hour for the date so we are considering the column drop_date for our calculations for this question.<br>
4. As mentioned in the pre-processing section above we imputed null values in the passenger_count with 0 and the null values in the payment_type with 5 which indicates unknown payment type.<br>

In [91]:
spark.sql('''SELECT
  CASE WHEN payment_type = 1 THEN 'Credit card'
  CASE WHEN payment_type = 2 THEN 'Cash' 
  CASE WHEN payment_type = 3 THEN 'No charge'
  CASE WHEN payment_type = 4 THEN 'Dispute'
  CASE WHEN payment_type = 5 THEN 'Unknown'
  CASE WHEN payment_type = 6 THEN 'Voided trip'
  END AS payment_type,
  MONTH(drop_date) AS Month,
  COUNT(payment_type) AS payment_type
FROM
  table_spark_clean
GROUP BY 1,2
ORDER BY 1,2
''').show()

23/09/07 21:40:27 WARN TaskSetManager: Stage 353 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:40:28 WARN TaskSetManager: Stage 354 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:40:29 WARN TaskSetManager: Stage 357 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.
23/09/07 21:40:30 WARN TaskSetManager: Stage 360 contains a task of very large size (7723 KiB). The maximum recommended task size is 1000 KiB.

+------------+-----+------------+
|payment_type|Month|payment_type|
+------------+-----+------------+
|           0|    6|       49256|
|           0|    7|           9|
|           1|    6|      310678|
|           1|    7|          59|
|           2|    1|           3|
|           2|    6|      163367|
|           2|    7|          36|
|           3|    6|        3847|
|           4|    6|        1152|
|           5|    6|          12|
+------------+-----+------------+



                                                                                