In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

### Question 1

In [2]:
# connect to cluster
spark = SparkSession.builder \
    .master("spark://de-zoomcamp.europe-west1-b.c.stoked-mode-375206.internal:7077") \
    .appName('test') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/27 21:20:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# spark version
spark.version

'3.2.3'

### Question 2

In [4]:
# Download the HVFHV data for february 2021:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz -O data/raw/fhvhv_tripdata_2021-06.csv.gz

--2023-02-27 21:20:12--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230227%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230227T212012Z&X-Amz-Expires=300&X-Amz-Signature=3b86edecf2e8c6a0059e89852f26e4bc60c28e8e2707ba1a442446bcc658c128&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-02-27 21:20:12--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [5]:
!ls data/raw -lh

total 1.1G
-rw-rw-r-- 1 froukje froukje    0 Feb 27 19:57 fhvhv_tripdata_2021-02.csv
-rw-rw-r-- 1 froukje froukje 878M Dec 20 00:13 fhvhv_tripdata_2021-06.csv
-rw-rw-r-- 1 froukje froukje 168M Dec 20 00:13 fhvhv_tripdata_2021-06.csv.gz


In [6]:
!gunzip data/raw/fhvhv_tripdata_2021-06.csv.gz

gzip: data/raw/fhvhv_tripdata_2021-06.csv already exists; do you wish to overwrite (y or n)? ^C


In [11]:
!ls data/raw -lh

total 1.1G
-rw-rw-r-- 1 froukje froukje    0 Feb 27 19:57 fhvhv_tripdata_2021-02.csv
-rw-rw-r-- 1 froukje froukje 878M Dec 20 00:13 fhvhv_tripdata_2021-06.csv
-rw-rw-r-- 1 froukje froukje 168M Dec 20 00:13 fhvhv_tripdata_2021-06.csv.gz


In [12]:
!pwd

/home/froukje/de-zoomcamp/week_5_batch_processing/homework


In [13]:
df = spark.read\
    .option("header", "true") \
    .csv('data/raw/fhvhv_tripdata_2021-06.csv')

df.head(5)

[Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:02:41', dropoff_datetime='2021-06-01 00:07:46', PULocationID='174', DOLocationID='18', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:16:16', dropoff_datetime='2021-06-01 00:21:14', PULocationID='32', DOLocationID='254', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:27:01', dropoff_datetime='2021-06-01 00:42:11', PULocationID='240', DOLocationID='127', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:46:08', dropoff_datetime='2021-06-01 00:53:45', PULocationID='127', DOLocationID='235', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02510', pickup_datetime='2021-06-01 00:45:42', dropoff_datetime='2021-06-01 01:03:33', PULocationID='144', DOLocationID='146', SR_Flag='N', Affiliated_base_number=Non

In [14]:
df_pandas = pd.read_csv("data/raw/fhvhv_tripdata_2021-06.csv")
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [15]:
# schema from the videos
schema = types.StructType([
         types.StructField('dispatching_base_num',types.StringType(),True),
         types.StructField('pickup_datetime',types.TimestampType(),True),
         types.StructField('dropoff_datetime',types.TimestampType(),True),
         types.StructField('PULocationID',types.IntegerType(),True),
         types.StructField('DOLocationID',types.IntegerType(),True),
         types.StructField('SR_Flag',types.StringType(),True)]
)

In [16]:
df = spark.read\
    .option("header", "true") \
    .schema(schema) \
    .csv('data/raw/fhvhv_tripdata_2021-06.csv')

df.head(5)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 16, 16), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 21, 14), PULocationID=32, DOLocationID=254, SR_Flag='N'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 27, 1), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 42, 11), PULocationID=240, DOLocationID=127, SR_Flag='N'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 46, 8), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 53, 45), PULocationID=127, DOLocationID=235, SR_Flag='N'),
 Row(dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 45, 42), dropoff_datetime=datetime.datetime(2021, 6, 1, 1, 3, 33), PULocationID=144, DOLocationID=146, SR_Fla

In [17]:
df.repartition(12)

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string]

In [18]:
df.write.parquet('data/fhvhv/2021/06', mode="overwrite")

                                                                                

In [19]:
!ls -lh data/fhvhv/2021/06

total 184M
-rw-r--r-- 1 froukje froukje   0 Feb 27 21:27 _SUCCESS
-rw-r--r-- 1 froukje froukje 28M Feb 27 21:25 part-00000-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 27M Feb 27 21:25 part-00001-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 27M Feb 27 21:25 part-00002-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 28M Feb 27 21:25 part-00003-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 27M Feb 27 21:27 part-00004-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 27M Feb 27 21:27 part-00005-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet
-rw-r--r-- 1 froukje froukje 23M Feb 27 21:27 part-00006-17d97a28-30ec-4dd7-ab52-3c5d131c38da-c000.snappy.parquet


