# <center><p style="color: Turquoise;"><strong>Proyecto Apache Spark</strong></p></center>

## <p style="color: green;">Introducción</p>

![](esquema.png)

En este proyecto, se aborda el diseño y la implementación de una solución integral para la extracción, procesamiento, almacenamiento y visualización de datos provenientes de la Agencia Estatal de Meteorología Española (AEMET). El flujo de trabajo se estructura en cuatro etapas principales: *data sources*, *ETL*, *data warehouse* y *visualization*. Cada una de estas etapas está orquestada y gestionada mediante Apache Airflow.

## <p style="color: green;">API AEMET</p>

Desde la URL indicada en el bloque de codigo siguiente, AEMET proporciona datos de las estaciones meteorológicas presentes en españa en ese momento.

Paras obtener esta información, se manda una *request GET* al *endpoint* del api y ésta deberá devolver una URL con la ubicación de los datos, a la cual se manda una segunda *request GET*.

Los datos vienen en formato json y se pueden cargar a pandas directamente.

In [None]:
import requests

url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/inventarioestaciones/todasestaciones/"

querystring = {"api_key":"  "}

headers = {
    'cache-control': "no-cache"
    }

url_estaciones = requests.request("GET", url, headers=headers, params=querystring).json()['datos']
url_estaciones_metadata = requests.request("GET", url, headers=headers, params=querystring).json()['metadatos']

print(url_estaciones)
print(url_estaciones_metadata)

https://opendata.aemet.es/opendata/sh/1ed165ab
https://opendata.aemet.es/opendata/sh/0556af7a


Primero obtenemos los datos relativos a todas las estaciones meteorológicas de AEMET.

In [2]:
import pandas as pd

data = requests.request("GET", url_estaciones, headers=headers, params=querystring).json()
df_estaciones = pd.DataFrame(data)
df_estaciones.to_csv('datos.csv', index=False, header=True)
df_estaciones

Unnamed: 0,latitud,provincia,altitud,indicativo,nombre,indsinop,longitud
0,394924N,ILLES BALEARS,490,B013X,"ESCORCA, LLUC",08304,025309E
1,394744N,ILLES BALEARS,5,B051A,"SÓLLER, PUERTO",08316,024129E
2,394121N,ILLES BALEARS,60,B087X,BANYALBUFAR,,023046E
3,393445N,ILLES BALEARS,52,B103B,ANDRATX - SANT ELM,99103,022208E
4,393305N,ILLES BALEARS,50,B158X,"CALVIÀ, ES CAPDELLÀ",,022759E
...,...,...,...,...,...,...,...
942,424131N,LLEIDA,2467,9988B,CAP DE VAQUÈIRA,08936,005826E
943,424201N,LLEIDA,1161,9990X,"NAUT ARAN, ARTIES",08107,005237E
944,424634N,LLEIDA,722,9994X,BOSSÒST,,004123E
945,430528N,NAVARRA,334,9995Y,VALCARLOS/LUZAIDE,,011803W


Veamos que contiene la columna provincias y las asociaremos a su correspondiente comunidad autónoma para posteriormente obtener esta nueva categoría.

In [3]:
df_estaciones.provincia.unique()

array(['ILLES BALEARS', 'BALEARES', 'LAS PALMAS', 'STA. CRUZ DE TENERIFE',
       'SANTA CRUZ DE TENERIFE', 'TARRAGONA', 'BARCELONA', 'GIRONA',
       'NAVARRA', 'GIPUZKOA', 'ARABA/ALAVA', 'BIZKAIA', 'CANTABRIA',
       'ASTURIAS', 'LEON', 'LUGO', 'A CORUÑA', 'PONTEVEDRA', 'OURENSE',
       'SORIA', 'BURGOS', 'SEGOVIA', 'VALLADOLID', 'PALENCIA', 'AVILA',
       'MADRID', 'SALAMANCA', 'ZAMORA', 'GUADALAJARA', 'CUENCA', 'TOLEDO',
       'CACERES', 'ALBACETE', 'CIUDAD REAL', 'BADAJOZ', 'CORDOBA',
       'HUELVA', 'CEUTA', 'JAEN', 'GRANADA', 'ALMERIA', 'SEVILLA',
       'CADIZ', 'MELILLA', 'MALAGA', 'MURCIA', 'ALICANTE', 'VALENCIA',
       'TERUEL', 'CASTELLON', 'LA RIOJA', 'HUESCA', 'ZARAGOZA', 'LLEIDA'],
      dtype=object)

