# Week 5

## Working with Spark

### Run Pyspark

In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local[*]") \
        .appName("test") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 13:58:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark.version

'3.4.2'

### Loading FHV 2019-10 data

In [3]:
df = pd.read_csv("data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz", nrows=100)

In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 7 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   dispatching_base_num    100 non-null    object 
 1   pickup_datetime         100 non-null    object 
 2   dropOff_datetime        100 non-null    object 
 3   PUlocationID            100 non-null    int64  
 4   DOlocationID            100 non-null    int64  
 5   SR_Flag                 0 non-null      float64
 6   Affiliated_base_number  99 non-null     object 
dtypes: float64(1), int64(2), object(4)
memory usage: 5.6+ KB


In [5]:
spark.createDataFrame(df).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', LongType(), True), StructField('DOlocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [3]:
from pyspark.sql import types

In [4]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.DoubleType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ]
)

In [5]:
print("Processing fhv data for October 2019...")

input_path = 'data/raw/fhv/2019/10/'
output_path = 'data/pq/fhv/2019/10/'

df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv(input_path)

df_fhv \
    .repartition(6) \
    .write.parquet(output_path, mode='overwrite')

print("...Successfully saved fhv data for October 2019 to parquet.")

Processing fhv data for October 2019...
...Successfully saved fhv data for October 2019 to parquet.


In [6]:
!ls -lh data/pq/fhv/2019/10

total 39M
-rw-r--r-- 1 abhijit abhijit    0 Mar  2 12:37 _SUCCESS
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00000-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00001-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00002-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00003-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00004-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet
-rw-r--r-- 1 abhijit abhijit 6.4M Mar  2 12:37 part-00005-5b14061e-581a-4f33-b0d0-f40dcd8fcec0-c000.snappy.parquet


The average size of parquet files is 6.4 MB

In [17]:
df_fhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
df_fhv.registerTempTable('trips')



In [8]:
spark.sql(
"""
SELECT 
    COUNT(1) AS October_15_trips    
FROM
    trips
WHERE 
    CAST(pickup_datetime AS DATE) = "2019-10-15"
"""
).show()

[Stage 0:>                                                          (0 + 1) / 1]

+----------------+
|October_15_trips|
+----------------+
|           62610|
+----------------+



                                                                                

In [9]:
spark.sql("""
SELECT 
    MAX(DATEDIFF(hour, pickup_datetime, dropOff_datetime)) as max_trip_duration_hours
FROM 
    trips
""").show()

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------------+
|max_trip_duration_hours|
+-----------------------+
|                 631152|
+-----------------------+



                                                                                

### Zones Lookup

In [10]:
zones_csv = "data/misc/taxi_zone_lookup.csv"

In [11]:
df_zone = pd.read_csv(zones_csv)

In [12]:
spark.createDataFrame(df_zone).schema

StructType([StructField('LocationID', LongType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [13]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)
    ]
)

In [14]:
df_zone = spark.read \
        .option("header", "true") \
        .schema(zone_schema) \
        .csv(zones_csv)

In [15]:
df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [16]:
df_zone.registerTempTable('zones')



In [17]:
df_fhv.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropOff_datetime',
 'PUlocationID',
 'DOlocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [20]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [25]:
spark.sql("""
SELECT 
    z.Zone, 
    COUNT(1) as trips_count
FROM 
    trips as t
JOIN 
    zones as z
ON 
    t.PUlocationID = z.LocationID
GROUP BY 
    z.Zone
ORDER BY 
    trips_count ASC
LIMIT 1
""").show()

[Stage 23:>                                                         (0 + 1) / 1]

+-----------+-----------+
|       Zone|trips_count|
+-----------+-----------+
|Jamaica Bay|          1|
+-----------+-----------+



                                                                                