In [1]:
import pyspark
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('FirstLook') \
    .getOrCreate()

24/03/02 16:55:29 WARN Utils: Your hostname, coy-pc resolves to a loopback address: 127.0.1.1; using 172.19.138.153 instead (on interface eth0)
24/03/02 16:55:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 16:55:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.parquet('fhvhv/2021/01/')

In [6]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [None]:
# retrieving data from Spark DataFrames

In [10]:
df \
    .select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DoLocationID') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .show(n=5)

+-------------------+-------------------+------------+------------+
|    pickup_datetime|   dropoff_datetime|PULocationID|DoLocationID|
+-------------------+-------------------+------------+------------+
|2021-01-01 11:42:02|2021-01-01 12:00:51|          85|          11|
|2021-01-01 03:31:07|2021-01-01 03:38:10|         129|          82|
|2021-01-01 17:59:21|2021-01-01 18:17:29|          14|          26|
|2021-01-01 21:18:30|2021-01-01 21:27:55|          39|          39|
|2021-01-01 21:58:38|2021-01-01 22:05:07|          20|          20|
+-------------------+-------------------+------------+------------+
only showing top 5 rows



In [33]:
from pyspark.sql import functions as F
from pyspark.sql import types

In [18]:
# the DataFrame.withColumn() function adds new column if first param is nonexistent. else, it replaces the already existing column

df\
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .show(n=5)


+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|dropoff_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+------------+
|           HV0003|              B02765|2021-01-01 11:42:02|2021-01-01 12:00:51|          85|          11|   null| 2021-01-01|  2021-01-01|
|           HV0003|              B02869|2021-01-01 03:31:07|2021-01-01 03:38:10|         129|          82|   null| 2021-01-01|  2021-01-01|
|           HV0003|              B02872|2021-01-01 17:59:21|2021-01-01 18:17:29|          14|          26|   null| 2021-01-01|  2021-01-01|
|           HV0003|              B02869|2021-01-01 21:18:30|2021-01-01 21:27:55|          39|          39|   null| 2021-01-01|  2021-01-01|
|           HV0003| 

In [19]:
df \
    .select('*') \
    .filter(df.hvfhs_license_num == 'HV0003') \
    .show(n=5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|           HV0003|              B02765|2021-01-01 11:42:02|2021-01-01 12:00:51|          85|          11|   null| 2021-01-01|
|           HV0003|              B02869|2021-01-01 03:31:07|2021-01-01 03:38:10|         129|          82|   null| 2021-01-01|
|           HV0003|              B02872|2021-01-01 17:59:21|2021-01-01 18:17:29|          14|          26|   null| 2021-01-01|
|           HV0003|              B02869|2021-01-01 21:18:30|2021-01-01 21:27:55|          39|          39|   null| 2021-01-01|
|           HV0003|              B02879|2021-01-01 21:58:38|2021-01-01 22:05:07|          20|          20|   nu

In [None]:
# making use of Pyspark User-Defined Functions (UDF)

In [36]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [30]:
crazy_stuff('B02884')

's/b44'

In [37]:
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [38]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  s/acd| 2021-01-01|  2021-01-01|          85|          11|
|  e/b35| 2021-01-01|  2021-01-01|         129|          82|
|  e/b38| 2021-01-01|  2021-01-01|          14|          26|
|  e/b35| 2021-01-01|  2021-01-01|          39|          39|
|  e/b3f| 2021-01-01|  2021-01-01|          20|          20|
|  s/b3d| 2021-01-02|  2021-01-02|         236|         220|
|  s/acd| 2021-01-01|  2021-01-01|          31|          32|
|  a/b37| 2021-01-01|  2021-01-01|          36|          36|
|  e/b47| 2021-01-01|  2021-01-01|         260|           7|
|  e/9ce| 2021-01-02|  2021-01-02|          92|         171|
|  e/acc| 2021-01-02|  2021-01-02|         158|         125|
|  e/9ce| 2021-01-02|  2021-01-02|         114|          14|
|  e/b35| 2021-01-01|  2021-01-01|          32|          42|
|  e/9ce| 2021-01-02|  2