In [23]:
import pyspark
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Question 2

In [9]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

In [10]:
import pandas as pd

pd.read_csv('fhv_tripdata_2019-10.csv.gz', compression='gzip').head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [27]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [36]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [16]:
df \
    .repartition(6) \
    .write.parquet('homework/fhv_tripdata_2019_10')

In [19]:
import os

def average_parquet_file_size(folder_path):
    parquet_files = [file for file in os.listdir(folder_path) if file.endswith('.parquet')]
    
    total_size = 0
    num_files = 0

    for file in parquet_files:
        file_path = os.path.join(folder_path, file)
        total_size += os.path.getsize(file_path)
        num_files += 1

    if num_files > 0:
        average_size = total_size / num_files
        return average_size
    else:
        return 0

folder_path = 'homework/fhv_tripdata_2019_10'
average_size = average_parquet_file_size(folder_path)
print("Average size of .parquet files in the folder:", average_size/1024**2, "Mbytes")

Average size of .parquet files in the folder: 6.219527244567871 Mbytes


## Question 3

In [29]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [22]:
df.select('pickup_datetime').head()

Row(pickup_datetime=datetime.datetime(2019, 10, 1, 0, 35))

In [30]:
df.filter(F.to_date(df.pickup_datetime) == '2019-10-15').count()

62610

## Question 4

In [32]:
df \
    .withColumn('duration', df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))\
    .agg({"duration": "max"}).collect()[0]

Row(max(duration)=2272149000)

In [34]:
2272149000 / 3600

631152.5

## Question 6

In [43]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [44]:
df_zones = spark.read.option("header", True).schema(zones_schema).csv('taxi_zone_lookup.csv')

In [48]:
df.createOrReplaceTempView('fhv_2019_10')

In [49]:
df_zones.createOrReplaceTempView('zones')

In [54]:
spark.sql("""
SELECT
    zones.Zone,
    COUNT(*) AS PickupCount
FROM 
    fhv_2019_10 AS fhv 
LEFT JOIN 
    zones ON fhv.PUlocationID = zones.LocationID
GROUP BY 
    zones.Zone
ORDER BY 
    PickupCount ASC
LIMIT 5
""").show()

+--------------------+-----------+
|                Zone|PickupCount|
+--------------------+-----------+
|         Jamaica Bay|          1|
|Governor's Island...|          2|
| Green-Wood Cemetery|          5|
|       Broad Channel|          8|
|     Highbridge Park|         14|
+--------------------+-----------+

