In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_input_df = inputDFs.get("customer_input_df")  # Ensure correct key name

        if transaction_input_df is None or customer_input_df is None:
            print("Error: Required DataFrames not found in inputDFs")
            return None

        print("Displaying transaction_input_df:")
        transaction_input_df.show()

        print("Displaying customer_input_df:")
        customer_input_df.show()

        # ✅ Convert `customer_id` in transaction_input_df to match customer_input_df type (string)
        transaction_input_df = transaction_input_df.withColumn("customer_id", col("customer_id").cast("string"))

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # **Join with customer data on customer_id**
        finalDF = filteredDF.join(customer_input_df, "customer_id", "inner") \
                            .select("customer_id", "customer_name", "join_date", "location")

        # Show results
        finalDF.show()

        # Additional Join Logic (if required)
        joinDF = finalDF.join(customer_input_df, "customer_id")  # Joining finalDF with customer data again (if needed)
        print("JOINDF")
        joinDF.show()

        return joinDF  # Returning joinDF instead of finalDF if required


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_input_df = inputDFs.get("customer_input_df")  # Ensure correct key name

        if transaction_input_df is None or customer_input_df is None:
            print("Error: Required DataFrames not found in inputDFs")
            return None

        print("Displaying transaction_input_df:")
        transaction_input_df.show()

        print("Displaying customer_input_df:")
        customer_input_df.show()

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # **Join with customer data on customer_id**
        finalDF = filteredDF.join(customer_input_df, "customer_id", "inner") \
                            .select("customer_id", "customer_name", "join_date", "location")

        # Show results
        finalDF.show()

        # Additional Join Logic (Fixed)
        joinDF = finalDF.join(customer_input_df, "customer_id")  # Joining finalDF with customer data again (if needed)
        print("JOINDF")
        joinDF.show()

        return joinDF  # Returning joinDF instead of finalDF if required


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")

        if transaction_input_df is None:
            print("Error: 'transaction_input_df' not found in inputDFs")
            return None

        print("Displaying transaction_input_df in transform:")
        transaction_input_df.show()

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply Window Function
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("AirPods after buying iPhone")
        transformedDF.orderBy("customer_id", "transaction_date", "product_name").show()

        print("Filtering: Customers who bought an iPhone and then AirPods")

        # Filter only rows where product_name = 'iPhone' and next_product_name = 'AirPods'
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # Show results
        filteredDF.show()

        # Return the transformed DataFrame
        return transformedDF
        


    


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_df = inputDFs.get("customer_df")  # Add customer data

        if transaction_input_df is None or customer_df is None:
            print("Error: Required DataFrames not found in inputDFs")
            return None

        print("Displaying transaction_input_df:")
        transaction_input_df.show()

        print("Displaying customer_df:")
        customer_df.show()

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # **Join with customer data on customer_id**
        finalDF = filteredDF.join(customer_df, "customer_id", "inner") \
                            .select("customer_id", "customer_name", "join_date", "location")

        # Show results
        finalDF.show()

        return finalDF
        customer_input_df = inputDFs.get("customer_input_df")
        joinDF = customer_input_df.join(customer_input_df,"customer_id")
        print("JOINDF")
        joinDF.show()
        return joinDF



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_input_df = inputDFs.get("customer_input_df")  # Fixed key

        if transaction_input_df is None or customer_input_df is None:
            print("❌ Error: Required DataFrames not found in inputDFs")
            return None

        print("📊 Displaying transaction_input_df:")
        transaction_input_df.show()

        print("📊 Displaying customer_input_df:")
        customer_input_df.show()

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get the next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("🔍 Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # Perform the correct join
        finalDF = self.joinDF(filteredDF, customer_input_df)

        # Show final results
        finalDF.show()

        return finalDF

    def joinDF(self, filteredDF, customer_input_df):
        """Performs an inner join between the filtered DataFrame and customer DataFrame"""
        print("🔗 Performing join operation...")
        joinDF = filteredDF.join(customer_input_df, "customer_id", "inner") \
                           .select("customer_id", "customer_name", "join_date", "location")

        print("✅ Join operation completed. Displaying joinDF:")
        joinDF.show()
        
        return joinDF



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_input_df = inputDFs.get("customer_input_df")  # Use consistent key name

        if transaction_input_df is None or customer_input_df is None:
            print("Error: Required DataFrames not found in inputDFs")
            return None

        print("Displaying transaction_input_df:")
        transaction_input_df.show()

        print("Displaying customer_input_df:")
        customer_input_df.show()

        # ✅ Convert `customer_id` in transaction_input_df to string for join compatibility
        transaction_input_df = transaction_input_df.withColumn("customer_id", col("customer_id").cast("string"))

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # ✅ Join with customer data on customer_id
        finalDF = filteredDF.join(customer_input_df, "customer_id", "inner") \
                            .select("customer_id", "customer_name", "join_date", "location")

        print("Final transformed DataFrame:")
        finalDF.show()

        return finalDF  # ✅ Removed dead code after return


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lead, col

class Transformer:
    def transform(self, inputDFs):
        pass

class FirstTransformer(Transformer):
    def transform(self, inputDFs):
        transaction_input_df = inputDFs.get("transaction_input_df")
        customer_input_df = inputDFs.get("customer_input_df")  # Use correct key

        if transaction_input_df is None or customer_input_df is None:
            print("Error: Required DataFrames not found in inputDFs")
            return None

        print("Displaying transaction_input_df:")
        transaction_input_df.show()

        print("Displaying customer_input_df:")
        customer_input_df.show()

        # ✅ Ensure `customer_id` has the same type in both DataFrames
        transaction_input_df = transaction_input_df.withColumn("customer_id", col("customer_id").cast("string"))
        customer_input_df = customer_input_df.withColumn("customer_id", col("customer_id").cast("string"))

        # Define Window Specification
        windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")

        # Apply lead function to get next product
        transformedDF = transaction_input_df.withColumn(
            "next_product_name", lead("product_name").over(windowSpec)
        )

        print("Filtering: Customers who bought an iPhone and then AirPods")
        filteredDF = transformedDF.filter(
            (transformedDF.product_name == "iPhone") & 
            (transformedDF.next_product_name == "AirPods")
        )

        # ✅ Perform Join with customer data on `customer_id`
        finalDF = filteredDF.join(customer_input_df, "customer_id", "inner") \
                            .select("customer_id", "customer_name", "join_date", "location")

        print("Final Transformed DataFrame:")
        finalDF.show()

        return finalDF
