In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [5]:
spark.version

'3.3.2'

In [7]:
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz"

In [10]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

--2023-03-06 06:14:18--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 20.207.73.82
Connecting to github.com (github.com)|20.207.73.82|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T061418Z&X-Amz-Expires=300&X-Amz-Signature=34a5dabe914d0cbd9061cde21efbfa3cf13b4ca5345dd798d70c2d80b9318677&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-06 06:14:18--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [13]:
!gunzip fhvhv_tripdata_2021-06.csv.gz

In [15]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [16]:
df.head(2)

[Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:02:41', dropoff_datetime='2021-06-01 00:07:46', PULocationID='174', DOLocationID='18', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:16:16', dropoff_datetime='2021-06-01 00:21:14', PULocationID='32', DOLocationID='254', SR_Flag='N', Affiliated_base_number='B02764')]

In [17]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [18]:
!head -n 1001 fhvhv_tripdata_2021-06.csv > head.csv

In [19]:
import pandas as pd

In [20]:
df_pandas = pd.read_csv('head.csv')

In [21]:
df_pandas.shape

(1000, 7)

In [22]:
df_pandas.head(2)

Unnamed: 0,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,Affiliated_base_number
0,B02764,2021-06-01 00:02:41,2021-06-01 00:07:46,174,18,N,B02764
1,B02764,2021-06-01 00:16:16,2021-06-01 00:21:14,32,254,N,B02764


In [24]:
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [25]:
from pyspark.sql import types

In [26]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [27]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [28]:
df = df.repartition(12)

In [29]:
df.write.parquet('fhvhv/2021/06/')

In [30]:
df = spark.read.parquet('fhvhv/2021/06/')

In [31]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [32]:
df.count()

14961892

In [37]:
from pyspark.sql import functions as F


In [38]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-06-04|  2021-06-04|         118|         109|
| 2021-06-02|  2021-06-02|         163|          79|
| 2021-06-03|  2021-06-03|         231|          13|
| 2021-06-03|  2021-06-03|           9|          92|
| 2021-06-05|  2021-06-05|          14|         133|
| 2021-06-03|  2021-06-03|         152|          74|
| 2021-06-02|  2021-06-02|         220|         235|
| 2021-06-01|  2021-06-01|         162|         161|
| 2021-06-04|  2021-06-04|         206|         206|
| 2021-06-02|  2021-06-02|          49|         182|
| 2021-06-02|  2021-06-02|         236|          89|
| 2021-06-01|  2021-06-01|         151|          43|
| 2021-06-02|  2021-06-02|         181|         189|
| 2021-06-03|  2021-06-03|         181|          89|
| 2021-06-01|  2021-06-01|          86|          86|
| 2021-06-04|  2021-06-04|         162|       

In [40]:
df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    

In [42]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617| 2021-06-04|  2021-06-04|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875| 2021-06-02|  2021-06-02|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|                B02871| 2021-06-03|  2021-06-03|
|              B02888|2021-06-03 08:45:25|2021-06-03 09:00:12|           9|          92|      N|                B02888| 2021-06-03

In [43]:
new_df = df.select('pickup_date', 'dropoff_date').filter(df.pickup_date == '2021-06-15')


In [44]:
new_df.count()

452470

In [36]:
df.head(1)

[Row(dispatching_base_num='B02617', pickup_datetime=datetime.datetime(2021, 6, 4, 16, 50, 34), dropoff_datetime=datetime.datetime(2021, 6, 4, 17, 1, 18), PULocationID=118, DOLocationID=109, SR_Flag='N', Affiliated_base_number='B02617')]

Longest duration for each trip

In [46]:
df.show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617| 2021-06-04|  2021-06-04|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+
only showing top 1 row



In [62]:
from pyspark.sql.functions import dayofyear,unix_timestamp,max


In [49]:
%config Completer.use_jedi = False



In [59]:
df = df.withColumn('DiffInSeconds',(unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime'))/3600) \
  

In [60]:
df.show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|      DiffInSeconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617| 2021-06-04|  2021-06-04|0.17888888888888888|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
only showing top 1 row



In [63]:
df.select(max(df.DiffInSeconds).alias("fee_max"), 
    ).show()

+----------------+
|         fee_max|
+----------------+
|66.8788888888889|
+----------------+



In [64]:
zone_data = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"

In [65]:
zone_df =  pd.read_csv(zone_data)

In [66]:
zone_df.head(2)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone


In [72]:
df.select(df.columns).show(3,truncate=True)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|      DiffInSeconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617| 2021-06-04|  2021-06-04|0.17888888888888888|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875| 2021-06-02|  2021-06-02|0.14527777777777778|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|                B02871| 2021-06-03|  2021-06-03| 0.0763888888888889|
+---------

In [73]:
zone_df.head(1)

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR


In [74]:
df.registerTempTable("sql_Table")




In [75]:
df2 = spark.sql("select * from sql_Table")
# sorted(df.collect()) == sorted(df2.collect())
# True
# spark.catalog.dropTempView("people")

In [99]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'pickup_date',
 'dropoff_date',
 'DiffInSeconds']

In [101]:
from pyspark.sql.functions import desc


In [102]:
most_frequent_PULocationID = df.groupBy('PULocationID').count().orderBy(desc('count'))

# show the top 10 most frequent values
most_frequent_PULocationID.show(10)

+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
|          79|221244|
|         132|188867|
|          37|187929|
|          76|186780|
|         231|164344|
|         138|161596|
|         234|158937|
|         249|154698|
|           7|152493|
+------------+------+
only showing top 10 rows



In [77]:
df2.show(2)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|dropoff_date|      DiffInSeconds|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617| 2021-06-04|  2021-06-04|0.17888888888888888|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875| 2021-06-02|  2021-06-02|0.14527777777777778|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+------------+-------------------+
only showi

In [103]:
zone_df[zone_df.LocationID==61]

Unnamed: 0,LocationID,Borough,Zone,service_zone
60,61,Brooklyn,crown heights north,Boro Zone
