## Validar que el archivo cliente.csv se encuentre en raw_data

In [0]:
# Listar archivos en la carpeta Bronze
display(dbutils.fs.ls("dbfs:/FileStore/Delta_Lake/deltalake/bronze/raw_data/"))


path,name,size,modificationTime
dbfs:/FileStore/Delta_Lake/deltalake/bronze/raw_data/clientes.csv,clientes.csv,3573,1742508749000


##Cargar el Archivo en un DataFrame en **Databricks** (Bronze)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Definir el esquema 
schema = StructType([
    StructField("id_cliente", IntegerType(), True),
    StructField("nombre", StringType(), True),
    StructField("pais", StringType(), True),  
    StructField("edad", IntegerType(), True),
    StructField("genero", StringType(), True)  
])

# Leer el archivo CSV con el esquema 
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("dbfs:/FileStore/Delta_Lake/deltalake/bronze/raw_data/clientes.csv")

# Guardar en la capa Bronze con la opción de sobrescribir esquema
df_bronze.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("dbfs:/FileStore/Delta_Lake/deltalake/bronze/clientes")  


## Verificar que los Datos se Guardaron Correctamente

In [0]:
# Leer los datos desde la capa Bronze
df_bronze = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/bronze/clientes")

# Mostrar el esquema de los datos
df_bronze.printSchema()

# Mostrar las primeras filas de los datos
df_bronze.show(10)


root
 |-- id_cliente: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- genero: string (nullable = true)

+----------+---------------+--------------------+----+------+
|id_cliente|         nombre|                pais|edad|genero|
+----------+---------------+--------------------+----+------+
|         1|    James Hayes|             Bolivia|  33|     F|
|         2|   Lucas Morgan|Saint Vincent and...|  39|     F|
|         3|Jennifer Garcia|Saint Vincent and...|  51|     M|
|         4|  Edward Watson|          Luxembourg|  18|     F|
|         5|   Haley Morris|        South Africa|  23|     F|
|         6|    Carl Foster|               Italy|  20|     M|
|         7|    Donna Lucas|           Indonesia|  61|     F|
|         8|     Kyle Mccoy|             Vanuatu|  26|     M|
|         9|     Marie Diaz|         Philippines|  55|     F|
|        10|   Angela Jones|             Namibia|  66|    

## Transformación para y guardarlos en la Capa Silver
- Eliminar registros con valores vacíos en cualquier columna.
- Reemplazar 'F' por 'Femenino' y 'M' por 'Masculino' en la columna genero

In [0]:
from pyspark.sql.functions import col, when

# Cargar los datos desde la capa Bronze
df_silver = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/bronze/clientes")

# Eliminar registros con valores vacíos en cualquier columna
df_silver = df_silver.dropna()

# Reemplazar 'F' por 'Femenino' y 'M' por 'Masculino'
df_silver = df_silver.withColumn("genero",
                  when(col("genero") == "F", "Femenino")
                  .when(col("genero") == "M", "Masculino")
                  .otherwise(col("genero")))

# Guardar los datos transformados en la capa Silver
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("dbfs:/FileStore/Delta_Lake/deltalake/silver/clientes")


### Verificar los Datos en Silver

In [0]:
# Leer los datos desde la capa Silver
df_silver = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/silver/clientes")

# Mostrar el esquema y los primeros registros
df_silver.printSchema()
df_silver.show(5)


root
 |-- id_cliente: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- genero: string (nullable = true)

+----------+---------------+--------------------+----+---------+
|id_cliente|         nombre|                pais|edad|   genero|
+----------+---------------+--------------------+----+---------+
|         1|    James Hayes|             Bolivia|  33| Femenino|
|         2|   Lucas Morgan|Saint Vincent and...|  39| Femenino|
|         3|Jennifer Garcia|Saint Vincent and...|  51|Masculino|
|         4|  Edward Watson|          Luxembourg|  18| Femenino|
|         5|   Haley Morris|        South Africa|  23| Femenino|
+----------+---------------+--------------------+----+---------+
only showing top 5 rows



## Definir las Transformaciones en la Capa Gold

In [0]:
from pyspark.sql.functions import avg, count

# Cargar los datos desde la capa Silver
df_gold = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/silver/clientes")

# 1️.Calcular el promedio de edad por país
df_promedio_edad = df_gold.groupBy("pais").agg(avg("edad").alias("edad_promedio"))

# 2️. Calcular la cantidad de clientes por género
df_conteo_genero = df_gold.groupBy("genero").agg(count("id_cliente").alias("total_clientes"))

# Guardar los datos transformados en la capa Gold
df_promedio_edad.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("dbfs:/FileStore/Delta_Lake/deltalake/gold/promedio_edad")

df_conteo_genero.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("dbfs:/FileStore/Delta_Lake/deltalake/gold/conteo_genero")


### Verificación de datos en la Capa Gold

### Ver Promedio de Edad por País

In [0]:
df_promedio_edad = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/gold/promedio_edad")
df_promedio_edad.show()


+--------------------+------------------+
|                pais|     edad_promedio|
+--------------------+------------------+
|            Kiribati|              65.0|
|French Southern T...|              59.0|
|         Philippines|              55.0|
|              Jersey|              57.0|
|           Singapore|              43.0|
|                Fiji|24.666666666666668|
|Northern Mariana ...|              29.0|
|            Maldives|              28.0|
|               Sudan|              49.0|
|               Palau|              51.0|
|              France|              57.0|
|              Greece|              21.0|
|           Sri Lanka|              24.0|
|             Algeria|              44.0|
|             Reunion|              53.0|
|           Argentina|              29.5|
|             Belgium|              29.0|
|          San Marino|              40.0|
|             Ecuador|              29.0|
|               Qatar|              40.0|
+--------------------+------------

### Ver Conteo de Clientes por Género

In [0]:
df_conteo_genero = spark.read.format("delta").load("dbfs:/FileStore/Delta_Lake/deltalake/gold/conteo_genero")
df_conteo_genero.show()


+---------+--------------+
|   genero|total_clientes|
+---------+--------------+
| Femenino|            46|
|Masculino|            54|
+---------+--------------+

