In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

spark =  SparkSession.builder \
    .master('local[*]')\
    .appName('test') \
    .getOrCreate()

24/03/06 18:51:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Home Work Solution 1

Install Spark and PySpark

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.
- What's the output?

In [3]:
spark.version

'3.0.3'

Home work solution 2

Data: FHV October 2019

- Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.
- Repartition the Dataframe to 6 partitions and save it to parquet.
- What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [4]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [5]:
!wc -l fhv_tripdata_2019-10.csv.gz

62958 fhv_tripdata_2019-10.csv.gz


In [33]:
df_fhv = spark.read\
    .option('header','true') \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [34]:
df_fhv.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropOff_datetime,StringType,true),StructField(PUlocationID,StringType,true),StructField(DOlocationID,StringType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [35]:
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
+--------------------+------------------

In [9]:
# %%timeit
import pandas as pd
df_pandas = pd.read_csv('fhv_tripdata_2019-10.csv.gz')

In [10]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [11]:
df_pandas['Affiliated_base_number'] = df_pandas['Affiliated_base_number'].fillna("NAN Values")

In [12]:
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropOff_datetime,StringType,true),StructField(PUlocationID,DoubleType,true),StructField(DOlocationID,DoubleType,true),StructField(SR_Flag,DoubleType,true),StructField(Affiliated_base_number,StringType,true)))

Define the new shema and read the data again by pasisng in the data types

In [36]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num',types.StringType(),True),
    types.StructField('pickup_datetime',types.TimestampType(),True),
    types.StructField('dropOff_datetime',types.TimestampType(),True),
    types.StructField('PUlocationID',types.IntegerType(),True),
    types.StructField('DOlocationID',types.IntegerType(),True),
    types.StructField('SR_Flag',types.StringType(),True),
    types.StructField('Affiliated_base_number',types.StringType(),True)
])

df_fhv = spark.read \
    .option('header', 'true') \
    .schema(schema)\
    .csv('fhv_tripdata_2019-10.csv.gz')

