In [15]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("Delta Spark 4.0.0") \
    .master("local[*]") \
    .enableHiveSupport() \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [16]:
# Cargando datos en dataframes
df_customer = spark.read.format("delta").load("../sql/dw_bronze/customer")
df_product = spark.read.format("delta").load("../sql/dw_bronze/product")
df_so_detail = spark.read.format("delta").load("../sql/dw_bronze/so_detail")
df_so_header = spark.read.format("delta").load("../sql/dw_bronze/so_header.write")
df_s_territory =spark.read.format("delta").load("../sql/dw_bronze/s_terrotory.write")

In [None]:
# Contando cantidad de nulos por campo en cada tabla
from pyspark.sql.functions import col, count, when

def porcentaje_nulos_por_columna(df_customer):
    total = df_customer.count()  # Cuenta cuantas filas tiene el dataframe

    return df_customer.select([
        # Para cada columna, contamos los valores que son nulos y calculamos el %
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df_customer.columns  # Recorremos todas las columnas del DataFrame
    ])
porcentaje_nulos_por_columna(df_customer).show() #StoreID con 93.26% de nulos


def porcentaje_nulos_por_columna(df_product):
    total = df_product.count()  # Cuenta cuantas filas tiene el dataframe

    return df_product.select([
        # Para cada columna, contamos los valores que son nulos y calculamos el %
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df_product.columns  # Recorremos todas las columnas del DataFrame
    ])
porcentaje_nulos_por_columna(df_product).show() #StoreID con 93.26% de nulos


def porcentaje_nulos_por_columna(df_so_detail):
    total = df_so_detail.count()  # Cuenta cuantas filas tiene el dataframe

    return df_so_detail.select([
        # Para cada columna, contamos los valores que son nulos y calculamos el %
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df_so_detail.columns  # Recorremos todas las columnas del DataFrame
    ])
porcentaje_nulos_por_columna(df_so_detail).show() #StoreID con 93.26% de nulos


def porcentaje_nulos_por_columna(df_so_header):
    total = df_so_header.count()  # Cuenta cuantas filas tiene el dataframe

    return df_so_header.select([
        # Para cada columna, contamos los valores que son nulos y calculamos el %
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df_so_header.columns  # Recorremos todas las columnas del DataFrame
    ])
porcentaje_nulos_por_columna(df_so_header).show() #StoreID con 93.26% de nulos


def porcentaje_nulos_por_columna(df_s_territory):
    total = df_s_territory.count()  # Cuenta cuantas filas tiene el dataframe

    return df_s_territory.select([
        # Para cada columna, contamos los valores que son nulos y calculamos el %
        (count(when(col(c).isNull(), c)) / total * 100).alias(c)
        for c in df_s_territory.columns  # Recorremos todas las columnas del DataFrame
    ])
porcentaje_nulos_por_columna(df_s_territory).show() #StoreID con 93.26% de nulos

In [17]:
#Eliminando duplicados y nulos de los ID, dropeando campos de auditoria
#df_customer.show() # Eliminar nulos y duplicados (CustomerID) - Eliminar campos (fecha_carga, archivo_origen)
#df_product.show() # Eliminar nulos y duplicados (ProductID) - Eliminar campos (fecha_carga, archivo_origen)
#df_so_detail.show() # Eliminar nulos y duplicados (SalesOrderID) - Eliminar campos (fecha_carga, archivo_origen)
#df_so_header.show() # Eliminar nulos y duplicados (SalesOrderID) - Eliminar campos (fecha_carga, archivo_origen)
#df_s_territory.show() # Eliminar nulos y duplicados (TerritoryID) - Eliminar campos (fecha_carga, archivo_origen)

df_customer_limpio = df_customer.drop_duplicates(["CustomerID"]).filter("CustomerID IS NOT NULL").drop("fecha_carga", "archivo_origen")
df_product_limpio = df_product.dropDuplicates(["ProductID"]).filter("ProductID IS NOT NULL").drop("fecha_carga", "archivo_origen")
df_so_detail_limpio = df_so_detail.dropDuplicates(["SalesOrderID"]).filter("SalesOrderID IS NOT NULL").drop("fecha_carga", "archivo_origen")
df_so_header_limpio = df_so_header.dropDuplicates(["SalesOrderID"]).filter("SalesOrderID IS NOT NULL").drop("fecha_carga", "archivo_origen")
df_s_territory_limpio = df_s_territory.dropDuplicates(["TerritoryID"]).filter("TerritoryID IS NOT NULL").drop("fecha_carga", "archivo_origen")


In [None]:
df_customer_limpio.show(1) 
df_product_limpio.show(1) 
df_so_detail_limpio.show(1) 
df_so_header_limpio.show(1) 
df_s_territory_limpio.show(1)

