In [11]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import functions as F
import warnings
from os.path import exists

import os.path

from pyspark.sql.functions import udf, max, desc
# from pyspark.sql.types import DoubleType



In [2]:
warnings.filterwarnings('ignore')

spark = SparkSession.builder.appName("Week5").config('spark.executor.instances', 4).getOrCreate()
# spark.conf.set("spark.executor.memory", "10g")
# spark.conf.set("spark.executor.cores", "4")


spark.sparkContext.setLogLevel("WARN")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/26 14:55:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Question 1:
Install Spark and PySpark

Install Spark
Run PySpark
Create a local spark session
Execute spark.version.
What's the output?

In [3]:
spark.version

'3.3.2'

_____

## Question 2
HVFHW June 2021

Read it with Spark using the same schema as we did in the lessons.
We will use this dataset for all the remaining questions.
Repartition it to 12 partitions and save it to parquet.
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [None]:
#download file and store locally as CSV
# https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

month = 6
year = 2021
dataset_file = f'fhvhv_tripdata_{year}-{month:02}'

#check if fifle already exists
file_exists = os.path.exists(f'{dataset_file}.csv')

if not file_exists:
    dataset_url = f'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/{dataset_file}.csv.gz'
    df_pandas = pd.read_csv(dataset_url, compression='gzip')
    df_pandas.to_csv(f'{dataset_file}.csv')
else:
    print('file already exists')

In [None]:
# Read it with Spark using the same schema as we did in the lessons.

q2_schema = StructType(
    [StructField('_c0', StringType(), True), 
     StructField('dispatching_base_num', StringType(), True),
     StructField('pickup_datetime', TimestampType(), True),
     StructField('dropOff_datetime', TimestampType(), True),
     StructField('PUlocationID', FloatType(), True), 
     StructField('DOlocationID', FloatType(), True),
     StructField('SR_Flag', StringType(), True), 
     StructField('Affiliated_base_number', StringType(), True)])

df_q2 = (spark.read
         .option("header", "true")
         .schema(q2_schema)
         .csv('fhvhv_tripdata_2021-06.csv')
         .drop(F.col("_c0"))
        )

df_q2 = (df_q2
         .withColumn("PUlocationID", df_q2["PUlocationID"].cast(IntegerType()))
         .withColumn("DOlocationID", df_q2["DOlocationID"].cast(IntegerType()))
        )

In [None]:
df_q2.printSchema()

In [None]:
# Repartition it to 12 partitions and save it to parquet.
df_q2 = df_q2.repartition(12)

In [None]:
df_q2.count()

In [None]:
df_q2.write.parquet('data/fhvhv/', mode = 'overwrite')

In [None]:
# What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?
# 23,5Mb

____

## Question 3:
Count records

How many taxi trips were there on June 15?

Consider only trips that started on June 15.

In [None]:
df_q2.count()

In [None]:
df_q3 = (df_q2
         .withColumn("pickup_date", F.to_date(df_q2.pickup_datetime))
        .withColumn("dropOff_date", F.to_date(df_q2.dropOff_datetime))
        .filter("pickup_date = '2021-06-15'") 
        )

df_q3.count()

________

## Question 4:
Longest trip for each day

Now calculate the duration for each trip.
How long was the longest trip in Hours?

In [None]:
df_q4 = spark.read.parquet('data/fhvhv/')

### spark method

In [None]:
df_q4 = df_q4.withColumn('DiffInSeconds', F.col("dropOff_datetime").cast("long") - F.col('pickup_datetime').cast("long")) \
.withColumn('DiffInMinutes',(F.col('DiffInSeconds')/60))\
.withColumn('DiffInHours',(F.col('DiffInSeconds')/3600))\
.select('pickup_datetime', 'dropOff_datetime', 'DiffInHours', 'DiffInSeconds', 'DiffInMinutes')

In [None]:
res = df_q4.selectExpr('max(DiffInHours) as max_value').first().max_value

In [None]:
print("Maximum hours_between: ", res)

### sparksql method

In [None]:
df_q4.createOrReplaceTempView('fhvhv_data')

spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    DATEDIFF(hour, pickup_datetime, dropoff_datetime) AS DateDiff
FROM
    fhvhv_data
ORDER BY
    DateDiff DESC
""").show(1)



______

## Question 5:
User Interface

Spark’s User Interface which shows application's dashboard runs on which local port?

In [None]:
http://localhost:4040/jobs/

__________

## Question 6:
Most frequent pickup location zone

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [8]:
df_zones = spark.read.format('csv').option('header','true').load('zone_lookup.csv')

df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [7]:
df_q6= spark.read.parquet('data/fhvhv/')

df_q6.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [10]:
df_join = df_q6.join(df_zones, df_q6.PUlocationID == df_zones.LocationID)

df_join.columns


['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'LocationID',
 'Borough',
 'Zone',
 'service_zone']

### Spark method

In [12]:

most_frequent_zone = df_join.groupBy('Zone') \
                      .count() \
                      .orderBy(desc('count')) \
                      .first()['Zone']


                                                                                

In [13]:
most_frequent_zone

'Crown Heights North'

### SparkSQL method

In [14]:
df_zones.registerTempTable('zones')
df_q6.registerTempTable('fhvhv')


In [16]:
spark.sql("""
SELECT 
    z.zone AS zone,
    COUNT(1) AS number_records
FROM fhvhv as tr
INNER JOIN zones as z on z.LocationID = tr.PUlocationID
GROUP BY 1
ORDER BY COUNT(1) DESC
LIMIT 1 
""").show()



+-------------------+--------------+
|               zone|number_records|
+-------------------+--------------+
|Crown Heights North|        231279|
+-------------------+--------------+



                                                                                