In [0]:
"""
This script allows for different transformations while conforming to the factory pattern
"""

'\nThis script allows for different transformations while conforming to the factory pattern\n'

In [0]:
%run "./Extractor"

In [0]:
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
from typing import Dict
import traceback

In [0]:



class Transform:
    def __init__(self):
        pass
    def transform(self,input_df):
        """Abstract class method needs to be defined in all child classes"""
        raise ValueError("Method not defined")




        
        


Bronze to Silver Transformations

In [0]:
class CustTableTransformations(Transform):
    """
    This class provides transformations on the 'customer' table. It extends 
    the base Transform class and applies transformations such as concatenating 
    first and last names into a full name and handling null values in the 'active' column.
    """

    def transform(self, inputDFs: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
        """
        Transforms the input DataFrame for 'customer' by:
        - Creating a 'full_name' column by concatenating the 'first_name' and 'last_name' columns.
        - Dropping the 'first_name' and 'last_name' columns from the DataFrame.
        - Handling null values in the 'active' column by replacing them with 0.

        Parameters:
        inputDFs (dict): Dictionary of input DataFrames. Expected key:
                         - "customer": DataFrame containing customer data.
                         
        Returns:
        dict: The updated inputDFs dictionary with transformations applied to the 
              'customer' DataFrame.
        """
        try:
            # Ensure all values in inputDFs are Spark DataFrames
            for df in inputDFs.values():
                assert isinstance(df, DataFrame), "Non-DataFrame value found in input dictionary."

            # Extract the 'customer' DataFrame from the input dictionary
            customer_df = inputDFs["customer"]

            # Create a new 'full_name' column by concatenating 'first_name' and 'last_name'
            # and then drop the 'first_name' and 'last_name' columns
            customer_df = customer_df.withColumn("full_name", 
                                             sf.concat(sf.col("first_name"), 
                                                       sf.lit(" "), 
                                                       sf.col("last_name"))) \
                                 .drop("first_name", "last_name")

            # Handle null values in the 'active' column by replacing them with 0
            customer_df = customer_df.withColumn("active", 
                                             sf.when(sf.col("active").isNull(), 
                                                     sf.lit(0)).otherwise(sf.col("active")))

            # Update the input dictionary with the transformed 'customer' DataFrame
            inputDFs["customer"] = customer_df

            # Return the updated input DataFrame dictionary
            return inputDFs
        except AssertionError as e:
            print("Error: All values in the input dictionary must be Spark DataFrames.")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}")


In [0]:

