In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

In [10]:
# !curl -O https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet

In [11]:
df = spark.read \
    .parquet("fhvhv_tripdata_2021-01.parquet", header=True, inferSchema=True)

In [12]:
df.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [13]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True), StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag',

In [14]:
import pandas as pd

In [15]:
df_pandas = pd.read_parquet('fhvhv_tripdata_2021-01.parquet')
df_pandas_subset = df_pandas[['hvfhs_license_num','dispatching_base_num','pickup_datetime','dropoff_datetime','PULocationID','DOLocationID']].head(1001)

In [16]:
df_pandas_subset.dtypes

hvfhs_license_num               object
dispatching_base_num            object
pickup_datetime         datetime64[us]
dropoff_datetime        datetime64[us]
PULocationID                     int64
DOLocationID                     int64
dtype: object

In [17]:
pd.DataFrame.iteritems = pd.DataFrame.items

In [18]:
spark.createDataFrame(df_pandas_subset).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True)])

In [19]:
from pyspark.sql import types
from pyspark.sql.types import *

In [20]:
schema = StructType([
    StructField('hvfhs_license_num', StringType(), True), 
    StructField('dispatching_base_num', StringType(), True), 
    StructField('pickup_datetime', TimestampType(), True), 
    StructField('dropoff_datetime', TimestampType(), True), 
    StructField('PULocationID', LongType(), True), 
    StructField('DOLocationID', LongType(), True)
])

In [21]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .parquet("fhvhv_tripdata_2021-01.parquet")

In [22]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 7, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 7, 49, 7), PULocationID=230, DOLocationID=166),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 7, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 8, 18, 21), PULocationID=152, DOLocationID=167),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 7, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 7, 38, 5), PULocationID=233, DOLocationID=142),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 7, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 7, 45, 50), PULocationID=142, DOLocationID=143),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 7, 48, 14), dropoff_

In [23]:
df = df.repartition(24)

In [24]:
df.write.parquet("fhvhv/2021/01/")

AnalysisException: path file:/c:/Users/USER/Code/data-engineering-zoomcamp/05-batch/code/fhvhv/2021/01 already exists.

In [25]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)



In [29]:
from pyspark.sql import functions as F

In [33]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [34]:
crazy_stuff('B02884')

's/b44'

In [35]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [37]:
df \
     .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
     .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
     .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
     .select('base_id','pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
     .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/b35| 2021-01-22|  2021-01-22|          71|          72|
|  e/b47| 2021-01-10|  2021-01-10|         231|         231|
|  e/b47| 2021-01-18|  2021-01-18|          42|         161|
|  e/b42| 2021-02-01|  2021-02-01|          37|         225|
|  e/b3f| 2021-01-18|  2021-01-18|         181|          25|
|  e/b38| 2021-01-01|  2021-01-01|         106|          13|
|  a/b31| 2021-01-07|  2021-01-07|          23|          23|
|  a/a7a| 2021-01-21|  2021-01-21|          78|         163|
|  e/a39| 2021-01-15|  2021-01-15|         220|          78|
|  e/9ce| 2021-01-06|  2021-01-06|         126|         126|
|  e/acc| 2021-01-30|  2021-01-30|          85|          89|
|  e/9ce| 2021-01-20|  2021-01-20|          85|          39|
|  s/b44| 2021-01-29|  2021-01-29|         205|         215|
|  e/b3b| 2021-01-29|  2

In [28]:
df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .show()

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-28 00:30:23|2021-01-28 00:42:55|          53|          83|
|2021-01-16 18:12:29|2021-01-16 18:37:14|          85|          76|
|2021-01-12 02:15:18|2021-01-12 02:56:52|         161|          35|
|2021-01-28 23:09:05|2021-01-28 23:24:18|          36|         255|
|2021-01-27 15:35:52|2021-01-27 15:48:09|         231|          87|
|2021-01-22 07:13:49|2021-01-22 07:33:42|           3|          42|
|2021-01-25 19:34:03|2021-01-25 19:42:45|         230|          50|
|2021-01-09 17:59:20|2021-01-09 18:11:44|         234|         246|
|2021-01-11 19:58:13|2021-01-11 20:02:19|           3|         242|
|2021-01-07 18:15:37|2021-01-07 18:36:20|         226|         161|
|2021-01-30 19:58:58|2021-01-30 20:12:22|          62|         225|
|2021-01-22 02:36:36|2021-01-22 02:45:41|       