##Import library needed

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

#Create Spark Session

In [13]:
#Create a spark session
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [14]:
df = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("fhv_tripdata_2019-01.csv")
print("Here is our inferred schema:")
df.printSchema()



Here is our inferred schema:
root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



                                                                                

In [15]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00001|2019-01-01 00:30:00|2019-01-01 02:51:55|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:45:00|2019-01-01 00:54:49|        null|        null|   null|                B00001|
|              B00001|2019-01-01 00:15:00|2019-01-01 00:54:52|        null|        null|   null|                B00001|
|              B00008|2019-01-01 00:19:00|2019-01-01 00:39:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:27:00|2019-01-01 00:37:00|        null|        null|   null|                B00008|
|              B00008|2019-01-01 00:48:0

In [16]:
df.createOrReplaceTempView("tripData")

In [17]:
sqlDF = spark.sql("SELECT DISTINCT dispatching_base_num,  pickup_datetime, dropoff_datetime FROM \
tripData where PULocationID is not null and DOLocationID ='264' and dispatching_base_num= 'B02182' \
GROUP BY dispatching_base_num,  pickup_datetime, dropoff_datetime ORDER BY pickup_datetime ASC")
sqlDF.show()



+--------------------+---------------+----------------+
|dispatching_base_num|pickup_datetime|dropoff_datetime|
+--------------------+---------------+----------------+
+--------------------+---------------+----------------+



In [18]:
df_distinct = spark.sql("SELECT DISTINCT SR_Flag from tripData where PULocationID IS NOT NULL and DOLocationID is NOT NULL ")
df_distinct.show()



+-------+
|SR_Flag|
+-------+
|     26|
|     27|
|     12|
|     22|
|   null|
|      1|
|     13|
|      6|
|     16|
|      3|
|     20|
|      5|
|     19|
|     15|
|      9|
|     17|
|      4|
|      8|
|     23|
|      7|
+-------+
only showing top 20 rows



                                                                                

In [19]:
sqlDF.write.parquet("spark_write_parquet.parquet")

                                                                                

In [20]:
df_parquet = spark.read.parquet("spark_write_parquet.parquet")
df_parquet.show()

+--------------------+---------------+----------------+
|dispatching_base_num|pickup_datetime|dropoff_datetime|
+--------------------+---------------+----------------+
+--------------------+---------------+----------------+



In [21]:
sqlTripData = spark.sql("SELECT dispatching_base_num, PUlocationID, DOlocationID, Affiliated_base_number \
FROM tripData WHERE PULocationID IS NOT NULL and DOLocationID is NOT NULL AND pickup_datetime >= '2019-01-01' \
AND pickup_datetime <='2019-01-10' GROUP BY dispatching_base_num, PUlocationID, DOlocationID, Affiliated_base_number \
ORDER BY dispatching_base_num")
sqlTripData.show()

[Stage 20:>                                                         (0 + 4) / 4]

+--------------------+------------+------------+----------------------+
|dispatching_base_num|PUlocationID|DOlocationID|Affiliated_base_number|
+--------------------+------------+------------+----------------------+
|     B00021         |          56|          70|       B00021         |
|     B00021         |           7|         202|       B00021         |
|     B00021         |          92|          92|       B00021         |
|     B00021         |         223|         146|       B00021         |
|     B00021         |          56|          56|       B00021         |
|     B00021         |           7|         140|       B00021         |
|     B00021         |         129|         260|       B00021         |
|     B00021         |          83|         129|       B00021         |
|     B00021         |         196|          56|       B00021         |
|     B00021         |         173|          83|       B00021         |
|     B00021         |          82|          93|       B00021   

                                                                                