class PaymentsTableUnionTransform(Transform):
    """
    A class that transforms a dictionary of Spark DataFrames by unifying all DataFrames
    whose names contain 'payment_' and storing the result in the 'payment' key of the output dictionary.
    The dictionary keys that have 'payment_' are actually partitions of a single table. Hence, we use union to
    create dataframe containing the full data.
    """

    def transform(self, inputDFs: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
        """
        Transforms the input DataFrames by performing a union operation on all DataFrames
        whose names contain 'payment_' and adding the resulting unified DataFrame 
        back into the dictionary under the key 'payment'.

        Args:
            inputDFs (Dict[str, DataFrame]): Dictionary of Spark DataFrames with table names as keys.

        Returns:
            Dict[str, DataFrame]: A transformed dictionary where keys are table names and values are DataFrames.
        """

        try:
            # Ensure all values in inputDFs are Spark DataFrames
            for df in inputDFs.values():
                assert isinstance(df, DataFrame), "Non-DataFrame value found in input dictionary."

            # Filter and union DataFrames whose names contain "payment_"
            union_dfs = [df for name, df in inputDFs.items() if "payment_" in name]
            
            if union_dfs:
                # Perform the union operation if there are DataFrames to union
                union_df = reduce(lambda a, b: a.union(b), union_dfs)

                # Add the unified DataFrame to the dictionary under the 'payment' key
                inputDFs["payment"] = union_df

            # Remove individual 'payment_' DataFrames, keeping the unified DataFrame
            return {name: df for name, df in inputDFs.items() if "payment_" not in name or name == "payment"}

        except AssertionError as e:
            print("Error: All values in the input dictionary must be Spark DataFrames.")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-177671288233407>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mclass[39;00m [38;5;21;01mPaymentsTableUnionTransform[39;00m(Transform):
[1;32m      2[0m [38;5;250m    [39m[38;5;124;03m"""[39;00m
[1;32m      3[0m [38;5;124;03m    A class that transforms a dictionary of Spark DataFrames by unifying all DataFrames[39;00m
[1;32m      4[0m [38;5;124;03m    whose names contain 'payment_' and storing the result in the 'payment' key of the output dictionary.[39;00m
[1;32m      5[0m [38;5;124;03m    The dictionary keys that have 'payment_' are actually partitions of a single table. Hence, we use union to[39;00m
[1;32m      6[0m [38;5;124;03m    create dataframe containing the full data.[39;00m
[1;32m      7[0m [38;5;124;03m    """[39;00m
[1;32m      9[0m     [38;5;28;01mdef


Silver To Gold Layer Transformations

In [0]:
class Top10PercentCustomerTransformation(Transform):
    """
    A transformation class that calculates the top 10% of customers based on total sales and rental frequency.

    Inherits from the Transform class and performs the following steps:
    - Joins customer data with payment and rental data.
    - Aggregates sales and rental frequency by customer.
    - Selects the top 10% customers based on total sales and rental frequency.
    """

    def transform(self, inputDFs: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
        """
        Transforms the input DataFrames to calculate the top 10% customers by total sales and rental frequency.

        Args:
            inputDFs (Dict[str, DataFrame]): A dictionary containing the input DataFrames with keys:
                                             'customer', 'film', 'rental', 'inventory', 'payment'.

        Returns:
            Dict[str, DataFrame]: A dictionary with two DataFrames:
                                  'top_customers_by_sales' for the top 10% customers by sales,
                                  'top_customers_by_frequency' for the top 10% customers by rental frequency.
        """
        try:
            # Ensure all values in inputDFs are Spark DataFrames
            for df in inputDFs.values():
                assert isinstance(df, DataFrame), "Non-DataFrame value found in input dictionary."
            
            # Extract individual DataFrames from the input dictionary
            customer_df = inputDFs["customer"]
            payment_df = inputDFs["payment"]
            rental_df = inputDFs["rental"]


            # Join customer_df with payment_df and retain relevant columns for sales aggregation
            joined_df1 = customer_df.join(payment_df, customer_df["customer_id"] == payment_df["customer_id"], "inner")\
            .drop(payment_df["customer_id"])\
            .filter(sf.col("activebool") == True)
        
            # Join customer_df with rental_df and retain relevant columns for rental frequency aggregation
            joined_df2 = customer_df.join(rental_df, customer_df["customer_id"] == rental_df["customer_id"], "inner").drop(rental_df["customer_id"])\
            .filter(sf.col("activebool") == True)
       
            # Aggregate the sales data by customer_id
            top_cust_by_sales_df = (
                joined_df1
                .groupBy(sf.col("customer_id"))
                .agg(sf.sum(sf.col("amount").cast("double")).alias("Total_Spent"))
            )
       
            # Aggregate the rental data by customer_id
            top_cust_by_freq_df = (
                joined_df2
                .groupBy(sf.col("customer_id"))
                .agg((sf.count(sf.col("rental_id")).alias("Total_Frequency")))
            )

            # Calculate the number of rows for the top 10% customers
            limit_value_sales = int(top_cust_by_sales_df.count() * 0.1)
            limit_value_freq = int(top_cust_by_freq_df.count() * 0.1)
       
            # Join the aggregated sales data back with the customer_df to get first_name and last_name
            final_result_cust_sale_df = top_cust_by_sales_df.join(customer_df, "customer_id")\
                .select("customer_id", "full_name", 
                        sf.round(sf.col("Total_Spent").cast("double"), 2).alias("Total_Spent"))\
                .limit(limit_value_sales)\
                .orderBy("Total_Spent", ascending=False)
        
            # Join the aggregated rental frequency data back with the customer_df to get first_name and last_name
            final_result_cust_freq_df = top_cust_by_freq_df.join(customer_df, "customer_id")\
                .select("customer_id", "full_name", "Total_Frequency")\
                .limit(limit_value_freq)\
                .orderBy("Total_Frequency", ascending=False)

            # Return the results as a dictionary of DataFrames
            return {
                "top_customers_by_sales_df": final_result_cust_sale_df, 
                "top_customers_by_frequency_df": final_result_cust_freq_df
            }
        except AssertionError as e:
            print("Error: All values in the input dictionary must be Spark DataFrames.")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            traceback.print_exc()  # This will print the full stack trace

In [0]:
class SalesRevenueByGeographyTransformation(Transform):
    """
    This class provides transformation logic to calculate sales revenue 
    by geography, specifically by district and city. It extends the base 
    Transform class and implements the required transformation logic on input DataFrames.
    """

    def transform(self, inputDFs: Dict[str, DataFrame]) -> Dict[str, DataFrame]:
        """
        Transforms the input DataFrames to calculate total sales by district and city.
        The DataFrames include store, address, city, customer, and payment data, which 
        are joined and filtered based on active status before aggregating sales.
        
        Parameters:
        inputDFs (dict): Dictionary of input DataFrames. Keys expected are:
                         - "store": DataFrame containing store data.
                         - "address": DataFrame containing address data.
                         - "city": DataFrame containing city data.
                         - "customer": DataFrame containing customer data.
                         - "payment": DataFrame containing payment data.
                         
        Returns:
        dict: A dictionary containing two DataFrames:
              - "sales_by_district": DataFrame with total sales aggregated by district.
              - "sales_by_city": DataFrame with total sales aggregated by city.
        """
        try:
            
            # Ensure all values in inputDFs are Spark DataFrames
            for df in inputDFs.values():
                assert isinstance(df, DataFrame), "Non-DataFrame value found in input dictionary."
        
            # Load the input DataFrames from the inputDFs dictionary
            store_df = inputDFs["store"]
            address_df = inputDFs["address"]
            city_df = inputDFs["city"]
            customer_df = inputDFs["customer"]
            payment_df = inputDFs["payment"]

            # Join store, address, city, customer, and payment DataFrames
            # using their corresponding keys, and filter for only active customers
            joined_df1 = (store_df.join(address_df, "address_id")  # Join store with address on address_id
                              .join(city_df, "city_id")        # Join result with city on city_id
                              .join(customer_df, "store_id")   # Join result with customer on store_id
                              .join(payment_df, "customer_id") # Join result with payment on customer_id
                              .filter(sf.col("active") == 1))  # Filter for only active customers
        
            # Calculate total sales by district
            sales_by_district_df = (joined_df1.select("district", "amount")  # Select district and amount columns
                                        .groupBy("district")            # Group by district
                                        .agg((sf.round(sf.sum("amount"), 2).alias("total_sales")))  # Sum amount and round
                                        .orderBy("total_sales", ascending=False))  # Order by total sales in descending order
        
            # Calculate total sales by city
            sales_by_city_df = (joined_df1.select("city", "amount")          # Select city and amount columns
                                     .groupBy("city")                    # Group by city
                                     .agg((sf.round(sf.sum("amount"), 2)).alias("total_sales"))  # Sum amount and round
                                     .orderBy("total_sales", ascending=False))  # Order by total sales in descending order

            # Return both aggregated DataFrames in a dictionary
            return {"sales_by_district": sales_by_district_df,
                "sales_by_city": sales_by_city_df}
            
        except AssertionError as e:
            print("Error: All values in the input dictionary must be Spark DataFrames.")
        
        except Exception as e:
            print(f"An unexpected error occurred: {e}")


In [0]:
#testing the transformations
# DFs = TablesExtractor().extract()

# for key,_ in DFs.items():
#     print(key)


actor
address
category
city
country
customer
film
film_actor
film_category
inventory
language
payment
payment_p2022_01
payment_p2022_02
payment_p2022_03
payment_p2022_04
payment_p2022_05
payment_p2022_06
payment_p2022_07
rental
staff
store


In [0]:
# DFs["film"] = 0

In [0]:
# tran = PaymentsTableUnionTransform()
# transformedDFs = tran.transform(DFs)

# for key,_ in transformedDFs.items():
#     print(key)

In [0]:
# display(DFs["customer"])

In [0]:
# top10 = Top10PercentCustomerTransformation()
# top10.transform(DFs)

# transformer = CustTableTransformations()
# transformer.transform(DFs)


Payment DF Schema:
root
 |-- payment_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- rental_id: integer (nullable = true)
 |-- amount: decimal(38,18) (nullable = true)
 |-- payment_date: timestamp (nullable = true)

Rental DF Schema:
root
 |-- rental_id: integer (nullable = true)
 |-- rental_date: timestamp (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- return_date: timestamp (nullable = true)
 |-- staff_id: integer (nullable = true)
 |-- last_update: timestamp (nullable = true)



{'top_customers_by_sales_df': DataFrame[customer_id: int, first_name: string, last_name: string, Total_Spent: double],
 'top_customers_by_frequency_df': DataFrame[customer_id: int, first_name: string, last_name: string, Total_Frequency: bigint]}

In [0]:
# transformer2 = SalesRevenueByGeographyTransformation()
# transformer2.transform(DFs)