In [17]:
import os
os.environ['HADOOP_HOME'] = 'C:/tools/hadoop-3.2.0'
os.environ['JAVA_HOME'] = 'C:/tools/jdk-11.0.21'
os.environ['SPARK_HOME'] = 'C:/tools/spark-3.3.2-bin-hadoop3'

In [18]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [29]:
#set up your configurations before building your SparkSession.

google_credentials = "C:/Users/LENOVO/Desktop/Data-Engineering-Zoomcamp/Module5-Batch_Processing_Spark\HW/module5-spark-2534760391ac.json"

conf = SparkConf() \
    .setMaster('spark://localhost:7077') \
    .setAppName('test') \
    .set("spark.jars", "C:/tools/spark-3.3.2-bin-hadoop3/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", google_credentials)
print(conf)

<pyspark.conf.SparkConf object at 0x00000298F57A2ED0>


In [33]:
sc = SparkContext('spark://localhost:7077', "Spark_connect_gcs_App")

# set google configurations inside SparkSession
hadoop_config = sc._jsc.hadoopConfiguration()

hadoop_config.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_config.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_config.set("fs.gs.auth.service.account.json.keyfile", google_credentials)
hadoop_config.set("fs.gs.auth.service.account.enable", "true")

print(sc)

<SparkContext master=spark://localhost:7077 appName=Spark_connect_gcs_App>


In [34]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

sc.stop()

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [36]:
#1. Define schema
from pyspark.sql import types
fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropOff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.DoubleType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [37]:
#read csv file as spark dataframe & set schema
fhv_df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema)\
    .csv("gs://spark-output-bi-reports/dataset/fhv_tripdata_2019-10.csv")

fhv_df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [38]:
#2. partition data to 6 partitions
fhv_partitioned = fhv_df.repartition(6)

In [39]:
fhv_partitioned.write.mode('overwrite').parquet("C:/Users/LENOVO/Desktop/Data-Engineering-Zoomcamp/Module5-Batch_Processing_Spark/HW/output_partitioned/")

Question 3:
Count records

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

In [40]:
fhv_partitioned.createOrReplaceTempView('fhv_oct_trips')

In [41]:
count_trips_middle_oct = spark.sql("""
select 
    count(*)
from 
    fhv_oct_trips
where 
    extract(DAY from pickup_datetime) = 15
""")
count_trips_middle_oct.show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



Question 4:
Longest trip for each day

What is the length of the longest trip in the dataset in hours?

In [42]:
longest_trip = spark.sql("""
select 
    pickup_datetime, dropOff_datetime, DATEDIFF(hour, pickup_datetime, dropOff_datetime) as longest_trip_length
from 
    fhv_oct_trips
order by 3 desc
limit 1
""")
longest_trip.show()

+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropOff_datetime|longest_trip_length|
+-------------------+-------------------+-------------------+
|2019-10-28 09:00:00|2091-10-28 09:30:00|             631152|
+-------------------+-------------------+-------------------+



Question 6:
Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [44]:
#Define zones schema
zones_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

In [45]:
#read csv file as spark dataframe & set schema
zones_df = spark.read \
    .option("header", "true") \
    .schema(zones_schema)\
    .csv("dataset/taxi+_zone_lookup.csv")

zones_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [46]:
zones_df.createOrReplaceTempView("zones_table")

In [51]:
joined_data = spark.sql("""
select 
    zones.Zone, count(*) as least_zone_pickup
from 
    fhv_oct_trips as fhv_trips
join 
    zones_table as zones
on 
    fhv_trips.PULocationID = zones.LocationID
group by 
    1
order by 
    2 asc
limit 
    1
""")

joined_data.show()

+-----------+-----------------+
|       Zone|least_zone_pickup|
+-----------+-----------------+
|Jamaica Bay|                1|
+-----------+-----------------+

