In [366]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, row_number, struct
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from itertools import chain
from pyspark.sql import DataFrameWriter
from pyspark.sql.types import StringType, MapType, IntegerType
mapCol = MapType(IntegerType(),StringType(),False)

#spark=SparkSession.builder.appName("dataETL").getOrCreate()
spark = SparkSession.builder \
    .appName("PostgreSQL Connection with PySpark") \
    .getOrCreate()

spark.conf.set('spark.driver.class', 'org.postgresql.Driver')

In [367]:
uberData=spark.read.parquet("C:/Users/hgarg/OneDrive/Desktop/PySpark/yellow_tripdata_2022-01.parquet")

In [368]:

#Adding a new Column as index
uberRideData = uberData.withColumn("UberId", row_number().over(Window.partitionBy("VendorId", "tpep_pickup_datetime").orderBy("VendorId","tpep_pickup_datetime")))
uberRideData.persist()

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, UberId: int]

In [369]:

#PickUp and DropOff time details
weekDays = {
    1: "Monday",
    2: "Tuesday",
    3: "Wednesday",
    4: "Thursday",
    5: "Friday",
    6: "Saturday",
    7: "Sunday"
}
months = {
    1: "January",
    2: "February",
    3: "March",
    4: "April",
    5: "May",
    6: "June",
    7: "July",
    8: "August",
    9: "September",
    10: "October",
    11: "November",
    12: "December"
}
mapWeek = F.create_map(*[F.lit(x) for x in chain(*weekDays.items())])
mapMonth = F.create_map(*[F.lit(x) for x in chain(*months.items())])
pickDropDetails = uberRideData.select("tpep_pickup_datetime", "tpep_dropoff_datetime").distinct()
pickDropDf = pickDropDetails.withColumn("UberPickUpId", monotonically_increasing_id())\
    .withColumn("pickUpHour", F.hour(col("tpep_pickup_datetime"))).withColumn("pickUpMin", F.minute(col("tpep_pickup_datetime")))\
    .withColumn("pickUpSec", F.second(col("tpep_pickup_datetime"))).withColumn("pickUpDay", mapWeek[F.dayofweek(col("tpep_pickup_datetime"))])\
    .withColumn("pickUpMonth", mapMonth[F.month(col("tpep_pickup_datetime"))]).withColumn("pickUpYear", F.year(col("tpep_pickup_datetime")))\
    .withColumn("dropOffHour", F.hour(col("tpep_dropoff_datetime"))).withColumn("dropOffMin", F.minute(col("tpep_dropoff_datetime")))\
    .withColumn("dropOffSec", F.second(col("tpep_dropoff_datetime"))).withColumn("dropOffDay", mapWeek[F.dayofweek(col("tpep_dropoff_datetime"))])\
    .withColumn("dropOffMonth", mapMonth[F.month(col("tpep_dropoff_datetime"))]).withColumn("dropOffYear", F.year(col("tpep_dropoff_datetime")))


In [370]:
#Passenger Details
passengerDetails = uberRideData.select("passenger_count").distinct().filter(col("passenger_count").isNotNull())
passengerDetailsDF = passengerDetails.withColumn("UberPassengerId", monotonically_increasing_id())\
    .withColumn("passengersCcount", col("passenger_count").cast("integer"))

In [371]:
#Trip Details
tripDetails = uberRideData.select("trip_distance").distinct().filter(col("trip_distance").isNotNull())
tripDetailsDF = tripDetails.withColumn("UberTripId", monotonically_increasing_id())\
    .withColumnRenamed("trip_distance", "TripDistance(km)")

In [372]:
#Creating Rate Card Table
from itertools import chain
rateDic = {
    1: "Standard Rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or WestChester",
    5: "Negotiated Fair",
    6: "Group Ride",
    99: "Premium"}

rateMap = F.create_map(*[F.lit(x) for x in chain(*rateDic.items())])

tripRateDetails = uberRideData.select("RateCodeId").distinct().filter(col("RateCodeId").isNotNull())
tripRateDetailsDf = tripRateDetails.withColumn("UberFairId", monotonically_increasing_id())\
    .withColumn("RateCodeId", col("RateCodeId").cast("integer"))

fairCardDetails = tripRateDetailsDf.withColumn("FairId", rateMap[col("RateCodeId")])

In [373]:
#Payment Mode Storage
from itertools import chain
paymentMode = {
    0: "Credit Card",
    1: "Cash",
    2: "No Charge",
    3: "Dispute",
    4: "Unknown",
    5: "Voided Trip"}

paymentModeMap = F.create_map(*[F.lit(x) for x in chain(*paymentMode.items())])

paymentModeDetails = uberRideData.select("payment_Type").distinct().filter(col("payment_Type").isNotNull())
paymentModeDetailsDf = paymentModeDetails.withColumn("UberPaymentId", monotonically_increasing_id())\
    .withColumn("paymentModeId", col("payment_Type").cast("integer"))

paymentModeDetailsFinalDf = paymentModeDetailsDf.withColumn("paymentMode", paymentModeMap[col("payment_Type")])\
    .select("UberPaymentId", "paymentModeId", "paymentMode")

In [374]:
#Location Table
locationDetails = uberRideData.selectExpr("PULocationID as PickUpLocationId", "DOLocationID as DropOffLocationId")\
    .distinct().filter(col("PULocationID").isNotNull()).distinct().filter(col("DOLocationID").isNotNull())
locationDetailsDf = locationDetails.withColumn( "UberLocationId", monotonically_increasing_id())


In [375]:
#Joining all the dataframes
uberRideDataUpdated = uberRideData.withColumn("tripFair", struct(col("fare_amount"), col("extra"), col("mta_tax"), col("tip_amount"), col("tolls_amount"), col("improvement_surcharge")))\
    .drop("fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge")
finalFactRawData = uberRideDataUpdated.join(pickDropDf, uberRideData["UberId"] == pickDropDf["UberPickUpId"], "left")\
    .join(passengerDetailsDF, uberRideData["UberId"] == passengerDetailsDF["UberPassengerId"], "left")\
    .join(tripDetailsDF, uberRideData["UberId"] == tripDetailsDF["UberTripId"], "left")\
    .join(fairCardDetails, uberRideData["RateCodeId"] == fairCardDetails["RateCodeId"], "left")\
    .join(paymentModeDetailsFinalDf, uberRideData["payment_Type"] == paymentModeDetailsFinalDf["paymentModeId"], "left")\
    .join(locationDetailsDf, uberRideData["UberId"] == locationDetailsDf["UberLocationId"], "left")

finalData = finalFactRawData.select("UberId", "pickUpHour", "pickUpMonth", "pickUpDay", "pickUpYear","dropoffHour", "dropoffMonth",
                     "dropoffDay", "dropoffYear", "passengersCcount", "trip_distance", "FairId", "paymentMode", "PickUpLocationId",
                     "DropOffLocationId", "total_amount", "tripFair")


In [376]:
finalData.write\
    .format("jdbc")\
    .option("url", "jdbc:postgresql://localhost/Airline")\
    .option("dbtable", "UberDataAnalyst")\
    .option("user", "postgres")\
    .option("password", "xxxxxxxx")\
    .save()

Py4JJavaError: An error occurred while calling o6432.save.
: java.sql.SQLException: No suitable driver
	at java.sql.DriverManager.getDriver(DriverManager.java:315)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:107)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:107)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:229)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:233)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.GeneratedMethodAccessor406.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
