### Comienzo capa Silver (Raw Bronce - Silver)

In [0]:
from pyspark.sql.functions import regexp_replace, split, col

df_test = spark.read.csv("/mnt/datalake/globalsalesmarketmedallon/global_sales_raw/global_superstore_data.csv")
df_test.show(5, truncate=False)

# Limpieza: quitar comillas y convertir "\t" en tab real
df_clean = df_test.withColumn("cleaned", regexp_replace("_c0", '"', ''))
df_clean = df_clean.withColumn("cleaned", regexp_replace("cleaned", "\\\\t", "\t"))

df_split = df_clean.withColumn("columns", split(col("cleaned"), "\t"))

df_split.selectExpr("size(columns) as num_cols").show()

In [0]:
column_names = [
    "Category", "City", "Country", "Customer_ID", "Customer_Name", "Discount", "Market", "Aasdas"
    "564mg54o", "Order_Date", "Order_ID", "Order_Priority", "Product_ID", "Product_Name",
    "Profit", "Quantity", "Region", "Row_ID", "Sales", "Segment", "Ship_Date", "Ship_Mode",
    "Shipping_Cost", "State", "Sub_Category", "Year", "Market2", "Week_Num"
]

# Convertir la columna "columns" (array) en columnas individuales
df_final = df_split.select([col("columns")[i].alias(name) for i, name in enumerate(column_names)])

df_final.show(5, truncate=False)

display(df_final)

In [0]:

# save to silver
df_final.coalesce(1).write.format("csv") \
    .mode("overwrite") \
    .save("/mnt/datalake/globalsalesmarketmedallon/global_sales_bronce/csv")

df_final.coalesce(1).write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/datalake/globalsalesmarketmedallon/global_sales_bronce/delta_tables")


In [0]:

df_plata = spark.read.format("delta").load("/mnt/datalake/globalsalesmarketmedallon/global_sales_bronce/delta_tables")
display(df_plata)


In [0]:

#Limpieza y Transformacion en capa plata (Silver)

new_header = df_plata.limit(1).collect()[0].asDict().values()
columns = list(new_header)

# Reemplazar espacios por guiones bajos y cambiar 'Customer ID' a 'Customer_ID'
columns = [col.replace(" ", "_") if " " in col else col for col in columns]
columns = ["Customer_ID" if col == "Customer_ID" else col for col in columns]
columns = [col.replace(";;;;;.", "") if col.endswith(";;;;;.") else col for col in columns]

df_clean = df_plata.rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0]).toDF(columns)
df_clean = df_plata.withColumn("Week_Num", regexp_replace(col("Week_Num"), ";+", "").cast("string"))

# Eliminacion de header duplicado en 1er fila , al seleccionar solo Category se borra la fila completa
df_clean = df_plata.filter(df_plata["Category"] != "Category")

# Vuelvo a limpiar columna Week_Num quitandole los ;
df_clean = df_clean.withColumn("Week_Num", regexp_replace(col("Week_Num"), ";+", ""))

display(df_clean)

# df_clean.write.format("delta").mode("overwrite").save("/mnt/datalake/globalsalesmarketmedallon/global_sales_plata/delta_tables")
# df_clean.write.format("csv").mode("overwrite").save("/mnt/datalake/globalsalesmarketmedallon/global_sales_plata/csv")


In [0]:

df_clean = df_clean.drop("AasdasRecord_Count")
df_clean = df_clean.withColumnRenamed("Market", "Continent")
df_clean = df_clean.drop("Market2")
df_clean = df_clean.drop("Aasdas564mg54o")
display(df_clean)

In [0]:
from pyspark.sql.functions import date_format, col

# casteo de columnas a diferentes tipos de datos

df_clean = df_clean.withColumn("Discount", col("Discount").cast("int"))
df_clean = df_clean.withColumn("Order_Date", date_format(col("Order_Date").cast("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df_clean = df_clean.withColumn("Profit", col("Profit").cast("double"))
df_clean = df_clean.withColumn("Quantity", col("Quantity").cast("int"))
df_clean = df_clean.withColumn("Ship_Date", date_format(col("Order_Date").cast("timestamp"), "yyyy-MM-dd HH:mm:ss"))
df_clean = df_clean.withColumn("Shipping_Cost", col("Shipping_Cost").cast("double"))
df_clean = df_clean.withColumn("Sales", col("Sales").cast("int"))
df_clean = df_clean.withColumn("Year", col("Year").cast("int"))

display(df_clean)

In [0]:
from pyspark.sql.functions import upper, trim
# Estandarizo columna Order_Priority y Ship_Mode en MAYUSCULA y trim para espacios en blanco
df_clean = df_clean.withColumn("Order_Priority", upper(trim(col("Order_Priority")))) \
       .withColumn("Ship_Mode", upper(trim(col("Ship_Mode")))) \
       .withColumn("Sub_Category", trim(col("Sub_Category"))) 

display(df_clean)


## Mal manejo de los viajes desde el csv ingestado

In [0]:

from pyspark.sql.functions import year, month, weekofyear, datediff

# creamos nuevas columnas para el año, mes, semana y dias de entrega del pedido para mas informacion de analisis 
df_clean = df_clean.withColumn("Order_Year", year("Order_Date")) \
       .withColumn("Order_Month", month("Order_Date")) \
       .withColumn("Order_Week", weekofyear("Order_Date")) \
       .withColumn("Shipping_Days", datediff("Ship_Date", "Order_Date")) \
       .withColumn("Profit_Margin", col("Profit") / col("Sales"))

# Check de envio con fecha posterior a la orden (Fallo por misma fecha y hora), se podria borrar la columna "Shipping_Days"
df_clean = df_clean.filter(col("Ship_Date") >= col("Order_Date"))

display(df_clean)

df_clean.write.format("delta").mode("overwrite").save("/mnt/datalake/globalsalesmarketmedallon/global_sales_plata/delta_tables")
df_clean.write.format("csv").mode("overwrite").save("/mnt/datalake/globalsalesmarketmedallon/global_sales_plata/csv")

### Fin capa plata (Silver)