In [4]:
provincia_comunidad = {

    'ARABA/ALAVA': 'País Vasco',
    'Albacete': 'Castilla-La Mancha',
    'Alicante': 'Comunidad Valenciana',
    'ALMERIA': 'Andalucía',
    'Asturias': 'Asturias',
    'AVILA': 'Castilla y León',
    'Badajoz': 'Extremadura',
    'Barcelona': 'Cataluña',
    'Burgos': 'Castilla y León',
    'CACERES': 'Extremadura',
    'CADIZ': 'Andalucía',
    'Cantabria': 'Cantabria',
    'CASTELLON': 'Comunidad Valenciana',
    'Ciudad Real': 'Castilla-La Mancha',
    'CORDOBA': 'Andalucía',
    'Cuenca': 'Castilla-La Mancha',
    'Girona': 'Cataluña',
    'Granada': 'Andalucía',
    'Guadalajara': 'Castilla-La Mancha',
    'Gipuzkoa': 'País Vasco',
    'Huelva': 'Andalucía',
    'Huesca': 'Aragón',
    'ILLES BALEARS': 'Islas Baleares',
    'JAEN': 'Andalucía',
    'A CORUÑA': 'Galicia',
    'La Rioja': 'La Rioja',
    'Las Palmas': 'Canarias',
    'LEON': 'Castilla y León',
    'LLEIDA': 'Cataluña',
    'Lugo': 'Galicia',
    'Madrid': 'Comunidad de Madrid',
    'MALAGA': 'Andalucía',
    'Murcia': 'Región de Murcia',
    'Navarra': 'Navarra',
    'OURENSE': 'Galicia',
    'Palencia': 'Castilla y León',
    'Pontevedra': 'Galicia',
    'Salamanca': 'Castilla y León',
    'SANTA CRUZ DE TENERIFE': 'Canarias',
    'Segovia': 'Castilla y León',
    'Sevilla': 'Andalucía',
    'Soria': 'Castilla y León',
    'Tarragona': 'Cataluña',
    'Teruel': 'Aragón',
    'Toledo': 'Castilla-La Mancha',
    'Valencia': 'Comunidad Valenciana',
    'Valladolid': 'Castilla y León',
    'BIZKAIA': 'País Vasco',
    'Zamora': 'Castilla y León',
    'Zaragoza': 'Aragón',
    'Ceuta': 'Ceuta',
    'Melilla': 'Melilla',
    'STA. CRUZ DE TENERIFE': 'Canarias',
    'Baleares': 'Islas Baleares',
}
provincia_comunidad = {k.upper(): v for k, v in provincia_comunidad.items()}
df_estaciones['comunidad'] = df_estaciones['provincia'].map(provincia_comunidad)
df_estaciones

Unnamed: 0,latitud,provincia,altitud,indicativo,nombre,indsinop,longitud,comunidad
0,394924N,ILLES BALEARS,490,B013X,"ESCORCA, LLUC",08304,025309E,Islas Baleares
1,394744N,ILLES BALEARS,5,B051A,"SÓLLER, PUERTO",08316,024129E,Islas Baleares
2,394121N,ILLES BALEARS,60,B087X,BANYALBUFAR,,023046E,Islas Baleares
3,393445N,ILLES BALEARS,52,B103B,ANDRATX - SANT ELM,99103,022208E,Islas Baleares
4,393305N,ILLES BALEARS,50,B158X,"CALVIÀ, ES CAPDELLÀ",,022759E,Islas Baleares
...,...,...,...,...,...,...,...,...
942,424131N,LLEIDA,2467,9988B,CAP DE VAQUÈIRA,08936,005826E,Cataluña
943,424201N,LLEIDA,1161,9990X,"NAUT ARAN, ARTIES",08107,005237E,Cataluña
944,424634N,LLEIDA,722,9994X,BOSSÒST,,004123E,Cataluña
945,430528N,NAVARRA,334,9995Y,VALCARLOS/LUZAIDE,,011803W,Navarra


Desde la URL indicada en el bloque de codigo siguiente, AEMET proporciona datos meteorológicos diarios tomados por todas las estaciones meteorológicas presentes en españa en ese momento, comenzando en 1991 y hasta hoy. 

Para obtenerlos, se manda una request GET al endpoint del api y ésta deberá devolver una URL con la ubicación de los datos, a la cual se manda una segunda request GET.

Los datos vienen en formato json y se pueden cargar a pandas directamente.

Existen varias limitaciones: sólo está permitido obtener datos de un rango de 32 días consecutivos o menos (si se quiere un período mas largo, hay que lanzar mas GETs) y sólo se pueden mandar 50 requests por minuto por API key.

El siguiente código tiene en cuenta esas limitaciones y obtiene todos los datos entre *start_date* y *end_date*, dividiendo si fuera necesario el rango de fechas en lotes de tamaño 32 * (50/2) (ya que hay que mandar 2 request para obtener un conjunto de datos) y poniendo el tiempo de espera suficiente (indicado por *timeout_between_requests*). También va dejando logs por pantalla del avance del proceso, y si se perdieran datos en el camino, informa de cuales se han perdido.

In [None]:
import requests
from datetime import datetime, timedelta
import pandas as pd
import concurrent.futures
import time
import locale

locale.setlocale(locale.LC_TIME, 'es_ES.UTF-8')

