# Preprocesamiento de los datos para poder ser empleados por la _RNA_

Los datos empleados en el proyecto se encuentran en el directorio _"/data/"_, de forma que se dividen en puros (raw) y clasificados (classified) e integrados (integrated):

```
.
|-data/
   |- raw_data/ -> ficheros de datos sin procesar (procesado preliminar).
   |     |- sensores/ -> ficheros correspondientes a los datos de los sensores.
   |     |- pluviometro/ -> ficheros correspondientes a los datos del pluviómetro.
   |- classified_data/ -> ficheros de datos procesados.
   |     |- sensores/ -> ficheros correspondientes a los datos de los sensores.
   |     |- pluviometro/ -> ficheros correspondientes a los datos del pluviómetro.
   |- integrated_data/ -> fichero con todos los datos integrados. 
```

Importamos las librerías para el procesamiento de los datos.

Para la ETL (extracción, transformación y carga) emplearemos Pandas.

Por otro lado, en el caso de los datos del pluviómetro para comprobar los valores anómalos emplearemos la librería nativa json utilizando datos de la estación meteorológica de la AEMET situada en Aranda de Duero, ya que es la que más próxima se encuentra al viñedo (situado en el municipio de Anguix, Burgos).
El contenido de los directorios _"/data/raw_data/sensores"_ y _"/data/raw_data/pluviometro"_  será listado mediante el módulo nativo de Python _"os"_. 

In [9]:
import pandas as pd
import statsmodels.api as sm
import json
import os
import numpy as np

Importamos las constantes para establecer la dirección en la que se encuentran los ficheros de datos y el directorio en el que se deberá almacenar los resultados

In [10]:
# Rutas de directorios y variables globales
from env_paths import *
from global_variables import *

## Detección los valores atípicos
La _"detección de valores atípicos"_ permite detectar los valores anómalos (observaciones numéricamente diferentes al resto) en los datos debidos a diversos factores como errores humanos, mecánicos extremos de lecturas genuinas o por reemplazos de valores perdidos (_missing data_).

En este caso se empleará el análisis de _outliers_ por medio del rango intercuartílico mediante una ventana móvil establecida agrupando las muestras correspondientes al mismo día, de forma que por cada grupo se calcula su media, desviación típica y mediana.
Se establece un umbral (_threshold_) de X veces la desviación estándar, siendo aquellas muestras cuya diferencia con la media del grupo que se encuentren por encima de este umbral modificadas estableciendo la mediana de la variable correspondiente en el análisis.  

In [11]:
def outlier_detection(data, iqr):
    # Ventana de datos agrupados por dia
    rolling_win = data.groupby(pd.Grouper(freq="24H", key="ts"))

    for _, group in rolling_win:
        # Para cada columna realizamos el analisis por deteccion de valores atipicos
        for col in data.columns:
            if(col != 'ts'):
                # Calculo de la media y la desviacion tipica del grupo
                group_mean = group[col].mean()
                group_std = group[col].std()
                group_median = group[col].median()

                # Definicion del umbral para los valores atipicos (se establece x veces la desviacion estandar)
                threshold = iqr * group_std

                # Identificacion de los valores atipicos
                outliers = group[(group[col] - group_mean).abs() > threshold]

                data.loc[data['ts'].isin(outliers['ts']), col] = group_median

## Comparación de las lecturas del pluviómetro instalado con datos de API meteorológica
En "_/data/raw_data/_" se encuentra el fichero "_estacion_aranda.json_" que contiene los datos meteorológicos de la estación meteorológica de la AEMET en Aranda de Duero desde el 20/07/2021 hasta el 22/02/2023.

Los datos en este fichero JSON tienen la siguiente forma:

```
{
  "fecha" : "2021-07-20",
  "indicativo" : "2117D",
  "nombre" : "ARANDA DE DUERO",
  "provincia" : "BURGOS",
  "altitud" : "790",
  "tmed" : "24,2",
  "prec" : "0,0",
  "tmin" : "13,7",
  "horatmin" : "05:20",
  "tmax" : "34,8",
  "horatmax" : "14:00",
  "dir" : "21",
  "velmedia" : "1,4",
  "racha" : "8,1",
  "horaracha" : "15:00",
  "sol" : "7,9",
  "presMax" : "927,6",
  "horaPresMax" : "Varias",
  "presMin" : "923,9",
  "horaPresMin" : "14"
}
```

De los datos reflejados se emplearán "_fecha_" y "_prec_", este úiltimo parámetro medido en mm (se convertirá posteriormente a litros/m<sup>2</sup>).
Al cargar los datos de la API se realiza una transformación a otro formato, de forma que se instancia un diccionario con "_fecha_" como clave y los la estructura reflejada anteriormente como valor, de esta forma el acceso a la información de las precipitaciones se agilizará y no será necesario recorrer el vector del fichero JSON por cada uno de los grupos de datos.

