# PROYECTO ETL EN DATABRICKS - COSMETSAC
Autor: Brayan R. Neciosup Bolaños

Importante:
Como no se tiene desplegada una BD relacional en la nube, usaremos Unity Catalog 
y todas sus características para simular una BD relacional en Databricks. 

El Modelo Entidad Relación esta elaborado en MSSM, puedes visualizarlo en la imagen denominada: ModeloER-SQL.png
O también puedes revisar el script: ScriptBDCosmetSAC.sql

### LIBRERÍAS UTILIZADAS

In [0]:
# Librerias a utilizar
from pyspark.sql import SparkSession # Puerta de acceso a todas las funcionalidades de apache spark
from pyspark.sql.functions import * # Funciones SQL
from pyspark.sql.types import * # Funciones de tipos de datos


### CONFIGURACIONES Y FUNCIONES UTILIZADAS PARA LA CARGA DE INFORMACIÓN EN UNITY CATALOG

In [0]:
#### Configuración de Unity Catalog para simular BD relacional
# A). Creación del Catálago (Nivel más alto en la jerarquía de Unity Catalog) 
# [Es como crear la BD en cualquier gestor de BD]
spark.sql("CREATE CATALOG IF NOT EXISTS cosmetsac")
print("Catálago creado exitosamente")

# B). Creación del Esquema (Segundo nivel en la jerarquía de Unity Catalog) 
# [Son como los esquemas dentro de cualquier gestor de BD] 
# [En caso no creamos conveniente crearlo, podemos usar el esquema "default"]
spark.sql("CREATE SCHEMA IF NOT EXISTS cosmetsac.ventas") # Permite administrar mejor cada entidad
print("Esquema ventas creado exitosamente")

# Importante: No crearemos los volumenes, porque las tablas donde se almacenarán los datos
#             serán entidades gobernada por Unity Catalog y permitirá usar el lenguaje SQL.

In [0]:
# Función encargada de cargar la información a la tabla de clientes 
def cargar_informacion_clientes_cosmetsac(dataframe_clientes):
    # dataframe_clientes.show()
    # Almacenaremos el dataframe en una delta table que será gobernado por Unity Catalog
    dataframe_clientes.write.format("delta").mode("overwrite").saveAsTable("cosmetsac.ventas.clientes")
    print("Datos de clientes registrados")
    # write: Modo de escritura
    # format("delta"): Es la forma nativa de Databricks para almacenar información (Optimizada y recomendada)
    # mode("overwrite"): Sobreescribe toda la información existente sin perder nada de datos.
    # saveAsTable("cosmetsac.ventas.clientes"): Permite guardarse como delta table optimizada

In [0]:
# Función encargada de cargar la información de tablas externas (Marcas,Categorias,FormasPagos)
# La similitud de estas 3 tablas, se debe a su estructura en sus campos.
def cargar_datos_tablas_externas(nombre_tabla,dataframe_tabla_externa):
    # dataframe_tabla_externa.show()
    # Almacenaremos el dataframe en una delta table que será gobernada por Unity Catalog
    jerarquia_unity_catalog_tabla = f"cosmetsac.ventas.{nombre_tabla}" # Formateamos la jerarquia en base al nombre de la tabla
    dataframe_tabla_externa.write.format("delta").mode("overwrite").saveAsTable(jerarquia_unity_catalog_tabla)
    print(f"Datos de {nombre_tabla} registrados")
    # write: Modo de escritura
    # format("delta"): Es la forma nativa de Databricks para almacenar información (Optimizada y recomendada)
    # mode("overwrite"): Sobreescribe toda la información existente sin perder nada de datos.
    # saveAsTable(jerarquia_unity_catalog_tabla): Permite guardarse como delta table optimizada

In [0]:
# Función encargada de cargar la información a la tabla productos
def cargar_datos_productos(dataframe_productos):
    # dataframe_productos.show()
    # Almacenaremos el dataframe en una delta table que será gorbernada por Unity Catalog
    dataframe_productos.write.format("delta").mode("overwrite").saveAsTable("cosmetsac.ventas.productos")
    print("Datos de productos registrados")
    # write: Modo de escritura
    # format("delta"): Es la forma nativa de Databricks para almacenar información (Optimizada y recomendada)
    # mode("overwrite"): Sobreescribe toda la información existente sin perder nada de datos.
    # saveAsTable("cosmetsac.ventas.productos"): Permite guardarse como delta table optimizada

### PROCESO ETL (EXTRAER - TRANSFORMAR Y CARGAR)

In [0]:
# Archivo CLIENTES-EMPRESA-COSMETSAC.xlsx cargada previamente a Unity Catalog en formato delta table(clientes_empresa_cosmetsac)

#### EXTRAER
clientes_cosmetsac = spark.sql("SELECT * FROM workspace.exercises.clientes_empresa_cosmetsac")
# clientes_cosmetsac.show() # Leemos las 5 primeras filas de la tabla