api_key = ""
def fetch_df_from_period(start_date, end_date, api_key):
    fechaIniStr = start_date.strftime("%Y-%m-%dT%H:%M:%SUTC")
    fechaFinStr = end_date.strftime("%Y-%m-%dT%H:%M:%SUTC")
    url = f"https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/fechaini/{fechaIniStr}/fechafin/{fechaFinStr}/todasestaciones"
    querystring = {"api_key": api_key}
    headers = {'cache-control': "no-cache"}

    success = False
    while not success:  # Intenta hasta que tenga éxito
        try:
            response = requests.get(url, headers=headers, params=querystring)
            response.raise_for_status()
            if response:
                url2 = response.json()['datos']
                data = requests.get(url2, headers=headers, params=querystring)
                if data:
                    df = pd.DataFrame(data.json())
                    success = True
                    return df
        except requests.exceptions.ConnectionError as e:
            print("Error de conexión:", e)
            print(f'No se obtuvieron datos para el período entre {start_date.strftime("%d-%b %Y")} y {end_date.strftime("%d-%b %Y")}')
            time.sleep(5)  # Espera un poco antes de intentarlo de nuevo

    if not success:
        print(f'No se pudieron obtener datos después de múltiples intentos para el período entre {start_date.strftime("%d-%b %Y")} y {end_date.strftime("%d-%b %Y")}')
        return pd.DataFrame()  # Si no puede obtener los datos, devuelve un DataFrame vacío

start_date = datetime(1991, 1, 1) 
end_date = datetime(2023, 12, 31)
# el intervalo máximo de días que deja pedir en una llamada al API de AEMET es 32 (incluyendo el primer día y el último).
interval = 32 
# el numero maximo de llamadas por minuto permitidas para un mismo API key de AEMET es 50.
batch_size = int(48 / 2) # Dividimos entre 2 porque por cada intervalo de fecha son 2 requests para obtener los datos.
timeout_between_requests = 60 # (un minuto)
batch_start_date = start_date - timedelta(days = 1)

all_batch_dfs = []
while batch_start_date <= end_date:
        task_start_time = time.time()
        batch_end_date = min(batch_start_date + (batch_size) * timedelta(days=interval), end_date)
        formatted_start_date = (batch_start_date + timedelta(days = 1)).strftime("%d-%b %Y")
        formatted_end_date = batch_end_date.strftime("%d-%b %Y")
        print(f'Obteniendo datos para las fechas comprendidas entre {formatted_start_date} y {formatted_end_date}...')
        with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                        executor.submit(
                                fetch_df_from_period,
                                batch_start_date + timedelta(days=interval * (i-1)+1),
                                min(batch_start_date + timedelta(days=interval * i), end_date),
                                api_key
                        )
                        for i in range(1, min(batch_size, int((batch_end_date - batch_start_date).days / interval)+1)+1)
                ]
        batch_df = pd.concat([future.result() for future in concurrent.futures.as_completed(futures)], ignore_index=True)
        all_batch_dfs.append(batch_df)
        task_end_time = time.time()
        elapsed_time = task_end_time - task_start_time
        print(f'¡Datos obtenidos!. Tiempo transcurrido: {round(elapsed_time, 3)} s.')
        batch_start_date = min(batch_end_date, end_date)
        if batch_start_date >= end_date:
                break
        remaining_time = max(0, timeout_between_requests - (task_end_time - task_start_time))
        if remaining_time == 0:
                # Si la llamada al API duró mas de 60 segundos, metemos un timeout de 60 segundos para el siguiente lote
                remaining_time = 60
        while remaining_time > 0:
                print(f'Esperando {int(remaining_time)} segundos más antes del siguiente lote...', end='\r')
                time.sleep(1)
                remaining_time -= 1
        print(' '*50, end='\r')
        time.sleep(5)  
print('¡Tarea completada!')
final_df = pd.concat(all_batch_dfs)

Obteniendo datos para las fechas comprendidas entre 01-ene. 1991 y 06-feb. 1993...
¡Datos obtenidos!. Tiempo transcurrido: 8.627 s.
Obteniendo datos para las fechas comprendidas entre 07-feb. 1993 y 16-mar. 1995...
¡Datos obtenidos!. Tiempo transcurrido: 15.097 s.
Obteniendo datos para las fechas comprendidas entre 17-mar. 1995 y 22-abr. 1997...
¡Datos obtenidos!. Tiempo transcurrido: 16.423 s.
Obteniendo datos para las fechas comprendidas entre 23-abr. 1997 y 30-may. 1999...
¡Datos obtenidos!. Tiempo transcurrido: 11.231 s.
Obteniendo datos para las fechas comprendidas entre 31-may. 1999 y 06-jul. 2001...
¡Datos obtenidos!. Tiempo transcurrido: 12.017 s.
Obteniendo datos para las fechas comprendidas entre 07-jul. 2001 y 13-ago. 2003...
¡Datos obtenidos!. Tiempo transcurrido: 11.905 s.
Obteniendo datos para las fechas comprendidas entre 14-ago. 2003 y 19-sep. 2005...
¡Datos obtenidos!. Tiempo transcurrido: 26.229 s.
Obteniendo datos para las fechas comprendidas entre 20-sep. 2005 y 27-

Mergeamos las dos matrices y ordenamos los datos por fecha

