In [0]:
%pip install openpyxl

In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql.functions import col,lit, current_date, split
from pyspark.sql.types import *
import pandas as pd
import re

In [0]:
# %fs ls "dbfs:/FileStore/shared_uploads/revathy.s@diggibyte.com/tmp/"

In [0]:
excel_file_path = "/dbfs/FileStore/shared_uploads/revathy.s@diggibyte.com/tmp/sample_data_one_translated_bkp.xlsx"

df = pd.read_excel(excel_file_path, header = [0,1,2])
                                      
for col in df.columns:
    if df[col].dtype == "object":
        df[col] = df[col].astype(str)
    if df[col].dtype == "float":
        df[col] = df[col].astype(float)       

spark_df = spark.createDataFrame(df)

def clean_column_name(col_name):
    return re.sub(r"[(')]", "", col_name).replace(",", "").strip()

df_cleaned = spark_df.toDF(*[clean_column_name(col) for col in spark_df.columns])

df_renamed = df_cleaned.toDF(*[col.replace("Model", "").lstrip() for col in df_cleaned.columns])

df_new = df_renamed.withColumnRenamed("Unnamed: 0_level_1 Unnamed: 0_level_2", "Model")

df_selected =df_new.drop("Total turnover Unnamed: 14_level_1 TOTAL",\
                            "4627500 Unnamed: 15_level_1 unit price", \
                            "4627500 Unnamed: 16_level_1 The total amount",\
                            "4627500 Unnamed: 17_level_1 Inventory at the end of the month\nCompany-wide",\
                            "Unnamed: 18_level_1 Unnamed: 18_level_2")\

df_modified = df_selected.toDF(*[col.replace(" ", "_") for col in df_selected.columns])
                            
# display(df_modified)

In [0]:
final_df = df_modified.replace("nan", None)\
            .filter(col("Model").isNotNull())\
            .select(*[col(c).cast(DoubleType()) if c != "Model" else col(c) for c in df_modified.columns])

In [0]:
unpivoted = final_df.selectExpr(
    "Model",
    "stack(13, " +
    "'Shinko_Mitsukoshi_Taichung', `Shinko_Mitsukoshi_Taichung`, " +
    "'Shinko_Mitsukoshi_North_Station', `Shinko_Mitsukoshi_North_Station`, " +
    "'Shinko_Mitsukoshi_Peach_Station', `Shinko_Mitsukoshi_Peach_Station`, " +
    "'Shinko_Mitsukoshi_Chiayi', `Shinko_Mitsukoshi_Chiayi`, " +
    "'Shinko_Mitsukoshi_Simon', `Shinko_Mitsukoshi_Simon`, " +
    "'Shinko_Mitsukoshi_Left_Battalion', `Shinko_Mitsukoshi_Left_Battalion`, " +
    "'Shinko_Mitsukoshi_A8', `Shinko_Mitsukoshi_A8`, " +
    "'Shinko_Mitsukoshi_Heavenly_Mother', `Shinko_Mitsukoshi_Heavenly_Mother`, " +
    "'SOGO_Revival', `SOGO_Revival`, " +
    "'SOGO_Hsinchu', `SOGO_Hsinchu`, " +
    "'Taipei_Three_creations', `Taipei_Three_creations`, " +
    "'outlet_Tainan', `outlet_Tainan`, " +
    "'outlet_Kaohsiung', `outlet_Kaohsiung`" +
    ") as (store_name, sales_qty)"
)

df1 = unpivoted.withColumn("Country", lit("Taiwan"))\
    .withColumn("Key_account", lit("Gaston"))\
    .withColumn("load_date", current_date())\
    .withColumnRenamed("Model", "Product_code")

df_mapped = df1.withColumn("load_date", col("load_date").cast("date"))\
    .withColumn("year", split(col("load_date").cast("string"), "-").getItem(0))\
    .withColumn("month", split(col("load_date").cast("string"), "-").getItem(1))\
    .withColumn("day", split(col("load_date").cast("string"), "-").getItem(2))\
    .drop("load_date")

df1_mapped = df_mapped.filter(col("sales_qty").isNotNull())