# Taxi Exercise

## Prerrequisites

Install Java and Spark in VM

In [20]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark 3.5.0
!wget -q https://apache.osuosl.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

In [21]:
# unzip it
!tar xf spark-3.5.0-bin-hadoop3.tgz

In [22]:
!pip install -q findspark

In [23]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

Start Spark Session 

---

In [24]:
import findspark
findspark.init("spark-3.5.0-bin-hadoop3")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .appName("Joins") \
        .master("local[*]") \
        .config("spark.ui.port", "4500") \
        .getOrCreate()

spark.version

'3.3.1'

In [25]:
spark

In [26]:
# Import sql functions
from pyspark.sql.functions import *

In [27]:
!mkdir -p dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/taxi_data.csv -P /dataset
!wget -q https://raw.githubusercontent.com/paponsro/spark_edem_2324/master/dataset/taxi_zones.csv -P /dataset
!ls /dataset

Load the datasets

In [28]:
taxiDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/dataset/taxi_data.csv")

taxiDF.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [29]:
taxiDF.show(2)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2018-01-24 23:02:56|  2018-01-24 23:10:58|              1|         2.02|         1|                 N|          48|         107|           2|        8.5|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.8|
|       2| 2018-01-24 23:57:13|  2018-01-25 00:2

In [30]:
taxiZonesDF = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/dataset/taxi_zones.csv")

taxiZonesDF.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [31]:
taxiZonesDF.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



## Exercise

In this exercise we will be working with two DFs. The first one, taxiDf holds info about taxi rides per 2018 year. And the second, taxiZonesDF, have info about the Zones. Please load the DFs and print the schemas and two (or more) rows for more detailed info.

The aim of the exercise is to answer the questions listed below.

**Questions:**

 1. Which zones have the most pickups/dropoffs overall? Note there are many PULocationIDs per Zone?
 2. What are the peak hours for taxi?
 3. How are the trips distributed by length? Show stats like mean, max, min, etc. 
    Then get the total trips for less/more than 30 km. Why are people taking the cab? For long or short trips?
    You can also try the same with different distances. Which is the expected value for threshold is we want to obtain more or less the same trips in long/short counting?
 4. What are the peak hours for long/short trips?
 5. What are the top 3 pickup/dropoff zones for long/short trips?
 6. How are people paying for the ride, on long/short trips? Hint: the information about how good is the payment is in RatecodeID column.
 7. How is the payment type (RatecodeId) evolving with time (in days)? Hint: use the column with pickup time info.
    Get the same info but with avg of ratecode and total trips per day.

### Question 1

In [32]:
"""
Question 1: Which zones have the most pickups/dropoffs overall? Note there are many PULocationIDs per Zone?
"""

"""
We can understand the question as getting the most popular zones for PU/DO, each one separately. 
Or understand it as getting the most popular zones for PU/DO together. In the first case we will have to make two separate .groupBy() + .agg(count()), 
in the second case, we will do just one. In this exercise we will do it with the first approach only.
"""

# First approach
# First, we'll do the groupBy per PU, and then, we'll count the totalTrips. We need the join to have the zones info. And we join by PULocationID == LocationID because
# these are the columns with "same" info in our DFs

pickupsByTaxiZonePUDF = taxiDF.groupBy("PULocationID") \
    .agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .drop("LocationID", "service_zone") \
    .orderBy(col("totalTrips").desc())

pickupsByTaxiZonePUDF.show(3)

ratesByTaxiZonePUDF = taxiDF.groupBy("PULocationID") \
    .agg(count("RatecodeID").alias("ratecodeTotal")) \

ratesByTaxiZonePUDF.show(3) 

# 1b - Now we can group by borough. That's because one Borough have several Zones, and Borough info is more relevant (we think)
pickupsByBoroughPUDF = pickupsByTaxiZonePUDF.groupBy(col("Borough")) \
    .agg(sum(col("totalTrips")).alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByBoroughPUDF.show(2)

# Now we have to do the same for DOLocationIDs:
pickupsByTaxiZoneDODF = taxiDF.groupBy("DOLocationID") \
    .agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("DOLocationID") == col("LocationID")) \
    .drop("LocationID", "service_zone") \
    .orderBy(col("totalTrips").desc())