In [22]:
df_final = final_df.merge(df_estaciones, on='indicativo', how='inner')
df_final = df_final.sort_values('fecha')
df_final.head(5)

Unnamed: 0,fecha,indicativo,nombre_x,provincia_x,altitud_x,tmed,prec,tmin,horatmin,tmax,...,presMin,horaPresMin,sol,latitud,provincia_y,altitud_y,nombre_y,indsinop,longitud,comunidad
81153,1991-01-01,9573X,ALCAÑIZ,TERUEL,334,78.0,0,25.0,21:35,130.0,...,,,,410329N,TERUEL,334,ALCAÑIZ,8164.0,000830W,Aragón
1421018,1991-01-01,1249I,OVIEDO,ASTURIAS,336,73.0,0,30.0,08:30,116.0,...,9834.0,24.0,74.0,432112N,ASTURIAS,336,OVIEDO,8015.0,055227W,Asturias
1843274,1991-01-01,1466A,SILLEDA,PONTEVEDRA,435,48.0,0,0.0,,95.0,...,,,,424203N,PONTEVEDRA,435,SILLEDA,,081528W,Galicia
1059795,1991-01-01,2150H,"LA PINILLA, ESTACIÓN DE ESQUÍ",SEGOVIA,1798,,0,,,,...,,,,411131N,SEGOVIA,1798,"LA PINILLA, ESTACIÓN DE ESQUÍ",8143.0,032831W,Castilla y León
391220,1991-01-01,1014,"HONDARRIBIA, MALKARROA",GIPUZKOA,4,101.0,0,60.0,23:59,142.0,...,10242.0,24.0,58.0,432125N,GIPUZKOA,4,"HONDARRIBIA, MALKARROA",8029.0,014732W,País Vasco


Guardamos el dataframe que contiene los datos en un archivo .csv para su posterior estudio con Apache Spark

In [21]:
df_final.to_csv('dataset_1991_to_2023.csv', index=False, header=True)

Algunas de las columnas y filas no resultan de interés; pero no se van a eliminar todavía pues la parte de preprocesamiento de datos, se va a realizar con Apache Spark en lugar de usar Pandas.

## <p style="color: green;"> Spark ETL </p>

Veamos si encuentra Spark

In [1]:
import findspark
findspark.init()

Iniciamos una sesión Spark que podemos monitorizar desde http://localhost:4040/jobs/

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("AEMET_Spark")
    .master("local[10]")
    .config("spark.driver.memory", "7g")  # Configura la memoria del driver a 7 gigabytes
    .config("spark.driver.cores", "3")  # Configura el número de núcleos utilizados a 5
    .config("spark.executor.instances", "2")  # Configura el número de ejecutores a 5
    .getOrCreate()
)
spark

Cargamos los datos CSV en un dataframe para su posterior transformación. ETL

In [3]:
df = spark.read.csv("dataset_1991_to_2023.csv", header=True, inferSchema=True)
df.show(5)
print('Dimension de la matriz: ', df.count(), 'x', len(df.columns))
print(df.dtypes)

+----------+----------+--------------------+-----------+---------+----+----+----+--------+----+--------+-------+----+--------+-----+---------+-----+---------+-----+---------+-------+-----------+-------+-----------+----+-------+-----------+---------+--------------------+--------+--------+---------------+
|     fecha|indicativo|            nombre_x|provincia_x|altitud_x|tmed|prec|tmin|horatmin|tmax|horatmax|hrMedia| dir|velmedia|racha|horaracha|hrMax|horaHrMax|hrMin|horaHrMin|presMax|horaPresMax|presMin|horaPresMin| sol|latitud|provincia_y|altitud_y|            nombre_y|indsinop|longitud|      comunidad|
+----------+----------+--------------------+-----------+---------+----+----+----+--------+----+--------+-------+----+--------+-----+---------+-----+---------+-----+---------+-------+-----------+-------+-----------+----+-------+-----------+---------+--------------------+--------+--------+---------------+
|1991-01-01|     9573X|             ALCAÑIZ|     TERUEL|      334| 7,8| 0,0| 2,5|   2

Información de las variables iniciales

