In [1]:
# install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
# import library
from pyspark.sql import SparkSession
# from pyspark.sql import Row
from pyspark.sql.functions import col

In [3]:
#Create a spark session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

Extract

In [4]:
# read csv
df = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("/content/fhv_tripdata_2019-01.csv")
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



Transformation process

In [5]:
# fungsi filter nan berdasarkan nama kolom
def filter_nan (dataframe, col_name:list):
    return dataframe.na.drop(subset=col_name)

# fungsi filter tanggal
def filter_date (dataframe, col_name, start_date, end_date):
    return dataframe.filter((col(col_name)  >= start_date) \
                            & (col(col_name)  <= end_date))


In [6]:
# filter nan pada kolom 'PUlocationID' dan 'DOlocationID'
df = filter_nan(df, ['PUlocationID','DOlocationID'])

# filter tanggal
df = filter_date(df, 'pickup_datetime', '2019-01-01', '2019-01-10')

In [7]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00254|2019-01-01 00:33:03|2019-01-01 01:37:24|         140|          52|   null|                B02356|
|              B00254|2019-01-01 00:03:00|2019-01-01 00:34:25|         141|         237|   null|                B00254|
|              B00254|2019-01-01 00:45:48|2019-01-01 01:26:01|         237|         236|   null|                B00254|
|              B00254|2019-01-01 00:37:39|2019-01-01 01:44:59|         162|          85|   null|                B00254|
|              B00254|2019-01-01 00:35:06|2019-01-01 01:30:21|         237|         246|   null|                B00254|
|              B00254|2019-01-01 00:55:2

In [8]:
df.toPandas()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00254,2019-01-01 00:33:03,2019-01-01 01:37:24,140,52,,B02356
1,B00254,2019-01-01 00:03:00,2019-01-01 00:34:25,141,237,,B00254
2,B00254,2019-01-01 00:45:48,2019-01-01 01:26:01,237,236,,B00254
3,B00254,2019-01-01 00:37:39,2019-01-01 01:44:59,162,85,,B00254
4,B00254,2019-01-01 00:35:06,2019-01-01 01:30:21,237,246,,B00254
...,...,...,...,...,...,...,...
5338271,B02531,2019-01-10 00:00:00,2019-01-10 01:00:00,185,197,,B02531
5338272,B02765,2019-01-10 00:00:00,2019-01-10 00:08:17,145,112,,B02765
5338273,B02765,2019-01-10 00:00:00,2019-01-10 01:26:29,148,265,,B02765
5338274,B02800,2019-01-10 00:00:00,2019-01-10 00:10:04,90,79,,B02870


LOAD

In [11]:
# write data to parquet
df.write.mode("overwrite").parquet("clean_data_parquet.parquet")

# write data to json
df.write.mode("overwrite").json("clean_data_json.json")

Read

In [12]:
spark.read.parquet("clean_data_parquet.parquet").show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02869|2019-01-04 12:36:08|2019-01-04 12:52:16|         173|          82|   null|                B02869|
|              B02869|2019-01-04 12:54:51|2019-01-04 13:01:08|         164|          48|   null|                B02869|
|              B02869|2019-01-04 12:29:34|2019-01-04 12:47:08|         234|         237|   null|                B02869|
|              B02869|2019-01-04 12:57:00|2019-01-04 13:12:23|         237|         100|   null|                B02869|
|              B02869|2019-01-04 12:19:43|2019-01-04 12:44:38|         180|         145|   null|                B02869|
|              B02869|2019-01-04 12:00:1