## Creating a Delta Table in Gold Zone with the Following Details:

### 1. Fetching Statistics by Month and Year:
   - Fetching the highest number of rides per driver by month.
   - Identifying the highest number of trips and the highest-spent customer by both month and year.

### 2. Identifying the Top-Rated Driver:
   - Fetching the top-rated driver for the entire year.

### 3. Analyzing Customer Behavior:
   - Determining the highest-spent customer.
   - Identifying the customer with the highest distance traveled.


---

In [None]:
from pyspark.sql.functions import broadcast
import pyspark.sql.functions as F

In [None]:
# Read data from Delta Lake tables into DataFrames
df1 = spark.read.load("/mnt/Deltalake/silver_Zone/Trip_Transactions_Fact")
df2 = spark.read.load("/mnt/Deltalake/silver_Zone/Customer_Dimension")
df3 = spark.read.load("/mnt/Deltalake/silver_Zone/driver_Dimension")
df4 = spark.read.load("/mnt/Deltalake/silver_Zone/Date_Dimension")

# Perform a broadcast join between df1 and df2 on the "customer_id" and "Customer_id" columns, respectively
df5 = df1.join(broadcast(df2), df1.customer_id == df2.Customer_id)

# Get the number of partitions in the resulting DataFrame df5
df5.rdd.getNumPartitions()


Out[7]: 3

In [None]:
# Select specific columns from the DataFrame df5 and create a new DataFrame df5
df5 = df5.select("trip_id", "Customer_Name", "customer_age", "customer_gender", "Trip_Date", "driver_id", "total_distance", "total_fare", "driver_id")

# Perform a broadcast join between df5 and df3 on the "driver_id" and "driver_id" columns, respectively
df6 = df5.join(broadcast(df3), df3.driver_id == df5.driver_id)

# Select specific columns from the DataFrame df6 and create a new DataFrame df6
df6 = df6.select("trip_id", "Customer_Name", "customer_age", "customer_gender", "Trip_Date", "total_distance", "total_fare", "driver_name", "driver_age", "driver_gender")

# Perform a join between df6 and df4 on the "Trip_Date" and "date" columns, respectively
df7 = df6.join(df4, df4.date == df6.Trip_Date)

# Create or replace a temporary view named "df7" for further analysis or querying
df7.createOrReplaceTempView("df7")


Highest Spent & Highest distance travelled by Customer

In [None]:
# Perform a SQL query on the DataFrame df7 to calculate customer spending and distance-related metrics
df_customer_spent_distance = spark.sql("""
    SELECT
        customer_name,
        rank_total_distance,
        rank_total_fare,
        total_distance,
        total_fare,
        rank_trips_count,
        CONCAT(month, year) AS month_year,
        trips_count
    FROM (
        SELECT
            customer_name,
            RANK() OVER (PARTITION BY month, year ORDER BY total_distance DESC) AS rank_total_distance,
            RANK() OVER (PARTITION BY month, year ORDER BY total_fare DESC) AS rank_total_fare,
            total_distance,
            total_fare,
            month,
            year,
            RANK() OVER (PARTITION BY month, year ORDER BY trips_count DESC) AS rank_trips_count,
            trips_count
        FROM (
            SELECT
                customer_name,
                SUM(total_fare) AS total_fare,
                month,
                year,
                SUM(total_distance) AS total_distance,
                COUNT(trip_id) AS trips_count
            FROM df7
            GROUP BY customer_name, month, year
        )
    )
    WHERE rank_total_distance = 1 OR rank_total_fare = 1 OR rank_trips_count = 1
    ORDER BY CONCAT(month, year)
""")


Highest Rating & Highest Trips travelled by Driver

In [None]:
# Perform a SQL query on the DataFrame df7 to calculate driver-related trip metrics
df_driver_trips = spark.sql("""
    SELECT
        driver_name,
        CONCAT(month, year) AS year_Month,
        count_trips
    FROM (
        SELECT
            driver_name,
            RANK() OVER (PARTITION BY month, year ORDER BY count_trips DESC) AS rank_count_trips,
            month,
            year,
            count_trips
        FROM (
            SELECT
                driver_name,
                month,
                year,
                COUNT(trip_id) AS count_trips
            FROM df7
            GROUP BY driver_name, month, year
        )
    )
    WHERE rank_count_trips = 1
    ORDER BY CONCAT(month, year)
""")


In [None]:
# Read data from a Delta Lake table into a DataFrame df8
df8 = spark.read.load("/mnt/Deltalake/Bronze/Rewards_Points")

# Perform an inner join between df8 and df7 on the "trip_id" column and create a new DataFrame df9
df9 = df8.join(df7, df7.trip_id == df8.trip_id)

# Group by "Customer_Name," "Month," and "year" columns, summing up the "customer_rating" for each group, and create a new DataFrame df10
df10 = df9.groupBy("Customer_Name", "Month", "year").sum("customer_rating")

# Rename the summed column to "Customer_Rating" in the DataFrame df11
df11 = df10.withColumnRenamed("sum(customer_rating)", "Customer_Rating")

# Group by "driver_name," "Month," and "year" columns, summing up the "driver_rating" for each group, and create a new DataFrame df12
df12 = df9.groupBy("driver_name", "Month", "year").sum("driver_rating")

# Rename the summed column to "driver_Rating" in the DataFrame df13
df13 = df12.withColumnRenamed("sum(driver_rating)", "driver_Rating")

# Add a new column "rank_driver_rating" using the rank function over the partition of "Month" and "Year" based on "driver_rating," and create a new DataFrame df14
df14 = df13.withColumn("rank_driver_rating", F.expr("rank() over(partition by concat(month, Year) order by driver_rating desc)"))

# Filter df14 to include only the rows where "rank_driver_rating" is equal to 1, and create a new DataFrame df15
df15 = df14.filter(df14.rank_driver_rating == 1)


In [None]:
df15.display()

driver_name,Month,year,driver_Rating,rank_driver_rating
Kapil,1,2023,124,1
Aaditya,2,2023,87,1
Kumar,3,2023,121,1
Nikshit,4,2023,115,1
Sam,5,2023,122,1


In [None]:
# Group by "customer_name," "month," and "year" columns, summing up the "customer_rating" for each group, and create a new DataFrame df12
df12 = df11.groupBy("customer_name", "month", "year").sum("customer_rating")

# Rename the summed column to "customer_rating" in the DataFrame df12
df12 = df12.withColumnRenamed("sum(customer_rating)", "customer_rating")

# Add a new column "rank_customer_rating" using the rank function over the partition of "month" and "year" based on "customer_rating," and create a new DataFrame df13
df13 = df12.withColumn("rank_customer_rating", F.expr("rank() over(partition by concat(month, Year) order by customer_rating desc)"))

# Filter df13 to include only the rows where "rank_customer_rating" is equal to 1, and create a new DataFrame df15
df15 = df13.filter(df13.rank_customer_rating == 1)


In [None]:
df15.display()

customer_name,month,year,customer_rating,rank_customer_rating
ABC2,1,2023,124,1
ABC6,2,2023,89,1
ABC32,3,2023,120,1
ABC3,4,2023,116,1
ABC0,5,2023,120,1


In [None]:
# Save the DataFrame df_customer_spent_distance as a table named "df_customer_spent_distance"
df_customer_spent_distance.write.saveAsTable("df_customer_spent_distance")
