In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types, functions as F

In [3]:
spark = SparkSession.builder\
          .master("local[*]")\
          .appName("test")\
          .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/27 20:48:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
schema = types.StructType([
  types.StructField('hvfhs_license_num', types.StringType(), True),
  types.StructField('dispatching_base_num', types.StringType(), True),
  types.StructField('originating_base_num', types.StringType(), True),
  types.StructField('request_datetime', types.TimestampNTZType(), True),
  types.StructField('on_scene_datetime', types.TimestampNTZType(), True),
  types.StructField('pickup_datetime', types.TimestampNTZType(), True),
  types.StructField('dropoff_datetime', types.TimestampNTZType(), True),
  types.StructField('PULocationID', types.IntegerType(), True),
  types.StructField('DOLocationID', types.IntegerType(), True),
  types.StructField('trip_miles', types.DoubleType(), True),
  types.StructField('trip_time', types.IntegerType(), True),
  types.StructField('base_passenger_fare', types.DoubleType(), True),
  types.StructField('tolls', types.DoubleType(), True),
  types.StructField('bcf', types.DoubleType(), True),
  types.StructField('sales_tax', types.DoubleType(), True),
  types.StructField('congestion_surcharge', types.DoubleType(), True),
  types.StructField('airport_fee', types.DoubleType(), True),
  types.StructField('tips', types.DoubleType(), True),
  types.StructField('driver_pay', types.DoubleType(), True),
  types.StructField('shared_request_flag', types.StringType(), True),
  types.StructField('shared_match_flag', types.StringType(), True),
  types.StructField('access_a_ride_flag', types.StringType(), True),
  types.StructField('wav_request_flag', types.StringType(), True),
  types.StructField('wav_match_flag', types.StringType(), True)])

In [5]:
df = spark.read\
      .option("header", True)\
      .parquet('fhvhv_tripdata_2021-01.parquet')
      # .schema(schema)\
      # .option('mergeSchema', True)\

                                                                                

In [6]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_f

In [None]:
df = df.repartition(24)

In [8]:
df.write.parquet('fhvhv/2021/01', mode='overwrite')

25/02/27 20:48:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
25/02/27 20:49:17 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/27 20:49:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/27 20:49:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/02/27 20:49:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [9]:
def custom_func(base_num):
  num = int(base_num[1:])
  if num % 7 == 0:
    return f's/{num:0.3x}'
  else:
    return f'e/{num:0.3x}'

In [10]:
custom_func_udf = F.udf(custom_func, returnType=types.StringType())

In [None]:
df\
  .withColumn('pickup_date', F.to_date(df.pickup_datetime))\
  .withColumn('dropoff_date', F.to_date(df.dropoff_datetime))\
  .withColumn('base_id', custom_func_udf(df.dispatching_base_num))\
  .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')\
  .show()

                                                                                

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-26|  2021-01-26|         247|         200|
| 2021-01-29|  2021-01-29|          61|         119|
| 2021-01-27|  2021-01-27|         248|         182|
| 2021-01-04|  2021-01-04|         225|          97|
| 2021-01-25|  2021-01-25|         233|         140|
| 2021-01-15|  2021-01-15|          20|         265|
| 2021-01-23|  2021-01-23|         137|         226|
| 2021-01-20|  2021-01-20|         126|         254|
| 2021-01-22|  2021-01-22|          50|         166|
| 2021-01-01|  2021-01-01|          28|         129|
| 2021-01-23|  2021-01-23|          68|         265|
| 2021-01-28|  2021-01-28|          28|         134|
| 2021-01-15|  2021-01-15|         159|          18|
| 2021-01-05|  2021-01-05|          37|         256|
| 2021-01-04|  2021-01-04|          35|         177|
| 2021-01-17|  2021-01-17|          85|       

In [13]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')\
    .filter(df.hvfhs_license_num == 'HV0003')

DataFrame[pickup_datetime: timestamp_ntz, dropoff_datetime: timestamp_ntz, PULocationID: bigint, DOLocationID: bigint]

In [14]:
spark.stop()