- fecha --> Fecha del dia (AAAA-MM-DD)
- latitud --> Latitud geográfica de la estación
- provincia --> Provincia donde reside la estación
- comunidad --> Comunidad Autónoma donde reside la estación
- indicativo --> Indicativo climatológico de la estación
- altitud --> Altitud de la estación, "unidad": "m"
- nombre --> Ubicación de la estación
- indsinop --> Indicativo sinóptico
- longitud --> Longitud geográfica de la estación
- tmed --> Temperatura media diaria, "unidad": "°C"
- prec --> Precipitación diaria de 07 a 07, "unidad": "mm (Ip = inferior a 0,1 mm) (Acum = Precipitación acumulada)"
- tmin --> Temperatura Mínima del día, "unidad": "°C"
- horatmin --> Hora y minuto de la temperatura mínima, "unidad": "UTC"
- tmax --> Temperatura Máxima del día, "unidad": "°C"
- horatmax --> Hora y minuto de la temperatura máxima
- dir --> Dirección de la racha máxima, "unidad": "decenas de grado (99 = dirección variable)(88 = sin dato)"
- velmedia --> Velocidad media del viento, "unidad": "m/s"
- racha --> Racha máxima del viento, "unidad": "m/s"
- horaracha --> Hora y minuto de la racha máxima, "unidad": "UTC"
- sol --> Insolación, "unidad": "horas"
- presmax --> Presión máxima al nivel de referencia de la estación, "unidad": "hPa"
- horapresmax --> Hora de la presión máxima (redondeada a la hora entera más próxima), "unidad": "hPa"
- presmin --> Presión mínima al nivel de referencia de la estación, "unidad": "hPa"
- horapresmin --> Hora de la presión mínima (redondeada a la hora entera más próxima), "unidad": "hPa"
- hrmedia --> Humedad relativa media diaria, "unidad": "%"
- hrmax --> Humedad relativa máxima diaria, "unidad": "%"
- horahrmax --> Hora de la humedad relativa máxima diaria, "unidad": "UTC"
- hrmin --> Humedad relativa mínima diaria, "unidad": "hPa"
- horahrmin --> Hora de la humedad relativa mínima diaria, "unidad": "UTC"

ETL

In [4]:
from pyspark.sql.functions import split, avg, col, translate

def ETL1(df):
    # Eliminamos las columnas y filas que no resultan de interes o nos proporcionan información repetida
    df = df.filter(df["fecha"] != '2024-01-01')
    df = df.drop('nombre_y','provincia_y','altitud_y','hrMedia','horaPresMax','horaPresMin','hrMax','hrMin','horaHRMin','horaHRMax','horaracha','indsinop','dir')
    df = df.withColumnRenamed('nombre_x','nombre').withColumnRenamed('provincia_x','provincia').withColumnRenamed('altitud_x','altitud')
    df = df.dropna(subset=['tmed'])

    # Se definen 3 nuevas columnas
    df = df.withColumn("año", split(df["fecha"], "-")[0])
    df = df.withColumn("mes", split(df["fecha"], "-")[1])
    df = df.withColumn("dia", split(df["fecha"], "-")[2])

    # Cambiamos el tipo de datos de algunas columnas a float
    for column in ["tmed", "prec", "tmin", "tmax", "velmedia", "racha", "presMax", "presMin"]:
        df = df.withColumn(column, translate(col(column), ",", "."))
        df = df.withColumn(column, col(column).cast("float"))

    # Ordenamod el dataframe por fecha
    df = df.orderBy('fecha')
    
    return df

In [5]:
def ETL2(df):
    # Se eliminan más columnas que no resultan de interés
    df = df.drop('presMax', 'presMin', 'sol', 'velmedia', 'indicativo', 'horatmin','horatmax')
    ordered_columns = ['fecha', 'comunidad', 'provincia', 'nombre', 'altitud', 'latitud', 'longitud', 'tmed', 'prec', 'tmin', 'tmax', 'racha']
    df = df.select(*ordered_columns)

    # Calculamos la temperatura y precipitación promedio de cada mes y año a nivel nacional, sin distinción de estaciones
#     mes_año = ["tmed_mes_año", "prec_mes_año", "tmin_mes_año", "tmax_mes_año"]
#     tp = ["tmed", "prec", "tmin", "tmax"]
#     for i in range (0,4):
#             df_aux = df.groupBy("año", "mes").agg(avg(tp[i]).alias(mes_año[i])).withColumn(mes_año[i], col(mes_año[i]).cast("float"))
#             df = df.join(df_aux, ["año", "mes"], "inner")
    
    return df

In [6]:
df_after_ETL1 = ETL1(df)
df_after_ETL1.show(5)

+----------+----------+--------------------+----------+-------+----+----+----+--------+----+--------+--------+-----+-------+-------+----+-------+--------+---------------+----+---+---+
|     fecha|indicativo|              nombre| provincia|altitud|tmed|prec|tmin|horatmin|tmax|horatmax|velmedia|racha|presMax|presMin| sol|latitud|longitud|      comunidad| año|mes|dia|
+----------+----------+--------------------+----------+-------+----+----+----+--------+----+--------+--------+-----+-------+-------+----+-------+--------+---------------+----+---+---+
|1991-01-01|     9573X|             ALCAÑIZ|    TERUEL|    334| 7.8| 0.0| 2.5|   21:35|13.0|   14:30|     1.7| 11.4|   NULL|   NULL|NULL|410329N| 000830W|         Aragón|1991| 01| 01|
|1991-01-01|     1249I|              OVIEDO|  ASTURIAS|    336| 7.3| 0.0| 3.0|   08:30|11.6|   14:50|     0.6|  4.4|  988.1|  983.4| 7,4|432112N| 055227W|       Asturias|1991| 01| 01|
|1991-01-01|     1466A|             SILLEDA|PONTEVEDRA|    435| 4.8| 0.0| 0.0|  

In [7]:
df_after_ETL2 = ETL2(df_after_ETL1)
df_after_ETL2.show(5)
print('Dimension de la matriz: ', df_after_ETL2.count(), 'x', len(df_after_ETL2.columns))
print(df_after_ETL2.dtypes)