In [37]:
df_fhv.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropOff_datetime,TimestampType,true),StructField(PUlocationID,IntegerType,true),StructField(DOlocationID,IntegerType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [38]:
df_fhv = df_fhv.repartition(6)

In [39]:
df_fhv.write.parquet('fhvhv/2019/01', mode='overwrite')

                                                                                

In [17]:
!ls -lhr fhvhv/2019/01

total 38M
-rw-r--r-- 1 sammygis sammygis 6.3M Mar  6 18:53 part-00005-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis 6.4M Mar  6 18:53 part-00004-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis 6.4M Mar  6 18:53 part-00003-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis 6.4M Mar  6 18:53 part-00002-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis 6.4M Mar  6 18:53 part-00001-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis 6.4M Mar  6 18:53 part-00000-179f9f23-2d95-4cc1-a9f9-5ee3c3f2f13e-c000.snappy.parquet
-rw-r--r-- 1 sammygis sammygis    0 Mar  6 18:53 _SUCCESS


Homework Solution 3:

Count records
- How many taxi trips were there on the 15th of October?
- Consider only trips that started on the 15th of October.

In [40]:
df_fhv.registerTempTable("fhvhv")
result = spark.sql("""
    SELECT COUNT(*)
    FROM fhvhv
    WHERE to_date(pickup_datetime) = '2019-10-15'
    """)

result.show()

[Stage 23:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

Homework Solution 4

Longest trip for each day
- What is the length of the longest trip in the dataset in hours?

In [19]:
# Calculate the longest trip duration per day using Apache Spark SQL
trip_per_day = spark.sql("""
    SELECT 
        TO_DATE(pickup_datetime) AS pickup_date,
        pickup_datetime,
        dropOff_datetime,
        MAX(
            (HOUR(dropOff_datetime) + MINUTE(dropOff_datetime) / 60.0 + SECOND(dropOff_datetime) / 3600.0) -
            (HOUR(pickup_datetime) - MINUTE(pickup_datetime) / 60.0 + SECOND(pickup_datetime) / 3600.0)
        ) AS longest_trip_duration_in_hours
    FROM fhvhv
    -- WHERE TO_DATE(pickup_datetime) = TO_DATE(dropOff_datetime) 
    GROUP BY 
        pickup_date, pickup_datetime, dropOff_datetime  -- Group by pickup date, pickup time, and drop-off time
    ORDER BY longest_trip_duration_in_hours DESC  -- Order by longest trip duration in descending order
    LIMIT 10  -- Limit the output to 10 records
""")

trip_per_day.show()




+-----------+-------------------+-------------------+------------------------------+
|pickup_date|    pickup_datetime|   dropOff_datetime|longest_trip_duration_in_hours|
+-----------+-------------------+-------------------+------------------------------+
| 2019-10-02|2019-10-02 00:37:00|2019-10-02 23:49:00|                     24.433334|
| 2019-10-17|2019-10-17 00:47:00|2019-10-17 23:36:00|                     24.383333|
| 2019-10-15|2019-10-15 00:49:00|2019-10-15 23:17:00|                     24.100000|
| 2019-10-16|2019-10-16 00:45:00|2019-10-16 23:13:00|                     23.966667|
| 2019-10-16|2019-10-16 00:21:00|2019-10-16 22:57:00|                     23.300000|
| 2019-10-25|2019-10-25 00:21:00|2019-10-25 22:37:00|                     22.966667|
| 2019-10-26|2019-10-26 00:45:00|2019-10-26 22:05:00|                     22.833333|
| 2019-10-18|2019-10-18 00:49:00|2019-10-18 21:36:00|                     22.416667|
| 2019-10-02|2019-10-02 00:54:00|2019-10-02 20:53:00|            

                                                                                

In [20]:
# Calculate the longest trip duration per day using Apache Spark SQL
trip_per_day = spark.sql( """
                SELECT 
                    TO_DATE(pickup_datetime) AS pickup_date,
                    SUM(
                        HOUR(dropOff_datetime) + MINUTE(dropOff_datetime) / 60.0 + SECOND(dropOff_datetime) / 3600.0 -
                        HOUR(pickup_datetime) + MINUTE(pickup_datetime) / 60.0 + SECOND(pickup_datetime) / 3600.0
                    ) AS longest_trip_duration_in_hours_decimal
                FROM fhvhv
                GROUP BY pickup_date
                ORDER BY longest_trip_duration_in_hours_decimal DESC
                LIMIT 1;
            """)

trip_per_day.show()



+-----------+--------------------------------------+
|pickup_date|longest_trip_duration_in_hours_decimal|
+-----------+--------------------------------------+
| 2019-10-03|                          90829.463043|
+-----------+--------------------------------------+



                                                                                

Homework Solution 5

User Interface

- Spark’s User Interface which shows the application's dashboard runs on which local port?

localhost://4040

Homework Solution 6

Least frequent pickup location zone

- Load the zone lookup data into a temp view in Spark: Zone Data

- Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [23]:
# !wget https://s3.amazonaws.com/nyc-tlc/misc/'taxi+_zone_lookup.csv'

--2024-03-06 18:57:08--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.233.120, 52.217.43.158, 52.216.59.120, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.233.120|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2024-03-06 18:57:08 (187 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [24]:
df = spark.read\
    .option("header",'true') \
    .csv('taxi+_zone_lookup.csv')

In [25]:
df.write.parquet('zones')

In [26]:
!ls

01_test.ipynb		 10_spark_gcs_schema_local.ipynb  lib
02_partitioning.ipynb	 fhv_tripdata_2019-10.csv.gz	  spark-warehouse
03_sparkDataFrame.ipynb  fhvhv				  taxi+_zone_lookup.csv
05_spark_sql.ipynb	 green_taxi			  yellow_taxi
09_connecting_gcs.ipynb  homework.ipynb			  zones


In [30]:
# load the alrady downloaded zone data
df_zones = spark.read.parquet('zones/')

In [31]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [43]:
df_fhv.head(5)

                                                                                

[Row(dispatching_base_num='B02784', pickup_datetime=datetime.datetime(2019, 10, 1, 9, 55, 38), dropOff_datetime=datetime.datetime(2019, 10, 1, 10, 5, 43), PUlocationID=89, DOlocationID=85, SR_Flag=None, Affiliated_base_number=None),
 Row(dispatching_base_num='B02429', pickup_datetime=datetime.datetime(2019, 10, 21, 4, 15, 47), dropOff_datetime=datetime.datetime(2019, 10, 21, 4, 36, 4), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02429'),
 Row(dispatching_base_num='B01482', pickup_datetime=datetime.datetime(2019, 10, 19, 12, 0), dropOff_datetime=datetime.datetime(2019, 10, 19, 12, 20), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B01482'),
 Row(dispatching_base_num='B03015', pickup_datetime=datetime.datetime(2019, 10, 11, 14, 28), dropOff_datetime=datetime.datetime(2019, 10, 11, 14, 32, 44), PUlocationID=264, DOlocationID=216, SR_Flag=None, Affiliated_base_number='B03015'),
 Row(dispatching_base_num='B01529', pickup_datetime=da

In [46]:
#join the main data to the zone data
df_join = df_fhv.join(df_zones,
                      df_fhv.PUlocationID == df_zones.LocationID)

In [47]:
df_join.drop('PUlocationID')

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropOff_datetime: timestamp, DOlocationID: int, SR_Flag: string, Affiliated_base_number: string, LocationID: string, Borough: string, Zone: string, service_zone: string]

In [48]:
df_join.head()

                                                                                

Row(dispatching_base_num='B02784', pickup_datetime=datetime.datetime(2019, 10, 1, 9, 55, 38), dropOff_datetime=datetime.datetime(2019, 10, 1, 10, 5, 43), PUlocationID=89, DOlocationID=85, SR_Flag=None, Affiliated_base_number=None, LocationID='89', Borough='Brooklyn', Zone='Flatbush/Ditmas Park', service_zone='Boro Zone')

In [49]:
df_join.registerTempTable("joined_table")

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

In [50]:
least_pipkup_lctn = spark.sql("""
                 SELECT LocationID,
                        Zone,
                 COUNT(1) AS pickup_count
                 FROM
                     joined_Table
                 GROUP BY
                     1,2
                ORDER BY
                        pickup_count
                """)

In [51]:
least_pipkup_lctn.show()



+----------+--------------------+------------+
|LocationID|                Zone|pickup_count|
+----------+--------------------+------------+
|         2|         Jamaica Bay|           1|
|       105|Governor's Island...|           2|
|       111| Green-Wood Cemetery|           5|
|        30|       Broad Channel|           8|
|       120|     Highbridge Park|          14|
|        12|        Battery Park|          15|
|       207|Saint Michaels Ce...|          23|
|        27|Breezy Point/Fort...|          25|
|       154|Marine Park/Floyd...|          26|
|         8|        Astoria Park|          29|
|       128|    Inwood Hill Park|          39|
|       253|       Willets Point|          47|
|        96|Forest Park/Highl...|          53|
|        34|  Brooklyn Navy Yard|          57|
|        59|        Crotona Park|          62|
|        58|        Country Club|          77|
|        99|     Freshkills Park|          89|
|       190|       Prospect Park|          98|
|        54| 

                                                                                