## Import Library

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

## Create spark session

In [2]:
#Create a spark session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

## Extract csv data to data frame

In [3]:
df = spark.read.option("header","true").option("interSchema","true")\
    .csv("fhv_tripdata_2019-01.csv")
print("Here is our inferred schema:")
df.printSchema()

Here is our inferred schema:
root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropOff_datetime: string (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [4]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00001|2019-01-01 00:30:00|2019-01-01 02:51:55|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:45:00|2019-01-01 00:54:49|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:15:00|2019-01-01 00:54:52|        null|        null|   null|                B00001|
|              B00008|2019-01-01 00:19:00|2019-01-01 00:39:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:27:00|2019-01-01 00:37:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:48:0

## Create Temporary Table in order can run SQL in spark 

In [5]:
df.createOrReplaceTempView("tripData")

## Filter data dengan pickup_datetime mulai dari 1 Jan 2019 sampai 10 Jan 2019

In [6]:
trip_sql = spark.sql("SELECT * FROM tripData WHERE PUlocationID is not null AND DOlocationID is not null\
    AND pickup_datetime >= '2019-01-01 00:00:00' AND pickup_datetime <= '2019-10-01 23:59:00'")
trip_sql.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00254|2019-01-01 00:33:03|2019-01-01 01:37:24|         140|          52|   null|                B02356|
|              B00254|2019-01-01 00:03:00|2019-01-01 00:34:25|         141|         237|   null|                B00254|
|              B00254|2019-01-01 00:45:48|2019-01-01 01:26:01|         237|         236|   null|                B00254|
|              B00254|2019-01-01 00:37:39|2019-01-01 01:44:59|         162|          85|   null|                B00254|
|              B00254|2019-01-01 00:35:06|2019-01-01 01:30:21|         237|         246|   null|                B00254|
|              B00254|2019-01-01 00:55:2

In [None]:
trip_pandas = trip_sql.toPandas()



## Export Data to parquet and json

In [12]:
trip_sql.write.parquet("trip_parquet.parquet")

In [10]:
trip_sql.write.json("trip_json.json")

## READ 

In [14]:
read_parquet = spark.read.parquet("trip_parquet.parquet")
read_parquet.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02872|2019-01-10 20:37:03|2019-01-10 20:41:47|         121|         135|   null|                B02872|
|              B02872|2019-01-10 20:49:30|2019-01-10 21:00:32|         135|          98|      1|                B02872|
|              B02872|2019-01-10 20:54:42|2019-01-10 21:05:30|         192|         175|      2|                B02872|
|              B02872|2019-01-10 20:01:53|2019-01-10 20:47:28|          96|          85|      3|                B02872|
|              B02872|2019-01-10 20:11:17|2019-01-10 20:38:44|          63|         188|      4|                B02872|
|              B02872|2019-01-10 20:24:5