+----------+---------------+----------+--------------------+-------+-------+--------+----+----+----+----+-----+
|     fecha|      comunidad| provincia|              nombre|altitud|latitud|longitud|tmed|prec|tmin|tmax|racha|
+----------+---------------+----------+--------------------+-------+-------+--------+----+----+----+----+-----+
|1991-01-01|         Aragón|    TERUEL|             ALCAÑIZ|    334|410329N| 000830W| 7.8| 0.0| 2.5|13.0| 11.4|
|1991-01-01|       Asturias|  ASTURIAS|              OVIEDO|    336|432112N| 055227W| 7.3| 0.0| 3.0|11.6|  4.4|
|1991-01-01|        Galicia|PONTEVEDRA|             SILLEDA|    435|424203N| 081528W| 4.8| 0.0| 0.0| 9.5| NULL|
|1991-01-01|     País Vasco|  GIPUZKOA|HONDARRIBIA, MALK...|      4|432125N| 014732W|10.1| 0.0| 6.0|14.2|  6.1|
|1991-01-01|Castilla y León|    BURGOS|   BURGOS AEROPUERTO|    891|422125N| 033717W| 1.6| 0.0|-1.4| 4.6|  6.7|
+----------+---------------+----------+--------------------+-------+-------+--------+----+----+----+----

Veamos lo valores NaN que tenemos sobre el conjunto de datos despues del preprocesamiento de datos. Es importante tener esto en cuenta pues si queremos trabajar con alguna de las variables que contienen estos valores, hay que procesar los dados de nuevo. 

Por ejemplo, para estudiar las precipitaciones a lo largo de los años en España, debemos eliminar las filas que contienen esos valores. No lo hacemos ahora pues no queremos que afecte al histórico de temperaturas.

In [8]:
from pyspark.sql import functions as F

null_counts = df_after_ETL2.agg(*[F.sum(F.col(c).isNull().cast('int')).alias(c) for c in df_after_ETL2.columns])
null_counts.show()

+-----+---------+---------+------+-------+-------+--------+----+------+----+----+-------+
|fecha|comunidad|provincia|nombre|altitud|latitud|longitud|tmed|  prec|tmin|tmax|  racha|
+-----+---------+---------+------+-------+-------+--------+----+------+----+----+-------+
|    0|        0|        0|     0|      0|      0|       0|   0|219836|   0|   0|1306240|
+-----+---------+---------+------+-------+-------+--------+----+------+----+----+-------+



In [7]:
#spark.stop() # Este comando detendrá la sesión de Spark actual y liberará los recursos asociados con ella
# del spark # eliminar completamente la variable spark de tu entorno

## <p style="color: green;">SQLite</p>

Elegimos SQLite para crear la base de datos dada la versatilidad y sencillez de la misma. Para esto, primero debemos pasar la estructura de datos de apache a dataframe de pandas.

In [9]:
df_SQlite = df_after_ETL2.toPandas()

Hacemos una funcion que contiene las consultas necesarias para obtener los dos dataframe que necesitamos para nuestro dashboard

In [10]:
def consulta_datos_historicos(conn):
    query = """SELECT fecha, comunidad, provincia, tmed, prec, tmin, tmax FROM my_table"""

    df_historico = pd.read_sql_query(query, conn)
    return df_historico

def consulta_datos_TOP5(conn):
    query = """SELECT DISTINCT nombre, provincia, altitud, latitud, longitud, tmax, tmin, racha FROM my_table ORDER BY altitud DESC"""

    df_aux = pd.read_sql_query(query, conn)
    return df_aux

Creamos una conexión a la base de datos SQLite, guardamos nuestro df_SQlite en la misma y posteriormente se realizan las consultas definidad con anterioridad.

In [11]:
import sqlite3
import pandas as pd

conn = sqlite3.connect('my_database.db')
df_SQlite.to_sql('my_table', conn, if_exists='replace', index=False)
df_historico = consulta_datos_historicos(conn)
df_aux = consulta_datos_TOP5(conn)
conn.close()

## <p style="color: green;">Visualización</p>

Se procesan los datos para poder representar adecuadamente en nuestro dashboard los resultados.

In [13]:
import pandas as pd
from pyspark.sql import functions as F

top_5_altura = df_aux.head(5).copy()
bottom_5_altura = df_aux.sort_values(by='altitud').head(5).copy()
top_5_tmax = df_aux.sort_values(by='tmax', ascending=False).head(5).copy()
top_5_tmin = df_aux.sort_values(by='tmin', ascending=True).head(5).copy()
top_5_racha = df_aux.sort_values(by='racha', ascending=False).head(5).copy()

top_5_altura.loc[:,'clase'] = 'Top 5 alturas más altas'
bottom_5_altura.loc[:,'clase'] = 'Top 5 alturas más bajas'
top_5_tmax.loc[:,'clase'] = 'Top 5 temperaturas máximas'
top_5_tmin.loc[:,'clase'] = 'Top 5 temperaturas mínimas'
top_5_racha.loc[:,'clase'] = 'Top 5 rachas de viento'

