In [1]:
# Import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
# Begin spark session on port 4040
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/02/25 02:06:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Question 1:
Install Spark and PySpark

Install Spark
Run PySpark
Create a local spark session
Execute spark.version.
What's the output?

In [111]:
!spark-shell --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.2
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.


## Question 2:
FHV October 2019

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [23]:
# Create custom schema
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [24]:
# Read data into spark cluster
df = spark.read \
    .schema(schema) \
    .option("header", "true") \
    .csv('../data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz')

In [19]:
df.repartition(6).write.parquet('../data/pq/fhv/2019/10/')

                                                                                

In [107]:
!ls -lh ../data/pq/fhv/2019/10

total 39M
-rw-r--r-- 1 proled proled    0 Feb 25 02:11 _SUCCESS
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00000-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00001-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00002-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00003-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00004-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet
-rw-r--r-- 1 proled proled 6.4M Feb 25 02:11 part-00005-b52fd8c6-133e-4a15-a222-28721d46fa62-c000.snappy.parquet


## Question 3:
Count records

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

In [29]:
df_rename = df \
    .withColumnRenamed('dropOff_datetime', 'dropoff_datetime') \
    .withColumnRenamed('PUlocationID', 'pickup_id') \
    .withColumnRenamed('DOlocationID', 'dropoff_id') \
    .withColumnRenamed('SR_Flag', 'sr_flag') \
    .withColumnRenamed('Affiliated_base_number', 'affil_base_num')

In [32]:
df_rename.createOrReplaceTempView('fhv_data')

In [109]:
df_result = spark.sql("""

SELECT
    COUNT(*)
FROM
    fhv_data
WHERE
    CAST(pickup_datetime AS DATE) = '2019-10-15'
;

""")

In [110]:
df_result.show(1)

[Stage 65:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

## Question 4:
Longest trip for each day

What is the length of the longest trip in the dataset in hours?

In [79]:
df_result = spark.sql("""

SELECT
    UNIX_TIMESTAMP(pickup_datetime) AS pickup_unix,
    UNIX_TIMESTAMP(dropoff_datetime) AS dropoff_unix,
    UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime) AS trip_length_seconds,
    (UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / (60*60) AS trip_length_hours
FROM
    fhv_data
ORDER BY
    (dropoff_datetime - pickup_datetime) DESC
LIMIT
    10
;

""")

In [81]:
df_result.select('trip_length_hours').show(1)

[Stage 46:>                                                         (0 + 1) / 1]

+-----------------+
|trip_length_hours|
+-----------------+
|         631152.5|
+-----------------+
only showing top 1 row



                                                                                

## Question 6:
Least frequent pickup location zone

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [89]:
# Create custom schema
schema_zones = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [90]:
# Read data into spark cluster
df_zones = spark.read \
    .schema(schema_zones) \
    .option("header", "true") \
    .csv('../data/raw/zones/taxi_zone_lookup.csv')

In [96]:
df_zones_renamed = df_zones \
    .withColumnRenamed('LocationID', 'location_id') \
    .withColumnRenamed('Borough', 'borough') \
    .withColumnRenamed('Zone', 'zone')

In [98]:
df_zones_renamed.createOrReplaceTempView('zones')

In [101]:
df_result_3 = spark.sql("""

SELECT
    zones.zone,
    COUNT(*)
FROM
    fhv_data
INNER JOIN
    zones
ON
    fhv_data.pickup_id = zones.location_id
GROUP BY
    zones.zone
ORDER BY
    COUNT(*)
LIMIT
    1
;

""")

In [108]:
df_result_3.show(1)

[Stage 62:>                                                         (0 + 1) / 1]

+-----------+--------+
|       zone|count(1)|
+-----------+--------+
|Jamaica Bay|       1|
+-----------+--------+
only showing top 1 row



                                                                                