## Dependencias 

In [11]:
import numpy as np
import pandas as pd
from glob import glob
from auxiliares import freq
import os

import cufflinks as cf

from multiprocessing import Pool

from functools import reduce
import warnings
warnings.filterwarnings('ignore') 
cf.go_offline()
pd.set_option('display.max_columns', None)

In [2]:
def make_pool(func, params, threads):
    """
    Ejecuta de forma asíncrona múltiples llamadas a una función
    :param func: function, objeto función a paralelizar
    :param params: iterable, parámetros de evaluación paralela
    :param threads: int, número de hilos de multiprocesamiento
    :return: list, resultado de la ejecución paralela agrupada en una lista
    """
    pool = Pool(threads)
    data = pool.starmap(func, params)
    pool.close()
    pool.join()
    del pool
    return [x for x in data]

## Lectura de datos

In [6]:
ruta = '/media/jose/090f6b94-de30-4aaf-9f8a-4e18b120d7f6/bd/02.  Para Ingeniería/aire/csv/*.csv'
dest = '/media/jose/090f6b94-de30-4aaf-9f8a-4e18b120d7f6/bd/02.  Para Ingeniería/aire/parquet/'

In [7]:
archivos = glob(ruta)
len(archivos)

107

In [8]:
nombres = [a.split('/')[-1].split('.')[0] for a in archivos ]
nombres

['2019SO2',
 '2017SO2',
 '2014O3',
 '2019PM10',
 '2017NO',
 '2012NO',
 '2015O3',
 '2017PMCO',
 '2013NO',
 '2020NO',
 '2010NOX',
 '2019O3',
 '2021O3',
 '2010NO2',
 '2016PM25',
 '2014SO2',
 '2011CO',
 '2021NO2',
 '2019NO',
 '2020SO2',
 '2018NO2',
 '2017PM10',
 '2010PM25',
 '2017O3',
 '2014CO',
 '2012PMCO',
 '2017CO',
 '2019NOX',
 '2011PMCO',
 '2016NO',
 '2017NOX',
 '2018CO',
 '2013PM25',
 '2015PM10',
 '2013SO2',
 '2015PM25',
 '2012NOX',
 '2011PM10',
 '2013O3',
 '2015NOX',
 '2018PM10',
 '2012PM25',
 '2014PM10',
 '2017PM25',
 '2020NO2',
 '2013CO',
 '2012PM10',
 '2021PM25',
 '2011NO',
 '2013PM10',
 '2016CO',
 '2014PMCO',
 '2020PM25',
 '2013PMCO',
 '2021NOX',
 '2014PM25',
 '2011PM25',
 '2016NO2',
 '2014NO',
 '2021NO',
 '2020O3',
 '2016O3',
 '2019PM25',
 '2016NOX',
 '2021CO',
 '2013NO2',
 '2018NO',
 '2016SO2',
 '2016PM10',
 '2015CO',
 '2020PM10',
 '2018SO2',
 '2015PMCO',
 '2011NOX',
 '2012NO2',
 '2020PMCO',
 '2021SO2',
 '2018PM25',
 '2014NO2',
 '2015NO2',
 '2014NOX',
 '2019NO2',
 '2010NO',
 '

In [9]:
def leerArchivo(arch:str)->pd.DataFrame:
    df = pd.read_csv(arch,dtype=str).dropna().reset_index(drop=True)
    df['FECHA'] = pd.to_datetime(df['FECHA'],format='%Y-%m-%d')
    df['HORA'] = pd.to_numeric(df['HORA']).astype(int)
    df['VALOR'] = pd.to_numeric(df['VALOR']).astype(float)
    df['HORA'].replace({24:0},inplace=True)
    return df

In [12]:
for a,d in zip(nombres,make_pool(leerArchivo,[(x,) for x in archivos],15)):
    d.to_parquet(os.path.join(dest,a+'.parquet'))

## Selección de variables

In [None]:
contPred = 'O3'
vobs = 24
vdes = 6
step = 6
um = ['ESTACION','ancla']
varc = ['VALOR']
dim = ['CONTAMINANTE']

## Ingeniería de variables

In [None]:
anclai,anclaf = vobs, df['id_hora'].max()-vdes
anclai,anclaf

In [None]:
cruzar = lambda x,y:x.merge(y,on=um,how='outer')
apilar = lambda x,y:pd.concat([x,y],ignore_index=True)

In [None]:
def ingX(df:pd.DataFrame,ancla:int,sub:int)->pd.DataFrame:
    aux = df.loc[(df['id_hora'] <= ancla) & (df['id_hora'] >=
                                            (ancla-sub+1))].copy().reset_index(drop=True)
    
    funciones = [np.mean,np.median,np.std,np.min,np.max]
    nombres = ['media','mediana','desv','min','max']
  
    l = []
    for c,d in aux.groupby(dim):
        piv = d.pivot_table(index=['ESTACION'],
                        columns=['id_hora'],values=varc)
        for f,n in zip(funciones,nombres):
            piv[f'x_{n}_{c}_{sub}'] = piv.apply(f,axis=1)
        ing = piv.filter(like='x_').reset_index()
        ing.insert(1,'ancla',ancla)
        l.append(ing)
    return reduce(cruzar,l)

In [None]:
def vectorX(ancla:int)->pd.DataFrame:
    return reduce(cruzar, map(lambda sub: ingX(df, ancla, sub),
       range(step, vobs+step, step)))

In [None]:
X= reduce(apilar,
       make_pool(vectorX,[(ancla,) for ancla in range(anclai,anclaf+1)],30))

In [None]:
X.to_parquet('../data/aire_X.parquet')