df_map = pd.concat([top_5_altura, bottom_5_altura, top_5_tmax, top_5_tmin, top_5_racha])

La columna de latitud y longitud está dada en grados por lo que necesitamos pasar los datos a formato decimal si posteriormente se quieren representar en un mapa.

In [14]:
import re

def dms_to_decimal(dms):
    degrees, minutes, seconds, direction = re.match('(\d{2})(\d{2})(\d{2})(\w)', dms).groups()
    decimal = int(degrees) + int(minutes)/60 + int(seconds)/(60*60)
    if direction in ['S','W']:
        decimal *= -1
    return decimal

df_map['latitud'] = df_map['latitud'].apply(dms_to_decimal)
df_map['longitud'] = df_map['longitud'].apply(dms_to_decimal)

Se procesan los datos para estudiar la temperatura y la precipitacion a lo largo de los años, a nivel provincial y autonómico.

In [15]:
import pandas as pd
final_df = df_historico
final_df['fecha'] = pd.to_datetime(final_df['fecha'])

A continuación, se presenta un pequeño dashboard de la evolución de las temperaturas y precipitaciones anuales con un selector para la provincia, la comunidad autónoma y el tipo de temperatura; así como una línea de tendencia (regresion lineal) que aporta información a cerca del aumento de temperaturas en el intervalo de tiempo preseleccionado y la tentencia que sigue en el mismo. Además, se representa en un mapa, la información relativa al TOP 5 histórico de temperatura máxima, minima, altura máxima y mínima de la estación y la racha máxima.

In [16]:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
from sklearn.linear_model import LinearRegression

# Suponiendo que final_df está previamente definido
unique_years = pd.to_datetime(final_df['fecha']).dt.year.unique()
comunidades = final_df['comunidad'].unique()
provincias = final_df['provincia'].unique()

app = dash.Dash(__name__)
app.layout = html.Div([

    html.Div([
            dcc.RadioItems(
                id='filter-type',
                options=[{'label': 'Comunidad', 'value': 'comunidad'}, {'label': 'Provincia', 'value': 'provincia'}],
                value='provincia'
            ),
            dcc.RadioItems(
                id='temp-or-prec',
                options=[{'label': 'Temperatura', 'value': 'temperatura'}, {'label': 'Precipitación', 'value': 'precipitacion'}],
                value='temperatura'
            ),
    ], style={'display': 'flex', 'flex-direction': 'row'}),

    html.Div([
        dcc.Dropdown(
            id='comunidad-dropdown',
            options=[{'label': com, 'value': com} for com in final_df['comunidad'].unique()],
            value='Castilla y León',
            clearable=False
        ),
    ], style={'width': '50%', 'display': 'inline-block'}),

    html.Div([
        dcc.Dropdown(
            id='provincia-dropdown',
            options=[{'label': prov, 'value': prov} for prov in final_df['provincia'].unique()],
            value='LEON',
            clearable=False
        ),
    ], style={'width': '50%', 'display': 'inline-block'}),

    html.Div([
        dcc.Dropdown(
            id='temp-dropdown',
            options=[{'label': 'Minimas', 'value': 'Minimas'}, {'label': 'Maximas', 'value': 'Maximas'}, {'label': 'Medias', 'value': 'Medias'}],
            value='Medias',
            clearable=False
        ),
    ], style={'width': '50%', 'display': 'inline-block'}),

    html.Div([
        dcc.Dropdown(
            id='data-frequency-dropdown',
            options=[{'label': 'Diario', 'value': 'diarias'}, {'label': 'Mensual', 'value': 'mensuales'}, {'label': 'Anual', 'value': 'anuales'}],
            value='diarias',
            clearable=False
        ),
    ], style={'width': '50%', 'display': 'inline-block'}),

    html.Div([
        dcc.RangeSlider(
            id='year-slider',
            min=unique_years.min(),
            max=unique_years.max(),
            value=[unique_years.min(), unique_years.max()],
            marks={str(year): str(year) for year in unique_years}
        )
    ], style={'width': '100%', 'margin': 'auto'}),

    dcc.Graph(id='temperature-graph'),
    dcc.Graph(id='geo-scatter-graph')  
])

