# Data Engineering Zoomcamp HW5

In [48]:
import os
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import types
from dotenv import load_dotenv, find_dotenv


In [49]:
find_dotenv()
load_dotenv()

google_credentials = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')

In [50]:
# Constants

INPUT_PATH = "gs://de-zoomcamp-bq-2/fhv/fhv_tripdata_2019-10.csv"
OUTPUT_PATH = "gs://de-zoomcamp-bq-2/hw5/fhv_tripdata_2019-10/"

## Question 1

In [51]:
pyspark.__version__

'3.3.2'

In [52]:
spark_config = [
    ("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar"),
    ("spark.hadoop.google.cloud.auth.service.account.enable", "true"),
    ("spark.hadoop.google.cloud.auth.service.account.json.keyfile", google_credentials),
]

spark_conf = SparkConf().setAll(spark_config)

In [53]:
spark = SparkSession.builder \
        .appName('spark_dezoomcamp') \
        .master('local[*]') \
        .config(conf=spark_conf) \
        .getOrCreate()

## Question 2

In [54]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.IntegerType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.IntegerType(), True)])

In [55]:
df_fhv_2019 = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv(INPUT_PATH)

In [56]:
df_fhv_2019.count()

                                                                                

1897493

In [57]:
df_fhv_2019 \
    .repartition(6) \
    .write \
    .mode('overwrite') \
    .parquet(OUTPUT_PATH)

                                                                                

## Question 3

In [58]:
df_fhv = spark.read\
    .parquet(OUTPUT_PATH)

# Create a temp view for FHV oct data for SQL queries. 
df_fhv.createOrReplaceTempView('fhv_oct')

                                                                                

In [59]:
query = f"""
    SELECT 
        COUNT(1) AS num_taxi_trips
    FROM fhv_oct
    WHERE date_trunc('day', pickup_datetime) = '2019-10-15 00:00:00'
"""

spark.sql(query).show()



+--------------+
|num_taxi_trips|
+--------------+
|         62610|
+--------------+



                                                                                

## Question 4

In [60]:
query = f"""
    SELECT 
        MAX((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) / (60 * 60))AS longest_trip
    FROM fhv_oct
"""

spark.sql(query).show()



+------------+
|longest_trip|
+------------+
|    631152.5|
+------------+



                                                                                

## Question 6

In [61]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('../data/taxi_zone_lookup.csv')
    
df_zones.createOrReplaceTempView('zones')

In [62]:
query = f"""
    SELECT *
    FROM zones
"""

spark.sql(query).show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [63]:
query = f"""
    SELECT *
    FROM fhv_oct
"""

spark.sql(query).show()

[Stage 77:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|                null|2019-10-06 14:15:00|2019-10-06 17:07:00|         264|         264|   null|                  null|
|                null|2019-10-01 17:54:21|2019-10-01 17:59:50|         179|           7|   null|                  null|
|                null|2019-10-01 08:30:52|2019-10-01 08:40:16|         264|          82|   null|                  null|
|                null|2019-10-06 23:00:00|2019-10-06 23:30:58|         264|         264|   null|                  null|
|                null|2019-10-01 19:04:56|2019-10-01 19:26:44|         264|         259|   null|                  null|
|                null|2019-10-08 06:49:0

                                                                                

In [64]:
query = f"""
    SELECT 
        Zone,
        COUNT(1) as pick_up_freq
    FROM 
        fhv_oct
    INNER JOIN 
        zones
    ON 
        fhv_oct.PUlocationID = zones.LocationID
    GROUP BY
        Zone
    ORDER BY 
        pick_up_freq ASC
"""

spark.sql(query).show()



+--------------------+------------+
|                Zone|pick_up_freq|
+--------------------+------------+
|         Jamaica Bay|           1|
|Governor's Island...|           2|
| Green-Wood Cemetery|           5|
|       Broad Channel|           8|
|     Highbridge Park|          14|
|        Battery Park|          15|
|Saint Michaels Ce...|          23|
|Breezy Point/Fort...|          25|
|Marine Park/Floyd...|          26|
|        Astoria Park|          29|
|    Inwood Hill Park|          39|
|       Willets Point|          47|
|Forest Park/Highl...|          53|
|  Brooklyn Navy Yard|          57|
|        Crotona Park|          62|
|        Country Club|          77|
|     Freshkills Park|          89|
|       Prospect Park|          98|
|     Columbia Street|         105|
|  South Williamsburg|         110|
+--------------------+------------+
only showing top 20 rows



                                                                                

# Thank You !