# Proyecto Final: DataLake en Databricks
### Alumna: Yhomira Alexandra Yupayccana Lopa

Este proyecto tiene como objetivo implementar un DataLake utilizando la arquitectura **Medallón**. 
La arquitectura medallón organiza los datos en tres capas:
- **Bronze**: Guardar los archivos en el mismo formato de Origen.
- **Silver**: Crear tablas deltas a partir de los archivos guardados en la capa Bronze.
- **Golden**:Implementar los reportes finales en tablas Delta..

### Tecnologías Utilizadas
- **Databricks**: Para el procesamiento y almacenamiento de datos.
- **PySpark**: Para transformar y analizar los datos.

### Estructura del Notebook
1. Configuración inicial del entorno.
2. Transformaciones Capa Bronce.
3. Transformaciones Capa Silver.
4. Transformaciones Capa Golden.


A continuación, se detallan los pasos realizados.


## 1. Configuración inicial del entorno.

In [0]:
dbutils.fs.ls("dbfs:/")

Out[2]: [FileInfo(path='dbfs:/DATALAKE-DSRP/', name='DATALAKE-DSRP/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/DATALAKE-PROYECTOFINAL/', name='DATALAKE-PROYECTOFINAL/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/', name='FileStore/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/LAKE-DSRP/', name='LAKE-DSRP/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/', name='user/', size=0, modificationTime=0)]

In [0]:
dbutils.fs.mkdirs("dbfs:/DATALAKE-PROYECTOFINAL")

Out[3]: True

In [0]:
# Lista de nombres de carpetas SEGUN LA ARQUITECTURA MEDALLON
carpetas = ["dbfs:/DATALAKE-PROYECTOFINAL/bronze", "dbfs:/DATALAKE-PROYECTOFINAL/silver", "dbfs:/DATALAKE-PROYECTOFINAL/golden"]
# Crear las carpetas usando un bucle
for carpeta in carpetas:
    dbutils.fs.mkdirs(carpeta)

In [0]:
dbutils.fs.mkdirs("dbfs:/FileStore/temp")

Out[5]: True

## 2. Transformaciones Capa Bronce

En esta sección, se cargan los datos iniciales y se almacenan en la capa **Bronze** del DataLake.
Los datos se encuentran en formato CSV .


In [0]:
#Copiar los archivos de /temp a bronze:
dbutils.fs.cp("dbfs:/FileStore/temp/clientes.csv", "dbfs:/DATALAKE-PROYECTOFINAL/bronze")
dbutils.fs.cp("dbfs:/FileStore/temp/productos.csv", "dbfs:/DATALAKE-PROYECTOFINAL/bronze")
dbutils.fs.cp("dbfs:/FileStore/temp/ventas.csv", "dbfs:/DATALAKE-PROYECTOFINAL/bronze")

Out[6]: True

In [0]:
#Lectura de los csv en dataframes
clientes_df = spark.read.csv("dbfs:/DATALAKE-PROYECTOFINAL/bronze/clientes.csv", header=True)
productos_df = spark.read.csv("dbfs:/DATALAKE-PROYECTOFINAL/bronze/productos.csv", header=True)
ventas_df = spark.read.csv("dbfs:/DATALAKE-PROYECTOFINAL/bronze/ventas.csv", header=True)

In [0]:
clientes_df.show()

+----------+-----------------+--------------------+----------------+---------+
|id_cliente|           nombre|              correo|          ciudad| telefono|
+----------+-----------------+--------------------+----------------+---------+
|         1|     Carlos Perez|carlos.perez@gmai...|            Lima|987654321|
|         2|         Ana Ruiz|ana.ruiz@hotmail.com|        Huancayo|987654322|
|         3|       Juan Gomez|juan.gomez@yahoo.com|        Trujillo|987654323|
|         4|    Lucia Sanchez|lucia.sanchez@gma...|        Chiclayo|987654324|
|         5|     Pedro Torres|pedro.torres@hotm...|        Arequipa|987654325|
|         6|    Sofia Jimenez|sofia.jimenez@gma...|           Piura|987654326|
|         7|      Martin Diaz|martin.diaz@yahoo...|             Ica|987654327|
|         8|      Carla Rojas|carla.rojas@gmail...|           Cusco|987654328|
|         9|   Fernando Silva|fernando.silva@ho...|            Lima|987654329|
|        10|     Laura Chavez|laura.chavez@gmai...| 

In [0]:
productos_df.show()

