In [0]:
# Read the tripdetail.json file with the schema having the below mentioned fields
import requests

url = "https://raw.githubusercontent.com/Pavithrabhaskaran/Test/main/tripdetail_json.json"
data = requests.get(url).json()

df = spark.read.json(sc.parallelize([data]))
df.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|          43|         142|         1|       1|                 2.5|  3.0|        8.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|        11.8|  2021-01-01 00:36:12|

In [0]:
data = spark.read.option("multiline","true").json("/FileStore/tables/tripdetail_json.json")
data.show()
data.display()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+
|          43|         142|         1|       1|                 2.5|  3.0|        8.0|                  0.3|    0.5|              1|           2|                 N|       0.0|         0.0|        11.8|  2021-01-01 00:36:12|

DOLocationID,PULocationID,RatecodeID,VendorID,congestion_surcharge,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance
43,142.0,1,1,2.5,3.0,8.0,0.3,0.5,1,2.0,N,0.0,0.0,11.8,2021-01-01 00:36:12,2021-01-01 00:30:10,2.1
151,238.0,1,1,0.0,0.5,3.0,0.3,0.5,1,2.0,N,0.0,0.0,4.3,2021-01-01 00:52:19,2021-01-01 00:51:20,0.2
165,132.0,1,1,0.0,0.5,42.0,0.3,0.5,1,1.0,N,8.65,0.0,51.95,2021-01-01 01:11:06,2021-01-01 00:43:30,14.7
132,138.0,1,1,0.0,0.5,29.0,0.3,0.5,0,1.0,N,6.05,0.0,36.35,2021-01-01 00:31:01,2021-01-01 00:15:48,10.6
33,68.0,1,2,2.5,0.5,16.5,0.3,0.5,1,1.0,N,4.06,0.0,24.36,2021-01-01 00:48:21,2021-01-01 00:31:49,4.94
68,224.0,1,1,2.5,3.0,8.0,0.3,0.5,1,1.0,N,2.35,0.0,14.15,2021-01-01 00:24:30,2021-01-01 00:16:29,1.6
157,95.0,1,1,0.0,0.5,16.0,0.3,0.5,1,2.0,N,0.0,0.0,17.3,2021-01-01 00:17:28,2021-01-01 00:00:28,4.1
40,90.0,1,1,2.5,3.0,18.0,0.3,0.5,1,2.0,N,0.0,0.0,21.8,2021-01-01 00:30:34,2021-01-01 00:12:29,5.7
129,97.0,1,1,0.0,0.5,27.5,0.3,0.5,1,4.0,N,0.0,,28.8,2021-01-01 01:00:13,2021-01-01 00:39:16,9.1
142,263.0,1,1,2.5,3.0,12.0,0.3,0.5,2,1.0,N,3.15,0.0,18.95,2021-01-01 00:39:46,2021-01-01 00:26:12,2.7


In [0]:
# Build a temp view on top of the data frame
df.createOrReplaceTempView("Temp_tab")

In [0]:
# Write spark sql query to aggregate fare_amount based on Vendor Id. [In SQL]
spark.sql("select VendorID, sum(fare_amount) from Temp_tab group by VendorID").show()

+--------+------------------+
|VendorID|  sum(fare_amount)|
+--------+------------------+
|       1|2230.1800000000003|
|       2|            4128.0|
+--------+------------------+



In [0]:
# Write spark sql query to aggregate fare_amount based on Vendor Id. [In Pyspark]
from pyspark.sql.functions import sum
df.groupBy(col("VendorID")).agg(sum(col("fare_amount"))).show()

+--------+------------------+
|VendorID|  sum(fare_amount)|
+--------+------------------+
|       1|2230.1800000000003|
|       2|            4128.0|
+--------+------------------+



In [0]:
# Convert the tpep_pickup_datetime, tpep_dropoff_datetime both columns into IST from PST and add as a seperate column with _IST suffix.
from pyspark.sql.functions import *
df2 = df.withColumn("tpep_pickup_datetime_IST", df.tpep_pickup_datetime + expr("interval 12 hours 30 minutes"))
df2 = df2.withColumn("tpep_dropoff_datetime_IST", df.tpep_dropoff_datetime + expr("interval 12 hours 30 minutes"))

In [0]:
df2.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|tpep_pickup_datetime_IST|tpep_dropoff_datetime_IST|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+
|          43|         142|         1|       1|                 2.5|  