#### Transformar
#-- Limpiar la columna CLIENTE de guiones que existen
clientes_cosmetsac = clientes_cosmetsac.withColumns({
    "CLIENTE": regexp_replace(col("CLIENTE"),r'-',' '),
    "TELEFONO": cast(StringType(),col("TELEFONO"))
})
clientes_cosmetsac = clientes_cosmetsac.withColumns({
    "CLIENTE":upper(col("CLIENTE")),
    "TELEFONO":cast(IntegerType(),regexp_replace(col("TELEFONO"),r'^51',''))
})
clientes_cosmetsac = clientes_cosmetsac.withColumns({
    "Primer Nombre Cliente":split(col("CLIENTE")," ").getItem(0), # .split() Permite separar la información
    "Segundo Nombre Cliente":split(col("CLIENTE")," ").getItem(1),# basandose en un delimitador, para poder convertirlo a un array
    "Primer Apellido Cliente":split(col("CLIENTE")," ").getItem(2), # mismo array, al cu+al accedemos a cada elemtno respectivamente
    "Segundo Apellido Cliente":split(col("CLIENTE")," ").getItem(3) # con .getItem(IndiceElemento)
})

clientes_cosmetsac = clientes_cosmetsac.withColumns({
    "Apellidos":concat_ws(', ',col("Primer Apellido Cliente"),col("Segundo Apellido Cliente")),
    "Nombres":concat_ws(', ',col("Primer Nombre Cliente"),col("Segundo Nombre Cliente"))
})

clientes_cosmetsac = clientes_cosmetsac.select(
    col("Apellidos"),col("Nombres"),col("DNI"),col("CORREO ELECTRONICO"),col("TELEFONO")
)

# clientes_cosmetsac.show()
# Preparamos dataframe para una correcta coherencia con la estructura definida de la tabla:
clientes_bd = clientes_cosmetsac.withColumn(
    "ClientesID",                  # Agrego una columna para el ID del cliente
    monotonically_increasing_id()+1
)
clientes_bd = clientes_bd.select(
    col("ClientesID"),col("Apellidos").alias("ClientesApellidos"),
    col("Nombres").alias("ClientesNombres"),col("DNI").alias("ClientesDNI"),
    col("CORREO ELECTRONICO").alias("ClientesCorreoElectronico"),col("TELEFONO").alias("ClientesTelefono")
)
# clientes_bd.show()

#### CARGAR
# FUNCIÓN PARA CONVERTIR ESTE DATAFRAME A UN DELTA TABLE EN UNITY CATALOG
cargar_informacion_clientes_cosmetsac(clientes_bd)

In [0]:
# Archivo PRODUCTOS-EMPRESA-COSMETSAC.xlsx cargada previamente a Unity Catalog en formato delta table(productos_empresa_cosmetsac)

#### EXTRAER

productos_cosmetsac = spark.sql("SELECT * FROM workspace.exercises.productos_empresa_cosmetsac")
# productos_cosmetsac.show() 

#### TRANSFORMAR

productos_cosmetsac = productos_cosmetsac.select(
    col("Producto"),col("Precio regular (S/)"),col("Stock Actualizado"),
    col("Marca"),col("Categoría")
)
productos_cosmetsac = productos_cosmetsac.withColumnsRenamed({
    "Precio regular (S/)":"Precio Compra",
    "Stock Actualizado":"Stock"
})

# EXTRAEMOS LAS MARCAS
marcas_unicas = productos_cosmetsac.select(col("Marca")).dropDuplicates() # Eliminamos duplicados
diccionario_marcas = {
    "Marca ID":[i for i in range(1,marcas_unicas.count()+1)],
    "Marca":[i[0] for i in marcas_unicas.select(col("Marca")).collect()]
}
df_marcas = spark.createDataFrame(list(zip(*diccionario_marcas.values())),["Marca ID","Marca"])
# Preparamos dataframe para una correcta coherencia con la estructura definida de la tabla:
marcas_bd = df_marcas.select(col("Marca ID").alias("MarcasID"),col("Marca").alias("MarcasDescripcion"))
# df_marcas.show()

# EXTRAEMOS LAS CATEGORIAS

categorias_unicas = productos_cosmetsac.select(col("Categoría")).dropDuplicates()
diccionario_categorias = {
    "Categoria ID":[i for i in range(1,categorias_unicas.count()+1)],
    "Categoría": [i[0] for i in categorias_unicas.select(col("Categoría")).collect()]
}
df_categorias = spark.createDataFrame(list(zip(*diccionario_categorias.values())),["Categoria ID","Categoría"])
# Preparamos dataframe para una correcta coherencia con la estructura definida de la tabla:
categorias_bd = df_categorias.select(col("Categoria ID").alias("CategoriasID"),col("Categoría").alias("CategoriasDescripcion"))
# df_categorias.show()

