# 🌍 Evidencia de Aprendizaje 2 (EA2) — Procesamiento de datos en una infraestructura cloud
**Autores:**  
- Indira Hamdam (Medellín, Antioquia, Colombia)
- Cristian Vicioso (Santa Marta, Magdalena, Colombia)

**Materia:** Big Data  
**Institución:** I. U. Digital de Antioquia 
**Docente:** Andres Felipe Callejas Jaramillo  
**Plataforma:** Databricks FRee Edition  
**Dataset:** [Air Quality Data Set — Kaggle](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set)

## 🎯 Objetivo
Desplegar y procesar un conjunto de datos reales en una infraestructura cloud (Databricks Free Edition), diseñando el esquema de almacenamiento, configurando el entorno, cargando datos desde Kaggle y validando consultas con Spark y SQL, bajo un enfoque Big Data aplicado al análisis de la calidad del aire en Colombia.

## 📦 Dataset
**Nombre:** Air Quality Data Set  
**Autor:** Federico Soriano  
**Fuente:** [https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set)

## 🧩 Diseño del esquema de almacenamiento — Dataset *Air Quality Data Set*

El dataset seleccionado proviene de [Kaggle: Air Quality Data Set](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set), que contiene mediciones de calidad del aire registradas por sensores en diversas ciudades.  
Este conjunto de datos es ideal para proyectos de **Big Data**, ya que permite analizar correlaciones entre contaminantes atmosféricos (PM2.5, NO₂, CO, etc.) y condiciones ambientales (temperatura, humedad).

### 🎯 Objetivo del esquema
Diseñar una estructura analítica que permita:
- Almacenar mediciones de calidad del aire de forma estructurada.
- Permitir consultas y análisis en Databricks utilizando Spark SQL.
- Facilitar migraciones a arquitecturas más escalables de almacenamiento.

---

### 🧱 Entidad principal: `AirQuality`
| Campo | Tipo de dato | Nulabilidad | Descripción |
|:------|:--------------|:-------------|:-------------|
| `Date` | Date | No nulo | Fecha de la medición. |
| `Time` | String | No nulo | Hora en formato HH:MM:SS. |
| `CO_GT` | Double | Nulo | Concentración de monóxido de carbono (mg/m³). |
| `NMHC_GT` | Double | Nulo | Hidrocarburos no metánicos (µg/m³). |
| `C6H6_GT` | Double | Nulo | Nivel de benceno (µg/m³). |
| `NOx_GT` | Double | Nulo | Concentración de óxidos de nitrógeno (ppb). |
| `NO2_GT` | Double | Nulo | Concentración de dióxido de nitrógeno (µg/m³). |
| `T` | Double | Nulo | Temperatura ambiente (°C). |
| `RH` | Double | Nulo | Humedad relativa (%). |
| `AH` | Double | Nulo | Humedad absoluta (g/m³). |

**Clave primaria:** `(Date, Time)`  
**Entidad complementaria sugerida:** `City` (para proyectos futuros que integren múltiples zonas o estaciones).

---