In [18]:
#Modificando tipos de datos
#df_customer_limpio.printSchema() # Campos con tipos de datos correctos
#df_product_limpio.printSchema() # ModifiedDate, SellStartDate (Pasar a timestamp)
#df_so_detail_limpio.printSchema() # Campos con tipos de datos correctoscustomer
#df_so_header_limpio.printSchema() # Campos con tipos de datos correctos
#df_s_territory_limpio.printSchema() # Campos con tipos de datos correctos
from pyspark.sql.functions import col
df_customer_limpio_tipo = df_customer_limpio
df_product_limpio_tipo = df_product_limpio\
    .withColumn("ModifiedDate",col("ModifiedDate").cast("timestamp"))\
    .withColumn("SellStartDate",col("SellStartDate").cast("timestamp"))
df_so_detail_limpio_tipo = df_so_detail_limpio
df_so_header_limpio_tipo = df_so_header_limpio
df_s_territory_limpio_tipo = df_s_territory_limpio

In [19]:
#Renombrando campos a español
df_customer_rename = df_customer_limpio_tipo\
    .withColumnRenamed("CustomerID","ClienteID")\
    .withColumnRenamed("PersonID","PersonaID")\
    .withColumnRenamed("StoreID","TiendaID")\
    .withColumnRenamed("TerritoryID","TerritorioID")\
    .withColumnRenamed("AccountNumber","Numero_cuenta")\
    .withColumnRenamed("rowguid","guia_filas")\
    .withColumnRenamed("ModifiedDate","Fecha_modificacion")

df_product_rename = df_product_limpio_tipo\
    .withColumnRenamed("ProductID","ProductoID")\
    .withColumnRenamed("Name","Nombre")\
    .withColumnRenamed("ProductNumber","Numero_producto")\
    .withColumnRenamed("MakeFlag","Flag")\
    .withColumnRenamed("FinishedGoodsFlag","Flag_terminados")\
    .withColumnRenamed("SafetyStockLevel","Seguridad_stock")\
    .withColumnRenamed("ReorderPoint","Punto_reorden")\
    .withColumnRenamed("StandardCost","Costo_estandar")\
    .withColumnRenamed("ListPrice","Precio_lista")\
    .withColumnRenamed("Size","Tamaño")\
    .withColumnRenamed("SizeUnitMeasureCode","Codigo_medida_unidad")\
    .withColumnRenamed("WeightUnitMeasureCode","Codigo_medida_peso")\
    .withColumnRenamed("Weight","Peso")\
    .withColumnRenamed("DaysToManufacture","Dias_fabricar")\
    .withColumnRenamed("ProductLine","Linea_Producto")\
    .withColumnRenamed("Class","Clase")\
    .withColumnRenamed("Style","Estilo")\
    .withColumnRenamed("ProductSubcategoryID","SubcategoriaID_producto")\
    .withColumnRenamed("ProductModelID","ModeloID_producto")\
    .withColumnRenamed("SellStartDate","Fecha_inicio_venta")\
    .withColumnRenamed("SellEndDate","Fecha_finalizacion_venta")\
    .withColumnRenamed("DiscontinuedDate","Fecha_descontinuacion")\
    .withColumnRenamed("rowguid","guia_filas")\
    .withColumnRenamed("ModifiedDate","Fecha_modificacion")

df_so_detail_rename = df_so_detail_limpio_tipo\
    .withColumnRenamed("SalesOrderID","PedidoID")\
    .withColumnRenamed("SalesOrderDetailID","DetalleID")\
    .withColumnRenamed("CarrierTrackingNumber","Numero_seguimiento")\
    .withColumnRenamed("OrderQty","Cantidad_pedido")\
    .withColumnRenamed("ProductID","ProductoID")\
    .withColumnRenamed("SpecialOfferID","OfertaEsp_ID")\
    .withColumnRenamed("UnitPrice","Precio_Unitario")\
    .withColumnRenamed("UnitPriceDiscount","Desc_precio_uni")\
    .withColumnRenamed("LineTotal","Linea_total")\
    .withColumnRenamed("rowguid","guia_filas")\
    .withColumnRenamed("ModifiedDate","Fecha_modificacion")

