###Borrar y crear widgets

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("catalog", "bakehouse_dev")

catalogName = dbutils.widgets.get("catalog")

# Bandera para mostrar prints en la fase de desarrollo
showPrint = False

### Insertar Silver - Customers

In [0]:
import re
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# Consultar tabla
df_bronze = spark.table(f"{catalogName}.bronze.customers_bronze")

# add metadata columns
df_bronze = df_bronze.withColumn("updatedate", F.current_timestamp())

if showPrint:
  row_count, column_count = df_bronze.count(), len(df_bronze.columns)
  print(f"Row count: {row_count}")
  print(f"Column count: {column_count}")
  display(df_bronze.limit(50))

In [0]:
# Manejar Null en customerID
if showPrint:
    display(df_bronze.filter(F.col("customerID").isNull()).count())
    df_bronze.filter(F.col("customerID").isNull()).show(3)

# Drop rows where 'customerID' is null
df_bronze = df_bronze.dropna(subset=["customerID"])

# Get row count
if showPrint:
    row_count = df_bronze.count()
    print(f"Row count after droping null values: {row_count}")

In [0]:
# Codigos repetidos
df_tmp = df_bronze.groupBy("customerID").agg(F.count("*").alias("count")).filter(F.col("count") > 1)
if showPrint:
  print(f"Filas totales: {df_bronze.count()}")
  print(f"Filas con duplicadas: {df_tmp.count()}")

# Remover filas repetidos

# Remueve filas sin tener en cuenta un orden especifico
# df_tmp = df_bronze.dropDuplicates(["customerID"])

# Apply the window function to add a row number column
window_spec = Window.partitionBy(["customerID"]).orderBy(F.col("email_address"), F.col("last_name"), F.col("first_name").desc())
df_tmp = df_bronze.withColumn("row_number", F.row_number().over(window_spec))

# Filter to keep only the first row within each partition (the one with rank 1)
df_tmp = df_tmp.filter(F.col("row_number") == 1).drop("row_number")
df_tmp = df_tmp.drop("row_number")
df_bronze = df_tmp
if showPrint:
  print(f"Filas despues de remover duplicados: {df_bronze.count()}")
  display(df_bronze.orderBy("customerID").limit(10))

In [0]:
# Estandarizar phone_number a formato válido de 14 caracteres: (ejemplo: (XXX)-XXX-XXX-XXXXX)
def standardize_phone(phone):
    if phone is None:
        return None
    # Eliminar caracteres no numéricos
    digits = re.sub(r'\D', '', phone)
    # Solo tomar los primeros 14 dígitos
    digits = digits[:14]
    return f"({digits[:3]})-{digits[3:6]}-{digits[6:9]}-{digits[9:]}"

standardize_phone_udf = F.udf(standardize_phone, StringType())

df_bronze = df_bronze.withColumn("phone_number", standardize_phone_udf(F.col("phone_number")))
if showPrint:
    display(df_bronze.select("phone_number").limit(10))

In [0]:
# en el dataframe df_bronze cambiar la columna gender para que muestre un boleano, cuando tenga el texto female usar False y cuando tenga el texto male usar True. Cambiar el tipo de datos de la columna gender por boolean.
df_bronze = df_bronze.withColumn(
    "gender",
    F.when(F.col("gender") == "male", True)
    .when(F.col("gender") == "female", False)
    .otherwise(None)
)

df_tmp = df_bronze.filter(F.col("gender").isNull())
if showPrint:
    print(f"Cantidad de filas con gender = None: {df_tmp.count()}")
    display(df_bronze.select("customerID", "gender").limit(10))

In [0]:
# Convertir a mayusculas
df_bronze = df_bronze.withColumn("continent", F.upper(F.col("continent")))

if showPrint:
    display(df_bronze.select("customerID", "continent").limit(10))

In [0]:
# en el dataframe df_bronze en la columna state remplazar codigos de estado de dos letras con los nombres de estados de USA
us_state_map = {
    "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas", "CA": "California",
    "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware", "FL": "Florida", "GA": "Georgia",
    "HI": "Hawaii", "ID": "Idaho", "IL": "Illinois", "IN": "Indiana", "IA": "Iowa",
    "KS": "Kansas", "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland",
    "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi", "MO": "Missouri",
    "MT": "Montana", "NE": "Nebraska", "NV": "Nevada", "NH": "New Hampshire", "NJ": "New Jersey",
    "NM": "New Mexico", "NY": "New York", "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio",
    "OK": "Oklahoma", "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
    "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah", "VT": "Vermont",
    "VA": "Virginia", "WA": "Washington", "WV": "West Virginia", "WI": "Wisconsin", "WY": "Wyoming"
}

df_bronze = df_bronze.withColumn(
    "state",
    F.when(
        F.length(F.col("state")) == 2,
        F.create_map([F.lit(x) for pair in us_state_map.items() for x in pair])[F.col("state")]
    ).otherwise(F.col("state"))
)

if showPrint:
    display(df_bronze.select("state").distinct().orderBy("state"))

In [0]:
# Seleccionar y organizar columnas
df_silver = df_bronze.select("customerID", "first_name", "last_name", "gender", "email_address", "phone_number", "address", "continent", "country", "state", "city", "postal_zip_code", "updatedate")

if showPrint:
    df_silver.printSchema()

In [0]:
# guardar los datos del dataframe en la tabla
df_silver.write.format("delta") \
    .mode("overwrite") \
    .options(mergeSchema="true") \
    .saveAsTable(f"{catalogName}.silver.customers_silver")