In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, to_date
from pyspark.sql.types import StringType

spark = SparkSession.builder.master("local").appName("Avenga").getOrCreate()
df = spark.read.format("csv")\
    .option("header", True)\
    .option("inferSchema", True)\
    .option("delimiter", ",")\
    .option("quotes", "|")\
    .load("taxi_tripdata.csv")
df.printSchema()

[Stage 135:>                                                        (0 + 1) / 1]

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



                                                                                

In [42]:
df_col_up = df.withColumn("VendorID", when(df.VendorID == 1,"Creative Mobile Technologies, LLC").when(df.VendorID == 2, "VeriFone Inc"))
df_col_up = df_col_up.withColumn("payment_type", when(df_col_up.payment_type == 1,"Credit card")
                           .when(df_col_up.payment_type == 2, "Cash")
                           .when(df_col_up.payment_type == 3, "No charge")
                           .when(df_col_up.payment_type == 4, "Dispute")
                           .when(df_col_up.payment_type == 5, "Unknown")
                           .when(df_col_up.payment_type == 6, "Voided trip")
                           .otherwise("Unknown"))
df_col_up = df_col_up.withColumn("day_date",to_date('lpep_pickup_datetime'))

In [43]:
df_col_up.createOrReplaceTempView("tableA")
df_payment_rate = spark.sql("SELECT VendorID, payment_type, CAST((SUM(total_amount)/SUM(passenger_count)) AS DOUBLE) AS payment_rate_type  from tableA WHERE VendorID is not null AND total_amount is not null AND passenger_count is not null AND passenger_count > 0 AND day_date >= '2021-07-01' AND day_date <= '2021-07-31' GROUP BY VendorID, payment_type ORDER BY VendorID, payment_type")
df_payment_rate.show()

+--------------------+------------+------------------+
|            VendorID|payment_type| payment_rate_type|
+--------------------+------------+------------------+
|Creative Mobile T...|        Cash|10.528369994359748|
|Creative Mobile T...| Credit card|17.518158357771675|
|Creative Mobile T...|     Dispute| 7.854000000000002|
|Creative Mobile T...|   No charge|  8.83869346733666|
|Creative Mobile T...|     Unknown|             39.55|
|        VeriFone Inc|        Cash| 9.830360790497812|
|        VeriFone Inc| Credit card| 17.06046748629696|
|        VeriFone Inc|     Dispute|-6.911538461538464|
|        VeriFone Inc|   No charge|-5.786298701298686|
+--------------------+------------+------------------+



In [44]:
df1 = spark.sql("SELECT VendorID, day_date, CAST(AVG(total_amount/passenger_count) AS DOUBLE) AS payment_rate_day  from tableA WHERE VendorID is not null AND total_amount is not null AND passenger_count is not null AND passenger_count > 0 AND day_date >= '2021-07-01' AND day_date <= '2021-07-31' GROUP BY VendorID, day_date ORDER BY VendorID, day_date, payment_rate_day")

In [45]:
df1.createOrReplaceTempView("tableB")
df2 = spark.sql("SELECT VendorID, payment_rate_day, LAG(payment_rate_day, 1) OVER(PARTITION BY VendorID ORDER BY VendorID, payment_rate_day) AS pr_payment_rate_day from tableB")

In [46]:
df2.createOrReplaceTempView("tableC")
df3 = spark.sql("SELECT VendorID, payment_rate_day, pr_payment_rate_day,  (payment_rate_day - pr_payment_rate_day)/pr_payment_rate_day AS growth_rate_day from tableC")

In [47]:
df3.createOrReplaceTempView("tableD")
df4 = spark.sql("SELECT VendorID,  CAST(((AVG(growth_rate_day))*100*30) AS DOUBLE) AS percents_to_next_rate from tableD WHERE growth_rate_day is not null GROUP BY VendorID")

In [48]:
df4.show()

+--------------------+---------------------+
|            VendorID|percents_to_next_rate|
+--------------------+---------------------+
|Creative Mobile T...|   24.289626343868626|
|        VeriFone Inc|   12.223834531495921|
+--------------------+---------------------+



In [49]:
df5 = spark.sql("SELECT VendorID, CAST((max(total_amount/passenger_count)) AS DOUBLE) AS max_payment_rate, CAST((SUM(total_amount)/SUM(passenger_count)) AS DOUBLE) AS avg_payment_rate from tableA WHERE VendorID is not null AND total_amount is not null AND passenger_count is not null AND passenger_count > 0 GROUP BY VendorID ORDER BY VendorID")

In [50]:
df5.show()

+--------------------+----------------+------------------+
|            VendorID|max_payment_rate|  avg_payment_rate|
+--------------------+----------------+------------------+
|Creative Mobile T...|          184.21| 14.50976592392627|
|        VeriFone Inc|          480.31|13.994296617912743|
+--------------------+----------------+------------------+



In [51]:
df5.createOrReplaceTempView("tableF")
df4.createOrReplaceTempView("tableE")
df_payment_rate.createOrReplaceTempView("tableG")
df6 = spark.sql("SELECT tablef.VendorID, tableg.payment_type, tableg.payment_rate_type, CAST(tablef.avg_payment_rate*((tablee.percents_to_next_rate*0.01)+1) AS DOUBLE) AS next_payment_rate, tablef.max_payment_rate, tablee.percents_to_next_rate from tablef "
                "LEFT JOIN tablee USING (VendorId)"
                "LEFT JOIN tableg USING (VendorId)")

In [52]:
df6 = df6.withColumn("percents_to_next_rate", df6['percents_to_next_rate'].cast(StringType()))
df6.write.format("csv").option("header",True).mode("overwrite").save("final")
df6.printSchema()

                                                                                

root
 |-- VendorID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_rate_type: double (nullable = true)
 |-- next_payment_rate: double (nullable = true)
 |-- max_payment_rate: double (nullable = true)
 |-- percents_to_next_rate: string (nullable = true)