In [0]:
df2.select("tpep_pickup_datetime","tpep_pickup_datetime_IST","tpep_dropoff_datetime","tpep_dropoff_datetime_IST").show()

+--------------------+------------------------+---------------------+-------------------------+
|tpep_pickup_datetime|tpep_pickup_datetime_IST|tpep_dropoff_datetime|tpep_dropoff_datetime_IST|
+--------------------+------------------------+---------------------+-------------------------+
| 2021-01-01 00:30:10|     2021-01-01 13:00:10|  2021-01-01 00:36:12|      2021-01-01 13:06:12|
| 2021-01-01 00:51:20|     2021-01-01 13:21:20|  2021-01-01 00:52:19|      2021-01-01 13:22:19|
| 2021-01-01 00:43:30|     2021-01-01 13:13:30|  2021-01-01 01:11:06|      2021-01-01 13:41:06|
| 2021-01-01 00:15:48|     2021-01-01 12:45:48|  2021-01-01 00:31:01|      2021-01-01 13:01:01|
| 2021-01-01 00:31:49|     2021-01-01 13:01:49|  2021-01-01 00:48:21|      2021-01-01 13:18:21|
| 2021-01-01 00:16:29|     2021-01-01 12:46:29|  2021-01-01 00:24:30|      2021-01-01 12:54:30|
| 2021-01-01 00:00:28|     2021-01-01 12:30:28|  2021-01-01 00:17:28|      2021-01-01 12:47:28|
| 2021-01-01 00:12:29|     2021-01-01 12

In [0]:
# Add addition column TravelTime by finding the difference between tpep_pickup_datetime and tpep_dropoff_datetime dates
df3 = df2.withColumn("TravelTime",(to_timestamp(col("tpep_dropoff_datetime")).cast("long"))- to_timestamp(col("tpep_pickup_datetime")).cast("long"))
df3.show()

+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+----------+
|DOLocationID|PULocationID|RatecodeID|VendorID|congestion_surcharge|extra|fare_amount|improvement_surcharge|mta_tax|passenger_count|payment_type|store_and_fwd_flag|tip_amount|tolls_amount|total_amount|tpep_dropoff_datetime|tpep_pickup_datetime|trip_distance|tpep_pickup_datetime_IST|tpep_dropoff_datetime_IST|TravelTime|
+------------+------------+----------+--------+--------------------+-----+-----------+---------------------+-------+---------------+------------+------------------+----------+------------+------------+---------------------+--------------------+-------------+------------------------+-------------------------+----------+
|          43|         142|         1

In [0]:
# Write the dataframe into csv files (as 3 partition)
df4 = df3.coalesce(1).write.partitionBy("VendorID").mode("overwrite").format("csv").option("header","true").save("dbfs:/FileStore/mk/VendorID_CSV")
display(df4)

In [0]:
newDF = df3.repartition(3)
display(newDF)
newDF.write.format("csv").option("header","true").save("/FileStore/tables/Data.csv")

DOLocationID,PULocationID,RatecodeID,VendorID,congestion_surcharge,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,tpep_pickup_datetime_IST,tpep_dropoff_datetime_IST,TravelTime
164,164.0,1,2,2.5,0.5,4.0,0.3,0.5,2,2.0,N,0.0,0.0,7.8,2021-01-01 00:15:39,2021-01-01 00:12:07,0.37,2021-01-01 12:42:07,2021-01-01 12:45:39,212
238,239.0,1,2,2.5,0.5,4.0,0.3,0.5,1,1.0,N,1.0,0.0,8.8,2021-01-01 00:53:27,2021-01-01 00:51:21,0.69,2021-01-01 13:21:21,2021-01-01 13:23:27,126
263,90.0,1,2,2.5,0.5,10.5,0.3,0.5,1,1.0,N,3.58,0.0,17.88,2021-01-01 00:36:42,2021-01-01 00:26:59,2.79,2021-01-01 12:56:59,2021-01-01 13:06:42,583
166,238.0,1,2,0.0,0.5,4.5,0.3,0.5,1,2.0,N,0.0,0.0,5.8,2021-01-01 00:38:29,2021-01-01 00:36:08,0.76,2021-01-01 13:06:08,2021-01-01 13:08:29,141
79,230.0,1,2,2.5,0.5,18.0,0.3,0.5,1,1.0,N,4.36,0.0,26.16,2021-01-01 00:53:48,2021-01-01 00:27:12,3.45,2021-01-01 12:57:12,2021-01-01 13:23:48,1596
264,264.0,1,2,0.0,0.5,22.5,0.3,0.5,5,2.0,N,0.0,0.0,23.8,2021-01-01 01:02:57,2021-01-01 00:42:14,7.04,2021-01-01 13:12:14,2021-01-01 13:32:57,1243
41,143.0,1,1,2.5,3.0,11.5,0.3,0.5,1,2.0,Y,0.0,0.0,15.3,2021-01-01 01:10:48,2021-01-01 00:59:35,3.3,2021-01-01 13:29:35,2021-01-01 13:40:48,673
227,164.0,1,1,2.5,3.0,34.0,0.3,0.5,2,4.0,N,0.0,0.0,37.8,2021-01-01 00:45:44,2021-01-01 00:16:58,11.1,2021-01-01 12:46:58,2021-01-01 13:15:44,1726
113,90.0,1,2,2.5,0.5,5.5,0.3,0.5,1,1.0,N,2.79,0.0,12.09,2021-01-01 00:52:02,2021-01-01 00:47:05,0.94,2021-01-01 13:17:05,2021-01-01 13:22:02,297
166,143.0,1,1,2.5,3.0,11.5,0.3,0.5,3,2.0,N,0.0,0.0,15.3,2021-01-01 00:23:46,2021-01-01 00:09:30,2.3,2021-01-01 12:39:30,2021-01-01 12:53:46,856


