In [None]:
sc.install_pypi_package('boto3')
sc.install_pypi_package('pandas')
sc.install_pypi_package('scipy')

In [3]:
import json
import numpy as np
import pandas as pd
pd.set_option("display.max_columns", 100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
import boto3

BUCKET_NAME = 'ui1-tfm-data'

# Credencniales de autentificación de AWS
s3 = boto3.client('s3', aws_access_key_id = "xxx",
                        aws_secret_access_key = "xxx",
                        aws_session_token="xxx")

# Descargamos los ficheros con los que vamos a trabajar desde el bucket previamente creado en S3
s3.download_file(BUCKET_NAME, 'air-data/stations.json', '/tmp/stations.json')
s3.download_file(BUCKET_NAME, 'air-data/spain_airQuality.csv', '/tmp/spain_airQuality.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
<br>

# Estaciones

In [5]:
# Cargamos el archivo que contiene las estaciones
data = json.load(open('/tmp/stations.json'))

# Creamos el dataframe
df_stations = pd.DataFrame(data["data"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Filtramos las estaciones en España
df_stations_spain = df_stations.loc[df_stations['CountryOrTerritory'] == 'Spain']

# Eliminamos las estaciones de fuera de la península (Islas Canarias)
# lat > 35.512 AND lat < 44.512 AND lon > -10.415 AND lon < 5.054
df_stations_spain = df_stations_spain[(df_stations_spain.SamplingPoint_Latitude > 35.512) & (df_stations_spain.SamplingPoint_Latitude < 44.512) & (df_stations_spain.SamplingPoint_Longitude > -10.415) & (df_stations_spain.SamplingPoint_Longitude < 5.054)]

# Seleccionamos los campos relevantes
df_stations_spain = df_stations_spain[['StationLocalId', 'SamplingPoint_Latitude', 'SamplingPoint_Longitude']]

# Renombramos los campos
df_stations_spain.columns = ['Station', 'Latitude', 'Longitude']

# Reseteamos el índice del dataframe
df_stations_spain = df_stations_spain.reset_index(drop=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
print(len(df_stations_spain))
df_stations_spain.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

493
       Station  Latitude  Longitude
0  STA_ES0001R  39.54694   -4.35056
1  STA_ES0005R  42.72056   -8.92361
2  STA_ES0006R  39.87528    4.31639
3  STA_ES0007R  37.23722   -3.53417
4  STA_ES0008R  43.43917   -4.85000

---
El dataset final con la información sobre las estaciones de medición en España contiene los siguientes campos:

*   **Station**: Código identificativo de la estación
*   **Latitude**: Coordenada de latitud de la estación
*   **Longitude**: Coordenada de longitud de la estación

---
<br>

# Mediciones

In [8]:
# Cargamos el archivo y creamos el dataframe con las observaciones de los contaminantes
df = pd.read_csv('/tmp/spain_airQuality.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Seleccionamos los campos relevantes
df_measurements = df[['AirQualityStation', 'AirPollutant', 'Concentration', 'UnitOfMeasurement', 'DatetimeBegin']]

# Renombramos los campos
df_measurements.columns =['Station', 'AirPollutant', 'Concentration', 'UnitOfMeasurement', 'Datetime']

# Eliminamos las entradas cuyas mediciones son nulas
df_measurements = df_measurements[df_measurements['Concentration'].notna()]

# Convertimos el campo de la fecha a formato Datetime
df_measurements['Datetime'] = pd.to_datetime(df_measurements['Datetime'])

# Seleccinamos las entradas que se corresponden al mes de enero
df_measurements = df_measurements.loc[df_measurements['Datetime'] < '2020-2-1']

# Reseteamos el índice del dataframe
df_measurements = df_measurements.reset_index(drop=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# join con el dataframe de las estaciones para obtener su localizacion
combined_df = df_measurements.merge(df_stations_spain, left_on='Station', right_on='Station')
combined_df = combined_df[['AirPollutant', 'Concentration', 'UnitOfMeasurement', 'Station', 'Latitude', 'Longitude', 'Datetime']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
print(len(combined_df))
combined_df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1646807
  AirPollutant  Concentration UnitOfMeasurement      Station   Latitude  \
0           NO           36.0             µg/m3  STA_ES1633A  37.993611   
1           NO           68.0             µg/m3  STA_ES1633A  37.993611   
2           NO           58.0             µg/m3  STA_ES1633A  37.993611   
3           NO           47.0             µg/m3  STA_ES1633A  37.993611   
4           NO           42.0             µg/m3  STA_ES1633A  37.993611   

   Longitude                  Datetime  
0  -1.144722 2020-01-01 00:00:00+01:00  
1  -1.144722 2020-01-01 01:00:00+01:00  
2  -1.144722 2020-01-01 02:00:00+01:00  
3  -1.144722 2020-01-01 03:00:00+01:00  
4  -1.144722 2020-01-01 04:00:00+01:00

---
Así, el dataset final en relación a las mediciones de calidad del aire queda con los siguientes campos:


*   **AirPollutant**: Nombre identificactivo del contaminante medido.
*   **Concentration**: Concentración medida para este tipo de contaminante.
*   **UnitOfMeasurement**: Unidad de medida para este tipo de contaminante.
*   **Station**: Código identificactivo de la estación de medición que ha recopilado la información.
*   **Latitude**: Coordenada de latitud de la estación de medición.
*   **Longitude**: Coordenada de longitud de la estación de medición.
*   **Datetime**: Hora inicial de la medición.

---
<br>

# Preparación del dataset final
<br>

## Funciones utilizadas

In [12]:
def get_grid(lon_steps, lat_steps, n):
  '''
  Función que genera un diccionario con la posición de las celdas resultantes de dividir el área en nxn celdas
  '''
  grid_dict = {}
  
  lat_stride = lat_steps[1] - lat_steps[0]
  lon_stride = lon_steps[1] - lon_steps[0]

  count = 0
  for lat in lat_steps[:-1]:
    for lon in lon_steps[:-1]:
      count = count + 1
      # Define dimensions of box in grid
      upper_left = [lon, lat + lat_stride]
      upper_right = [lon + lon_stride, lat + lat_stride]
      lower_right = [lon + lon_stride, lat]
      lower_left = [lon, lat]
      grid_dict[count] = [upper_left[0], upper_left[1], lower_right[0], lower_right[1]]
  return grid_dict

N_DIVISIONS = 100 # Número de divisiones horizontales y verticales
# lat > 35.512 AND lat < 44.512 AND lon > -10.415 AND lon < 5.054
x_steps = np.linspace(-10.415, 5.054, N_DIVISIONS + 1) # Longitude
y_steps = np.linspace(35.512, 44.512, N_DIVISIONS + 1) # Latitude
grid_dict = get_grid(x_steps, y_steps, N_DIVISIONS) # Diccionario que contiene las coordenadas de cada celda


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
def remove_outliers(df):
  '''
  Identifica y elimina los outliers del campo 'Concentration'
  '''
  return df[((df.Concentration - df.Concentration.mean()) / df.Concentration.std()).abs() < 3]

def group_by_day_station(df):
  '''
  Para cada día del mes, se obtiene la media de las mediciones en cada estación
  '''
  return df[['Datetime', 'Concentration', 'Station', 'Longitude', 'Latitude']].groupby([df['Datetime'].dt.day, 'Station']).mean().reset_index()

from scipy.interpolate import griddata
def interpolate(df, lon_steps, lat_steps, n):
  '''
  Crea una red con puntos cada 100Km y genera nuevos datos para estos puntos interpolando los datos ya conocidos
  '''
  x = df["Longitude"].to_numpy()    
  y = df["Latitude"].to_numpy()
  z = df["Concentration"].to_numpy()

  xi, yi = np.meshgrid(lon_steps, lat_steps)

  # interpolate
  zi = griddata((x,y),z,(xi,yi),method='linear')

  # Utilizamos los nuevos valores para crear un nuevo dataframe
  x_column =  []
  for i in xi:
    for j in i:
      x_column.append(j)
  
  y_column =  []
  for i in yi:
    for j in i:
      y_column.append(j)

  z_column =  []
  for i in zi:
    for j in i:
      z_column.append(j)

  data = [x_column, y_column, z_column]
  columns = ['x', 'y', 'z']
  return pd.DataFrame(np.array(data).T, columns=columns)

def interpolateAll(df, n):
  '''
  Tiene como resultado un dataframe en el que se guardan los datos generados cada día según el contaminante
  '''
  interpolated_pollutant_df = pd.DataFrame()

  for n_day in range(1,31):
    day_df = df.loc[df['Datetime'] == n_day]
    interpolated_day_df = interpolate(day_df, x_steps, y_steps, n)
    interpolated_day_df['Day'] = n_day
    interpolated_pollutant_df = interpolated_pollutant_df.append(interpolated_day_df)

  return interpolated_pollutant_df

def locate_point(grid_x, grid_y, point_x, point_y):
  '''
  Localiza la celda a la que pertenece un punto y devuelve sus índices
  '''  
  x_step = grid_x[1]-grid_x[0]
  y_step = grid_y[1]-grid_y[0]
  cell_x = ((point_x - grid_x[0])//x_step) + 1
  cell_y = ((point_y - grid_y[0])//y_step) + 1
  return cell_x, cell_y

def get_cell_num(cell_x, cell_y, n):
  '''
  Devuelve el número de una celda dados sus índices X e Y
  '''
  return (((cell_y - 1) * (n-1)) + cell_x)

def addCellToAll(df, n):
  '''
  Función que itera sobre todas las entradas y añade la celda en la que se encuentra el vuelo en cada momento
  '''
  df['Cell'] = ''
  for i, row in df.iterrows():
    point_x = row[0] # Longitude
    point_y = row[1] # Latitude
    cell_x, cell_y = locate_point(x_steps, y_steps, point_x, point_y)
    cell_num = get_cell_num(cell_x, cell_y, n+1)
    df.at[i,'Cell'] = int(cell_num)
  return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
print(len(combined_df))
combined_df.sample(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1646807
        AirPollutant  Concentration UnitOfMeasurement      Station   Latitude  \
18223             O3           52.0             µg/m3  STA_ES1690A  40.269444   
1274480   NOX as NO2           34.0             µg/m3  STA_ES1747A  42.826940   
436269            O3           26.0             µg/m3  STA_ES1225A  41.615795   
1605375          SO2            4.0             µg/m3  STA_ES0629A  36.185620   
29541             NO            4.3             µg/m3  STA_ES1096A  42.351944   

         Longitude                  Datetime  
18223    -0.078889 2020-01-22 10:00:00+01:00  
1274480  -1.649440 2020-01-12 18:00:00+01:00  
436269    0.615726 2020-01-07 02:00:00+01:00  
1605375  -5.488930 2020-01-05 01:00:00+01:00  
29541    -7.876944 2020-01-18 09:00:00+01:00

In [15]:
# Separamos por tipo de contaminante
df_NO = combined_df.loc[combined_df['AirPollutant'] == 'NO']
df_SO2 = combined_df.loc[combined_df['AirPollutant'] == 'SO2']
df_NO2 = combined_df.loc[combined_df['AirPollutant'] == 'NO2']
df_NOX = combined_df.loc[combined_df['AirPollutant'] == 'NOX as NO2']
df_CO = combined_df.loc[combined_df['AirPollutant'] == 'CO']
df_O3 = combined_df.loc[combined_df['AirPollutant'] == 'O3']
df_PM25 = combined_df.loc[combined_df['AirPollutant'] == 'PM2.5']
df_PM10 = combined_df.loc[combined_df['AirPollutant'] == 'PM10']
df_C6H6 = combined_df.loc[combined_df['AirPollutant'] == 'C6H6']

# Identificamos y eliminamos outliers de cada nuevo dataframe
df_NO = remove_outliers(df_NO)
df_SO2 = remove_outliers(df_SO2)
df_NO2 = remove_outliers(df_NO2)
df_NOX = remove_outliers(df_NOX)
df_CO = remove_outliers(df_CO)
df_O3 = remove_outliers(df_O3)
df_PM25 = remove_outliers(df_PM25)
df_PM10 = remove_outliers(df_PM10)
df_C6H6 = remove_outliers(df_C6H6)

# Agrupamos cada contaminante por día y estación y hacemos la media
df_NO_by_day = group_by_day_station(df_NO)
df_SO2_by_day = group_by_day_station(df_SO2)
df_NO2_by_day = group_by_day_station(df_NO2)
df_NOX_by_day = group_by_day_station(df_NOX)
df_CO_by_day = group_by_day_station(df_CO)
df_O3_by_day = group_by_day_station(df_O3)
df_PM25_by_day = group_by_day_station(df_PM25)
df_PM10_by_day = group_by_day_station(df_PM10)
df_C6H6_by_day = group_by_day_station(df_C6H6)

# Utilizamos los datos actuales para obtener un valor interpolado de los anteriores para cada 100Km
interpollated_df_NO = interpolateAll(df_NO_by_day, N_DIVISIONS)
interpollated_df_SO2 = interpolateAll(df_SO2_by_day, N_DIVISIONS)
interpollated_df_NO2 = interpolateAll(df_NO2_by_day, N_DIVISIONS)
interpollated_df_NOX = interpolateAll(df_NOX_by_day, N_DIVISIONS)
interpollated_df_CO = interpolateAll(df_CO_by_day, N_DIVISIONS)
interpollated_df_O3 = interpolateAll(df_O3_by_day, N_DIVISIONS)
interpollated_df_PM25 = interpolateAll(df_PM25_by_day, N_DIVISIONS)
interpollated_df_PM10 = interpolateAll(df_PM10_by_day, N_DIVISIONS)
interpollated_df_C6H6 = interpolateAll(df_C6H6_by_day, N_DIVISIONS)

# Renombramos la columna 'Concentration' en función del contaminante antes de unificar todos los dataframes un uno solo
interpollated_df_NO.columns = ['Longitude', 'Latitude', 'NO', 'Day']
interpollated_df_SO2.columns = ['Longitude', 'Latitude', 'SO2', 'Day']
interpollated_df_NO2.columns = ['Longitude', 'Latitude', 'NO2', 'Day']
interpollated_df_NOX.columns = ['Longitude', 'Latitude', 'NOX', 'Day']
interpollated_df_CO.columns = ['Longitude', 'Latitude', 'CO', 'Day']
interpollated_df_O3.columns = ['Longitude', 'Latitude', 'O3', 'Day']
interpollated_df_PM25.columns = ['Longitude', 'Latitude', 'PM2.5', 'Day']
interpollated_df_PM10.columns = ['Longitude', 'Latitude', 'PM10', 'Day']
interpollated_df_C6H6.columns = ['Longitude', 'Latitude', 'C6H6', 'Day']

# Por último, hacemos un join de todos los contaminantes para tenerlos en un único dataframe
from functools import reduce
final_df = reduce(lambda x,y: pd.merge(x,y, on=['Longitude' , 'Latitude', 'Day'], how='outer'), [interpollated_df_NO, interpollated_df_SO2, interpollated_df_NO2, interpollated_df_NOX, interpollated_df_CO, interpollated_df_O3, interpollated_df_PM25, interpollated_df_PM10, interpollated_df_C6H6])

# Eliminamos las entradas cuyas mediciones son nulas para todos los contaminantes
final_df = final_df.dropna(thresh=9)

# Rellenamos los valores nulos que que quedan con la media de las filas anteriores y las siguientes
final_df = final_df.where(final_df.notnull(), other=(final_df.fillna(method='ffill')+final_df.fillna(method='bfill'))/2)

# Lo nulos que no se han podido calcular, se rellenan con la media de toda la columna
for i in final_df.columns[final_df.isnull().any(axis=0)]:
    final_df[i].fillna(final_df[i].mean(), inplace=True)

# Reseteamos el índice del dataframe
final_df = final_df.reset_index(drop=True)

# Asignamos una celda a cada entrada del dataframe
final_df = addCellToAll(final_df, N_DIVISIONS)

# Ordenamos las columnas para mejor legibilidad
final_df = final_df[['Day', 'Cell', 'NO', 'SO2', 'NO2', 'NOX', 'CO', 'O3', 'PM2.5', 'PM10', 'C6H6']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Comprobamos si existen entradas nulas
final_df.isnull().sum(axis = 0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Day      0
Cell     0
NO       0
SO2      0
NO2      0
NOX      0
CO       0
O3       0
PM2.5    0
PM10     0
C6H6     0
dtype: int64

In [17]:
print(len(final_df))
final_df.sample(15)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

135030
        Day  Cell         NO        SO2        NO2        NOX        CO  \
111561   25  7473   1.951532   1.201194   6.575093   8.939241  0.256835   
79942    18  7147   1.172529   4.189702   4.758433   5.525672  0.365007   
107452   24  8059   6.010746   1.897103  15.715557  24.951700  0.310899   
3450      1  6847   3.430196   1.931914   5.543974   8.867394  0.428484   
107960   25  1540   4.650546   9.794944  19.389237  25.682614  0.179127   
40444     9  8636   1.530117   2.895572   3.450328   4.986908  0.388407   
29213     7  5268   7.649132   3.962374  24.456884  36.009823  0.217872   
111849   25  7939   2.805563   2.285203   9.951065  15.693888  0.461068   
31097     7  7927  23.630916  12.948562  23.589224  56.799907  0.684477   
72789    17  3221   9.441261   2.433520  17.617761  31.380320  0.489707   
2359      1  5377   1.592851   1.427729  10.932229  11.180079  0.303671   
127164   29  4141   2.397997   1.903789   7.445855  13.511234  0.378385   
104062   24  2961 

---
El dataset final contiene los siguientes campos:

*   **Day**: Día (número) al que se corresponde la medición
*   **Cell**:Celda a la que se corresponde la medición
*   **NO**: Media de las mediciones de Óxido de nitrógeno para ese día en ese punto
*   **SO2**: Media de las mediciones de Dióxido de azufre para ese día en ese punto
*   **NO2**: Media de las mediciones de Dióxido de nitrógeno para ese día en ese punto
*   **NOX**: Media de las mediciones de otros compuestos de oxígeno y nitrógeno para ese día en ese punto
*   **CO**: Media de las mediciones de Monóxido de carbono para ese día en ese punto
*   **O3**: Media de las mediciones de Ozono para ese día en ese punto
*   **PM2.5**: Media de las mediciones de materia particulada 2.5 para ese día en ese punto
*   **PM10**: Media de las mediciones de materia particulada 10 para ese día en ese punto
*   **C6H6**: Media de las mediciones de Benceno para ese día en ese punto

---
<br>

In [18]:
# Guardamos el dataframe final como un archivo json y lo almacenamos en S3
final_df.to_json(r'/tmp/final_airQuality_dataset.json')
s3.upload_file('/tmp/final_airQuality_dataset.json', BUCKET_NAME, 'air-data/final_airQuality_dataset.json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…