### Question 3

How many taxi trips were there on June 15?

Consider only trips that started on June 15.

In [20]:
# register dataframe as a table to use sql
df.registerTempTable('trips_data')



In [21]:
spark.sql("""
SELECT
    pickup_datetime
FROM
    trips_data
WHERE
    pickup_datetime >= '2021-06-15 00:00:00' AND
    pickup_datetime <= '2021-06-15 23:59:59'
""").show()



+-------------------+
|    pickup_datetime|
+-------------------+
|2021-06-15 00:31:16|
|2021-06-15 00:52:12|
|2021-06-15 00:44:22|
|2021-06-15 00:18:17|
|2021-06-15 00:04:26|
|2021-06-15 00:18:06|
|2021-06-15 00:51:52|
|2021-06-15 00:08:58|
|2021-06-15 00:39:28|
|2021-06-15 00:06:47|
|2021-06-15 00:44:32|
|2021-06-15 00:02:09|
|2021-06-15 00:09:11|
|2021-06-15 00:26:00|
|2021-06-15 00:38:53|
|2021-06-15 00:30:50|
|2021-06-15 00:51:46|
|2021-06-15 00:57:46|
|2021-06-15 00:00:36|
|2021-06-15 00:16:55|
+-------------------+
only showing top 20 rows





In [22]:
spark.sql("""
SELECT
    count(*)
FROM
    trips_data
WHERE
    pickup_datetime >= '2021-06-15 00:00:00' AND
    pickup_datetime <= '2021-06-15 23:59:59'
""").show()



+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

### Question 4

**Longest trip for each day**

Now calculate the duration for each trip.
How long was the longest trip in Hours?

In [30]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    (unix_timestamp(dropoff_datetime)-unix_timestamp(pickup_datetime))/(3600) AS trip_duration
FROM
    trips_data
ORDER BY 
    trip_duration DESC
""").show()



+-------------------+-------------------+------------------+
|    pickup_datetime|   dropoff_datetime|     trip_duration|
+-------------------+-------------------+------------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|  66.8788888888889|
|2021-06-22 12:09:45|2021-06-23 13:42:44|25.549722222222222|
|2021-06-27 10:32:29|2021-06-28 06:31:20|19.980833333333333|
|2021-06-26 22:37:11|2021-06-27 16:49:01|18.197222222222223|
|2021-06-23 20:40:43|2021-06-24 13:08:44|16.466944444444444|
|2021-06-23 22:03:31|2021-06-24 12:19:39|14.268888888888888|
|2021-06-24 23:11:00|2021-06-25 13:05:35|13.909722222222221|
|2021-06-04 20:56:02|2021-06-05 08:36:14|             11.67|
|2021-06-27 07:45:19|2021-06-27 19:07:16|11.365833333333333|
|2021-06-20 17:05:12|2021-06-21 04:04:16|10.984444444444444|
|2021-06-01 12:25:29|2021-06-01 22:41:32|           10.2675|
|2021-06-01 12:01:46|2021-06-01 21:59:45| 9.966388888888888|
|2021-06-28 13:13:59|2021-06-28 23:11:58| 9.966388888888888|
|2021-06-27 03:52:14|202

                                                                                

### Question 6

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [42]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv -O data/raw/taxi_zone_lookup.csv

--2023-02-27 21:54:16--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230227%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230227T215416Z&X-Amz-Expires=300&X-Amz-Signature=45ca2c3b60572203aab3770ffcf565026e5ffca246251025f2655347e33f778b&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-02-27 21:54:16--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [46]:
!ls data/raw/

fhvhv_tripdata_2021-02.csv  fhvhv_tripdata_2021-06.csv.gz
fhvhv_tripdata_2021-06.csv  taxi_zone_lookup.csv


In [47]:
df_lookup = spark.read\
    .option("header", "true") \
    .csv('data/raw/taxi_zone_lookup.csv')

df_lookup.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [44]:
df_lookup.registerTempTable('lookup_data')

In [50]:
spark.sql("""
SELECT
    *
FROM
    lookup_data
""").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [66]:
spark.sql("""
SELECT
    Zone, COUNT(*)
FROM
    lookup_data
WHERE Zone LIKE '%East Chelsea%'
    OR Zone LIKE '%Astoria%'
    OR Zone LIKE '%East Chelsea%'
    OR Zone LIKE '%Crown Heights North'
GROUP BY 
    Zone
""").show()

+-------------------+--------+
|               Zone|count(1)|
+-------------------+--------+
|        Old Astoria|       1|
|Crown Heights North|       1|
|            Astoria|       1|
|       Astoria Park|       1|
|       East Chelsea|       1|
+-------------------+--------+

