In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

In [2]:
spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/09 16:13:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [4]:
# !gunzip fhvhv_tripdata_2021-01.csv.gz

In [5]:
!wc -l fhvhv_tripdata_2021-01.csv

11908469 fhvhv_tripdata_2021-01.csv


In [6]:
df = spark.read.option("header", "true").csv("fhvhv_tripdata_2021-01.csv")

In [7]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [8]:
df.head(3)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None)]

In [9]:
!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv

In [10]:
df_pandas = pd.read_csv("head.csv")

In [11]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [12]:
spark.createDataFrame(df_pandas).printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- SR_Flag: double (nullable = true)



Integer - 4 bytes  
Long - 8 bytes

In [13]:
schema = types.StructType(
    [
        types.StructField("hvfhs_license_num", types.StringType(), True),
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        types.StructField("PULocationID", types.IntegerType(), True),
        types.StructField("DOLocationID", types.IntegerType(), True),
        types.StructField("SR_Flag", types.StringType(), True),
    ]
)

In [14]:
df = (
    spark.read.option("header", "true").schema(schema).csv("fhvhv_tripdata_2021-01.csv")
)

In [15]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [16]:
df.head(3)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None)]

In [17]:
df.rdd.getNumPartitions()

6

In [18]:
718 / 128  # file size / block size

5.609375

In [19]:
df = df.repartition(24)

In [20]:
df.write.parquet("fhvhv/2021/01/")

                                                                                

In [21]:
df.rdd.getNumPartitions()



24

In [22]:
!tree fhvhv

[01;34mfhvhv[0m
└── [01;34m2021[0m
    └── [01;34m01[0m
        ├── _SUCCESS
        ├── part-00000-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00001-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00002-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00003-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00004-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00005-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00006-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00007-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00008-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00009-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00010-9a113357-3524-412c-8e12-cdc3b33748bf-c000.snappy.parquet
        ├── part-00011-9a113357-3524-

In [23]:
df = spark.read.parquet("fhvhv/2021/01/")

                                                                                

In [24]:
df.count()

11908468

In [25]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



SELECT * FROM df WHERE hvfhs_license_num =  HV0003

In [26]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2021-01-03 17:17:21|2021-01-03 17:26:18|         255|          34|   null|
|           HV0003|              B02882|2021-01-05 22:14:07|2021-01-05 22:32:28|         189|         107|   null|
|           HV0003|              B02867|2021-01-02 17:59:55|2021-01-02 18:10:39|          88|         137|   null|
|           HV0003|              B02872|2021-01-02 23:57:54|2021-01-03 00:15:48|         238|         224|   null|
|           HV0003|              B02875|2021-01-06 15:53:13|2021-01-06 16:07:07|         169|         208|   null|
|           HV0003|              B02867|2021-01-07 07:35:24|2021-01-07 07:55:49|

In [27]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f"s/{num:03x}"
    elif num % 3 == 0:
        return f"a/{num:03x}"
    else:
        return f"e/{num:03x}"

In [28]:
crazy_stuff("B02884")

's/b44'

In [29]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [30]:
(
    df.withColumn("pickup_date", F.to_date(df.pickup_datetime))
    .withColumn("dropoff_date", F.to_date(df.dropoff_datetime))
    .withColumn("base_id", crazy_stuff_udf(df.dispatching_base_num))
    .select("base_id", "pickup_date", "dropoff_date", "PULocationID", "DOLocationID")
    .show()
)

[Stage 12:>                                                         (0 + 1) / 1]

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-03|  2021-01-03|         255|          34|
|  e/b42| 2021-01-05|  2021-01-05|         189|         107|
|  e/b33| 2021-01-02|  2021-01-02|          88|         137|
|  e/b38| 2021-01-02|  2021-01-03|         238|         224|
|  e/b3b| 2021-01-06|  2021-01-06|         169|         208|
|  e/b33| 2021-01-07|  2021-01-07|          75|          88|
|  e/acc| 2021-01-07|  2021-01-07|         210|         210|
|  e/acc| 2021-01-02|  2021-01-02|         243|          69|
|  e/b35| 2021-01-04|  2021-01-04|         250|         213|
|  s/b3d| 2021-01-03|  2021-01-03|          87|          79|
|  e/a39| 2021-01-03|  2021-01-03|          68|         181|
|  s/acd| 2021-01-04|  2021-01-04|          95|         236|
|  s/b13| 2021-01-02|  2021-01-02|         262|         236|
|  e/9ce| 2021-01-04|  2

                                                                                

In [31]:
(
    df.select(
        "pickup_datetime",
        "dropoff_datetime",
        "PULocationID",
        "DOLocationID",
        "hvfhs_license_num",
    )
    .filter(df.hvfhs_license_num == "HV0003")
    .show()
)

+-------------------+-------------------+------------+------------+-----------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|hvfhs_license_num|
+-------------------+-------------------+------------+------------+-----------------+
|2021-01-05 22:14:07|2021-01-05 22:32:28|         189|         107|           HV0003|
|2021-01-02 17:59:55|2021-01-02 18:10:39|          88|         137|           HV0003|
|2021-01-02 23:57:54|2021-01-03 00:15:48|         238|         224|           HV0003|
|2021-01-06 15:53:13|2021-01-06 16:07:07|         169|         208|           HV0003|
|2021-01-07 07:35:24|2021-01-07 07:55:49|          75|          88|           HV0003|
|2021-01-07 08:45:12|2021-01-07 08:51:17|         210|         210|           HV0003|
|2021-01-02 15:44:26|2021-01-02 16:10:50|         243|          69|           HV0003|
|2021-01-04 16:50:28|2021-01-04 16:57:43|         250|         213|           HV0003|
|2021-01-03 10:30:34|2021-01-03 10:44:53|          87|