In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [2]:
# Q1 Creating Spark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

23/03/09 14:19:25 WARN Utils: Your hostname, M710-ASUS resolves to a loopback address: 127.0.1.1; using 192.168.1.72 instead (on interface wlp1s0)
23/03/09 14:19:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/09 14:19:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
## Output from spark.version command

print(spark.version)

3.3.2


In [4]:
# Q2 Average size of Parquet Files

## Run wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
## in bash terminal

## Specifying the schema
schema = types.StructType([
    types.StructField('hvfhvs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

## Reading csv.gz file
df = spark.read.option("header", "true").schema(schema).csv("data/fhvhv_tripdata_2021-06.csv.gz")

In [6]:
## Partition by 12
df = df.repartition(12)

## Write as parquet format
df.write.parquet('data/fhvhv/2021/06', mode = "overwrite")

[Stage 0:>                                                          (0 + 0) / 1][Stage 0:>                                                          (0 + 1) / 1]

23/03/09 14:20:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, SR_Flag, Affiliated_base_number
 Schema: hvfhvs_license_num, dispatching_base_num, pickup_datetime, dropoff_datetime, PUlocationID, DOlocationID, SR_Flag
Expected: hvfhvs_license_num but found: dispatching_base_num
CSV file: file:///home/m710/Escritorio/Data_Engineering_Zoomcamp/homeworks/homeworkWeek5/data/fhvhv_tripdata_2021-06.csv.gz


                                                                                

In [27]:
# Q3 How many taxi trips were there on June 15?
## Reading the parquet file
df = spark.read.parquet('data/fhvhv/2021/06/')
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

450872

In [30]:
# Q4 How long was the longest trip in Hours?

## Creating the SQL table
df.createOrReplaceTempView('fhvhv_2021_06')

## Querying the SQL table
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration
FROM 
    fhvhv_2021_06
GROUP BY pickup_date
ORDER BY duration DESC
LIMIT 10;
""").show()



+-----------+--------+
|pickup_date|duration|
+-----------+--------+
| 2021-06-22|    null|
| 2021-06-04|    null|
| 2021-06-20|    null|
| 2021-06-27|    null|
| 2021-06-28|    null|
| 2021-06-01|    null|
| 2021-06-17|    null|
| 2021-06-13|    null|
| 2021-06-19|    null|
| 2021-07-01|    null|
+-----------+--------+



                                                                                

In [13]:
# Q5 Spark’s User Interface which shows application's dashboard runs on which local port?

Port= 4040

In [16]:
# Q6 Using the zone lookup data and the fhvhv June 2021 data, 
# what is the name of the most frequent pickup location zone?

## Reading csv files

schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zones', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])
df_zones = spark.read.option("header", "true").schema(schema).csv("data/taxi_zone_lookup.csv")

In [18]:
## Creating SQL table
df_zones.createOrReplaceTempView('zones')

In [24]:
## Querying
spark.sql("""
SELECT
    zones.Zones as Zone,
    COUNT(1) as Count
FROM 
    fhvhv_2021_06 fhv LEFT JOIN zones ON fhv.PULocationID = zones.LocationID
GROUP BY Zone
ORDER BY Count DESC
LIMIT 3;
""").show()

23/03/09 14:43:52 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: LocationID, Zone
 Schema: LocationID, Zones
Expected: Zones but found: Zone
CSV file: file:///home/m710/Escritorio/Data_Engineering_Zoomcamp/homeworks/homeworkWeek5/data/taxi_zone_lookup.csv




+-------------------+------+
|               Zone| Count|
+-------------------+------+
|                 NA|548735|
|Crown Heights North|236244|
|        JFK Airport|224571|
+-------------------+------+



                                                                                