In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
pyspark.__file__

'/home/spark/spark-3.3.2-bin-hadoop3/python/pyspark/__init__.py'

In [7]:
pyspark.__version__

'3.3.2'

In [8]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/01 16:41:26 WARN Utils: Your hostname, ubuntu-s-4vcpu-8gb-blr1-01 resolves to a loopback address: 127.0.1.1; using 10.47.0.5 instead (on interface eth0)
24/03/01 16:41:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/01 16:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [45]:
# Question 1: Install Spark and PySpark - What's the output?

print(spark.version)

3.3.2


In [15]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-01 16:54:31--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240301%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240301T155432Z&X-Amz-Expires=300&X-Amz-Signature=cd584bd59e8e002ddcc0137fd4e1fbcdd43a79597ec542a98e7dd3507a2bb203&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-01 16:54:32--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [17]:
# Try inferring the Schema
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('fhv_tripdata_2019-10.csv.gz')

                                                                                

In [18]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [21]:
# !mkdir pq

In [35]:
output_path = "pq"

df\
    .repartition(6) \
    .write.parquet(output_path, mode='overwrite')

                                                                                

In [37]:
!tree pq

[01;34mpq[00m
├── _SUCCESS
├── part-00000-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet
├── part-00001-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet
├── part-00002-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet
├── part-00003-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet
├── part-00004-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet
└── part-00005-5c301ca2-a996-4a0b-8dad-475fe0f1cced-c000.snappy.parquet

0 directories, 7 files


In [43]:
# Question 2 - What is the average size of the Parquet?
!du -h pq

39M	pq


In [79]:
# Question 3 - How many taxi trips were there on the 15th of October?
df.registerTempTable('trips_data')

spark.sql("""
SELECT
    count(1) AS num_rows
FROM
    trips_data
WHERE
    pickup_datetime BETWEEN '2019-10-15 00:00:00' AND '2019-10-15 23:59:59'
""").show()

[Stage 35:>                                                         (0 + 1) / 1]

+--------+
|num_rows|
+--------+
|   62610|
+--------+



                                                                                

In [59]:
# Question 4 - Longest trip for each day. What is the length of the longest trip in the dataset in hours?

spark.sql("""
SELECT 
    CAST(pickup_datetime AS DATE) AS day,
    --TIMESTAMPDIFF(HOUR, dropOff_datetime, pickup_datetime) AS duration_hours
    (UNIX_TIMESTAMP(dropOff_datetime) - UNIX_TIMESTAMP(pickup_datetime)) / 3600 AS duration_hours

FROM trips_data

GROUP BY 
day, duration_hours

ORDER BY duration_hours DESC

LIMIT 5


""").show()

[Stage 15:>                                                         (0 + 1) / 1]

+----------+-----------------+
|       day|   duration_hours|
+----------+-----------------+
|2019-10-11|         631152.5|
|2019-10-28|         631152.5|
|2019-10-31|87672.44083333333|
|2019-10-01|70128.02805555555|
|2019-10-17|           8794.0|
+----------+-----------------+



                                                                                

In [77]:
# Question 6 - Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

# Load the zone lookup data into a temp view in Spark

zones_df = spark.read.csv('taxi_zone_lookup.csv', header=True, inferSchema=True)

In [80]:
# Create TempView
zones_df.registerTempTable('zones_data')



In [82]:
zones_df.printSchema()
df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [89]:
spark.sql("""
    SELECT 
        COUNT(l.pickup_datetime) AS num_trips, r.Zone AS Zone_Name
    FROM trips_data AS l
    JOIN zones_data AS r 
    ON 
        l.PUlocationID = r.LocationID
    GROUP BY Zone_Name
    ORDER BY num_trips ASC
    LIMIT 2
""").show()

[Stage 54:>                                                         (0 + 1) / 1]

+---------+--------------------+
|num_trips|           Zone_Name|
+---------+--------------------+
|        1|         Jamaica Bay|
|        2|Governor's Island...|
+---------+--------------------+



                                                                                