@app.callback(
    Output('temperature-graph', 'figure'),
    [Input('temp-dropdown', 'value'),
    Input('provincia-dropdown', 'value'),
    Input('comunidad-dropdown', 'value'),
    Input('year-slider', 'value'),
    Input('filter-type', 'value'),
    Input('temp-or-prec', 'value'),
    Input('data-frequency-dropdown', 'value')],
)
def update_graph(selected_temp, selected_provincia, selected_comunidad, selected_years, filter_type, selected_temp_or_prec, data_frequency):
    if filter_type == 'comunidad':
        filtered_df = final_df[(final_df['comunidad'] == selected_comunidad) & (final_df['fecha'].dt.year.between(selected_years[0], selected_years[1]))].copy()
    else:
        filtered_df = final_df[(final_df['provincia'] == selected_provincia) & (final_df['fecha'].dt.year.between(selected_years[0], selected_years[1]))].copy()

    if selected_temp_or_prec == 'temperatura':
        if selected_temp == 'Minimas':
            column = 'tmin'
        elif selected_temp == 'Maximas':
            column = 'tmax'
        elif selected_temp == 'Medias':
            column = 'tmed'
        y_title = 'Temperatura (ºC)'
    elif selected_temp_or_prec == 'precipitacion':
        column = 'prec'
        y_title = 'Precipitación (mm)'
        filtered_df = filtered_df.dropna(subset=['prec'])
    else:
        raise ValueError('selected_temp_or_prec debe ser "temperatura" o "precipitacion"')

    filtered_df['fecha'] = pd.to_datetime(filtered_df['fecha'])
    filtered_df['year'] = filtered_df['fecha'].dt.year
    filtered_df['month'] = filtered_df['fecha'].dt.month

    if data_frequency == 'diarias':
        data = filtered_df.groupby('fecha')[column].mean().reset_index()
        data['year_month'] = data['fecha'].dt.strftime('%Y-%m')
        x_valor = 'fecha'
    elif data_frequency == 'mensuales':
        data = filtered_df.groupby(['year', 'month'])[column].mean().reset_index()
        data['year_month'] = data['year'].astype(str) + '-' + data['month'].astype(str).str.zfill(2)
        x_valor = 'year_month'
    elif data_frequency == 'anuales':
        data = filtered_df.groupby(['year'])[column].mean().reset_index()
        data['year_month'] = data['year'].astype(str)
        x_valor = 'year_month'
    else:
        raise ValueError('data_frequency debe ser "diarias", "mensuales" o "anuales"')

    fig = px.line(data, x=x_valor, y=column, title=f'Promedio {selected_temp_or_prec} {selected_temp.lower()} {data_frequency.lower()} de {selected_provincia if filter_type == "provincia" else selected_comunidad}', color_discrete_sequence=['blue'])

    X = data.index.values.reshape(-1, 1)
    y = data[column]
    model = LinearRegression()
    model.fit(X, y)
    y_pred = model.predict(X)
    data['y_pred'] = y_pred
    diff = data.y_pred.iloc[-1] - data.y_pred.iloc[0]
    if selected_temp_or_prec == 'temperatura':
        fig.add_trace(go.Scatter(x=data['year_month'], y=data['y_pred'], mode='lines', name=f'Trendline ({diff:+.2f}ºC)', line=dict(color='red', width=2)))
    else:
        fig.add_trace(go.Scatter(x=data['year_month'], y=data['y_pred'], mode='lines', name=f'Trendline ({diff:+.2f}mm)', line=dict(color='red', width=2)))

    fig.update_layout(
        plot_bgcolor='rgba(1, 1, 1, 0.05)',
        xaxis_title='Año',
        yaxis_title=y_title,
    )

    return fig

@app.callback(
    Output('geo-scatter-graph', 'figure'),
    [Input('filter-type', 'value')]
)
def update_geo_graph(filter_type):

    fig = px.scatter_geo(
        df_map,
        lat='latitud',
        lon='longitud',
        color='clase', 
        hover_data=['altitud', 'tmax', 'tmin', 'racha'],
        hover_name='nombre',
        projection='natural earth',
        title='TOP 5 histórico de estaciones meteorológicas en función de altura, temperaturas y rachas de viento'
    )

    fig.update_layout(
        geo=dict(
            showland=True,
            landcolor="green",
            showcoastlines=True,
            coastlinecolor="black",
            showocean=True,
            oceancolor="lightblue"
        )
    )

    return fig

@app.callback(
    Output('temp-dropdown', 'options'),
    [Input('temp-or-prec', 'value')]
)
def update_temp_dropdown(selected_temp_or_prec):
    if selected_temp_or_prec == 'temperatura':
                return [{'label': 'Minimas', 'value': 'Minimas'}, {'label': 'Maximas', 'value': 'Maximas'}, {'label': 'Medias', 'value': 'Medias'}]
    elif selected_temp_or_prec == 'precipitacion':
        return [{'label': 'Precipitación', 'value': 'precipitacion'}]

@app.callback(
    Output('provincia-dropdown', 'options'),
    [Input('filter-type', 'value')]
)
def update_provincia_dropdown(filter_type):
    if filter_type == 'provincia':
        return [{'label': provincia, 'value': provincia} for provincia in provincias]
    elif filter_type == 'comunidad':
        return []

@app.callback(
    Output('comunidad-dropdown', 'options'),
    [Input('filter-type', 'value')]
)
def update_comunidad_dropdown(filter_type):
    if filter_type == 'comunidad':
        return [{'label': comunidad, 'value': comunidad} for comunidad in comunidades]
    elif filter_type == 'provincia':
        return []

if __name__ == '__main__':
    #app.run_server(debug=True)
    app.run(jupyter_mode="tab")


Dash app running on http://127.0.0.1:8050/


<IPython.core.display.Javascript object>