In [0]:
spark

In [0]:
from pyspark.sql.functions import to_timestamp, col, when, from_utc_timestamp, date_format, split, concat, lit, regexp_replace
from pyspark.sql.types import TimestampType

In [0]:
dbutils.fs.ls("/mnt/bronze")

In [0]:
path = dbutils.fs.ls("/mnt/bronze/ecommerce/")
tableNames = []
for table in path:
    tableNames.append(table.name.split("/")[0])
print(tableNames)

In [0]:
dfs = {}
for i in tableNames:
    path = f"/mnt/bronze/ecommerce/{i}/{i}.parquet"
    df = spark.read.parquet(path)
    columns = df.columns
    columns_name=[]
    # Concatenate first_name and last_name if both exist
    if "first_name" in columns and "last_name" in columns:
        df = df.withColumn("full_name", concat(df["first_name"], lit(" "), df["last_name"]))

    # Rename columns if necessary
    if any("user_name" in col for col in columns):
        df = df.withColumnRenamed("user_name", "user_id")
    if any("seller_address" in col for col in columns):
        df = df.withColumnRenamed("seller_address", "address")
    
    # Split address into city, state, and zipcode if address exists
    if any("address" in col for col in columns):
        df = df.withColumn("city", split(df['address'], ",").getItem(0))\
               .withColumn("state", split(df['address'], ",").getItem(1))\
               .withColumn("zipcode", split(df['address'], ",").getItem(2))

    if "product_category" in columns:
        df = df.withColumn("product_category_converted", regexp_replace("product_category", "_", " "))
    if "payment_type" in columns:
        df= df.withColumn("payment_type_converted", regexp_replace("payment_type", "_", " "))

    df = df.drop("address", "first_name", "last_name", "product_photos_qty", "product_name_lenght", "product_description_lenght", "product_category", "payment_type")
    
    # Store the DataFrame in the dictionary
    dfs[i] = df

# for i in dfs:
#     dfs[i].display()

In [0]:
converted_dfs = {}
for i in dfs:
    # path="/mnt/silver/ecommerce/"+i+"/"
    # df=spark.read.format("delta").load(path,header=True)
    df = dfs[i]
    columns_name=[]
    for col_name in df.columns:
        if "date" in col_name:
            columns_name.append(col_name)
        df_converted = df
        for col_name in columns_name:
            df_converted = df_converted.withColumn(
                col_name + "_converted",
                when(
                    to_timestamp(col(col_name), "MM-dd-yyyy HH:mm").isNotNull(),
                    date_format(to_timestamp(col(col_name), "MM-dd-yyyy HH:mm"), "yyyy-MM-dd HH:mm")
                ).when(
                    to_timestamp(col(col_name), "MM/dd/yyyy HH:mm").isNotNull(),
                    date_format(to_timestamp(col(col_name), "MM/dd/yyyy HH:mm"), "yyyy-MM-dd HH:mm")
                ).when(
                    to_timestamp(col(col_name), "dd-MM-yyyy HH:mm").isNotNull(),
                    date_format(to_timestamp(col(col_name), "dd-MM-yyyy HH:mm"), "yyyy-MM-dd HH:mm")
                ).when(
                    to_timestamp(col(col_name), "M/d/yyyy HH:mm").isNotNull(),
                    date_format(to_timestamp(col(col_name), "M/d/yyyy HH:mm"), "yyyy-MM-dd HH:mm")
                ).when(
                    to_timestamp(col(col_name), "M/d/yyyy H:mm").isNotNull(),
                    date_format(to_timestamp(col(col_name), "M/d/yyyy H:mm"), "yyyy-MM-dd HH:mm")
                ).otherwise(None)
            )
            
        df_converted = df_converted.drop(*columns_name)
        converted_dfs[i] = df_converted

# for i in converted_dfs:
#     converted_dfs[i].display()

In [0]:
for filter_table in converted_dfs:
    silver_path = f"/mnt/silver/ecommerce/{filter_table}/"
    print(filter_table)
    converted_dfs[filter_table].write.mode("overwrite").format('delta').save(silver_path)