Al igual que en los datos de los sensores, los datos del pluviómetro se agruparán por frecuencias de días, de manera que se calculará la media del grupo. Si esta difiere en 0,5 litros/m<sup>2</sup> con los datos de la API, se establecen las precipitaciones de ese día a las medias de la estación meteorológica.

In [12]:
def pluviuometer_comparison(data):
    with open(PLUVIOMETER_FILES_DIRECTORY + "estacion_aranda.json") as f:
        # Transformacion del json original (creado como una lista de diccionarios)
        # en un diccionario con las fechas como claves y el resto de campos como valores
        original_api_data = json.load(f)
        api_data = dict()
        for sample in original_api_data:
            api_data.update({sample["fecha"]: sample})

        # Ventana de datos agrupados por dia
        rolling_win = data.groupby(pd.Grouper(freq="1H", key="ts"))
        for day, group in rolling_win:
            # Calculo de la media de las precipitaciones del grupo (precipitaciones
            # diarias en litros/m^2)
            group_mean = group["pluv_deltaMM"].mean()
            
            # Obtenemos el dia del grupo como cadena de texto para poder
            # emplearlo como clave en el diccionario y asi poder comparar las 
            # precipitaciones
            day = str(day.date())

            # Si las precipitaciones medias difieren en 0,5 litros/m^2 establecemos
            # las precipitaciones de ese dia a las de la API.
            if abs(group_mean - float(api_data[day]["prec"].replace(",", ".")) * 0.1) > 0.5:
                data.loc[data['ts'].isin(group['ts']), "pluv_deltaMM"] = float(api_data[day]["prec"].replace(",", "."))
            

# Conversión del conjunto de datos en datos de tendencia

Convertimos el conjunto de datos original con muestras tomadas cada 5 minutos en datos medios, para de esta forma obtener las tendencias de las diferentes variables empleando el filtro de 
Hodrick-Prescott

In [13]:
def filter_data(data, freq = "1H", lamb = 1600, filter_cols = [], cols = []):
    # Ventana de datos agrupados por frecuencia
    rolling_win = data.groupby(pd.Grouper(freq=freq, key="ts"))

    # Bucle para determinar las medias temporales de las diferentes variables teniendo en cuenta los posibles saltos
    series = []
    win_serie = []
    for time, group in rolling_win:
        # Si el grupo tiene datos, calculamos su media diaria de cada variable
        if group.shape[0] > 0:
            data_mean = group[filter_cols].mean()
            data_mean['ts'] = time
            win_serie.append(data_mean)
        else:
            # Si no hay datos es porque se ha producido un salto temporal, guardamos la serie completa
            # y la limpiamos para comenzar la siguiente ventana de datos presentes.
            if len(win_serie) > 0:
                series.append(win_serie)
                win_serie = []
    series.append(win_serie)
    
    dataset = pd.DataFrame(columns=cols)
    # Bucle para determinar las tendencias de cada variable mediante filtro de Hodrick-Prescott
    for serie in series:
        serie_df = pd.DataFrame(data=serie, columns=cols)    
        if serie_df.shape[0] > 1:
            for col in filter_cols:
                _, trend = sm.tsa.filters.hpfilter(serie_df[col], lamb=lamb)
                serie_df[col] = trend

        dataset = pd.concat([dataset, serie_df])
    return dataset


# Procesamiento de los datos del pluviómetro
Al igual que en el conjunto de datos de los sensores, se empleará Pandas para la carga al programa de las muestras realizadas y se realizará la conversión de las marcas de tiempo.

En cuanto a los valores desconocidos, se seguirá el mismo procedimiento que en la sección anterior (en esta ocasión la cantidad de valores es sutancialmente menor).
En función de los datos obtenidos en la API meteorológica se realizará la sustitución de las muestras correspondientes.

Para obtener los datos por horas y posteriormente poder unirlos a los sensores calculamos las medias de las precipitaciones.