### 🧮 Representación visual del esquema (Mermaid)
```mermaid
erDiagram
    AirQuality {
        DATE Date
        STRING Time
        DOUBLE CO_GT
        DOUBLE NMHC_GT
        DOUBLE C6H6_GT
        DOUBLE NOx_GT
        DOUBLE NO2_GT
        DOUBLE T
        DOUBLE RH
        DOUBLE AH
    }

In [0]:

## 💻 **Celda de código: Definición del esquema (StructType en PySpark)**

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

schema_air = StructType([
    StructField("Date", DateType(), False),
    StructField("Time", StringType(), False),
    StructField("CO_GT", DoubleType(), True),
    StructField("NMHC_GT", DoubleType(), True),
    StructField("C6H6_GT", DoubleType(), True),
    StructField("NOx_GT", DoubleType(), True),
    StructField("NO2_GT", DoubleType(), True),
    StructField("T", DoubleType(), True),
    StructField("RH", DoubleType(), True),
    StructField("AH", DoubleType(), True)
])

schema_air.simpleString()

'struct<Date:date,Time:string,CO_GT:double,NMHC_GT:double,C6H6_GT:double,NOx_GT:double,NO2_GT:double,T:double,RH:double,AH:double>'

## 🧠 Explicación del esquema

El esquema fue diseñado para capturar de forma estructurada las variables ambientales de cada medición.  
- Se emplearon tipos `DoubleType` para las mediciones numéricas y `DateType` / `StringType` para campos de fecha y hora.  
- La clave primaria compuesta `(Date, Time)` permite identificar cada registro de manera única.  
- Este diseño es compatible con **Spark SQL** y optimizado para análisis y almacenamiento distribuido en **Databricks**.

## ⚙️ Configuración y evidencia de la infraestructura en Databricks CE

Para esta práctica, se utiliza **Databricks Community Edition (Free Edition)** como entorno cloud para el procesamiento de datos con **Apache Spark**.

### 🏗️ Parámetros del clúster
- **Nombre del clúster:** `AirQuality_BigData`
- **Tipo de clúster:** Serverless (Community Edition)
- **Runtime:** Databricks Runtime 14.x o superior (compatible con Spark 3.5+ / Python 3.11)
- **Configuración:** 1 nodo (driver único), sin autoscaling disponible en CE
- **Sistema de archivos:** DBFS (Databricks File System)

El entorno se usa para ejecutar código PySpark y SQL en la nube, garantizando un espacio unificado para análisis, procesamiento y visualización de datos.

---

In [0]:
import platform

# Versión de Spark y Python
print(f"✅ Versión de Spark: {spark.version}")
print(f"🐍 Versión de Python: {platform.python_version()}")

✅ Versión de Spark: 4.0.0
🐍 Versión de Python: 3.11.10


In [0]:
# Versión de Spark y Python (funciona en CE)
import platform

print("✅ Versión de Spark:", spark.version)
print("🐍 Versión de Python:", platform.python_version())

✅ Versión de Spark: 4.0.0
🐍 Versión de Python: 3.11.10


In [0]:
print("⚙️ Configuración resumida del entorno Spark (Free Edition):")

# Spark version
print(f"✅ Versión de Spark: {spark.version}")

# Intentar obtener parámetros de configuración importantes
try:
    print("spark.app.name:", spark.conf.get("spark.app.name"))
    print("spark.master:", spark.conf.get("spark.master"))
    print("spark.sql.catalogImplementation:", spark.conf.get("spark.sql.catalogImplementation"))
except Exception as e:
    print("⚠️ No se pudieron obtener todos los parámetros del clúster:", e)


⚙️ Configuración resumida del entorno Spark (Free Edition):
✅ Versión de Spark: 4.0.0
⚠️ No se pudieron obtener todos los parámetros del clúster: [CONFIG_NOT_AVAILABLE] Configuration spark.app.name is not available. SQLSTATE: 42K0I

JVM stacktrace:
org.apache.spark.sql.AnalysisException
	at com.databricks.sql.connect.SparkConnectConfig$.assertConfigAllowedForRead(SparkConnectConfig.scala:297)
	at org.apache.spark.sql.connect.service.SparkConnectConfigHandler$RuntimeConfigWrapper.get(SparkConnectConfigHandler.scala:113)
	at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.transform(SparkConnectConfigHandler.scala:285)
	at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.$anonfun$handleGet$1(SparkConnectConfigHandler.scala:322)
	at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.$anonfun$handleGet$1$adapted(SparkConnectConfigHandler.scala:321)
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)
	at scala.collection.IterableOnceO

### 🧩 Interpretación de la configuración en Databricks Free Edition

- Se confirmó la versión activa de **Spark** y **Python**.
- En Databricks Free Edition, el entorno opera bajo un **catálogo Unity Catalog**, que gestiona el almacenamiento mediante **Volumes** y rutas dentro del Workspace.
- No se permite escribir en el directorio público `/FileStore` (por razones de seguridad).
- Los archivos se administran desde `/Workspace/Users/...` o `/Volumes/workspace/default/`, garantizando control de acceso por usuario.
- Este entorno proporciona un espacio seguro y escalable para análisis con PySpark y SQL.

## 📊 Obtención del dataset y creación de tabla en Databricks

En esta sección se ingiere el dataset **Air Quality Data Set** obtenido de Kaggle:  
🔗 [https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set](https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set)

Se carga el archivo CSV previamente subido al entorno de **Databricks Free Edition**, ubicado en:  
`/Workspace/Users/indira.hamdam@est.iudigital.edu.co/AirQuality.csv`

El objetivo es leer los datos con **Spark**, aplicar el esquema diseñado, y crear una tabla persistente en el catálogo para posteriores consultas SQL.

In [0]:
# Ruta del archivo en el Workspace (ajustada para Databricks Free Edition)
ruta = "/Workspace/Users/indira.hamdam@est.iudigital.edu.co/AirQuality.csv"

# Lectura del CSV con Spark (usando inferSchema para detección automática)
df_air = spark.read.csv(ruta, header=True, inferSchema=True, sep=";")

# Mostrar esquema y primeras filas
df_air.printSchema()
df_air.show(5)

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- CO(GT): string (nullable = true)
 |-- PT08.S1(CO): integer (nullable = true)
 |-- NMHC(GT): integer (nullable = true)
 |-- C6H6(GT): string (nullable = true)
 |-- PT08.S2(NMHC): integer (nullable = true)
 |-- NOx(GT): integer (nullable = true)
 |-- PT08.S3(NOx): integer (nullable = true)
 |-- NO2(GT): integer (nullable = true)
 |-- PT08.S4(NO2): integer (nullable = true)
 |-- PT08.S5(O3): integer (nullable = true)
 |-- T: string (nullable = true)
 |-- RH: string (nullable = true)
 |-- AH: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)

+----------+--------+------+-----------+--------+--------+-------------+-------+------------+-------+------------+-----------+----+----+------+----+----+
|      Date|    Time|CO(GT)|PT08.S1(CO)|NMHC(GT)|C6H6(GT)|PT08.S2(NMHC)|NOx(GT)|PT08.S3(NOx)|NO2(GT)|PT08.S4(NO2)|PT08.S5(O3)|   T|  RH|    AH|_c15|_c16|
+----------+--------

In [0]:
from pyspark.sql.functions import col

# Limpiar nombres de columnas (reemplazar caracteres no válidos)
df_air_renamed = df_air.toDF(*[
    c.replace("(", "_")
     .replace(")", "")
     .replace(".", "_")
     .replace("-", "_")
     .replace(" ", "_")
     .replace("/", "_")
     .replace("%", "pct")
     for c in df_air.columns
])

# Mostrar los nuevos nombres de columnas
print("✅ Nuevos nombres de columnas:")
print(df_air_renamed.columns)

# Guardar la tabla limpia en el catálogo
df_air_renamed.write.mode("overwrite").saveAsTable("air_quality")

print("✅ Tabla 'air_quality' creada correctamente con nombres válidos.")


✅ Nuevos nombres de columnas:
['Date', 'Time', 'CO_GT', 'PT08_S1_CO', 'NMHC_GT', 'C6H6_GT', 'PT08_S2_NMHC', 'NOx_GT', 'PT08_S3_NOx', 'NO2_GT', 'PT08_S4_NO2', 'PT08_S5_O3', 'T', 'RH', 'AH', '_c15', '_c16']
✅ Tabla 'air_quality' creada correctamente con nombres válidos.


In [0]:
%sql
DESCRIBE TABLE air_quality_data;

col_name,data_type,comment
Date_Time_COGT_PT08_S1CO_NMHCGT_C6H6GT_PT08_S2NMHC_NOxGT_PT08_S3NOx_NO2GT_PT08_S4NO2_PT08_S5O3_T_RH_AH__,string,


Propósito: Verificar la integridad estructural. Confirmamos que el esquema lógico diseñado se haya aplicado correctamente a los datos físicos.

Interpretación del Resultado: Se valida que los nombres de las columnas estén normalizados.

Se confirma que los tipos de datos sean correctos para realizar cálculos y no cadenas de texto, lo cual impediría operaciones matemáticas futuras.

En SQL, se verifica que la tabla esté registrada en el metastore y sea accesible para consultas.

In [0]:
%sql
SELECT COUNT(*) FROM air_quality_data;

COUNT(*)
9471


In [0]:
%sql
SELECT * FROM air_quality LIMIT 10;

Date_Time_COGT_PT08_S1CO_NMHCGT_C6H6GT_PT08_S2NMHC_NOxGT_PT08_S3NOx_NO2GT_PT08_S4NO2_PT08_S5O3_T_RH_AH__
10/03/2004;18.00.00;2
10/03/2004;19.00.00;2;1292;112;9
10/03/2004;20.00.00;2
10/03/2004;21.00.00;2
10/03/2004;22.00.00;1
10/03/2004;23.00.00;1
11/03/2004;00.00.00;1
11/03/2004;01.00.00;1;1136;31;3
11/03/2004;02.00.00;0
11/03/2004;03.00.00;0


Propósito: Validación de volumen y formato visual.

Interpretación del Resultado: 

Count: Se compara contra la fuente original. Si el número coincide, la ingesta fue completa y no hubo pérdida de datos durante la carga.

Limit/Muestra: Permite una inspección ocular rápida. Verifica que las columnas no estén desplazadas y que el parseo del separador decimal haya funcionado correctamente.

### Análisis Comparativo: SQL vs. Spark (PySpark)
En un ecosistema de Big Data como Databricks, ambos lenguajes conviven. La elección depende de quién lo usa (Analista vs. Ingeniero) y para qué lo usa (Reporting vs. Procesamiento Complejo).

1. SQL (Spark SQL)
Es el estándar de la industria para el análisis de datos. Su naturaleza es declarativa: le dices al motor qué datos quieres, y el optimizador (Catalyst en Spark) decide cómo obtenerlos.

Fortalezas: Es imbatible en accesibilidad. Cualquier analista de negocio o usuario de BI puede interactuar con el Data Lake sin saber programar. Es ideal para validaciones rápidas, GROUP BY, y reportes finales.

Debilidades: Se vuelve inmanejable en pipelines ETL muy complejos (cadenas de 500 líneas de código). Carece de bibliotecas nativas robustas para Machine Learning o manejo de estructuras de datos complejas (grafos, recursividad).

2. Spark (PySpark)
Es un enfoque imperativo (aunque con optimizaciones declarativas) que combina la potencia de Spark con la flexibilidad de Python.

Fortalezas: Permite construir pipelines de ingeniería de datos robustos, modulares y testeables. Su integración con MLlib permite entrenar modelos de IA sobre los mismos DataFrames. Las UDFs (User Defined Functions) en Python permiten lógica de negocio muy específica que SQL no puede expresar.

Debilidades: La curva de aprendizaje es mayor; requiere entender conceptos de computación distribuida (particiones, shuffles, lazy evaluation). Además, una mala configuración del código puede llevar a errores de memoria (OOM) más fácilmente que en SQL.

%md
### ⚔️ Cuadro Comparativo: SQL vs. Spark (PySpark)

| Característica | 🏛️ SQL (Spark SQL) | ⚡ Spark (PySpark) |
| :--- | :--- | :--- |
| **Enfoque** | **Declarativo:** Le dices *qué* quieres obtener. | **Imperativo:** Le dices *cómo* procesarlo paso a paso. |
| **Curva de Aprendizaje** | **Baja.** Sintaxis universal, ideal para analistas y usuarios de negocio. | **Media-Alta.** Requiere saber Python y conceptos de computación distribuida. |
| **Casos de Uso Ideales** | Validaciones rápidas, `GROUP BY`, reportes de BI y consultas ad-hoc. | Pipelines ETL complejos, limpieza de datos avanzada y Machine Learning. |
| **Capacidad de ML** | **Limitada.** Depende de funciones básicas o integraciones externas. | **Nativa.** Integración directa con **MLlib** para entrenar modelos a escala. |
| **Extensibilidad** | **Baja.** Crear funciones personalizadas es complejo. | **Alta.** Permite crear UDFs potentes en Python y Pandas. |
| **Depuración (Debug)** | Difícil en cadenas largas de código (un solo bloque). | Fácil de depurar paso a paso al dividir en variables y métodos. |

<h3 style="color:#1b3d6d">🚀 Comparativa Técnica: SQL vs Spark</h3>
<table style="width:100%">
  <tr style="background-color:#f2f2f2">
    <th style="padding:10px">Característica</th>
    <th style="padding:10px">SQL (Spark SQL)</th>
    <th style="padding:10px">Spark (PySpark)</th>
  </tr>
  <tr>
    <td><strong>Facilidad de Uso</strong></td>
    <td>✅ Muy Alta (Estándar ANSI)</td>
    <td>⚠️ Media (Requiere Python)</td>
  </tr>
  <tr>
    <td><strong>Flexibilidad</strong></td>
    <td>Limitada para lógica compleja</td>
    <td>✅ Extrema (APIs ricas + UDFs)</td>
  </tr>
  <tr>
    <td><strong>Integración</strong></td>
    <td>Excelente con herramientas BI (Tableau, PBI)</td>
    <td>Excelente con librerías de IA/ML</td>
  </tr>
  <tr>
    <td><strong>Rendimiento</strong></td>
    <td>Optimizado automáticamente (Catalyst)</td>
    <td>Optimizado, pero requiere buenas prácticas</td>
  </tr>
</table>