+-----------+--------------------+-----------+-------+--------------+
|id_producto|     nombre_producto|  categoria| precio|cantidad_stock|
+-----------+--------------------+-----------+-------+--------------+
|          1|          Laptop Pro|Electronica|1200.50|            50|
|          2|        Smartphone X|Electronica| 800.00|           150|
|          3|        Televisor HD|Electronica| 550.00|            30|
|          4|   Zapatillas Runner|   Deportes|  75.00|           200|
|          5|   Bicicleta Montana|   Deportes| 450.00|            20|
|          6|         Mouse Gamer|Electronica|  35.00|           300|
|          7|    Teclado Mecanico|Electronica|  85.00|           120|
|          8|       Camara Reflex|Electronica| 900.00|            25|
|          9|Auriculares Bluet...|Electronica| 120.00|           250|
|         10|     Reloj Deportivo|   Deportes|  50.00|           180|
|         11|    Silla de Oficina|    Oficina| 150.00|            40|
|         12| Escrit

In [0]:
ventas_df.show()

+--------+----------+-----------+-----------+----------------+-----------+
|id_venta|id_cliente|id_producto|fecha_venta|cantidad_vendida|total_venta|
+--------+----------+-----------+-----------+----------------+-----------+
|       1|         1|          1| 2023-01-01|               2|    2401.00|
|       2|         1|          2| 2023-01-02|               1|     800.00|
|       3|         2|          3| 2023-01-03|               5|    2750.00|
|       4|         3|          4| 2023-01-04|               3|     225.00|
|       5|         3|          7| 2023-01-05|               1|      85.00|
|       6|         4|          9| 2023-01-06|               2|     240.00|
|       7|         5|          8| 2023-01-07|               4|     360.00|
|       8|         5|          6| 2023-01-08|               1|      35.00|
|       9|         6|         10| 2023-01-09|               3|     255.00|
|      10|         7|         11| 2023-01-10|               5|     750.00|
|      11|         8|    

## 3. Transformaciones Capa Silver
En esta sección, se crea el esquema para guardar las tablas delta tomando los datos de la capa bronce y observamos las respectivas tablas para modificar la estructura si es necesario.

In [0]:
%sql
create schema if not exists proyectofinal;

In [0]:
clientes_df.printSchema()

root
 |-- id_cliente: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- correo: string (nullable = true)
 |-- ciudad: string (nullable = true)
 |-- telefono: string (nullable = true)



In [0]:
clientes_df = clientes_df.withColumnRenamed("teléfono", "telefono")

In [0]:
dbutils.fs.rm("dbfs:/DATALAKE-PROYECTOFINAL/silver/data/CLIENTES_DELTA", recurse=True)

Out[15]: True

