# Limpieza de datos históricos de calidad del aire

### En este archivo se leen datos en bruto obtenidos del portal de datos abiertos del ayuntamiento de Madrid, se limpian y se enriquecen con datos de estaciones de tráfico, para su posterior análisis.

In [46]:
import pymongo 
import pandas as pd
import json
import os
from os import listdir
from os.path import isfile, join
from pandas.io.json import json_normalize
import zipfile
from datetime import date, datetime, timedelta
import time
from dateutil.parser import *

## Rutas
##### Se almacenan en variables, las rutas de las carpetas donde se encuentran los archivos sobre calidad del aire.

In [45]:
PATH = "./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015"
aire_historico_2015 = "./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015"
aire_historico_2016 = "./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016"
aire_historico_2017 = "./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2017"


estaciones = "./EnriquecimientoTrafico-CalidadAire.csv"    # sep = ;
estacionesInfo ="./informacion_estaciones_red_calidad_aire.xls"

ruta_magnitud_Tecnica = "./interprete.xlsx"  

## Funciones auxiliares

In [5]:
def extract_files(path):
    """
    Función que dada una ruta a una carpeta de archivos, devuelve una lista con las rutas a cada uno de los ficheros en la carpeta
    
    input: directorio donde están los ficheros zip de un año. Uno por  mes
    output: devuelve la lista de los ficheros de cada mes
    """
    return [ path +'/' + f for f in listdir(path) ]


## Comprobación de existencia de ficheros mensuales históricos

##### Utilizando la función extract_files definida previamente, se comprueba que los ficheros dentro de las carpetas son los correctos, estos archivos son datsets históricos de calidad del aire

In [4]:
## 2015
#---------------------------------------------------------------------------
# proceso de validación de los ficheros
# comprobar que dentro haya un fichero XLSX por cada mes
#---------------------------------------------------------------------------
monthfiles_2015 = extract_files(aire_historico_2015)
for i , file in enumerate(monthfiles_2015):     
    print(i, "-", file)

0 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Abril2015.xlsx
1 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Agosto2015.xlsx
2 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Dic2015.xlsx
3 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Enero2015.xlsx
4 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Febrero2015.xlsx
5 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Julio2015.xlsx
6 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Junio2015.xlsx
7 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Marzo2015.xlsx
8 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Mayo2015.xlsx
9 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2015/HistoricoHorario-CalidadAire-Nov2015.xlsx
10 - ./DATAS

In [5]:
## 2016
#---------------------------------------------------------------------------
# proceso de validación de los ficheros
# comprobar que dentro haya un fichero XLSX por cada mes
#---------------------------------------------------------------------------
monthfiles_2016 = extract_files(aire_historico_2016)
for i , file in enumerate(monthfiles_2016):     
    print(i, "-", file)

0 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Abril2016.xlsx
1 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Agosto2016.xlsx
2 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Dic2016.xlsx
3 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Enero2016.xlsx
4 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Febrero2016.xlsx
5 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Julio2016.xlsx
6 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Junio2016.xlsx
7 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Marzo2016.xlsx
8 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Mayo2016.xlsx
9 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2016/HistoricoHorario-CalidadAire-Nov2016.xlsx
10 - ./DATAS

In [6]:
## 2017
#---------------------------------------------------------------------------
# proceso de validación de los ficheros
# comprobar que dentro haya un fichero XLSX por cada mes
#---------------------------------------------------------------------------
monthfiles_2017 = extract_files(aire_historico_2017)
for i , file in enumerate(monthfiles_2017):     
    print(i, "-", file)

0 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2017/2017-Enero-HistoricoHorario-CalidadAire.xlsx
1 - ./DATASETS_AIRE/HistoricoHorarios-CalidadAire-2017/2017-Febrero-HistoricoHorario-CalidadAire.xlsx


### Enriquecimiento: 
##### A continuación se carga en un DataSet las distancias de las estaciones de tráfico a las de calidad del aire, y se enriquece la información de las estaciones de calidad del aire indicando si hay estaciones de tráfico en unos radios determinados

#### Distancias entre estaciones

In [13]:
# distancias estaciones contaminación - Trafico
df_distEstTraf = pd.read_csv(estaciones , sep = ';', encoding='latin-1')
# Se modifica el codigoEst, ya que viene en diferente formato en los ficheros
df_distEstTraf["codigoEst"] = df_distEstTraf["codigoEst"] + 28079000
df_distEstTraf.head()

Unnamed: 0,codigoEst,estacion,cod_cent,nombre,Distacia EstAire-EstTraf,Ranking(el más cercano),sum100,sum250,sum500,sum1000
0,28079004,Pza. de España,16006,Pl. Espa a N-S - San Leonardo-Cuesta San Vicente,573.815.289.844,1,1,1,1,1
1,28079004,Pza. de España,16041,(TACTICO) Pl. Espa-a (Salida del Aparcamiento)...,701.060.111.109,2,1,1,1,1
2,28079004,Pza. de España,16008,Pl. Espa a S-N - Reyes-San Leonardo,977.035.393.033,3,1,1,1,1
3,28079004,Pza. de España,16007,Cuesta San Vicente O-E - Gran Via-Ferraz,102.306.925.836,4,0,1,1,1
4,28079004,Pza. de España,16005,San Leonardo E-O - Maestro Guerrero-Princesa,1.314.466.851.072,5,0,1,1,1


