In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.csv

In [3]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2021-01.csv.gz

--2023-03-01 14:10:35--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2021-01.csv.gz
Resolving github.com (github.com)... ::ffff:20.248.137.48, 20.248.137.48
Connecting to github.com (github.com)|::ffff:20.248.137.48|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/e896902d-d6b3-4b1a-967d-f76edcd6da52?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230301%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230301T141036Z&X-Amz-Expires=300&X-Amz-Signature=ed4f34fcfa4e652c163ef660854d990f28d94aafbf7b5cd918df6b78bfe78e77&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2021-01.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-01 14:10:36--  https://objects.githubusercontent.com/github-production-release-asset-

In [4]:
!wc -l fhv_tripdata_2021-01.csv.gz

39664 fhv_tripdata_2021-01.csv.gz


In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2021-01.csv.gz')

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [7]:
import pandas as pd
df_pandas = pd.read_csv('fhv_tripdata_2021-01.csv.gz', nrows=1000)

df_pandas = pd.read_csv('head.csv')

In [8]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropoff_datetime           object
PULocationID              float64
DOLocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

spark.createDataFrame(df_pandas).schema

Integer - 4 bytes
Long - 8 bytes

In [10]:
from pyspark.sql import types

In [11]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2021-01.csv.gz')

In [15]:
df.dtypes

[('dispatching_base_num', 'string'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('SR_Flag', 'string'),
 ('Affiliated_base_number', 'string')]

In [14]:
df = df.repartition(24)

In [16]:
df.coalesce(1).write.parquet('fhv/2021/01/')

In [17]:
df = spark.read.parquet('fhv/2021/01/')

In [18]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



SELECT * FROM df WHERE hvfhs_license_num =  HV0003

In [19]:
from pyspark.sql import functions as F

In [20]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02550|2021-01-13 04:01:48|2021-01-13 04:14:48|        null|         181|   null|                B02550|
|              B01437|2021-01-24 13:58:49|2021-01-24 14:06:38|        null|          28|   null|                B02875|
|              B00900|2021-01-01 05:04:34|2021-01-01 06:07:00|        null|          42|   null|                B00900|
|              B00350|2021-01-08 11:03:38|2021-01-08 11:09:52|        null|         129|   null|                B03085|
|              B01239|2021-01-26 10:56:28|2021-01-26 11:51:03|        null|         119|   null|                B02765|
|              B02794|2021-01-16 11:11:1

In [21]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [22]:
crazy_stuff('B02884')

's/b44'

In [23]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [24]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  a/9f6| 2021-01-13|  2021-01-13|        null|         181|
|  a/59d| 2021-01-24|  2021-01-24|        null|          28|
|  a/384| 2021-01-01|  2021-01-01|        null|          42|
|  s/15e| 2021-01-08|  2021-01-08|        null|         129|
|  s/4d7| 2021-01-26|  2021-01-26|        null|         119|
|  e/aea| 2021-01-16|  2021-01-16|        null|         188|
|  e/100| 2021-01-31|  2021-01-31|        null|        null|
|  e/13f| 2021-01-07|  2021-01-07|          29|          21|
|  s/c08| 2021-01-19|  2021-01-19|        null|          65|
|  e/287| 2021-01-01|  2021-01-01|        null|          78|
|  a/bf4| 2021-01-10|  2021-01-10|        null|         149|
|  e/adf| 2021-01-27|  2021-01-27|        null|         242|
|  e/a6f| 2021-01-06|  2021-01-06|        null|          36|
|  e/437| 2021-01-29|  2

In [28]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID', 'dispatching_base_num') \
  .filter(df.dispatching_base_num == 'B03080') \
  .show()


+-------------------+-------------------+------------+------------+--------------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|dispatching_base_num|
+-------------------+-------------------+------------+------------+--------------------+
|2021-01-19 05:30:54|2021-01-19 05:45:58|        null|          65|              B03080|
|2021-01-16 13:05:30|2021-01-16 13:25:08|        null|          68|              B03080|
|2021-01-02 05:00:49|2021-01-02 05:15:53|        null|          35|              B03080|
|2021-01-22 10:12:42|2021-01-22 10:15:40|        null|         159|              B03080|
|2021-01-14 12:20:03|2021-01-14 12:52:11|        null|          14|              B03080|
|2021-01-12 08:50:22|2021-01-12 09:08:12|        null|         208|              B03080|
|2021-01-22 09:20:01|2021-01-22 09:50:55|        null|          77|              B03080|
|2021-01-25 10:30:01|2021-01-25 10:45:06|        null|         167|              B03080|
|2021-01-07 15:06:00|

In [None]:
!head -n 10 head.csv