In [0]:
%run ./nz_schema

In [0]:
df_nz_2023 = spark.read.format("csv") \
  .schema(schema) \
  .option("header", "true") \
  .load("/FileStore/tables/newzealand_price_mapping.csv")

In [0]:
df_nz_2024 = spark.read.format("csv") \
  .schema(schema) \
  .option("header", "true") \
  .load("/FileStore/tables/Clone_newzealand_price_mapping_2024.csv")

In [0]:
df_nz = df_nz_2023.unionByName(df_nz_2024)

df_nz = df_nz.withColumn('date', to_date(col('date'), 'dd-MM-yyyy')) 

# display(df_nz)

In [0]:
df_price= spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load("/FileStore/tables/price_invoice_mapping.csv")

def rename_columns(df):
    return df.toDF(*[col.lower().replace(" ", "_") for col in df.columns])

df_price = rename_columns(df_price)

df_price = df_price.filter(col("product_code").isNotNull()) \
                    .filter(col("product_code") != "#") \
                    .dropDuplicates()

# display(df_price)

In [0]:
df_price_month = df_price.withColumn('year',split(df_price.start_of_month,'-').getItem(2))\
            .withColumn('month',split(df_price.start_of_month,'-').getItem(1))\
            .withColumn('day',split(df_price.start_of_month,'-').getItem(0)) \
            .withColumnRenamed('gross_price', 'sales_amount') \
            .withColumn('price_date', to_date(concat_ws("-", col("year"), col("month"), col("day")), "yyyy-MM-dd")) \
            .select("product_code", "year", "month", "price_date", "sales_amount")


df_nz_month = df_nz.withColumn('date', to_date(col("date"), "dd-MM-yyyy"))\
             .filter(col("date") <= lit("2024-07-01")) \
             .drop("store_id", "store_code","sales_amount","total_sales_amount")

df_month_joined = df_nz_month.join(df_price_month, ["product_code", "year", "month"], "left") \
                    .dropDuplicates()

# display(df_month_joined)

In [0]:
df_price_week_1 = df_price.withColumn('year',split(df_price.start_of_week,'-').getItem(0))\
            .withColumn('month',split(df_price.start_of_week,'-').getItem(1))\
            .withColumn('day',split(df_price.start_of_week,'-').getItem(2)) \
            .withColumn('price_week', to_date(concat_ws("-", col("year"), col("month"), col("day")), "yyyy-MM-dd")) \
            .withColumn('year_m',split(df_price.start_of_month,'-').getItem(2))\
            .withColumn('month_m',split(df_price.start_of_month,'-').getItem(1))\
            .withColumn('day_m',split(df_price.start_of_month,'-').getItem(0)) \
            .withColumn('price_month', to_date(concat_ws("-",col("year_m"), col("month_m"), col("day_m")), "yyyy-MM-dd")) \

df_price_week_2 = df_price_week_1.withColumnRenamed('gross_price', 'sales_amount') \
                    .withColumn("price_date", when(col("price_week").isNotNull(), col("price_week")) 
                    .otherwise(col("price_month")))
            

df_price_week = df_price_week_2.withColumn('year', split(col('price_date'), '-').getItem(0)) \
                            .withColumn('month', split(col('price_date'), '-').getItem(1)) \
                            .withColumn("week", weekofyear(col("price_date"))) \
                            .select("product_code", "year", "month", "week", "price_date", "sales_amount")

df_nz_week = df_nz.withColumn('date', to_date(col("date"), "dd-MM-yyyy"))\
             .filter(col("date") > lit("2024-07-01")) \
             .drop("store_id", "store_code", "sales_amount", "total_sales_amount")

df_week_joined = df_nz_week.join(df_price_week, ["product_code", "year", "month","week"], "left")

# display(df_week_joined)

In [0]:
def process_sales_data(df_joined):

    window = (
        Window.partitionBy("product_code")
        .orderBy(col("year"), col("month"))
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )
   
    price_recent= df_joined.withColumn(
        "recent_sales_amount",
        last(
            when(
                col("sales_amount").isNotNull() & (col("sales_amount") != 0), col("sales_amount")
            ),
            ignorenulls=True
        ).over(window)
    )

    price_filled = price_recent.withColumn(
        "sales_amount_new",
        when(col("sales_amount").isNull(), col("recent_sales_amount"))
        .when(col("sales_amount") == 0, col("sales_amount"))
        .otherwise(col("sales_amount"))
    )

    df_cleaned = price_filled.drop("recent_sales_amount", "sales_amount", "price_date") \
                                   .withColumnRenamed("sales_amount_new", "sales_amount")

    return df_cleaned


**sales_amount <= 2024-07-01**

In [0]:
# Calling the function
df_1 = process_sales_data(df_month_joined)

# display(df_1)


In [0]:
df_1 = df_1.count()
display(df_1)

**sales_amount > 2024-07-01**

In [0]:
# Calling the function
df_2 = process_sales_data(df_week_joined)

# display(df_2)

In [0]:
df_2 = df_2.count()
display(df_2)

**Final sales_amount**

In [0]:
df_union = df_1.unionByName(df_2)
# display(df_union)

In [0]:
# Calling the function
df = process_sales_data(df_union)

df = df.filter(col("sales_amount").isNotNull())

# display(df)

In [0]:
df = df.count()
display(df)