In [8]:
df_distEstTraf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 92832 entries, 0 to 92831
Data columns (total 10 columns):
codigoEst                   92832 non-null int64
estacion                    92832 non-null object
cod_cent                    92832 non-null object
nombre                      92832 non-null object
Distacia EstAire-EstTraf    92832 non-null object
Ranking(el más cercano)     92832 non-null int64
sum100                      92832 non-null int64
sum250                      92832 non-null int64
sum500                      92832 non-null int64
sum1000                     92832 non-null int64
dtypes: int64(6), object(4)
memory usage: 7.1+ MB


#### Se leen los datos de las distancias y se convierten a json para tratarlos como diccionario

In [14]:
d = df_distEstTraf.to_json(orient='records', date_format = 'iso')
data_json = json.loads(d)
data_json[0: 5]

[{'Distacia EstAire-EstTraf': '573.815.289.844',
  'Ranking(el más cercano)': 1,
  'cod_cent': '16006',
  'codigoEst': 28079004,
  'estacion': 'Pza. de España',
  'nombre': 'Pl. Espa a N-S - San Leonardo-Cuesta San Vicente',
  'sum100': 1,
  'sum1000': 1,
  'sum250': 1,
  'sum500': 1},
 {'Distacia EstAire-EstTraf': '701.060.111.109',
  'Ranking(el más cercano)': 2,
  'cod_cent': '16041',
  'codigoEst': 28079004,
  'estacion': 'Pza. de España',
  'nombre': '(TACTICO) Pl. Espa-a (Salida del Aparcamiento) - (TACTICO) Pl. Espana (Salida del Aparcamiento)',
  'sum100': 1,
  'sum1000': 1,
  'sum250': 1,
  'sum500': 1},
 {'Distacia EstAire-EstTraf': '977.035.393.033',
  'Ranking(el más cercano)': 3,
  'cod_cent': '16008',
  'codigoEst': 28079004,
  'estacion': 'Pza. de España',
  'nombre': 'Pl. Espa a S-N - Reyes-San Leonardo',
  'sum100': 1,
  'sum1000': 1,
  'sum250': 1,
  'sum500': 1},
 {'Distacia EstAire-EstTraf': '102.306.925.836',
  'Ranking(el más cercano)': 4,
  'cod_cent': '16007',
 

#### Se genera un diccionario donde la clave es la estación, el valor es otro diccionario con cuatro claves: 'sum100', 'sum250', 'sum500' y 'sum1000' con valores de 0 o 1

In [15]:
codEst = df_distEstTraf["codigoEst"].values
distancias = {}
codEst = df_distEstTraf["codigoEst"].values
for e in codEst:
    distancias[e] =  {'sum100':[], 'sum1000':[], 'sum250':[], 'sum500':[]}
for d in data_json:
    est = d['codigoEst']
    cen = d['cod_cent']        
    if d['sum100'] == 1:
        distancias[est]['sum100'].append(cen)
    if d['sum1000'] == 1:
        distancias[est]['sum1000'].append(cen)
    if d['sum250'] == 1:
        distancias[est]['sum250'].append(cen)
    if d['sum500'] == 1:
        distancias[est]['sum500'].append(cen)

#### Información de estaciones contaminación

In [12]:
def_estacionesInfo = pd.read_excel(estacionesInfo,  skiprows = 4)
def_estacionesInfo.head()

Unnamed: 0.1,Unnamed: 0,NÚMERO,ESTACIÓN,DIRECCIÓN,LONGITUD,LATITUD,Xcoord,Ycoord,ALTITUD,TIPO ESTACION *,...,BTX,HC,UV,VV,DV,TMP,HR,PRB,RS,LL
0,,4,Pza. de España,Plaza de España,"3º 42' 44,09''O","40º 25' 25,87''N",-3.712247,40.423853,635.0,UT,...,,,,X,X,X,X,,,X
1,,8,Escuelas Aguirre,Entre C/ Alcalá y C/ O’ Donell,"3º 40' 56,35''O","40º 25' 17,63''N",-3.682319,40.421564,670.0,UT,...,X,X,,,,,,,,
2,,11,Avda. Ramón y Cajal,Avda. Ramón y Cajal esq. C/ Príncipe de Vergara,"3º 40' 38,48''O","40º 27' 05,31''N",-3.677356,40.451475,708.0,UT,...,X,,,,,,,,,X
3,,16,Arturo Soria,C/ Arturo Soria esq. C/ Vizconde de los Asilos,"3º 38' 21,24''O","40º 26' 24,17''N",-3.639233,40.440047,693.0,UF,...,,,,,,,,,,X
4,,17,Villaverde,C/. Juan Peñalver,"3º 42' 47,96''O","40º 20' 49,70''N",-3.713322,40.347139,604.0,UF,...,,,,,,,,,,


#### Coordenadas de las estaciones de contaminación

In [13]:
df_coor_estaciones = pd.read_excel(estacionesInfo,  skiprows = 4, 
                                  parse_cols = [1, 2, 6, 7] , skip_footer=4,
                                  )
df_coor_estaciones.tail()

Unnamed: 0,NÚMERO,ESTACIÓN,Xcoord,Ycoord
19,56,Pza. Fernández Ladreda,-3.718728,40.384964
20,57,Sanchinarro,-3.660503,40.494208
21,58,El Pardo,-3.774611,40.518058
22,59,Juan Carlos I,-3.609072,40.46525
23,60,Tres Olivos,-3.689761,40.500589


#### A continuación se edita el número de identificación de las estaciones para que coincida con los demás documentos

In [14]:
# el código de estación lo almacenao como 28079000 + número 
df_coor_estaciones["NÚMERO"] = df_coor_estaciones["NÚMERO"].apply(int)
df_coor_estaciones["NÚMERO"] = df_coor_estaciones["NÚMERO"] + 28079000
df_coor_estaciones.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24 entries, 0 to 23
Data columns (total 4 columns):
NÚMERO      24 non-null int64
ESTACIÓN    24 non-null object
Xcoord      24 non-null float64
Ycoord      24 non-null float64
dtypes: float64(2), int64(1), object(1)
memory usage: 848.0+ bytes


In [15]:
df_coor_estaciones.head()

Unnamed: 0,NÚMERO,ESTACIÓN,Xcoord,Ycoord
0,28079004,Pza. de España,-3.712247,40.423853
1,28079008,Escuelas Aguirre,-3.682319,40.421564
2,28079011,Avda. Ramón y Cajal,-3.677356,40.451475
3,28079016,Arturo Soria,-3.639233,40.440047
4,28079017,Villaverde,-3.713322,40.347139


## Fase 1  - Proceso 2015 - 2016 - 2017 

En esta fase se generan ficheros de tipo CSV enriquecidos con los siguientes datos:
* Fecha
* Coordenadas
* Magnitud técnica, etc

En la fase 2 se realiza el enriquecimiento de la intensidad de tráfico.
* Lectura de los ficheros enriquecidos en la fase 1
* Lectura de los datos enriquecidos de tráfico
* Generación de ficheros CSV de aire totalmente enriquecidos

### Funciones auxiliares

In [17]:
# función para convertir a fecha datetime la fecha que llega del fichero
def todate(x):
    int(x)
    year = (x // 10000) + 2000
    month = (x // 100) % 100
    day = x % 100
    return datetime(year, month, day)


In [18]:
# función para convertir a fecha datetime la fecha que llega del fichero
def totimestamp(x):
    int(x)
    year = (x // 10000) + 2000
    month = (x // 100) % 100
    day = x % 100
    return pd.Timestamp(datetime(year, month, day))

In [19]:
#Función que transforma las unidades de hora que se suman en  tipo timedelta
def gethora(d, serie):
    return pd.to_timedelta(serie.map(d), unit = 'h')

In [20]:
mag = pd.read_excel(ruta_magnitud_Tecnica ,sheetname = 0)
tec = pd.read_excel(ruta_magnitud_Tecnica ,sheetname = 1)
magtec = pd.merge(mag,tec, on=['tecnicaCod'])
magtec

Unnamed: 0,magnitudCod,magnitudNombre,Formula,tecnicaCod,tecnicaNom
0,1,Dióxido de Azufre,SO2,38,Fluorescencia ultravioleta
1,6,Monóxido de Carbono,CO2,48,Absorción infrarroja
2,7,Monóxido de Nitrógeno,NO,8,Quimioluminiscencia
3,8,Dióxido de Nitrógeno,NO2,8,Quimioluminiscencia
4,12,Óxidos de Nitrógeno,Nox,8,Quimioluminiscencia
5,9,Partículas < 2.5 µm,PM2.5,47,Microbalanza
6,10,Partículas < 10 µm,PM10,47,Microbalanza
7,14,Ozono,O3,6,Absorción ultravioleta
8,20,Tolueno,TOL,59,Cromatografía de gases
9,30,Benceno,BEN,59,Cromatografía de gases


## A continuación se hace un bucle sobre los ficheros correspondientes a los meses de cada año.
#### Se extraen las horas por día y los valores que les corresponden, los valores se modifican ligeramente y se vuelven a añadir a la matriz del mes. Se da formato a las fechas, se añaden las coordenadas y la información de la magnitud técnica

### Año 2017

In [None]:
for mesxlsx in monthfiles_2017:
    month = pd.read_excel(mesxlsx)
    d2 = month.iloc[:,-24:].to_json(orient = "index")
    d2 = json.loads(d2)
    m = []
    for indice, v in d2.items():
        for h, mh in v.items():
            m.append( [int(indice), h, mh] )   
    # Se crea el tataframe a partir de m    
    df = pd.DataFrame(m, columns = ['indice', "hora", "valor"] )
    df.valor = df.valor.map(lambda x: x.replace('V', ''))
    month1 = pd.merge(df, month.iloc[:,:-24], left_on = 'indice',
                          right_index = True)
    # Se transforma Año+Mes+Dia
    month1.AnoMesDia = month1.AnoMesDia.map(totimestamp)
    horas = dict(zip(list(month.iloc[:,-24:].columns), range(1, 25)))
    month1["fechaHora"] = gethora(horas, month1.hora) + month1.AnoMesDia
    # Se añaden las coordenadas
    month2 = pd.merge(month1, df_coor_estaciones, 
                      left_on = 'Estacion', 
                      right_on = 'NÚMERO')
    # Se añade la información de la magnitud técnica
    month3 = pd.merge(month2,magtec, left_on = 'Magnitud', right_on = 'magnitudCod')
    month4 = month3.reindex(columns = ['Estacion', 'ESTACIÓN', 'Xcoord', 'Ycoord',
                                  'fechaHora',  'Dato Horario', "magnitudCod",
                                  "magnitudNombre", 
                                  'tecnicaCod', 'tecnicaNom', 'valor'])
    # guardo el dataframe en un fichero csv
    t = month4.fechaHora[0]
    tofile = './procesados-Prev/' + str(t)[5:7] + '-' +  str(t.year) + '.csv'
    month4.to_csv(tofile, index = False)

### Año 2016

In [106]:
for mesxlsx in monthfiles_2016:
    month = pd.read_excel(mesxlsx)
    d2 = month.iloc[:,-24:].to_json(orient = "index")
    d2 = json.loads(d2)
    m = []
    for indice, v in d2.items():
        for h, mh in v.items():
            m.append( [int(indice), h, mh] )   
    # Se crea el tataframe a partir de m    
    df = pd.DataFrame(m, columns = ['indice', "hora", "valor"] )
    df.valor = df.valor.map(lambda x: x.replace('V', ''))
    month1 = pd.merge(df, month.iloc[:,:-24], left_on = 'indice',
                          right_index = True)
    # Se transforma Año+Mes+Dia
    month1.AnoMesDia = month1.AnoMesDia.map(totimestamp)
    horas = dict(zip(list(month.iloc[:,-24:].columns), range(1, 25)))
    month1["fechaHora"] = gethora(horas, month1.hora) + month1.AnoMesDia
    
    # Se añaden las coordenadas
    month2 = pd.merge(month1, df_coor_estaciones, 
                      left_on = 'Estacion', 
                      right_on = 'NÚMERO')
    
    # Se añade la información de la magnitud técnica
    month3 = pd.merge(month2,magtec, left_on = 'Magnitud', right_on = 'magnitudCod')
    month4 = month3.reindex(columns = ['Estacion', 'ESTACIÓN', 'Xcoord', 'Ycoord',
                                  'fechaHora',  'Dato Horario', "magnitudCod",
                                  "magnitudNombre", 
                                  'tecnicaCod', 'tecnicaNom', 'valor'])
    # guardo el dataframe en un fichero csv
    t = month4.fechaHora[0]
    tofile = './procesados-Prev/' + str(t)[5:7] + '-' +  str(t.year) + '.csv'
    month4.to_csv(tofile, index = False)

### Año 2015

In [107]:
for mesxlsx in monthfiles_2015:
    month = pd.read_excel(mesxlsx)
    d2 = month.iloc[:,-24:].to_json(orient = "index")
    d2 = json.loads(d2)
    m = []
    for indice, v in d2.items():
        for h, mh in v.items():
            m.append( [int(indice), h, mh] )   
    # Se crea el tataframe a partir de m    
    df = pd.DataFrame(m, columns = ['indice', "hora", "valor"] )
    df.valor = df.valor.map(lambda x: x.replace('V', ''))
    month1 = pd.merge(df, month.iloc[:,:-24], left_on = 'indice',
                          right_index = True)
    # Se transforma Año+Mes+Dia
    month1.AnoMesDia = month1.AnoMesDia.map(totimestamp)
    horas = dict(zip(list(month.iloc[:,-24:].columns), range(1, 25)))
    month1["fechaHora"] = gethora(horas, month1.hora) + month1.AnoMesDia
    
    # Se añaden las coordenadas
    month2 = pd.merge(month1, df_coor_estaciones, 
                      left_on = 'Estacion', 
                      right_on = 'NÚMERO')
    
    # Se añade la información de la magnitud técnica
    month3 = pd.merge(month2,magtec, left_on = 'Magnitud', right_on = 'magnitudCod')
    if len(month3) == 0:
        continue
    
    month4 = month3.reindex(columns = ['Estacion', 'ESTACIÓN', 'Xcoord', 'Ycoord',
                                  'fechaHora',  'Dato Horario', "magnitudCod",
                                  "magnitudNombre", 
                                  'tecnicaCod', 'tecnicaNom', 'valor'])
    # guardo el dataframe en un fichero csv
    t = month4.fechaHora[0]
    tofile = './procesados-Prev/' + str(t)[5:7] + '-' +  str(t.year) + '.csv'
    month4.to_csv(tofile, index = False)

## Fase 2 : Intensidad de tráfico
#### Por cada fichero de aire se abren dos ficheros de tráfico. Esto es porque la última hora de cada més  se corresponde al mes siguiente, por ejemplo: en el fichero 01-2015 se tienen los datos de enero y los datos asociados a la hora: 2015-02-01 00:00:00 

In [1]:
# Función que dados un mes y año, devielve un nombre de archivo correspondientes a estos y al siguiente
def buildNameFile(month, year):
    """
    input: mes, año
    Output: lista con nombre de archivo del mes dado y del mes siguiente.
    """
    monthsig = month + 1
    if monthsig == 13:
        monthsig = 1
        yearsig = year + 1
    else:
        yearsig = year
    mes = datetime(year, month, 1)
    messig= datetime(yearsig, monthsig, 1)
    f = str(mes)[5:7] + '-' + str(mes)[:4] + '.csv'
 
    fsig = str(messig)[5:7] + '-' + str(messig)[:4] + '.csv'
    return [f, fsig]


In [123]:
# RUTA FICHEROS PROCESADOS y enriquecidos en la fase 1
ruta_procesados_aire = "./procesados-Prev"
month_files = extract_files(ruta_procesados_aire)
openTraf = {}
for i , file in enumerate(month_files):   
    mes= int( file[-11:-9])
    year = int(file[-8:-4] )
    openTraf[file] = buildNameFile(mes, year)

In [124]:
"""
Lista de ficheros a abrir
Por ejemplo, para enriquecer aire del mes de enero
"""

openTraf

{'./procesados-Prev/01-2015.csv': ['01-2015.csv', '02-2015.csv'],
 './procesados-Prev/01-2017.csv': ['01-2017.csv', '02-2017.csv'],
 './procesados-Prev/02-2015.csv': ['02-2015.csv', '03-2015.csv'],
 './procesados-Prev/02-2017.csv': ['02-2017.csv', '03-2017.csv'],
 './procesados-Prev/03-2015.csv': ['03-2015.csv', '04-2015.csv'],
 './procesados-Prev/05-2015.csv': ['05-2015.csv', '06-2015.csv'],
 './procesados-Prev/05-2016.csv': ['05-2016.csv', '06-2016.csv'],
 './procesados-Prev/06-2015.csv': ['06-2015.csv', '07-2015.csv'],
 './procesados-Prev/06-2016.csv': ['06-2016.csv', '07-2016.csv'],
 './procesados-Prev/07-2015.csv': ['07-2015.csv', '08-2015.csv'],
 './procesados-Prev/07-2016.csv': ['07-2016.csv', '08-2016.csv'],
 './procesados-Prev/08-2015.csv': ['08-2015.csv', '09-2015.csv'],
 './procesados-Prev/08-2016.csv': ['08-2016.csv', '09-2016.csv'],
 './procesados-Prev/09-2015.csv': ['09-2015.csv', '10-2015.csv'],
 './procesados-Prev/09-2016.csv': ['09-2016.csv', '10-2016.csv'],
 './proces

In [125]:
""" 
Funciones para calcular los niveles de intensidad
"""

def f100(row):
    def inten(dfTrafico): 
        if bool(row['s100']):        
            f1 = dfTrafico[dfTrafico.fecha == row['fechaHora']]
            f2 = f1[ f1.identif.isin(row['s100'])].intensidad.sum()   
            return f2 # float
        else:
            return 0
    return inten

def f1000(row):
    def inten(dfTrafico):  
        if bool(row['s1000']):
        #f1 = dfTrafico[dfTrafico.identif.isin(row['s1000'])]
            f1 = dfTrafico[dfTrafico.fecha == row['fechaHora']]
        #print(len(f1)), print(len(f11))
            f2 = f1[ f1.identif.isin(row['s1000'])].intensidad.sum()   
            return f2
        else:
            return 0
    return inten

def f500(row):
    def inten(dfTrafico): 
        if bool(row['s500']):        
        #f1 = dfTrafico[dfTrafico.identif.isin(row['s1000'])]
            f1 = dfTrafico[dfTrafico.fecha == row['fechaHora']]
        #print(len(f1)), print(len(f11))
            f2 = f1[ f1.identif.isin(row['s500'])].intensidad.sum()   
            return f2 
        else:
            return 0
    return inten

def f250(row):
    def inten(dfTrafico):
        if bool(row['s250']):       
        #f1 = dfTrafico[dfTrafico.identif.isin(row['s1000'])]
            f1 = dfTrafico[dfTrafico.fecha == row['fechaHora']]
        #print(len(f1)), print(len(f11))
            f2 = f1[ f1.identif.isin(row['s250'])].intensidad.sum()   
            return f2 
        else:
            return 0
    return inten


In [33]:
"""
Número de columnas a añadir al dataframe
"""
sdis = ['s100', 's1000','s250','s500' ]

In [126]:
"""
Esta celda se ejecuta tres veces, una por año 
"""

print (time.strftime('%H:%M:%S'))

for mesxlsx in month_files:
    # juego solo con los de 2017 
    if  int(mesxlsx[-8:-4]) != 2017 :
        continue
    # juego solo con los de 2016
    """if  int(mesxlsx[-8:-4]) != 2016 :
        continue"""
    # juego solo con los de 2015
    """if  int(mesxlsx[-8:-4]) != 2015 :
    continue"""
    

    month = pd.read_csv(mesxlsx,  encoding='latin-1', 
                          parse_dates = [4])
    d4 = month.Estacion.map(distancias)
    month['s100'] = d4.map(lambda x: x['sum100'])
    month['s1000'] = d4.map(lambda x: x['sum1000'])
    month['s250'] = d4.map(lambda x: x['sum250'])
    month['s500'] = d4.map(lambda x: x['sum500'])
    # ficheros de tráfico a consultar:
    files_intensidad = openTraf[mesxlsx]
    print(mesxlsx, files_intensidad)
    try:
        df_traf = pd.read_csv('../TRAFICO/procesados/' + files_intensidad[0], 
               parse_dates = [1], 
               usecols = [1,2, 4])   
        df_traf.fecha = df_traf.fecha.map(lambda x: pd.Timestamp(x))
    except:
        print("file 1 do not exist")
        continue
    #m1.idelem = m1.idelem.map(lambda x : str(x))
    if len(files_intensidad) == 2:
        try:
            m2 = pd.read_csv('../TRAFICO/procesados/' + files_intensidad[1], 
               parse_dates = [1], 
               usecols = [1,2, 4])
            m2.fecha = m2.fecha.map(lambda x: pd.Timestamp(x))
        except:
            print("file 2 do not exist")
            continue
        #m2.idelem = m2.idelem.map(lambda x : str(x))
        
        df_traf = pd.concat([df_traf, m2], ignore_index = True)
        
    # completo con las intensidades
    # tarda 15 minutos por columna (1 hora por mes)
    # INTENSIDAD D100 ----
    print (time.strftime('%H:%M:%S'), 's100')
    sdis = ['s100']
    month['s100'] = d4.map(lambda x: x['sum100'])
    serief100 = month.apply(f100, axis = 1)   # serie de funciones
    month['d100'] = serief100.map(lambda x: x(df_traf))   # nueva col. de tipo float
    month.drop(sdis,axis=1, inplace = True)
    print (time.strftime('%H:%M:%S'),  's1000')
    # INTENSIDAD D1000 ----
    sdis = ['s1000']
    month['s1000'] = d4.map(lambda x: x['sum1000'])
    serief1000 = month.apply(f1000, axis = 1)
    month['d1000'] = serief1000.map(lambda x: x(df_traf))
    month.drop(sdis,axis=1, inplace = True)
    print (time.strftime('%H:%M:%S'),  's500')
     # INTENSIDAD D500 ---
    sdis = ['s500']
    month['s500'] = d4.map(lambda x: x['sum500'])
    serief500 = month.apply(f500, axis = 1)
    month['d500'] = serief500.map(lambda x: x(df_traf))
    month.drop(sdis,axis=1, inplace = True)
    print (time.strftime('%H:%M:%S'),  's250')        
    #· INTENSIDAD D250 --------
    sdis = ['s250'] 
    month['s250'] = d4.map(lambda x: x['sum250'])
    serief250 = month.apply(f250, axis = 1)
    month['d250'] = serief250.map(lambda x: x(df_traf))
    month.drop(sdis,axis=1, inplace = True)
    print (time.strftime('%H:%M:%S'),  'fin intensidad')
        
    # guardar en un csv    
    savefile = mesxlsx.replace('Prev', 'inten')
    month.to_csv(savefile, index = False)
    
    
print(time.strftime('%H:%M:%S'))

12:09:02
./procesados-Prev/01-2017.csv ['01-2017.csv', '02-2017.csv']
./procesados-Prev/02-2017.csv ['02-2017.csv', '03-2017.csv']
12:09:44


## Volcado de ficheros csv a Mongo ( fin)

In [128]:
# MONGO
def open_conection():
    client = pymongo.MongoClient('localhost',27017)
    return client

def storeTrafico(col, tabla):
    d = tabla.to_json(orient='records', date_format = 'iso')
    data_json = json.loads(d)
    print(data_json[0])
    for d in data_json:
        d['fechaHora']= parse(d['fechaHora'])
    col.insert_many(data_json) 

In [131]:
# RUTA FICHEROS PROCESADOS
ruta_enriquecidos_aire = "./procesados-inten"
month_files_inten = extract_files(ruta_enriquecidos_aire)
month_files_inten

['./procesados-inten/01-2015.csv',
 './procesados-inten/01-2016.csv',
 './procesados-inten/01-2017.csv',
 './procesados-inten/02-2015.csv',
 './procesados-inten/02-2016.csv',
 './procesados-inten/02-2017.csv',
 './procesados-inten/03-2015.csv',
 './procesados-inten/03-2016.csv',
 './procesados-inten/04-2016.csv',
 './procesados-inten/05-2015.csv',
 './procesados-inten/05-2016.csv',
 './procesados-inten/06-2015.csv',
 './procesados-inten/06-2016.csv',
 './procesados-inten/07-2015.csv',
 './procesados-inten/07-2016.csv',
 './procesados-inten/08-2015.csv',
 './procesados-inten/08-2016.csv',
 './procesados-inten/09-2015.csv',
 './procesados-inten/09-2016.csv',
 './procesados-inten/10-2015.csv',
 './procesados-inten/10-2016.csv',
 './procesados-inten/11-2015.csv',
 './procesados-inten/11-2016.csv',
 './procesados-inten/12-2015.csv',
 './procesados-inten/12-2016.csv']

In [130]:
client = open_conection()
db = client.aire
client, db

(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True),
 Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'aire'))

In [134]:
col = db.aire2015
#col.drop() # borro la colección entera
for file in month_files_inten:   
    # vuelvo los de de 2017 
    #if  int(file[-8:-4]) != 2017 :
    #    continue
    
    # juego solo con los de 2016
    #if  int(file[-8:-4]) != 2016 :
    #    continue
    
    # juego solo con los de 2015
    if  int(file[-8:-4]) != 2015 :
        continue
    
    
    month = pd.read_csv(file,  encoding='latin-1', parse_dates = [4])
    old = month.columns
    new = month.columns

    names = dict(zip(old, new))
    names['Estacion'] = 'codEstacion'
    names['ESTACION'] = 'estacion'
    names['ESTACIÓN'] = 'estacion'
    names['d100'] = 'intensidad100'
    names['d1000'] = 'intensidad1000'
    names['d500'] = 'intensidad500'
    names['d250'] = 'intensidad250'
    month = month.rename(columns = names) 

    storeTrafico(col, month)

{'codEstacion': 28079004, 'estacion': 'Pza. de España', 'Xcoord': -3.7122472222, 'Ycoord': 40.4238527778, 'fechaHora': '2015-01-01T01:00:00.000Z', 'Dato Horario': 2, 'magnitudCod': 1, 'magnitudNombre': 'Dióxido de Azufre', 'tecnicaCod': 38, 'tecnicaNom': 'Fluorescencia ultravioleta', 'valor': '00016', 'intensidad100': 0.0, 'intensidad1000': 1515.75, 'intensidad500': 0.0, 'intensidad250': 0.0}
{'codEstacion': 28079004, 'estacion': 'Pza. de España', 'Xcoord': -3.7122472222, 'Ycoord': 40.4238527778, 'fechaHora': '2015-02-01T01:00:00.000Z', 'Dato Horario': 2, 'magnitudCod': 1, 'magnitudNombre': 'Dióxido de Azufre', 'tecnicaCod': 38, 'tecnicaNom': 'Fluorescencia ultravioleta', 'valor': '00006', 'intensidad100': 2022.5, 'intensidad1000': 52931.0, 'intensidad500': 21987.25, 'intensidad250': 12548.25}
{'codEstacion': 28079004, 'estacion': 'Pza. de España', 'Xcoord': -3.7122472222, 'Ycoord': 40.4238527778, 'fechaHora': '2015-03-01T01:00:00.000Z', 'Dato Horario': 2, 'magnitudCod': 1, 'magnitudNo

# Prueba para enriqueceser intensiad consultando mongo. Descartado. Tarda mucho.

In [72]:
# consultas a mongo
def open_conection():
    client = pymongo.MongoClient('localhost',27017)
    return client

client.close()
client = open_conection()
db = client.trafico_timestamp
col2017 = db.trafico2017
col2017

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'trafico_timestamp'), 'trafico2017')

In [130]:
month4.head(2)

Unnamed: 0,Estacion,ESTACIÓN,Xcoord,Ycoord,fechaHora,Dato Horario,magnitudCod,magnitudNombre,tecnicaCod,tecnicaNom,valor
0,28079004,Pza. de España,-3.712247,40.423853,2017-01-01 01:00:00,2,1,Dióxido de Azufre,38,Fluorescencia ultravioleta,00007V
1,28079004,Pza. de España,-3.712247,40.423853,2017-01-01 02:00:00,2,1,Dióxido de Azufre,38,Fluorescencia ultravioleta,00006V


In [144]:
# sin agregado (esta funciona)
def intensidad(coleccion, d4 , hora):
    """
      input: colección donde hay qeu consultar
             lista de estaciones de tráfico
             e : estación que estamos enriqueciendo los datos
             hora 
      output: nivel de intensidad
    """
    res = {'sum100':0, 'sum250': 0, 'sum500':0, 'sum1000': 0}
    listaIdent1000 = d4['sum1000']
    if bool(listaIdent1000):  # si hay estaciones de tráfico cerca        
        cursor = coleccion.find(
                            {'identif' : { '$in': listaIdent1000},
                            'fecha': {'$eq': hora}}    ,
                     {'identif': 1, 'fecha': 1, 'intensidad':1})
        if (cursor.alive) :  # si ha encontrado datos
            # construyo el dataframe
            df = pd.DataFrame(list(cursor))
            res['sum100'] = df[df.identif.map(lambda x: x in d4['sum100'])].intensidad.sum()
            res['sum250'] = df[df.identif.map(lambda x: x in d4['sum250'])].intensidad.sum()
            res['sum500'] = df[df.identif.map(lambda x: x in d4['sum500'])].intensidad.sum()
            res['sum1000'] = df[df.identif.map(lambda x: x in d4['sum1000'])].intensidad.sum()
            return res
        else:
            return res
    else:
        return res

In [177]:
def consulta(col, lista, hora):
    m = 0
    q = col.aggregate([
     {
         '$match': {
             '$and': [
                        {'identif' : { '$in': lista}},
                        {'fecha': {'$eq': hora}}
              ]
        }
    },
    {   
        '$group': {
            '_id': 'A',
            'cantidad': { '$sum' : 1 },
            'intensidad': { '$sum' : "$intensidad" }
        }}
        ])
    if q.alive:
        m = list(q)[0]['intensidad']
    return m

In [178]:
%time t = consulta(col2017, d4['sum1000'], hora)
t

Wall time: 3.44 s


23715.5833333334

In [175]:
if t.alive:
    m = list(t)[0]['intensidad']
m

704.75

#### prueba acceso mongo para ver tiempos (tarda mucho - descartar opción)

In [181]:
# sin agregado (esta funciona)
def intensidad2(coleccion, d4 , hora):
    """
      input: colección donde hay qeu consultar
             lista de estaciones de tráfico
             e : estación que estamos enriqueciendo los datos
             hora 
      output: nivel de intensidad
    """
    res = {'sum100':0, 'sum250': 0, 'sum500':0, 'sum1000': 0}

    res['sum100'] =  consulta(coleccion, d4['sum100'], hora)
    res['sum250'] = consulta(coleccion, d4['sum250'], hora)
    res['sum500'] = consulta(coleccion, d4['sum500'], hora)
    res['sum1000'] = consulta(coleccion, d4['sum1000'], hora)
    return res

In [145]:
# test
d4 = distancias[28079004]
fechah  = month4.fechaHora[0]
# test
valorunico = intensidad(col2017, d4, fechah)

In [146]:
valorunico

{'sum100': 704.75,
 'sum1000': 23715.5833333334,
 'sum250': 4658.25,
 'sum500': 8613.0833333333}

In [186]:
def intensidadserie1(col, d4, serieFecha ):
    val = serieFecha.map(lambda x: intensidad(col, d4,  x))
    return val   # devuelve un diccionario con las 4 intensidades

In [187]:
def intensidadserie2(col, d4, serieFecha ):
    val = serieFecha.map(lambda x: intensidad2(col, d4,  x))
    return val   # devuelve un diccionario con las 4 intensidades

In [188]:
%time t  = intensidadserie1( col2017, d4 , month4.head(5).fechaHora )  # 85 horas

Wall time: 17.6 s


In [189]:
# esta es peor opción
%time t  = intensidadserie2( col2017, d4 , month4.head(5).fechaHora )  # +85 horas

Wall time: 1min 6s


In [190]:
t

0    {'sum100': 704.75, 'sum250': 4658.25, 'sum500'...
1    {'sum100': 1417.25, 'sum250': 10590.0, 'sum500...
2    {'sum100': 1645.25, 'sum250': 11090.0, 'sum500...
3    {'sum100': 1288.0, 'sum250': 8625.25, 'sum500'...
4    {'sum100': 934.75, 'sum250': 5497.25, 'sum500'...
Name: fechaHora, dtype: object

In [155]:
## aqui, pero tarda bastante ...  medir tiempos....   (calculo 85 horas)
len(month4) * 17 / 5 / 3600

85.49866666666667

In [286]:
def intensidadmonth(coleccion, serieEstacion, distancias , serieFecha):
    s = serieEstacion.map(distancias)
  
    valor_i  = intensidadserie(coleccion, s, serieFecha)
    
    
    return valor_i

In [287]:
s = intensidadmonth(col, month4.Estacion, distancias, month4.fechaHora)

NameError: name 'intensidadserie' is not defined

In [159]:
nuevas_col = ['nivelIntensidadTrafico100', 'nivelIntensidadTrafico250',
              'nivelIntensidadTrafico500', 'nivelIntensidadTrafico1000',]
col = col2017


q100 = col.aggregate([
     {
         '$match': {
             '$and': [
                        {'identif' : { '$in': d4['sum1000']}},
                        {'fecha': {'$eq': hora}}
              ]
        }
    },
    {   
        '$group': {
            '_id': 'A',
            'cantidad': { '$sum' : 1 },
            'intensidad': { '$sum' : "$intensidad" }
        }}
        ])


q100

<pymongo.command_cursor.CommandCursor at 0x23107a204a8>

### consultar mongo 

In [186]:
# consultas a mongo
def open_conection():
    client = pymongo.MongoClient('localhost',27017)
    return client

client = open_conection()
db = client.trafico_timestamp
col2017 = db.trafico2017_timestamp
col2017

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'trafico_timestamp'), 'trafico2017_timestamp')

In [162]:
cu = col2017.aggregate([
     {
         '$match': {
             '$and': [
                        {'identif' : { '$in':d4['sum100']}},
                        {'fecha': {'$eq': hora}}
              ]
        }
    },
    {   
        '$group': {
            '_id': 28079004,
            'cantidad_de_ordenes': { '$sum' : 1 },
            'intensidad': { '$sum' : "$intensidad" }
        }}
        ])

In [163]:
cu.alive

True

In [164]:
for d in cu:
    print(d)

{'_id': 28079004, 'cantidad_de_ordenes': 3, 'intensidad': 704.75}


# Consulta mongo agrregado para test


In [None]:
db.getCollection('trafico2017').aggregate([
     {
         '$match': {
             '$and': [
                        {'identif' : { '$in': ['16006', '16008', '16041']}},
                        {'fecha': {'$eq': ISODate("2017-02-01T00:00:00.000Z")}}
              ]
        }
    },
    {   
        '$group': {
            '_id': 'A',
            'cantidad': { '$sum' : 1 },
            'intensidad': { '$sum' : "$intensidad" }
        }}
        ])
                            
      