In [0]:
%sql
-- Creamos las tablas input
--Clientes
DROP TABLE IF EXISTS proyectofinal.clientes_delta;
CREATE OR REPLACE TABLE proyectofinal.clientes_delta
(
    id_cliente STRING,
    nombre STRING,
    correo STRING,
    ciudad STRING,
    telefono STRING
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/silver/data/CLIENTES_DELTA';

In [0]:
productos_df.printSchema()

root
 |-- id_producto: string (nullable = true)
 |-- nombre_producto: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- precio: string (nullable = true)
 |-- cantidad_stock: string (nullable = true)



In [0]:
productos_df = productos_df.withColumnRenamed("categoría", "categoria")

In [0]:
dbutils.fs.rm("dbfs:/DATALAKE-PROYECTOFINAL/silver/data/PRODUCTOS_DELTA", recurse=True)

Out[19]: True

In [0]:
%sql
-- Creamos las tablas input
--Productos
DROP TABLE IF EXISTS proyectofinal.productos_delta;
CREATE OR REPLACE TABLE proyectofinal.productos_delta
(
    id_producto STRING,
    nombre_producto STRING,
    categoria STRING,
    precio STRING,
    cantidad_stock STRING
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/silver/data/PRODUCTOS_DELTA';

In [0]:
ventas_df.printSchema()

root
 |-- id_venta: string (nullable = true)
 |-- id_cliente: string (nullable = true)
 |-- id_producto: string (nullable = true)
 |-- fecha_venta: string (nullable = true)
 |-- cantidad_vendida: string (nullable = true)
 |-- total_venta: string (nullable = true)



In [0]:
dbutils.fs.rm("dbfs:/DATALAKE-PROYECTOFINAL/silver/data/VENTAS_DELTA", recurse=True)

Out[22]: True

In [0]:
%sql
-- Creamos las tablas input
--Ventas
DROP TABLE IF EXISTS proyectofinal.ventas_delta;
CREATE OR REPLACE TABLE proyectofinal.ventas_delta
(
    id_venta STRING,
    id_cliente STRING,
    id_producto STRING,
    fecha_venta STRING,
    cantidad_vendida STRING,
    total_venta STRING
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/silver/data/VENTAS_DELTA';

In [0]:
%sql
--verificamos si se creo correctamente
SHOW TABLES IN proyectofinal;


database,tableName,isTemporary
proyectofinal,clientes_delta,False
proyectofinal,productos_delta,False
proyectofinal,ventas_delta,False


In [0]:
#Escritura en las tablas delta
clientes_df.write.mode("overwrite").saveAsTable("proyectofinal.clientes_delta")
productos_df.write.mode("overwrite").saveAsTable("proyectofinal.productos_delta")
ventas_df.write.mode("overwrite").saveAsTable("proyectofinal.ventas_delta")

In [0]:
df_clientes = spark.table("proyectofinal.clientes_delta")
df_productos = spark.table("proyectofinal.productos_delta")
df_ventas = spark.table("proyectofinal.ventas_delta")

df_clientes.show(5)
df_productos.show(5)
df_ventas.show(5)


+----------+-------------+--------------------+--------+---------+
|id_cliente|       nombre|              correo|  ciudad| telefono|
+----------+-------------+--------------------+--------+---------+
|         1| Carlos Perez|carlos.perez@gmai...|    Lima|987654321|
|         2|     Ana Ruiz|ana.ruiz@hotmail.com|Huancayo|987654322|
|         3|   Juan Gomez|juan.gomez@yahoo.com|Trujillo|987654323|
|         4|Lucia Sanchez|lucia.sanchez@gma...|Chiclayo|987654324|
|         5| Pedro Torres|pedro.torres@hotm...|Arequipa|987654325|
+----------+-------------+--------------------+--------+---------+
only showing top 5 rows

+-----------+-----------------+-----------+-------+--------------+
|id_producto|  nombre_producto|  categoria| precio|cantidad_stock|
+-----------+-----------------+-----------+-------+--------------+
|          1|       Laptop Pro|Electronica|1200.50|            50|
|          2|     Smartphone X|Electronica| 800.00|           150|
|          3|     Televisor HD|Electr

## 3. Transformaciones Capa Golden

Creamos Tablas para llenar con la logica final pedida

1. Ventas Totales por Producto
2. Productos y sus Precios con Ventas Realizadas
3. Clientes con la Mayor Cantidad de Compras
4. Productos con Baja Venta

In [0]:
%sql
--Tabla 1 :Ventas Totales por Producto
CREATE or REPLACE TABLE proyectofinal.ventas_totales_por_producto
(
  nombre_producto STRING,
  categoria STRING,
  total_cantidad_vendida BIGINT
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/golden/data/VENTAS_TOTALES_POR_PRODUCTO'


In [0]:
%sql
----Tabla 2: Productos y sus Precios con Ventas Realizadas
CREATE or REPLACE TABLE proyectofinal.productos_precios_ventas
(
  nombre_producto STRING,
  precio_producto decimal(20,2),
  cantidad_vendida BIGINT,
  ingresos_totales decimal(20,2)
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/golden/data/VENTAS_PRECIOS_VENTAS'


In [0]:
%sql
-- Tabla 3:  Clientes con la Mayor Cantidad de Compras
CREATE or REPLACE TABLE proyectofinal.clientes_mayor_compras
(
  nombre STRING,
  cantidad_compras BIGINT
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/golden/data/CLIENTES_MAYOR_COMPRAS'


In [0]:
%sql
-- Tabla 4:  Productos con Baja Venta
CREATE or REPLACE TABLE proyectofinal.productos_baja_venta
(
  nombre_producto STRING,
  categoria STRING,
  cantidad_vendida BIGINT
)
USING DELTA
LOCATION 'dbfs:/DATALAKE-PROYECTOFINAL/golden/data/PRODUCTOS_BAJA_VENTA'


## 1. Ventas Totales por Producto

### Enunciado:
Se requiere generar un reporte que detalle el total de ventas realizadas para cada producto. El reporte debe incluir los siguientes campos:

- **Nombre del Producto**  
- **Categoría**  
- **Total de Cantidad Vendida**  



In [0]:
from pyspark.sql.functions import  col ,sum ,count

In [0]:
ventas_totales_por_producto = df_ventas.join(df_productos,df_ventas["id_producto"]==df_productos["id_producto"])\
                                .groupBy('nombre_producto','categoria')\
                                .agg(sum(col("cantidad_vendida").cast("int")).alias("total_cantidad_vendida"))

In [0]:
display(ventas_totales_por_producto)

nombre_producto,categoria,total_cantidad_vendida
Zapatillas Runner,Deportes,5
Camara Reflex,Electronica,8
Monitor LED,Electronica,1
Tablet 10,Electronica,3
Zapatos Elegantes,Ropa,1
Smartphone X,Electronica,2
Teclado Mecanico,Electronica,2
Impresora Multifuncion,Oficina,5
Laptop Pro,Electronica,5
Sombrero de Paja,Accesorios,3


## 2. Productos y sus Precios con Ventas Realizadas

### Enunciado:
Se requiere un reporte que detalle los productos junto con su precio, la cantidad vendida y los ingresos totales generados por cada producto. El reporte debe incluir los siguientes campos:

- **Nombre del Producto**  
- **Precio del Producto**  
- **Cantidad Vendida**  
- **Total de Ingresos Generados** (cantidad vendida * precio)

In [0]:
productos_precios_ventas = df_productos.join(df_ventas, df_productos["id_producto"] == df_ventas["id_producto"])\
                            .groupBy("Nombre_producto", "precio")\
                            .agg(sum(col("cantidad_vendida").cast("int")).alias("cantidad_vendida"))\
                            .withColumn("total_ingresos", col("precio").cast("decimal(10,2)") * col("cantidad_vendida"))

In [0]:
display(productos_precios_ventas)

Nombre_producto,precio,cantidad_vendida,total_ingresos
Cartera de Cuero,120.0,4,480.0
Lentes de Sol,20.0,10,200.0
Camara de Seguridad,150.0,4,600.0
Zapatillas Runner,75.0,5,375.0
Mousepad XL,20.0,2,40.0
Chaqueta de Invierno,100.0,6,600.0
Silla de Oficina,150.0,7,1050.0
Impresora Multifuncion,300.0,5,1500.0
Cargador Rapido,25.0,6,150.0
Reloj Deportivo,50.0,8,400.0


## 3. Clientes con la Mayor Cantidad de Compras

### Enunciado:
Generar un reporte que identifique a los clientes que han realizado el mayor número de compras. El reporte debe incluir los siguientes campos:

- **Nombre del Cliente**  
- **Cantidad de Compras Realizadas** 

In [0]:
clientes_mayor_compras = df_clientes.join(df_ventas, df_clientes['id_cliente'] == df_ventas['id_cliente'])\
                            .groupBy('nombre')\
                            .agg(count('nombre').alias('cantidad_compras'))

In [0]:
display(clientes_mayor_compras)

nombre,cantidad_compras
Diego Vargas,1
Santiago Reyes,1
Juan Gomez,3
Fernando Silva,2
Felipe Campos,2
Roberto Fernandez,2
Emilio Cortes,1
Paola Gutierrez,1
Jorge Aguilar,1
Oscar Ramos,2


##4. Productos con Baja Venta

### Enunciado:
Se requiere un reporte que identifique los productos con baja venta, es decir, aquellos con una cantidad total de ventas menor a 15. El reporte debe incluir los siguientes campos:

- **Nombre del Producto**  
- **Categoría**  
- **Cantidad Vendida**  

In [0]:
productos_baja_venta = ventas_totales_por_producto.withColumnRenamed("total_cantidad_vendida", "cantidad_vendida")\
    .filter(col("cantidad_vendida") < 15) 


In [0]:
display(productos_baja_venta)

nombre_producto,categoria,cantidad_vendida
Zapatillas Runner,Deportes,5
Camara Reflex,Electronica,8
Monitor LED,Electronica,1
Tablet 10,Electronica,3
Zapatos Elegantes,Ropa,1
Smartphone X,Electronica,2
Teclado Mecanico,Electronica,2
Impresora Multifuncion,Oficina,5
Laptop Pro,Electronica,5
Sombrero de Paja,Accesorios,3


## Escritura de Tablas Resultantes


In [0]:
ventas_totales_por_producto.write.mode("overwrite").saveAsTable("proyectofinal.ventas_totales_por_producto")
productos_precios_ventas.write.mode("overwrite").saveAsTable("proyectofinal.productos_precios_ventaso")
clientes_mayor_compras.write.mode("overwrite").saveAsTable("proyectofinal.clientes_mayor_compras")
productos_baja_venta.write.mode("overwrite").saveAsTable("proyectofinal.productos_baja_venta")

Espero que el proyecto sea de su agrado y cualquier recomendacion sera agradecido. Gracias

**Desarrollado por: Yhomira Alexandra Yupayccana Lopa**  