In [14]:
def pluviometer_data_processing(pluviometer_files):
    for file in pluviometer_files:
        dataset = pd.read_csv(PLUVIOMETER_FILES_DIRECTORY + file)

        # Eliminar campos sin valores
        dataset = dataset.dropna()

        dataset['ts'] = pd.to_datetime(dataset['ts'], unit="ms")

        # Eliminar columnas irrelevantes o con datos que expresan el mismo dominio del problema
        dataset = dataset.drop(['fecha', 'bateria', 'pluv', 'pluv_delta'], axis=1)

        # Eliminar outliers
        outlier_detection(dataset, 1.5)

        # Sustitucion de los datos del pluviometro en funcion de las lluvias diarias
        pluviuometer_comparison(dataset)

        # Datos medios por hora
        cols = ['ts', 'pluv_deltaMM']
        day_data = pd.DataFrame(columns=cols)
        rolling_win = dataset.groupby(pd.Grouper(freq="1H", key="ts"))
        for time, group in rolling_win:
            # Si el grupo tiene datos, calculamos la media de las precitipaciones en la hora
            if group.shape[0] > 0:
                data_mean = group[['pluv_deltaMM']].mean()
                data_mean['ts'] = time
                serie_df = pd.DataFrame(data=[data_mean], columns=cols) 

                day_data = pd.concat([day_data, serie_df])
        
        day_data['ts'] = pd.to_numeric(day_data['ts'])

        day_data.to_csv(CLASSIFIED_PLUVIOMETER_PATH + file, index = False)

## Procesamiento de los datos de los sensores
Para cada uno de los ficheros de los sensores realizaremos varias operaciones. En primer lugar, empleando Pandas cargaremos el "_DataFrame_" en el programa y tras esto eliminaremos las filas con valores marcados como inválidos (procesamiento preliminar realizado de forma visual).

Para agrupar los datos en frecuencias por horas es necesario realizar una conversión "_cast_" del tipo de dato de la columna "_ts_" del conjunto de datos.
Las columnas "_fecha_", "_h_C_" y "_h_L_" son redundantes, en el primer caso puesto que la información correspondiente se encuentra en forma de _Linux Epochs_ en la columna "_ts_" de la que previamente se ha realizado una transformación y que actúa como índice temporal del conjunto de datos. En el caso de las otras dos columnas, se tratan de datos relacionados con la humedad del suelo sin calibrar a diferentes profundidades, encontrandose estas calibradas en "_h_C_cal_" y "_h_L_cal_" respectivamente.

Posteriormente se eliminarán los valores desconocidos ("_Missing Data_") debido a la gran cantidad de datos disponibles (en las visualizaciones previas se comprobó visualmente los posibles impactos en el tamaño del conjunto de datos). Finalmente se realizará la detección de valores extremos ("_outliers_") con diferentes umbrales y se reconvertirá las marcas de tiempo de nuevo a _Linux Epochs_.

Finalmente se obtienen las tendecias de los atributos cada hora con el filtrado de Hodrick-Prescott.

In [15]:
def sensor_data_processing(sensor_files):
    for file in sensor_files:
        dataset = pd.read_csv(SENSOR_FILES_DIRECTORY + file)

        # Eliminamos los datos marcados como invalidos
        dataset = dataset[dataset['validez'] != 0]

        # Transformamos los timestamps en datetimes, para posteriormente poder
        # agrupar los datos por dia.
        dataset['ts'] = pd.to_datetime(dataset['ts'], unit="ms")

        # Eliminamos las columnas de fecha, h_C, h_L (por redundancia) y la columna
        # de la bateria (no es esencial en la regresion).
        # Por otro, lado deshechamos la columna de validez (no es necesaria)
        dataset = dataset.drop(['fecha', 'h_C', 'h_L', 'bateria', 'validez'], axis=1)

        # Eliminar nulos (data missing)
        dataset = dataset.dropna()

        # Eliminar outliers
        outlier_detection(dataset, 1.5)

        # Datos por hora
        dataset = filter_data(dataset, freq="1H", lamb=100, filter_cols=OUTPUT_COLS, cols=dataset.columns.to_list())

        dataset['ts'] = pd.to_numeric(dataset['ts'])
        
        dataset.to_csv(CLASSIFIED_SENSOR_PATH + file, index = False)

        print(f"Sensor: {file} procesado")
        

## Ejecución del procesamiento de datos
Se obtendrá los ficheros de los datos sin procesar almacenados en los directorios expuestos con anterioridad, invocando a las funciones de procesamiento correspondientes.

In [16]:
if __name__ == "__main__":
    sensor_files = [file for file in os.listdir(SENSOR_FILES_DIRECTORY) if ".csv" in file]
    pluviometer_files = [file for file in os.listdir(PLUVIOMETER_FILES_DIRECTORY) if ".csv" in file]

    sensor_data_processing(sensor_files)
    pluviometer_data_processing(pluviometer_files)

Sensor: sensor1.csv procesado
Sensor: sensor2.csv procesado
Sensor: sensor3.csv procesado
Sensor: sensor4.csv procesado
Sensor: sensor5.csv procesado
Sensor: sensor6.csv procesado
Sensor: sensor7.csv procesado
Sensor: sensor8.csv procesado