pickupsByTaxiZoneDODF.show(3)

# 1b - Now we can group by borough.
pickupsByBoroughDODF = pickupsByTaxiZoneDODF.groupBy(col("Borough")) \
    .agg(sum(col("totalTrips")).alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByBoroughDODF.show(2)

# Tips: drop and orderBy is just for make it more readable. The logic is groupBy -> agg -> join
# We make th count("*") because we want to know EVERY different trips with no nulls in all columns. Depending on what we want, the logic may change at this point.

+------------+----------+---------+--------------------+
|PULocationID|totalTrips|  Borough|                Zone|
+------------+----------+---------+--------------------+
|         237|     15945|Manhattan|Upper East Side S...|
|         161|     15255|Manhattan|      Midtown Center|
|         236|     13767|Manhattan|Upper East Side N...|
+------------+----------+---------+--------------------+
only showing top 3 rows

+------------+-------------+
|PULocationID|ratecodeTotal|
+------------+-------------+
|         148|         2920|
|         243|           42|
|         137|         4103|
+------------+-------------+
only showing top 3 rows

+---------+----------+
|  Borough|totalTrips|
+---------+----------+
|Manhattan|    304266|
|   Queens|     17712|
+---------+----------+
only showing top 2 rows

+------------+----------+---------+--------------------+
|DOLocationID|totalTrips|  Borough|                Zone|
+------------+----------+---------+--------------------+
|         161|

### Question 2

In [33]:
"""
Question 2: What are the peak hours for taxi?
"""

"""
This is similar to the previous exercise. The only difference will be we have to group by hour this time. As we don't have the hour info directly in our DF, we have
to process the tpep_pickup_datetime column first. The rest of the logic will be the same.
"""

pickupsByHourDF = taxiDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy("hour_of_day") \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByHourDF.show(3)

+-----------+----------+
|hour_of_day|totalTrips|
+-----------+----------+
|         16|     22121|
|         17|     21598|
|         19|     20884|
+-----------+----------+
only showing top 3 rows



### Question 3

In [34]:
"""
Question 3: How are the trips distributed by length? Show stats like mean, max, min, etc.
Then get the total trips for less/more than 30 km. Why are people taking the cab? For long or short trips?
You can also try the same with different distances. Which is the expected value for threshold is we want to obtain more or less the same trips in long/short counting?
"""

"""
In this case we want to obtain data about one of the attributes of the df, the distance (trip_distance). 
So we will make a select of that particular column and then apply the corresponding aggregations (count, average, etc).
"""

# Select stats for taxiDF
tripDistanceDF = taxiDF.select(col("trip_distance").alias("distance"))
tripDistanceStatsDF = tripDistanceDF.select(
    count("*").alias("count"),
    lit(30).alias("threshold"),
    mean("distance").alias("mean"),
    stddev("distance").alias("stddev"),
    min("distance").alias("min"),
    max("distance").alias("max"))
tripDistanceStatsDF.show(3)

"""
In the second part we want to group by long vs. short trips. But we don't have this data in the DF, so we will first add a flag, 
to determine if it is long/short. And then we can group by that flag.
"""
# We will add a isLong column (flag) with the true/false for long/short rides
tripsWithLengthDF = taxiDF.withColumn("isLong", col("trip_distance") >= 30)
tripsWithLengthDF.show(2)

# As we want to know how many long/short trip are we have to groupBy islong and count()
tripsByLengthDF = tripsWithLengthDF.groupBy("isLong").count()
tripsByLengthDF.show()

+------+---------+------------------+------------------+---+----+
| count|threshold|              mean|            stddev|min| max|
+------+---------+------------------+------------------+---+----+
|331893|       30|2.7179894423805155|3.4851522248851214|0.0|66.0|
+------+---------+------------------+------------------+---+----+

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|isLong|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+-

### Question 4

In [35]:
"""
Question 4: What are the peak hours for long/short trips?
"""

"""
In this exercise the logic is something like a mix of the two previous ones. On the one hand we want to group by long/short trips (the previous flag), 
and on the other hand by hours. So we will use the DF that we have created previously with the flag (e.g. 3), modify the time column as we have done in e.g. 2; 
and then we can group by (hours, long/short). Note that in this case the groupBy will be done by both columns.
"""

# We will use the DF created before with isLong flag. 
# Then, we'll groupBy either islong and hour columns and make a count.

pickupsByHourByLengthDF = tripsWithLengthDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy("hour_of_day", "isLong") \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

pickupsByHourByLengthDF.filter(col("isLong") == True).show()
pickupsByHourByLengthDF.filter(col("isLong") == False).show()

+-----------+------+----------+
|hour_of_day|isLong|totalTrips|
+-----------+------+----------+
|         19|  true|        10|
|         17|  true|         9|
|          7|  true|         6|
|          6|  true|         5|
|         12|  true|         5|
|         14|  true|         5|
|         11|  true|         5|
|         21|  true|         5|
|          5|  true|         4|
|         18|  true|         4|
|         10|  true|         3|
|         23|  true|         3|
|         13|  true|         3|
|         20|  true|         3|
|          9|  true|         3|
|         15|  true|         2|
|          8|  true|         2|
|         16|  true|         2|
|          0|  true|         2|
|          1|  true|         1|
+-----------+------+----------+
only showing top 20 rows

+-----------+------+----------+
|hour_of_day|isLong|totalTrips|
+-----------+------+----------+
|         16| false|     22119|
|         17| false|     21589|
|         19| false|     20874|
|         18| 

### Question 5

In [36]:
"""
Question 5: What are the top 3 pickup/dropoff zones for long/short trips?
"""

"""
Again, we are going to use the DF we created with the long/short flag. But in this case we also want to group by zones. 
Similar to what we have seen in exercise 1, we can see it as PU/Do zones separately, or at the same time.
"""

"""
Let's see the first approach (separately).
"""
# We will use again the DF with isLong info.
# If we understand the question as PU and DO popularity seprated, we will have two DFs, one per each.

PUPopularDF = tripsWithLengthDF.groupBy("PULocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Pickup_Zone") \
    .drop("LocationID", "Borough", "service_zone", "PULocationID") \
    .orderBy(col("totalTrips").desc())

PUPopularDF.show(10)

DOPopularDF = tripsWithLengthDF.groupBy("DOLocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("DOLocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Dropoff_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .drop("DOLocationID") \
    .orderBy(col("totalTrips").desc())

DOPopularDF.show(10)

# If we want just the three first Rows we can also do a .limit(3) instead of .show()

"""
For the second approach, we have to do the groupBy for the two columns and two joins, one for PU and one for DO.
"""

PUandDOPopularDF = tripsWithLengthDF.groupBy("PULocationID", "DOLocationID").agg(count("*").alias("totalTrips")) \
    .join(taxiZonesDF, col("PULocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Pickup_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .join(taxiZonesDF, col("DOLocationID") == col("LocationID")) \
    .withColumnRenamed("Zone", "Dropoff_Zone") \
    .drop("LocationID", "Borough", "service_zone") \
    .drop("PULocationID", "DOLocationID") \
    .orderBy(col("totalTrips").desc())

PUandDOPopularDF.show(10)

+----------+--------------------+
|totalTrips|         Pickup_Zone|
+----------+--------------------+
|     15945|Upper East Side S...|
|     15255|      Midtown Center|
|     13767|Upper East Side N...|
|     13715|        Midtown East|
|     11702|         Murray Hill|
|     11488|            Union Sq|
|     11455|Times Sq/Theatre ...|
|     10319|Penn Station/Madi...|
|     10091|        Clinton East|
|      9845|       Midtown North|
+----------+--------------------+
only showing top 10 rows

+----------+--------------------+
|totalTrips|        Dropoff_Zone|
+----------+--------------------+
|     15099|      Midtown Center|
|     14261|Upper East Side N...|
|     13754|Upper East Side S...|
|     11239|         Murray Hill|
|     11090|        Midtown East|
|     10054|Times Sq/Theatre ...|
|      9929|            Union Sq|
|      8666| Lincoln Square East|
|      8594|       Midtown North|
|      8258|        Clinton East|
+----------+--------------------+
only showing top 10 ro

### Question 6

In [37]:
"""
Question 6: How are people paying for the ride, on long/short trips? Hint: the information about how good is the payment is in RatecodeID column.
"""

"""
We can first inspect the RatecodeID column to see what information it provides. And then we simply do a .groupBy() + .agg(count()) as usual.
"""

# The RatecodeID columns has the info about how good is the payment.
taxiDF.select("RatecodeID").distinct().show()

ratecodeDistributionDF = taxiDF \
    .groupBy(col("RatecodeID")).agg(count("*").alias("totalTrips")) \
    .orderBy(col("totalTrips").desc())

ratecodeDistributionDF.show()

+----------+
|RatecodeID|
+----------+
|         1|
|         6|
|         3|
|         5|
|         4|
|         2|
|        99|
+----------+

+----------+----------+
|RatecodeID|totalTrips|
+----------+----------+
|         1|    324387|
|         2|      5878|
|         5|       895|
|         3|       530|
|         4|       193|
|        99|         7|
|         6|         3|
+----------+----------+



### Question 7

In [38]:
"""
Question 7: How is the payment type (RatecodeId) evolving with time (in days)? Hint: use the column with pickup time info.
Get the same info but with avg of ratecode and total trips per day.
"""

"""
For the day we will use tpep_pickup_datetime column. We have to remove the time info from it in order to do the groupBy, 
so we will do a to_date.
"""

# We have to group by pickup time and ratecode this time.
ratecodeEvolution = taxiDF \
    .withColumn("pickup_day", to_date(col("tpep_pickup_datetime"))) \
    .groupBy(col("pickup_day"), col("RatecodeID")) \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("pickup_day"))

ratecodeEvolution.show()

# Now we can get the avg ratecode per day.
ratecodeEvolutionAvg = ratecodeEvolution.groupBy("pickup_day").agg(sum("totalTrips").alias("totalTrips"), avg("RatecodeID").alias("avgRate"))

ratecodeEvolutionAvg.show()


"""
We can do the same for hours (just fyi, not in question). First we'll do the same as in exercise 2. And then, we'll do the average.
"""
# And the same for hours
ratecodeEvolutionPerHour = taxiDF \
    .withColumn("hour_of_day", hour(col("tpep_pickup_datetime"))) \
    .groupBy(col("hour_of_day").alias("pickup_hour"), col("RatecodeID")) \
    .agg(count("*").alias("totalTrips")) \
    .orderBy(col("pickup_hour"))

ratecodeEvolutionPerHour.show()

# Now we agg in the same way to obtain avg rate and total trips.
ratecodeEvolutionPerHourAvg = ratecodeEvolutionPerHour.groupBy("pickup_hour") \
    .agg(sum("totalTrips").alias("totalTrips"), avg("RatecodeID").alias("avgRate")).orderBy("pickup_hour")

ratecodeEvolutionPerHourAvg.show()

+----------+----------+----------+
|pickup_day|RatecodeID|totalTrips|
+----------+----------+----------+
|2018-01-24|         1|     10760|
|2018-01-24|         2|       174|
|2018-01-24|         5|        80|
|2018-01-24|         3|         4|
|2018-01-24|         4|         9|
|2018-01-24|         6|         1|
|2018-01-25|        99|         7|
|2018-01-25|         3|       526|
|2018-01-25|         5|       815|
|2018-01-25|         2|      5704|
|2018-01-25|         1|    313627|
|2018-01-25|         4|       184|
|2018-01-25|         6|         2|
+----------+----------+----------+

+----------+----------+------------------+
|pickup_day|totalTrips|           avgRate|
+----------+----------+------------------+
|2018-01-25|    320865|17.142857142857142|
|2018-01-24|     11028|               3.5|
+----------+----------+------------------+

+-----------+----------+----------+
|pickup_hour|RatecodeID|totalTrips|
+-----------+----------+----------+
|          0|         1|      2514|
|