In [0]:
# write the output into single CSV file.
df3.coalesce(1).write.format("csv").option("header","true").save("dbfs:/FileStore/mk/Trip_CSV")

In [0]:
display(df3)

DOLocationID,PULocationID,RatecodeID,VendorID,congestion_surcharge,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,tpep_pickup_datetime_IST,tpep_dropoff_datetime_IST,TravelTime
43,142.0,1,1,2.5,3.0,8.0,0.3,0.5,1,2.0,N,0.0,0.0,11.8,2021-01-01 00:36:12,2021-01-01 00:30:10,2.1,2021-01-01 13:00:10,2021-01-01 13:06:12,362
151,238.0,1,1,0.0,0.5,3.0,0.3,0.5,1,2.0,N,0.0,0.0,4.3,2021-01-01 00:52:19,2021-01-01 00:51:20,0.2,2021-01-01 13:21:20,2021-01-01 13:22:19,59
165,132.0,1,1,0.0,0.5,42.0,0.3,0.5,1,1.0,N,8.65,0.0,51.95,2021-01-01 01:11:06,2021-01-01 00:43:30,14.7,2021-01-01 13:13:30,2021-01-01 13:41:06,1656
132,138.0,1,1,0.0,0.5,29.0,0.3,0.5,0,1.0,N,6.05,0.0,36.35,2021-01-01 00:31:01,2021-01-01 00:15:48,10.6,2021-01-01 12:45:48,2021-01-01 13:01:01,913
33,68.0,1,2,2.5,0.5,16.5,0.3,0.5,1,1.0,N,4.06,0.0,24.36,2021-01-01 00:48:21,2021-01-01 00:31:49,4.94,2021-01-01 13:01:49,2021-01-01 13:18:21,992
68,224.0,1,1,2.5,3.0,8.0,0.3,0.5,1,1.0,N,2.35,0.0,14.15,2021-01-01 00:24:30,2021-01-01 00:16:29,1.6,2021-01-01 12:46:29,2021-01-01 12:54:30,481
157,95.0,1,1,0.0,0.5,16.0,0.3,0.5,1,2.0,N,0.0,0.0,17.3,2021-01-01 00:17:28,2021-01-01 00:00:28,4.1,2021-01-01 12:30:28,2021-01-01 12:47:28,1020
40,90.0,1,1,2.5,3.0,18.0,0.3,0.5,1,2.0,N,0.0,0.0,21.8,2021-01-01 00:30:34,2021-01-01 00:12:29,5.7,2021-01-01 12:42:29,2021-01-01 13:00:34,1085
129,97.0,1,1,0.0,0.5,27.5,0.3,0.5,1,4.0,N,0.0,,28.8,2021-01-01 01:00:13,2021-01-01 00:39:16,9.1,2021-01-01 13:09:16,2021-01-01 13:30:13,1257
142,263.0,1,1,2.5,3.0,12.0,0.3,0.5,2,1.0,N,3.15,0.0,18.95,2021-01-01 00:39:46,2021-01-01 00:26:12,2.7,2021-01-01 12:56:12,2021-01-01 13:09:46,814
