In [6]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
pyspark.__file__

'/home/chinwee/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/__init__.py'

In [7]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/26 07:24:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Download file and read using Spark

In [4]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-26 07:18:18--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.78.116
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.78.116|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-26 07:19:11 (13.5 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [8]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-02.csv')

In [9]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

### Generate 1000 rows to check schema using pandas

In [12]:
!head -n 1001 fhvhv_tripdata_2021-02.csv > head.csv

In [13]:
import pandas as pd
df_pandas = pd.read_csv('head.csv')

In [14]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [16]:
from pyspark.sql import types

In [17]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

### Read data using Spark again, this time specifying schema

In [19]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [20]:
df = df.repartition(24)

In [21]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [23]:
df = spark.read.parquet('fhvhv/2021/02/')
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



### Question 2: Size of HVFHW February 2021 

In [None]:
# !ls -lh fhvhv/2021/02
# 208mb

### Use Spark SQL

In [24]:
### Register dataframe as temp table so we can run sql on it
df.registerTempTable('fhvhv_2021_02')

In [26]:
spark.sql("""
SELECT *
FROM fhvhv_2021_02
LIMIT 10
""").show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null|
|           HV0003|              B02869|2021-02-04 16:56:52|2021-02-04 17:21:36|         260|          95|   null|
|           HV0003|              B02871|2021-02-03 18:34:17|2021-02-03 18:57:12|         235|          60|   null|
|           HV0003|              B02869|2021-02-04 07:25:09|2021-02-04 07:30:34|          55|          55|   null|
|           HV0003|              B02836|2021-02-04 23:15:27|2021-02-04 23:34:29|

### Question 3. Count records

In [34]:
from pyspark.sql.functions import dayofmonth

spark.sql("""
SELECT COUNT(*)
FROM fhvhv_2021_02
WHERE dayofmonth(pickup_datetime) = 15
""").show()

[Stage 12:>                                                         (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

### Question 4. Longest trip for each day

In [47]:
spark.sql("""
SELECT 
  pickup_datetime, 
  dropoff_datetime, 
  unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime) AS duration_seconds
FROM fhvhv_2021_02
WHERE pickup_datetime IS NOT NULL
  AND dropoff_datetime IS NOT NULL
ORDER BY duration_seconds DESC
""").show()

[Stage 23:>                                                         (0 + 4) / 4]

+-------------------+-------------------+----------------+
|    pickup_datetime|   dropoff_datetime|duration_seconds|
+-------------------+-------------------+----------------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|           75540|
|2021-02-17 15:54:53|2021-02-18 07:48:34|           57221|
|2021-02-20 12:08:15|2021-02-21 00:22:14|           44039|
|2021-02-03 20:24:25|2021-02-04 07:41:58|           40653|
|2021-02-19 23:17:44|2021-02-20 09:44:01|           37577|
|2021-02-25 17:13:35|2021-02-26 02:57:05|           35010|
|2021-02-20 01:36:13|2021-02-20 11:16:19|           34806|
|2021-02-18 15:24:19|2021-02-19 01:01:11|           34612|
|2021-02-18 01:31:20|2021-02-18 11:07:15|           34555|
|2021-02-10 20:51:39|2021-02-11 06:21:08|           34169|
|2021-02-10 01:56:17|2021-02-10 10:57:33|           32476|
|2021-02-25 09:18:18|2021-02-25 18:18:57|           32439|
|2021-02-21 19:59:13|2021-02-22 04:56:16|           32223|
|2021-02-09 18:36:13|2021-02-10 03:31:00|           3208



### Question 5. Most frequent dispatching_base_num

In [49]:
spark.sql("""
SELECT 
  dispatching_base_num,
  COUNT(*) AS count
FROM fhvhv_2021_02
GROUP BY dispatching_base_num
ORDER BY count DESC
""").show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

### Question 6. Most common locations pair

In [50]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [51]:
### Register dataframe as temp table so we can run sql on it
zones.registerTempTable('zones')

In [65]:
spark.sql("""
SELECT  
  CONCAT(COALESCE(pu_zones.Zone, 'Unknown'), ' / ', COALESCE(do_zones.Zone, 'Unknown')) AS locations_pair,
  COUNT(*) AS count
FROM fhvhv_2021_02
LEFT JOIN zones AS pu_zones
ON PULocationID = pu_zones.LocationID
LEFT JOIN zones AS do_zones
ON DOLocationID = do_zones.LocationID
GROUP BY CONCAT(COALESCE(pu_zones.Zone, 'Unknown'), ' / ', COALESCE(do_zones.Zone, 'Unknown'))
ORDER BY count DESC
""").show(20, truncate=False)



+-----------------------------------------------------+-----+
|locations_pair                                       |count|
+-----------------------------------------------------+-----+
|East New York / East New York                        |45041|
|Borough Park / Borough Park                          |37329|
|Canarsie / Canarsie                                  |28026|
|Crown Heights North / Crown Heights North            |25976|
|Bay Ridge / Bay Ridge                                |17934|
|Jackson Heights / Jackson Heights                    |14688|
|Astoria / Astoria                                    |14688|
|Central Harlem North / Central Harlem North          |14481|
|Bushwick South / Bushwick South                      |14424|
|Flatbush/Ditmas Park / Flatbush/Ditmas Park          |13976|
|South Ozone Park / South Ozone Park                  |13716|
|Brownsville / Brownsville                            |12829|
|JFK Airport / NA                                     |12542|
|Prospec

                                                                                