productos_cosmetsac = productos_cosmetsac.join(df_marcas,"Marca")
productos_cosmetsac = productos_cosmetsac.join(df_categorias,"Categoría")

# Seleccionamos columnas necesarias.
productos_cosmetsac = productos_cosmetsac.select(
    col("Producto"),col("Precio Compra"),col("Stock"),
    col("Marca ID"),col("Categoria ID")
)
# Preparamos dataframe para una correcta coherencia con la estructura definida de la tabla:
productos_bd = productos_cosmetsac.withColumn(
    "ProductosID",
    monotonically_increasing_id()+1
)
productos_bd = productos_bd.select(
    col("ProductosID"),col("Producto").alias("ProductosDescripcion"),
    col("Precio Compra").alias("ProductosPrecioCompra"),col("Stock").alias("ProductosStock"),
    col("Marca ID").alias("ProductosMarcasID"),col("Categoria ID").alias("ProductosCategoriasID")
)
# productos_bd.show()

#### CARGAR

# Funciones para cargar información ....
cargar_datos_tablas_externas(nombre_tabla="marcas",dataframe_tabla_externa=marcas_bd) # Tabla marcas
cargar_datos_tablas_externas(nombre_tabla="categorias",dataframe_tabla_externa=categorias_bd) # Tabla categorias
cargar_datos_productos(productos_bd)


In [0]:
# Archivo VENTAS-EMPRESA-COSMETSAC.xlsx cargada previamente a Unity Catalog en formato delta table(ventas_empresa_cosmetsac)

#### EXTRAER

ventas_cosmetsac = spark.sql("SELECT * FROM workspace.exercises.ventas_empresa_cosmetsac")
# ventas_cosmetsac.show()

#### TRANSFORMAR

# Extraemos las formas de pago
formas_pago_unicas = ventas_cosmetsac.select(col("PagosDescripcion")).dropDuplicates()
diccionario_formas_pago = {
    "FormaPagoID":[i for i in range(1,formas_pago_unicas.count()+1)],
    "PagosDescripcion":[i[0] for i in formas_pago_unicas.select(col("PagosDescripcion")).collect()]
}
df_formas_pago = spark.createDataFrame(data=list(zip(*diccionario_formas_pago.values())),schema=["FormaPagoID","PagosDescripcion"])
# df_formas_pago.show()

# Extraemos las promociones
promociones_unicas = ventas_cosmetsac.select(
    col("Promocion"),col("FechaInicio"),col("FechaFin")
).dropDuplicates()

# Agregamos columna Descuento de la descripción
promociones_unicas = promociones_unicas.withColumns({
    "Promocion ID":
    monotonically_increasing_id()+1,    
    "Descuento":
    (regexp_extract(col("Promocion"),r'\d+',0)/100),
    "Estado":
    lit(0)
})
# promociones_unicas.show()

# Construimos el dataframe ventas final para la fase de Carga
# 1️⃣ Unimos con el dataframe de Formas Pagos
df_ventas_cosmetsac = ventas_cosmetsac.join(df_formas_pago,on="PagosDescripcion",how="inner")
# 2️⃣ Unimos con el dataframe de Promociones
df_ventas_cosmetsac = df_ventas_cosmetsac.join(promociones_unicas,how="inner",on="Promocion")
# 3️⃣ Estandarizamos el ID Cliente e ID Producto
df_ventas_cosmetsac = df_ventas_cosmetsac.withColumns({
    "ID Cliente":
    regexp_replace(col("ID Cliente"),r'^C','').cast(dataType=IntegerType()),
    "ID Producto":
    regexp_replace(col("ID Producto"),r'^P','').cast(dataType=IntegerType())
})
# df_ventas_cosmetsac.show() 

# Seleccionamos columnas necesarias para poblar la tabla Pedidos en la fase de Carga
df_pedidos = df_ventas_cosmetsac.select(col("FechaCompra"),col("ID Cliente"),col("FormaPagoID")).sort(col("FechaCompra").asc())
# df_pedidos.show()

#### CARGAR
# Funciones para cargar las tablas formas de pago, promociones y pedidos .....


In [0]:
# Archivo VENTAS-EMPRESA-COSMETSAC.xlsx cargada previamente a Unity Catalog en formato delta table(ventas_empresa_cosmetsac)
# ETL ➡️ Tabla DetallePedidos

#### EXTRAER Y TRANSFORMAR
# Extraer los IDs de los clientes que tienen un pedido registrado

clientes_ids = df_pedidos.select(col("ID Cliente")).dropDuplicates()
# clientes_ids.show()

#### CARGAR
# Función que permite extraer todos los IDs de Pedidos de los clientes
# ....

# Función que prepara el dataframe de la función anterior, para el poblado de la tabla DetallePedidos
# ....

# Función que permite realizar la carga de información a la tabla DetallePedidos
# .... 



