**Install Spark and PySpark**

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

- 3.3.2
- 2.1.4
- 1.2.3
- 5.4

The answer is 3.3.3, as the version installed and run on my machine is 3.3.3

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.Builder() \
    .master("local[*]") \
    .getOrCreate()

23/10/30 08:16:44 WARN Utils: Your hostname, dawson-vm resolves to a loopback address: 127.0.1.1; using 192.168.192.134 instead (on interface ens33)
23/10/30 08:16:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/10/30 08:16:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [3]:
spark.version

'3.3.3'

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons. \
We will use this dataset for all the remaining questions.

Repartition it to 12 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 2MB
- 24MB
- 100MB
- 250MB

The answer is 24 MB.

In [5]:
df_fhvhv = spark.read \
    .option("header", "true") \
    .csv("data/raw/fhvhv_tripdata_2021-06.csv.gz", inferSchema=True)

                                                                                

In [7]:
df_fhvhv.repartition(12) \
    .write.parquet("data/pq/fhvhv/2021/06/", mode="ignore")

                                                                                

In [8]:
! ls -lh './data/pq/fhvhv/2021/06'

total 284M
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00000-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00001-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00002-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00003-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00004-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00005-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00006-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00007-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c000.snappy.parquet
-rw-r--r-- 1 dawson dawson 24M Oct 30 08:37 part-00008-e9a57d8e-5ea5-46c2-9777-2043bc5f761a-c

Count records

How many taxi trips were there on June 15?

Consider only trips that started on June 15.

- 308,164
- 12,856
- 452,470
- 50,982

The answer is 452470.

**We will use pyspark.sql.functions as a helper function for this question and the next one.**

In [12]:
from pyspark.sql import functions as F

In [10]:
df_fhvhv

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, Affiliated_base_number: string]

In [15]:
df_fhvhv.filter(F.to_date(df_fhvhv.pickup_datetime) == "2021-06-15").count()

                                                                                

452470

**Longest trip for each day**

Now calculate the duration for each trip.

How long was the longest trip in Hours?

- 66.87 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

Answer is 66.87 Hours.

This code below shows how to find answer in Pyspark Dataframe methods.

In [41]:
df_fhvhv.select("dropoff_datetime", "pickup_datetime") \
    .withColumn("duration", F.round((F.unix_timestamp(df_fhvhv.dropoff_datetime) - F.unix_timestamp(df_fhvhv.pickup_datetime)) / 3600.0, 2)) \
    .orderBy("duration", ascending=False) \
    .limit(1) \
    .show()

[Stage 17:>                                                         (0 + 1) / 1]

+-------------------+-------------------+--------+
|   dropoff_datetime|    pickup_datetime|duration|
+-------------------+-------------------+--------+
|2021-06-28 08:48:25|2021-06-25 13:55:41|   66.88|
+-------------------+-------------------+--------+



                                                                                

This code below shows how to find answer in Spark SQL. \
It is necessary to register the dataframe to SQL as a table.

In [26]:
df_fhvhv.registerTempTable("fhvhv")



In [42]:
timediff_sql = spark.sql("""
SELECT  dropoff_datetime,
        pickup_datetime,
        ROUND((UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600.0, 2) AS duration
FROM fhvhv
ORDER BY 3 DESC
LIMIT 1;                                                           
""")

In [43]:
timediff_sql.show()

[Stage 18:>                                                         (0 + 1) / 1]

+-------------------+-------------------+--------+
|   dropoff_datetime|    pickup_datetime|duration|
+-------------------+-------------------+--------+
|2021-06-28 08:48:25|2021-06-25 13:55:41|   66.88|
+-------------------+-------------------+--------+



                                                                                

**User Interface**

Spark’s User Interface which shows application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080


The answer is port 4040.

![interface](./pyspark-jobs-interface.png)

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark \
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

- East Chelsea
- Astoria
- Union Sq
- Crown Heights North

Answer is Crown Heights North

In [50]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv -O taxi_zone_lookup.csv

--2023-10-30 09:59:24--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20231030%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20231030T095849Z&X-Amz-Expires=300&X-Amz-Signature=e7f4549e7f56130de8f7cc6f205724acbe87630bd744e5a4135faac5bc8beef9&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-10-30 09:59:24--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [53]:
zone_lookup = spark.read \
    .option("header", "true") \
    .csv("taxi_zone_lookup.csv", inferSchema=True)

In [68]:
print("zone_lookup")
zone_lookup.printSchema()

print("df_fhvhv")
df_fhvhv.printSchema()

zone_lookup
root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

df_fhvhv
root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



This section uses Spark Dataframe methods to find the answer.

In [69]:
zone_lookup_pu = zone_lookup.withColumnRenamed("Zone", "PUZone")

In [81]:
df_fhvhv.join(zone_lookup_pu, on=df_fhvhv.PULocationID == zone_lookup_pu.LocationID, how="inner") \
    .groupBy("PUZone") \
    .agg(F.count("PUZone").alias("count_pickup")) \
    .orderBy("count_pickup", ascending=False) \
    .limit(1) \
    .show()

[Stage 41:>                                                         (0 + 1) / 1]

+-------------------+------------+
|             PUZone|count_pickup|
+-------------------+------------+
|Crown Heights North|      231279|
+-------------------+------------+



                                                                                

This section uses Spark SQL to find the answer.

In [82]:
zone_lookup_pu.createOrReplaceTempView("zone_lookup_pu")

In [86]:
spark.sql("""
          SELECT z.PUZone, COUNT(1) AS count_pickup
          FROM fhvhv f
          INNER JOIN zone_lookup_pu z ON z.LocationID = f.PULocationID
          GROUP BY z.PUZone
          ORDER BY 2 DESC
          LIMIT 1;
          """) \
          .show()

[Stage 45:>                                                         (0 + 1) / 1]

+-------------------+------------+
|             PUZone|count_pickup|
+-------------------+------------+
|Crown Heights North|      231279|
+-------------------+------------+



                                                                                