In [0]:
from pyspark.sql.functions import col, to_date, concat_ws, lead
from pyspark.sql.window import Window

class firsttransformer(transformer):
    def transform(self, inputdf):
        """
        Transforming the input data by adding a Date column and computing the Next_Product.
        """
        transactioninputPdf = inputdf.get("transactioninputPdf")
        print("transactioninputPdf in transform")
        transactioninputPdf.show()

        # Step 1: Combine Year, Month, and Day into a Date column
        date_df = (
            transactioninputPdf
            .withColumn(
                "Date",
                to_date(
                    concat_ws("-", col("Year"), col("Month"), col("Day")),
                    "yyyy-MM-dd"
                )
            )
            .drop("Day", "Month", "Year")  # Drop the original columns to avoid confusion
        )

        # Step 2: Create a Window specification and compute the Next_Product
        window_spec = Window.partitionBy("Model_ID").orderBy("Date")

        transformed_df = date_df.withColumn(
            "Next_Product", 
            lead("BranchName").over(window_spec)
        )

        print("Transformed DataFrame:")
        transformed_df.show()

        # Return the final transformed DataFrame
        return transformed_df
