# Homework week 5: Spark

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [None]:

schema_fhvhv = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])
     

In [None]:
schema_zones = types.StructType([
    types.StructField('LocationID', types.StringType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
])

In [None]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()

## Question 1: Run the command "spark.version" What version number is output?


In [None]:
spark.version

In [None]:
df_fhvhv = spark.read\
    .option('header', 'true')\
    .schema(schema_fhvhv)\
    .csv('/home/fedrpi/de-zoomcamp-2023/data/fhvhv/fhvhv_tripdata_2021-06.csv.gz')

In [None]:
df_fhvhv.show()

## Question 2: Repartition the June 2021 HVFHV Data into 12 partitions and save it to Parquet. What is the average size of the Parquet Files?

In [None]:
df_fhvhv = df_fhvhv.repartition(12)

In [None]:
df_fhvhv.write.parquet('/home/fedrpi/de-zoomcamp-2023/data/pq')

In [None]:
dfp = spark.read.parquet('/home/fedrpi/de-zoomcamp-2023/data/pq/')

In [None]:
dfp

## Question 3: How many taxi trips were started on June 15th?


In [None]:
dfp.withColumn('pickup_date', F.to_date(dfp.pickup_datetime))\
   .withColumn('dropoff_date', F.to_date(dfp.dropoff_datetime))\
   .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')\
   .filter(col('pickup_date') == '2021-06-15')\
   .count()

## Question 4: How long is the longest trip in the dataset? 

In [None]:
dfp.withColumn('hours_diff', (F.unix_timestamp(col("dropoff_datetime")) - F.unix_timestamp(col("pickup_datetime"))) / 3600)\
   .select('pickup_datetime', 'dropoff_datetime', 'hours_diff', 'PULocationID', 'DOLocationID')\
   .orderBy(col('hours_diff').desc())\
   .show()

In [None]:

dfp.registerTempTable('fhvhv')

In [None]:
df_zones = spark.read\
            .option('header', 'true')\
            .schema(schema_zones)\
            .csv('/home/fedrpi/de-zoomcamp-2023/data/fhvhv/taxi_zone_lookup.csv')

In [None]:
df_zones.registerTempTable('zones')

## Question 5: What port does Spark's User Interface Dashboard run on by default?

4040

## Question 6: What is the name of the most frequent pickup location zone?

In [None]:
gr_df = spark.sql('''
    select 
        f.PULocationID,
        z.Zone,
        count(*) total
    from 
        fhvhv f
    left join zones z
           on f.PULocationID = z.LocationID
    group by 1,2
    order by total desc
'''
)

In [None]:
gr_df.show()