df_so_header_rename = df_so_header_limpio_tipo\
    .withColumnRenamed("SalesOrderID", "PedidoID")\
    .withColumnRenamed("RevisionNumber", "NumeroRevision")\
    .withColumnRenamed("OrderDate", "FechaPedido")\
    .withColumnRenamed("DueDate", "FechaVencimiento")\
    .withColumnRenamed("ShipDate", "FechaEnvio")\
    .withColumnRenamed("Status", "Estado")\
    .withColumnRenamed("OnlineOrderFlag", "PedidoOnline")\
    .withColumnRenamed("SalesOrderNumber", "NumeroPedido")\
    .withColumnRenamed("PurchaseOrderNumber", "NumeroOrdenCompra")\
    .withColumnRenamed("AccountNumber", "NumeroCuenta")\
    .withColumnRenamed("CustomerID", "ClienteID")\
    .withColumnRenamed("SalesPersonID", "VendedorID")\
    .withColumnRenamed("TerritoryID", "TerritorioID")\
    .withColumnRenamed("BillToAddressID", "DireccionFacturaID")\
    .withColumnRenamed("ShipToAddressID", "DireccionEnvioID")\
    .withColumnRenamed("ShipMethodID", "MetodoEnvioID")\
    .withColumnRenamed("CreditCardID", "TarjetaCreditoID")\
    .withColumnRenamed("CreditCardApprovalCode", "CodigoAprobacionTC")\
    .withColumnRenamed("CurrencyRateID", "TasaCambioID")\
    .withColumnRenamed("SubTotal", "Subtotal")\
    .withColumnRenamed("TaxAmt", "MontoImpuesto")\
    .withColumnRenamed("Freight", "CostoEnvio")\
    .withColumnRenamed("TotalDue", "TotalAPagar")\
    .withColumnRenamed("Comment", "Comentario")\
    .withColumnRenamed("rowguid", "guia_filas")\
    .withColumnRenamed("ModifiedDate", "FechaModificacion")

df_s_territory_rename = df_s_territory_limpio_tipo\
    .withColumnRenamed("TerritoryID", "TerritorioID")\
    .withColumnRenamed("Name", "Nombre")\
    .withColumnRenamed("CountryRegionCode", "CodigoPaisRegion")\
    .withColumnRenamed("Group", "Continente")\
    .withColumnRenamed("SalesYTD", "VentasAcumuladasAno")\
    .withColumnRenamed("SalesLastYear", "VentasAnoAnterior")\
    .withColumnRenamed("CostYTD", "CostoAcumuladoAno")\
    .withColumnRenamed("CostLastYear", "CostoAnoAnterior")\
    .withColumnRenamed("rowguid", "guia_filas")\
    .withColumnRenamed("ModifiedDate", "FechaModificacion")


In [21]:
#Haciendo JOINS con la finalidad de crear tablas dimensionales
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, sequence, explode, to_date, year, month, quarter, dayofweek, date_format, when

#CREANDO DF_VENTAS
df_ventas = df_so_header_rename.join(
    df_so_detail_rename,
    on="PedidoID",
    how = "inner"
).select(
    "PedidoID",
    "ClienteID",
    "TerritorioID",
    "FechaPedido",
    "ProductoID",
    "Cantidad_pedido",
    "Precio_Unitario",
    "Linea_total"   
)
#df_ventas.show()

#CREANDO DF_PRODUCTOS
df_productos = df_product_rename\
    .select(
        "ProductoID",
        "Nombre",
        "Numero_producto",
        "Precio_lista"
    )
#df_productos.show()

#CREANDO DF_CLIENTES
df_clientes = df_customer_rename\
    .select(
        "ClienteID",
        "TerritorioID"
    )
#df_clientes.show()

df_territorio = df_s_territory_rename\
    .select(
        "TerritorioID",
        "Nombre",
        "CodigoPaisRegion",
        "Continente"
    )
#df_territorio.show()

# Creando rango de fechas directamente con Spark
df_rango = spark.sql("""
SELECT explode(sequence(to_date('2015-01-01'), to_date('2025-12-31'), interval 1 day)) as Fecha
""")

# Agregando columnas necesarias
df_fecha = df_rango.withColumn("FechaID", date_format("Fecha", "yyyyMMdd").cast("int"))\
    .withColumn("Año", year("Fecha"))\
    .withColumn("Mes", month("Fecha"))\
    .withColumn("Trimestre", quarter("Fecha"))\
    .withColumn("DiaSemana", dayofweek("Fecha"))\
    .withColumn("NombreMes", date_format("Fecha", "MMMM"))\
    .withColumn("NombreDia", date_format("Fecha", "EEEE"))\
    .withColumn("EsFinDeSemana", when(dayofweek("Fecha").isin(1, 7), 1).otherwise(0))\
    .select("FechaID", "Fecha", "Año", "Mes", "Trimestre", "NombreMes", "NombreDia", "DiaSemana", "EsFinDeSemana")


In [None]:

#ALMACENANDO DATAFRAMES EN AMBIENTE
df_ventas.write.format("delta").mode("overwrite").save("../sql/dw_silver/dim_ventas")
df_productos.write.format("delta").mode("overwrite").save("../sql/dw_silver/dim_productos")
df_clientes.write.format("delta").mode("overwrite").save("../sql/dw_silver/dim_clientes")
df_territorio.write.format("delta").mode("overwrite").save("../sql/dw_silver/dim_territorio")
df_fecha.write.format("delta").mode("overwrite").save("../sql/dw_silver/dim_fecha")