# **Paso 5: Integración de la base de datos**

In [0]:
# Diccionario con las rutas de los archivos Parquet
file_paths = {
    "city_day": "/FileStore/shared_uploads/ismaelarista.28@gmail.com/refined_city_day.parquet",
    "city_hour": "/FileStore/shared_uploads/ismaelarista.28@gmail.com/refined_city_hour.parquet",
    "station_day": "/FileStore/shared_uploads/ismaelarista.28@gmail.com/refined_station_day.parquet",
    "station_hour": "/FileStore/shared_uploads/ismaelarista.28@gmail.com/refined_station_hour.parquet",
    "stations": "/FileStore/shared_uploads/ismaelarista.28@gmail.com/refined_stations.parquet"
}

# Leer los archivos Parquet y registrar tablas temporales
for table_name, file_path in file_paths.items():
    # Leer el archivo Parquet
    df = spark.read.parquet(file_path)
    
    # Mostrar una muestra del DataFrame para verificar
    print(f"Data preview for {table_name}:")
    df.show(5)
    
    # Registrar como tabla temporal
    df.createOrReplaceTempView(table_name)
    print(f"Table '{table_name}' registered successfully.")


Data preview for city_day:
+---------+----------+-----+-----+----+-----+-----+-----+----+-----+------+-------+-------+-----+-----------+
|     City|      Date|PM2.5| PM10|  NO|  NO2|  NOx|  NH3|  CO|  SO2|    O3|Benzene|Toluene|  AQI| AQI_Bucket|
+---------+----------+-----+-----+----+-----+-----+-----+----+-----+------+-------+-------+-----+-----------+
|Ahmedabad|2015-01-01|48.57|95.68|0.92|18.22|17.15|15.85|0.92|27.64|133.36|    0.0|   0.02|118.0|Desconocido|
|Ahmedabad|2015-01-02|48.57|95.68|0.97|15.69|16.46|15.85|0.97|24.55| 34.06|   3.68|    5.5|118.0|Desconocido|
|Ahmedabad|2015-01-03|48.57|95.68|17.4| 19.3| 29.7|15.85|17.4|29.07|  30.7|    6.8|   16.4|118.0|Desconocido|
|Ahmedabad|2015-01-04|48.57|95.68| 1.7|18.48|17.97|15.85| 1.7|18.59| 36.08|   4.43|  10.14|118.0|Desconocido|
|Ahmedabad|2015-01-05|48.57|95.68|22.1|21.42|37.76|15.85|22.1|39.33| 39.31|   7.01|  18.89|118.0|Desconocido|
+---------+----------+-----+-----+----+-----+-----+-----+----+-----+------+-------+-------+--

# **Paso 6: Análisis de datos y elaboración de informes**
**Explicaciones de los reportes**
- 1. Calidad del aire por ciudades
- 2. Calidad del aire por temporadas
- 3. Contaminación por hora del día
- 4. Calidad del aire por estados y ciudades
- 5. Ciudades con peor calidad del aire (filtro AQI > 100)
- 6. Temporadas con mayor contaminación por PM2.5
- 7. Horas críticas con AQI > 150
- 8. Estados con mayor contaminación por PM2.5 y PM10
- 9. Días críticos con AQI extremo (> 200)
- 10. Ciudades con peor calidad del aire por temporada: Identifica ciudades críticas en cada estación del año.
- 11. Estados con niveles de PM2.5 altos por hora del día: Examina cómo cambia la contaminación según la hora.
- 12. Promedio de AQI diario con filtro de contaminantes: Analiza días específicos con alta contaminación.
- 13. Temporadas críticas por estado: Agrupa la calidad del aire en estados por estaciones.
- 14. Días críticos por estado y contaminante: Días con niveles peligrosos de PM2.5 por estado.
- 15. Estados con AQI alto en horas pico: Analiza las horas pico más críticas en los estados.

In [0]:
# ==============================
# 1. Calidad del aire por ciudades
# ==============================
# Muestra el promedio del AQI (Air Quality Index) por ciudad, ordenado de mayor a menor.
spark.sql("""
    SELECT 
        City, 
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        City
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 2. Calidad del aire por temporadas
# ==============================
# Agrupa las estaciones del año (invierno, primavera, verano, otoño)
# y calcula el promedio del AQI para cada una.
spark.sql("""
    SELECT 
        CASE 
            WHEN MONTH(Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        Season
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 3. Contaminación por hora del día
# ==============================
# Analiza la contaminación promedio de PM2.5 y PM10 por hora del día.
spark.sql("""
    SELECT 
        HOUR(Datetime) AS Hour,
        AVG(`PM2.5`) AS Avg_PM25,
        AVG(PM10) AS Avg_PM10
    FROM 
        city_hour
    GROUP BY 
        Hour
    ORDER BY 
        Avg_PM25 DESC
""").show()

# ==============================
# 4. Calidad del aire por estados y ciudades
# ==============================
# Combina las tablas de estaciones y ciudades para calcular el promedio
# del AQI por estado y ciudad, ordenado de mayor a menor.
spark.sql("""
    SELECT 
        s.State, 
        c.City, 
        AVG(c.AQI) AS Avg_AQI
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    GROUP BY 
        s.State, c.City
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 5. Ciudades con peor calidad del aire (filtro AQI > 100)
# ==============================
# Muestra únicamente las ciudades donde el promedio del AQI supera 100.
spark.sql("""
    SELECT 
        City, 
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        City
    HAVING 
        Avg_AQI > 100
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 6. Temporadas con mayor contaminación por PM2.5
# ==============================
# Calcula el promedio de PM2.5 en cada estación del año.
spark.sql("""
    SELECT 
        CASE 
            WHEN MONTH(Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(`PM2.5`) AS Avg_PM25
    FROM 
        city_day
    WHERE 
        `PM2.5` IS NOT NULL
    GROUP BY 
        Season
    ORDER BY 
        Avg_PM25 DESC
""").show()

# ==============================
# 7. Horas críticas con AQI > 150
# ==============================
# Muestra las horas del día en las que el AQI promedio es mayor a 150.
spark.sql("""
    SELECT 
        HOUR(Datetime) AS Hour,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_hour
    WHERE 
        AQI > 150
    GROUP BY 
        Hour
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 8. Estados con mayor contaminación por PM2.5 y PM10
# ==============================
# Muestra el promedio de PM2.5 y PM10 por estado, basado en las estaciones.
spark.sql("""
    SELECT 
        s.State, 
        AVG(c.`PM2.5`) AS Avg_PM25, 
        AVG(c.PM10) AS Avg_PM10
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    WHERE 
        c.`PM2.5` IS NOT NULL AND c.PM10 IS NOT NULL
    GROUP BY 
        s.State
    ORDER BY 
        Avg_PM25 DESC, Avg_PM10 DESC
""").show()

# ==============================
# 9. Días críticos con AQI extremo (> 200)
# ==============================
# Identifica los días con los valores más altos de AQI (mayor a 200).
spark.sql("""
    SELECT 
        Date, 
        City, 
        MAX(AQI) AS Max_AQI
    FROM 
        city_day
    GROUP BY 
        Date, City
    HAVING 
        Max_AQI > 200
    ORDER BY 
        Max_AQI DESC
""").show()

# ==============================
# 1. Ciudades con peor calidad del aire por temporada
# ==============================
# Este reporte muestra las ciudades con el AQI más alto (> 100),
# desglosado por las estaciones del año (Winter, Spring, Summer, Autumn).
spark.sql("""
    SELECT 
        City,
        CASE 
            WHEN MONTH(Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        City, Season
    HAVING 
        Avg_AQI > 100 -- Filtro de AQI alto
    ORDER BY 
        Season, Avg_AQI DESC
""").show()

# ==============================
# 2. Estados con niveles de PM2.5 más altos por hora del día
# ==============================
# Este reporte identifica los estados donde los niveles promedio de PM2.5
# son más altos, agrupados por hora del día.
spark.sql("""
    SELECT 
        s.State,
        HOUR(ch.Datetime) AS Hour,
        AVG(ch.`PM2.5`) AS Avg_PM25
    FROM 
        stations s
    JOIN 
        city_hour ch ON s.City = ch.City
    WHERE 
        ch.`PM2.5` IS NOT NULL
    GROUP BY 
        s.State, Hour
    ORDER BY 
        Avg_PM25 DESC
""").show()

# ==============================
# 3. Promedio de AQI por día y ciudad con filtro de contaminantes
# ==============================
# Este reporte analiza el promedio diario del AQI en ciudades específicas,
# considerando solo días con altos niveles de PM2.5 (> 75) y PM10 (> 100).
spark.sql("""
    SELECT 
        Date, 
        City,
        AVG(AQI) AS Avg_AQI,
        AVG(`PM2.5`) AS Avg_PM25,
        AVG(PM10) AS Avg_PM10
    FROM 
        city_day
    WHERE 
        `PM2.5` > 75 AND PM10 > 100 -- Días con contaminantes altos
    GROUP BY 
        Date, City
    ORDER BY 
        Avg_AQI DESC
""").show()

# ==============================
# 4. Temporadas críticas por estado
# ==============================
# Este reporte analiza el comportamiento del AQI promedio en los estados,
# agrupado por las estaciones del año.
spark.sql("""
    SELECT 
        s.State,
        CASE 
            WHEN MONTH(c.Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(c.Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(c.Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(c.Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(c.AQI) AS Avg_AQI
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    GROUP BY 
        s.State, Season
    ORDER BY 
        Season, Avg_AQI DESC
""").show()

# ==============================
# 5. Días con mayor contaminación por estado y contaminante
# ==============================
# Este reporte identifica los días más críticos por estado,
# analizando los niveles máximos de PM2.5 (> 100).
spark.sql("""
    SELECT 
        s.State,
        c.Date,
        MAX(c.`PM2.5`) AS Max_PM25
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    WHERE 
        c.`PM2.5` IS NOT NULL
    GROUP BY 
        s.State, c.Date
    HAVING 
        Max_PM25 > 100 -- Días críticos por PM2.5
    ORDER BY 
        Max_PM25 DESC
""").show()

# ==============================
# 6. Estados con peor calidad del aire en horas pico
# ==============================
# Este reporte muestra los estados con el AQI promedio más alto
# durante las horas pico (7 AM - 9 AM y 5 PM - 7 PM).
spark.sql("""
    SELECT 
        s.State,
        HOUR(ch.Datetime) AS Hour,
        AVG(ch.AQI) AS Avg_AQI
    FROM 
        stations s
    JOIN 
        city_hour ch ON s.City = ch.City
    WHERE 
        HOUR(ch.Datetime) IN (7, 8, 9, 17, 18, 19) -- Horas pico
    GROUP BY 
        s.State, Hour
    ORDER BY 
        Avg_AQI DESC
""").show()



+-------------+------------------+
|         City|           Avg_AQI|
+-------------+------------------+
|    Ahmedabad| 339.8616226978596|
|        Delhi| 258.7834743653559|
|        Patna|214.41496232508072|
|      Lucknow|212.20059731209557|
|     Gurugram| 210.7045860631328|
|      Talcher| 159.4172972972973|
|   Jorapokhar|145.20701454234387|
| Brajrajnagar|142.53731343283582|
|     Guwahati|139.80278884462152|
|      Kolkata| 138.9029484029484|
|       Jaipur|133.39766606822263|
|       Bhopal|132.26297577854672|
|     Amritsar|119.77149877149877|
|Visakhapatnam|117.41518467852258|
|      Chennai|114.72025883524141|
|       Mumbai| 113.1209556993529|
|    Hyderabad|109.75972083748754|
|        Kochi|104.62345679012346|
|    Amaravati| 97.92534174553101|
|   Chandigarh| 96.85197368421052|
+-------------+------------------+
only showing top 20 rows

+------+------------------+
|Season|           Avg_AQI|
+------+------------------+
|Winter|204.10201612903225|
|Autumn|  173.43197227

# **Paso 7: Creación de Dashboard**
**Reportes que exportaremos para Power BI:**

- Calidad del aire por ciudades.
- Calidad del aire por temporadas.
- Calidad del aire por estados y ciudades.
- Contaminación por hora del día.
- Estados con mayor contaminación por PM2.5 y PM10.
- Días críticos con AQI extremo (> 200).

In [0]:
import pandas as pd
import os

# Crear una función para exportar y forzar la descarga
def export_and_download(query, local_filename, dbfs_filename):
    # Ejecutar la consulta SQL
    result = spark.sql(query)
    
    # Convertir a Pandas DataFrame
    pandas_df = result.toPandas()
    
    # Guardar como archivo CSV local
    local_path = f"/tmp/{local_filename}"
    pandas_df.to_csv(local_path, index=False)
    
    # Copiar a DBFS
    dbfs_path = f"dbfs:/FileStore/shared_uploads/ismaelarista.28@gmail.com/{dbfs_filename}"
    dbutils.fs.cp(f"file:{local_path}", dbfs_path)
    
    print(f"Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/{dbfs_filename}")

# ==============================
# Exportar los reportes
# ==============================

# 1. Calidad del aire por ciudades
export_and_download(
    """
    SELECT City, AVG(AQI) AS Avg_AQI
    FROM city_day
    GROUP BY City
    ORDER BY Avg_AQI DESC
    """,
    "aqi_by_city.csv",
    "aqi_by_city.csv"
)

# 2. Calidad del aire por temporadas
export_and_download(
    """
    SELECT 
        CASE 
            WHEN MONTH(Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        Season
    ORDER BY 
        Avg_AQI DESC
    """,
    "aqi_by_season.csv",
    "aqi_by_season.csv"
)

# 3. Calidad del aire por estados y ciudades
export_and_download(
    """
    SELECT 
        s.State, 
        c.City, 
        AVG(c.AQI) AS Avg_AQI
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    GROUP BY 
        s.State, c.City
    ORDER BY 
        Avg_AQI DESC
    """,
    "aqi_by_state_and_city.csv",
    "aqi_by_state_and_city.csv"
)

# 4. Contaminación por hora del día
export_and_download(
    """
    SELECT 
        HOUR(Datetime) AS Hour,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_hour
    GROUP BY 
        Hour
    ORDER BY 
        Avg_AQI DESC
    """,
    "aqi_by_hour.csv",
    "aqi_by_hour.csv"
)

# 5. Estados con mayor contaminación por PM2.5 y PM10
export_and_download(
    """
    SELECT 
        s.State,
        AVG(c.`PM2.5`) AS Avg_PM25,
        AVG(c.PM10) AS Avg_PM10
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    WHERE 
        c.`PM2.5` IS NOT NULL AND c.PM10 IS NOT NULL
    GROUP BY 
        s.State
    ORDER BY 
        Avg_PM25 DESC, Avg_PM10 DESC
    """,
    "pm25_pm10_by_state.csv",
    "pm25_pm10_by_state.csv"
)

result = spark.sql("""
    SELECT City, AVG(AQI) AS Avg_AQI
    FROM city_day
    GROUP BY City
    ORDER BY Avg_AQI DESC
""")

# ==============================
# 1. Calidad del aire por ciudades
# ==============================
result = spark.sql("""
    SELECT City, AVG(AQI) AS Avg_AQI
    FROM city_day
    GROUP BY City
    ORDER BY Avg_AQI DESC
""")
result.show(truncate=False, n=50)  # Cambia "n" según la cantidad de filas que quieras mostrar

# ==============================
# 2. Calidad del aire por temporadas
# ==============================
result = spark.sql("""
    SELECT 
        CASE 
            WHEN MONTH(Date) IN (12, 1, 2) THEN 'Winter'
            WHEN MONTH(Date) IN (3, 4, 5) THEN 'Spring'
            WHEN MONTH(Date) IN (6, 7, 8) THEN 'Summer'
            WHEN MONTH(Date) IN (9, 10, 11) THEN 'Autumn'
        END AS Season,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_day
    GROUP BY 
        Season
    ORDER BY 
        Avg_AQI DESC
""")
result.show(truncate=False)

# ==============================
# 3. Calidad del aire por estados y ciudades
# ==============================
result = spark.sql("""
    SELECT 
        s.State, 
        c.City, 
        AVG(c.AQI) AS Avg_AQI
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    GROUP BY 
        s.State, c.City
    ORDER BY 
        Avg_AQI DESC
""")
result.show(truncate=False, n=50)

# ==============================
# 4. Contaminación por hora del día
# ==============================
result = spark.sql("""
    SELECT 
        HOUR(Datetime) AS Hour,
        AVG(AQI) AS Avg_AQI
    FROM 
        city_hour
    GROUP BY 
        Hour
    ORDER BY 
        Avg_AQI DESC
""")
result.show(truncate=False)

# ==============================
# 5. Estados con mayor contaminación por PM2.5 y PM10
# ==============================
result = spark.sql("""
    SELECT 
        s.State,
        AVG(c.`PM2.5`) AS Avg_PM25,
        AVG(c.PM10) AS Avg_PM10
    FROM 
        stations s
    JOIN 
        city_day c ON s.City = c.City
    WHERE 
        c.`PM2.5` IS NOT NULL AND c.PM10 IS NOT NULL
    GROUP BY 
        s.State
    ORDER BY 
        Avg_PM25 DESC, Avg_PM10 DESC
""")
result.show(truncate=False, n=50)

# ==============================
# 6. Días críticos con AQI extremo (> 200)
# ==============================
result = spark.sql("""
    SELECT Date, City, AQI
    FROM city_day
    WHERE AQI > 200
    ORDER BY AQI DESC
""")
result.show(truncate=False, n=50)


Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/aqi_by_city.csv
Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/aqi_by_season.csv
Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/aqi_by_state_and_city.csv
Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/aqi_by_hour.csv
Archivo disponible para descargar en: https://community.cloud.databricks.com/files/shared_uploads/ismaelarista.28@gmail.com/pm25_pm10_by_state.csv
+------------------+------------------+
|City              |Avg_AQI           |
+------------------+------------------+
|Ahmedabad         |339.8616226978596 |
|Delhi             |258.7834743653559 |
|Patna             |214.41496232508072|
|Lucknow           |212.20059731209557|
|