# ETL .- INTEGRACIÓN DE DATOS GEOGRÁFICOS Y METEREOLÓGICOS DE ESPAÑA A PARTIR DE DIVERSAS FUENTES DE DATOS PÚBLICOS.

Se recomienda correr el código en [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb).
<table align="center">
 <td align="center"><a target="_blank" href="https://colab.research.google.com/github/andreadgalis/TFM/blob/main/TFM_Andrea_Delgado_Galisteo.ipynb">
        <img src="https://i.ibb.co/2P3SLwK/colab.png"  style="padding-bottom:5px;" />Run in Google Colab</a></td>
  <td align="center"><a target="_blank" href="https://github.com/andreadgalis/TFM/blob/main/TFM_Andrea_Delgado_Galisteo.ipynb">
        <img src="https://i.ibb.co/xfJbPmL/github.png"  height="70px" style="padding-bottom:5px;"  />View Source on GitHub</a></td>
</table>

Primero los imports e instalación de aquetes necesarios.

In [1]:
! pip install unidecode
! pip install dms2dec
! pip install geopandas
! pip install requests
! pip install pandas
! pip install zipfile
! pip install geopy
! pip install matplotlib
! pip install numpy 
! pip install shapely


import requests
import json
import pandas as pd
import zipfile
import io
import os
import unidecode
import geopy.distance
from dms2dec.dms_convert import dms2dec
import geopandas as gpd
import matplotlib.path as mplPath
import numpy as np
from shapely.geometry import Point, Polygon
from google_drive_downloader import GoogleDriveDownloader as gdd
import gdown
import datetime
from datetime import date
from dateutil.relativedelta import relativedelta
from google.colab import files
import time

with open('ETL_logging.txt', 'w') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - La instalación de todos los paquetes e imports fue correcta.'
  f.write(message)
  f.write('\n')
  f.close()


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting unidecode
  Downloading Unidecode-1.3.4-py3-none-any.whl (235 kB)
[K     |████████████████████████████████| 235 kB 29.2 MB/s 
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.4
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting dms2dec
  Downloading dms2dec-0.1-py3-none-any.whl (3.0 kB)
Installing collected packages: dms2dec
Successfully installed dms2dec-0.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting geopandas
  Downloading geopandas-0.10.2-py2.py3-none-any.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 26.3 MB/s 
[?25hCollecting fiona>=1.8
  Downloading Fiona-1.8.21-cp37-cp37m-manylinux2014_x86_64.whl (16.7 MB)
[K     |████████████████████████████████| 16.7 MB 250 kB/s 
[?25hCollecting pyproj>=2.2.

## Municipios

Antes de comenzar con el tratamiento de datos climatológicos, es de vital importancia trabajarlos sobre un marco geográfico ordenado. Esto puede parecer sencillo y una idea muy básica pero este tipo de datos, los geográficos, suelen presentar muchos problemas ya que pueden no estar siempre referenciados de la misma manera, su homogeneización es pues fundamental. 

Lo primero es saber que las observaciones climatológicas se producen siempre en una red de estaciones meterelógicas presentes en toda la red nacional. Por esto en la web del AEMET, por ejemplo, presentan un inventario con todos los municipios del territorio junto con algunos datos de interés geográfico.

Siguiendo pues la misión de obtener una tabla entidad de nuestra futura base de datos de municipios, otra de las fuentes de datos que vamos a emplear en esta ETL es el listado de todos los municipios de España disponible en el INE. 

Esta web será visitada a lo largo del código para obtener otras informaciones.

Primero obtenemos el fichero maestro de municipios disponible en la web de aemet OpenData.

Para ello, hacemos uso del servicio API que tienen disponible. Antes es necesario haber obtenido una key. Con la URL y un simple get tenemos el primer dataframe.

In [2]:

#ctes necesarias para las APIs en la web de AEMET
aemet_api_key = {"api_key":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhbmRyZWFkZ2FsaXNAZ21haWwuY29tIiwianRpIjoiZGFjZDgwNmYtNTk2Ny00MTkyLWE2YzYtM2I3M2VlYTQ5ZWY0IiwiaXNzIjoiQUVNRVQiLCJpYXQiOjE2NjA4NDExMjIsInVzZXJJZCI6ImRhY2Q4MDZmLTU5NjctNDE5Mi1hNmM2LTNiNzNlZWE0OWVmNCIsInJvbGUiOiIifQ.faXcsh-CbHKYIkI0nO0vJp5eE4uEbq5B-n9v7LwMw3w"}
headers = {'cache-control': "no-cache"}

#URL

aemet_municipios_url = "https://opendata.aemet.es/opendata/api/maestro/municipios/"


with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Comenzando descarga dataframe municipios API AEMET.'
  f.write(message)
  f.write('\n')
  f.close()


#GET

try:
  municipios_aemet_json = requests.request("GET", aemet_municipios_url, headers=headers, params=aemet_api_key, timeout = (5, 10)).json()
  #Tabla de municipios de la AEMET
  municipios_aemet = pd.json_normalize(municipios_aemet_json)
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_1 = len(municipios_aemet)
    message_ok = ahora+' - INFO - Acceso API AEMET a dataframe municipios llevado con éxito. El dataframe contiene '+str(num_rows_1)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except: 
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - No se ha podido acceder a dataframe municipios AEMET API.'
    f.write(message)
    f.write('\n')
    f.close()


Descargarmos ahora el fichero disponible en la web del INE con el listado completo  de municipios con los códigos correspondientes 
de provincias y comunidades y ciudades autonómicas.

En esta ocasión el link es directo y público a un fichero xlsx de la web del INE.

In [3]:
#URL, esta vez es directa a un xml
ine_municipios_url = 'https://www.ine.es/daco/daco42/codmun/codmun20/20codmun.xlsx'

with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Comenzando descarga dataframe municipios INE doc.'
  f.write(message)
  f.write('\n')
  f.close()

try:
  ine_municipios_file = pd.ExcelFile(ine_municipios_url)
  #Tabla de municipios del INE
  municipios_ine = pd.read_excel(ine_municipios_file, 'dic19', skiprows=1, converters={'CODAUTO':str,'CPRO':str, 'CMUN':str, 'DC':str}) 
  #Se castean a texto los códigos porque si no se interpretan como números enteros
  #Eliminamos los espacios previos y posteriores de los nombres de las columnas
  municipios_ine = municipios_ine.rename(columns=lambda x: x.strip())
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_2 = len(municipios_ine)
    message_ok = ahora+' - INFO - Acceso documento INE de municipios llevado con éxito. El dataframe contiene '+str(num_rows_2)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()    
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - No se ha podido acceder a dataframe municipios INE.'
    f.write(message)
    f.write('\n')
    f.close()  


También será necesaria la siguiente tabla que contiene el nombre de las provincias y comunidades autónomas junto  con los códigos.
En esta ocasión leemos los datos directamente de la web (empleando read_html).

In [4]:
#URL, esta vez directa a la página web del INE
ine_ccaa_prov_url = 'https://www.ine.es/daco/daco42/codmun/cod_ccaa_provincia.htm'

with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Comienza lectura tabla web INE códigos autonómicos y provinciales.'
  f.write(message)
  f.write('\n')
  f.close()

try:
  table_MN = pd.read_html(ine_ccaa_prov_url, skiprows=[51], converters={'CODAUTO':str,'CPRO':str}) 
  #La razón del skip de la línea 51 es que era otro cabecero donde especificaba que empezaba las secciones de ciudades autonómicas
  prov_com_codigos_ine = table_MN[0]
  #Se eliminan los espacios en blanco, que al emplear este método de leer desde la web son muy frecuentes
  prov_com_codigos_ine = prov_com_codigos_ine.rename(columns=lambda x: x.strip())
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_3 = len(prov_com_codigos_ine)
    message_ok = ahora+' - INFO - Acceso tabla web INE al dataframe de códigos autonómicos y provinciales llevado con éxito. El dataframe contiene '+str(num_rows_3)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()  
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - No se ha podido acceder a dataframede códigos autonómicos y provinciales del INE via web.'
    f.write(message)
    f.write('\n')
    f.close()  

Creamos los dataframes comunidades autónomas y provincias. Serán dos tablas a ser contenidas en la base de datos.

In [5]:
comunidades_autonomas = prov_com_codigos_ine[['CODAUTO', 'Comunidad Autónoma']]
comunidades_autonomas.rename(columns = {'Comunidad Autónoma':'Comunidad_autonoma'}, inplace = True)
comunidades_autonomas = comunidades_autonomas.drop_duplicates()



provincias = prov_com_codigos_ine[['CPRO', 'Provincia']]
provincias = provincias.drop_duplicates()


with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Entidades provincias y comunidades_autonomas creadas.'
  f.write(message)
  f.write('\n')
  f.close()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


Las dos tablas de municipios contienen información complementaria. Mientras que de la tabla del INE nos quedaremos con los identificadores; de la tabla del Aemet extraemos otros datos como la altitud o coordenadas geográficas del centro de cada municipio.

La primera unión lógica entre estos dataframes descargados, es unir la tabla de municipios del INE con la tabla que tiene el nombre de las provincias y comunidades autónomas. Ello es posible ya que ambas tablas incluyen los códigos de comunidad autónoma y provincia.

Un nuevo campo que voy a introducir y que jugará un papel fundamental es el ID de municipio del INE. Este se consigue uniendo el CPRO con el CMUN, lo que resulta en un identificador único de largo 5 para todos los municipios. 

In [6]:
#MERGE

total_ine_municipios = pd.merge(municipios_ine, prov_com_codigos_ine, on=['CODAUTO', 'CPRO'])

#Nuevo campo identificador

total_ine_municipios["INE_MUN_ID"] = total_ine_municipios["CPRO"]+total_ine_municipios["CMUN"]

with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  num_rows_4 = len(total_ine_municipios)
  message = ahora+' - INFO - Merge datos INE municipales e INE códigos provinciales y autonómicos existoso. El dataframe resultante contiene '+str(num_rows_4)+' líneas.'
  f.write(message)
  f.write('\n')
  f.close()


Esto nos deja todos los datos proporcionados por el INE de la siguiente forma:

In [7]:
total_ine_municipios

Unnamed: 0,CODAUTO,CPRO,CMUN,DC,NOMBRE,Comunidad Autónoma,Provincia,INE_MUN_ID
0,16,01,051,3,Agurain/Salvatierra,País Vasco,Araba/Álava,01051
1,16,01,001,4,Alegría-Dulantzi,País Vasco,Araba/Álava,01001
2,16,01,002,9,Amurrio,País Vasco,Araba/Álava,01002
3,16,01,049,3,Añana,País Vasco,Araba/Álava,01049
4,16,01,003,5,Aramaio,País Vasco,Araba/Álava,01003
...,...,...,...,...,...,...,...,...
8126,02,50,296,7,"Zaida, La",Aragón,Zaragoza,50296
8127,02,50,297,3,Zaragoza,Aragón,Zaragoza,50297
8128,02,50,298,9,Zuera,Aragón,Zaragoza,50298
8129,18,51,001,3,Ceuta,Ceuta,Ceuta,51001


La siguiente unión que puede realizarse es pues la unión de esta tabla contenedora del nuevo identificador por municipio con la tabla de municipios del Aemet que como ya se ha dicho añade otro tipo de información (geográfica y demográfica).

Para ello, tuve que interpretar uno de los campos que presentaba este dataframe desde el principio, el campo "id". Tras fijarme en su estructura y comparar varios casos, es claro que eliminando de este código la cadena de strings "id", el código resultante encaja a la perfección con el ID que se ha conformado reviamente para cada municipio haciendo uso de los códigos de provincia y municipio del INE.

In [8]:
try:
 #Eliminando la cadena de texto 'id'
  municipios_aemet['id'] = municipios_aemet['id'].str.replace('id','')
  municipios_aemet.rename(columns = {'id':'INE_MUN_ID'}, inplace = True)
  #MERGE
  total_municipios = pd.merge(municipios_aemet, total_ine_municipios, on='INE_MUN_ID', how='inner')
  MUNICIPIOS = total_municipios[['INE_MUN_ID', 'CPRO', 'CODAUTO','NOMBRE','capital','latitud','longitud','latitud_dec','longitud_dec','altitud','num_hab']]
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_5 = len(MUNICIPIOS)
    message_ok = ahora+' - INFO - Merge datos municipales INE y AEMET exitoso. El dataframe MUNICIPIOS contiene '+str(num_rows_5)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Merge datos municipales INE y AEMET no se ha podido realizar. FATAL error, no se puede continuar'
    f.write(message)
    f.write('\n')
    f.close()  


Quedando así la tabla con todas las uniones. Esta tabla consistirá pues en una de las entidades fundamentales de la base de datos; será la entidad MUNICIPIOS.

In [9]:
MUNICIPIOS

Unnamed: 0,INE_MUN_ID,CPRO,CODAUTO,NOMBRE,capital,latitud,longitud,latitud_dec,longitud_dec,altitud,num_hab
0,44001,44,02,Ababuj,Ababuj,"40º32'54.450744""","-0º48'28.084212""",40.54845854,-0.80780117,1372,65
1,40001,40,07,Abades,Abades,"40º54'58.824504""","-4º16'4.346004""",40.91634014,-4.26787389,971,873
2,48001,48,16,Abadiño,Abadiño-Zelaieta,"43º8'51.525564""","-2º36'24.743484""",43.14764599,-2.60687319,144,7504
3,10001,10,11,Abadía,Abadía,"40º15'34.315272""","-5º58'40.289016""",40.25953202,-5.97785806,451,324
4,27001,27,12,Abadín,Abadín o Provecende,"43º21'46.874736""","-7º28'19.72182""",43.36302076,-7.47214495,515,2646
...,...,...,...,...,...,...,...,...,...,...,...
8115,08153,08,09,Òrrius,Òrrius,"41º33'18.670032""","2º21'17.785224""",41.55518612,2.35494034,255,690
8116,42134,42,07,Ólvega,Ólvega,"41º46'49.646784""","-1º58'57.597312""",41.78045744,-1.98266592,1030,3814
8117,18147,18,01,Órgiva,Órgiva,"36º54'4.3173""","-3º25'26.079168""",36.90119925,-3.42391088,455,5393
8118,23092,23,01,Úbeda,Úbeda,"38º0'41.393628""","-3º22'16.28976""",38.01149823,-3.37119160,750,35177


### Códigos postales

Otra de las informaciones relativas a los municipios que suelen ser muy necesarias es la de los códigos postales. En España un mismo municipio puede tener diferentes códigos postales pero además, un mismo código postal puede emplearse por más de un municipio.

En un principio, esta información la proporciona Correos pero es una información de pago. Sin embargo, he descubierto otra forma posible de obtener esta información. Esta es a través del callejero del censo electoral que prepara el INE también. 

En esta url, directa a un fichero zip directamente de la web del INE de nuevo, se presentan varias tablas en formato de ancho fijo, se supone para ser procesadas por programas especializados en interpretar y graficar datos geográficos. 

Se hace entonces un tratamiento sobre una de las tablas (TRAM.P01-52.D220630.G220704), la que se supone contiene todas las direcciones postales del territorio español. Tras una serie de recortes dentro de diversos códigos de los que no he conseguido interpretación ni documentación alguna, pude obtenerse el código ZIP así como el código INE de cada municipio. De nuevo se compararon varios casos antes de aplicar la transoformación a la tabla entera para asegurar.

Ha sido necesario emplear el encoding ISO-8859-1 en lugar del habitual UTF-8 porque se incluían algunos caracteres del diccionario español colo la letra Ñ.

In [10]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Comienza lectura tabla web INE callejero electoral con todas direcciones postales españolas.'
  f.write(message)
  f.write('\n')
  f.close()

try:
  #Descargamos la carpeta comprimida zip contenedora de todos los ficheros de tablas de ancho fijo
  !wget -nc 'http://www.ine.es/prodyser/callejero/caj_esp/caj_esp_072022.zip' #nc para que no vuelva a descargarla en caso de que ya se haya descargado
  zip = zipfile.ZipFile('caj_esp_072022.zip')

  fichero_direcciones = io.StringIO(zip.read('TRAM.P01-52.D220630.G220704').decode('ISO-8859-1'))
  #Para separar todos los campos, especifico los anchos de cada columna
  df = pd.read_fwf(fichero_direcciones, widths = [13, 29, 11, 8, 9, 8, 32, 50, 5, 25, 67, 11, 5], header=None)
  df.columns = ['ine_codigo_string', 'string_2', 'zip_string', 'string_4', 'string_5', 'string_6', 'string_7', 'nombre_ciudad', 'string_9', 'string_10', 'string_11', 'string_12', 'string_13']
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_6 = len(df)
    message_ok = ahora+' - INFO - Descarga callejero electoral exitosa. El dataframe contiene '+str(num_rows_6)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except: 
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Descarga callejero electoral fallida'
    f.write(message)
    f.write('\n')
    f.close()  


--2022-09-20 16:20:12--  http://www.ine.es/prodyser/callejero/caj_esp/caj_esp_072022.zip
Resolving www.ine.es (www.ine.es)... 195.254.149.130, 195.254.149.129
Connecting to www.ine.es (www.ine.es)|195.254.149.130|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://www.ine.es/prodyser/callejero/caj_esp/caj_esp_072022.zip [following]
--2022-09-20 16:20:12--  https://www.ine.es/prodyser/callejero/caj_esp/caj_esp_072022.zip
Connecting to www.ine.es (www.ine.es)|195.254.149.130|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/zip]
Saving to: ‘caj_esp_072022.zip’

caj_esp_072022.zip      [     <=>            ]  40.05M  1.29MB/s    in 32s     

2022-09-20 16:20:44 (1.26 MB/s) - ‘caj_esp_072022.zip’ saved [41992096]



Sobre toda la tabla anterior, hacemos pues el subset de las columnas que nos interesan y aplicamos las transformaciones necesarias:

In [11]:
subset = df[['ine_codigo_string', 'zip_string']]
subset['ine_codigo_string'] = subset['ine_codigo_string'].str[:5]
subset['zip_string'] = subset['zip_string'].str[:5]
ZIPs = subset.drop_duplicates()
ZIPs.rename(columns = {'zip_string':'ZIP'}, inplace = True)
ZIPs.rename(columns = {'ine_codigo_string':'INE_MUN_ID'}, inplace = True)
ZIPs = pd.merge(MUNICIPIOS, ZIPs, on ='INE_MUN_ID', how='inner')
ZIPs = ZIPs[['INE_MUN_ID', 'ZIP']]
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Tabla relación entre códigos postales y IDs de municipios creada correctamente.'
  f.write(message)
  f.write('\n')
  f.close()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [12]:
ZIPs

Unnamed: 0,INE_MUN_ID,ZIP
0,44001,44155
1,40001,40141
2,48001,48220
3,10001,10748
4,27001,27730
...,...,...
14566,23092,23314
14567,23092,23509
14568,23092,23469
14569,23092,23413


Ya conseguimos con esto una tabla entidad donde podemos consultar los códigos ZIPs de cada municipio y/o los municipios donde un ZIP es empleado.

De cara a poder formar una vista materializada total de municipios, contener esta información también será interesante. Con el fin de no multiplicar líneas (ya que los municipios pueden contener más de un código ZIP), los añadiré a la entidad municipios en forma de array. 

In [13]:
ZIPs_array = ZIPs.groupby('INE_MUN_ID').ZIP.apply(list).reset_index()
ZIPs_array.columns = ['INE_MUN_ID', 'ZIPs_array']
ZIPs_array.head()

Unnamed: 0,INE_MUN_ID,ZIPs_array
0,1001,"[01240, 01193]"
1,1002,"[01470, 01450, 01468]"
2,1003,"[01169, 01160, 01165]"
3,1004,"[01474, 01478]"
4,1006,"[01220, 01211]"


Con ello, hacemos un merge más y ya tenemos toda la información de los municipios unificada, en lo que será la vista materializada:

In [14]:
total_municipios = pd.merge(total_municipios, ZIPs_array, left_on ='INE_MUN_ID', right_on='INE_MUN_ID')
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Información ZIPs agrupada en array agregada a vista materializada total_municipios.'
  f.write(message)
  f.write('\n')
  f.close()

Dado que en esta tabla resultante alguna de las informaciones se han duplicado durante los joins y de cara a tener la información ordenada de una manera más lógica, reordeno y selecciono los campos.

In [15]:
total_municipios = total_municipios[['CODAUTO', 'CPRO','INE_MUN_ID','Comunidad Autónoma','Provincia','NOMBRE','capital','latitud','longitud','latitud_dec','longitud_dec','altitud','num_hab', 'ZIPs_array']]
total_municipios.head()

Unnamed: 0,CODAUTO,CPRO,INE_MUN_ID,Comunidad Autónoma,Provincia,NOMBRE,capital,latitud,longitud,latitud_dec,longitud_dec,altitud,num_hab,ZIPs_array
0,2,44,44001,Aragón,Teruel,Ababuj,Ababuj,"40º32'54.450744""","-0º48'28.084212""",40.54845854,-0.80780117,1372,65,[44155]
1,7,40,40001,Castilla y León,Segovia,Abades,Abades,"40º54'58.824504""","-4º16'4.346004""",40.91634014,-4.26787389,971,873,[40141]
2,16,48,48001,País Vasco,Bizkaia,Abadiño,Abadiño-Zelaieta,"43º8'51.525564""","-2º36'24.743484""",43.14764599,-2.60687319,144,7504,[48220]
3,11,10,10001,Extremadura,Cáceres,Abadía,Abadía,"40º15'34.315272""","-5º58'40.289016""",40.25953202,-5.97785806,451,324,[10748]
4,12,27,27001,Galicia,Lugo,Abadín,Abadín o Provecende,"43º21'46.874736""","-7º28'19.72182""",43.36302076,-7.47214495,515,2646,"[27730, 27843, 27849, 27845, 27738, 27737]"


### Geometrías

Otra información que resultará fundamental de cara a poder localizar puntos geográficos en un municipio (como más delante se hará con las estaciones metereológicas), es el polígono recinto de cada uno de los municipios. Esta información, más compleja, la descargamos del instituto nacional de geografía, a través de una petición post tras haber estudiado el código fuente de la página ya que no ocurre como en otras entidades donde se ponían a disposición APIs o ficheros directamente.

In [16]:
cnig_url = 'https://centrodedescargas.cnig.es/CentroDescargas/descargaDir'

secuencial = {'secuencialDescDir': '9000029'}

with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Comienza descarga web IGN de los polígonos recintos de cada municipio, provincia y comunidad.'
  f.write(message)
  f.write('\n')
  f.close()

try:
  ign_response = requests.post(cnig_url, data = secuencial, timeout = (10, 30))

  lineas_limite = ign_response.content
  with open('lineas_limite.zip', 'wb') as s:
      s.write(lineas_limite)

  with zipfile.ZipFile('lineas_limite.zip', 'r') as zipObj:
    # Extrae todo el contenido del Zip
    zipObj.extractall()

  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Descarga datos geométricos poligonales correcta.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Descarga datos geométricos poligonales fallida. FATAL no puede continuar.'
    f.write(message)
    f.write('\n')
    f.close()        

Lo que se descara aquí son unos ficheros en formatos .shp que contienen el polygon de cada uno de los municipios. Para poder tratar estos datos se hace uso del paquete geopandas; pero además, han de ir acompañados en el directorio donde se ubican de otros ficheros homónimos de diferentes extensiones (.cpg, .dbf, .prj, .shx).

Los municipios de Canarias vienen en ficheros separados a los de la península y baleares por lo que es necesario leer dos ficheros y concatenar ambos dataframes.

En el paso anterior ya han sido descargados y extráidos del zip por lo que pueden ser leídos directamente con la siguiente orden:

In [17]:
sf1 = gpd.read_file("SHP_ETRS89/recintos_municipales_inspire_peninbal_etrs89/recintos_municipales_inspire_peninbal_etrs89.shp")
sf2 = gpd.read_file("SHP_WGS84/recintos_municipales_inspire_canarias_wgs84/recintos_municipales_inspire_canarias_wgs84.shp")
sf = pd.concat([sf1, sf2], ignore_index=True)
sf.head()

Unnamed: 0,INSPIREID,COUNTRY,NATLEV,NATLEVNAME,NATCODE,NAMEUNIT,CODNUT1,CODNUT2,CODNUT3,geometry
0,ES.IGN.BDDAE.34010404096,ES,https://inspire.ec.europa.eu/codelist/Administ...,Municipio,34010404096,Urrácal,ES6,ES61,ES611,"POLYGON ((-2.37995 37.38348, -2.38005 37.38547..."
1,ES.IGN.BDDAE.34010404903,ES,https://inspire.ec.europa.eu/codelist/Administ...,Municipio,34010404903,La Mojonera,ES6,ES61,ES611,"POLYGON ((-2.71831 36.81474, -2.71757 36.81405..."
2,ES.IGN.BDDAE.34011818188,ES,https://inspire.ec.europa.eu/codelist/Administ...,Municipio,34011818188,Villanueva Mesía,ES6,ES61,ES614,"POLYGON ((-4.03369 37.24215, -4.03279 37.24251..."
3,ES.IGN.BDDAE.34010404097,ES,https://inspire.ec.europa.eu/codelist/Administ...,Municipio,34010404097,Velefique,ES6,ES61,ES611,"POLYGON ((-2.45634 37.21622, -2.45601 37.21650..."
4,ES.IGN.BDDAE.34010404098,ES,https://inspire.ec.europa.eu/codelist/Administ...,Municipio,34010404098,Vélez-Blanco,ES6,ES61,ES611,"POLYGON ((-2.25680 37.88997, -2.25078 37.89718..."


De cara a obtener el INE_MUN_ID  que venimos manejando en todas las tablas anteriores, es necesario hacer recortes dentro del campo NATCODE. Además, seleccioanremos tan solo esta columna junto con la que guarda la geometría.

In [18]:
sf['INE_MUN_ID'] = sf['NATCODE'].str[-5:]
MUN_GEOMETRY = sf[['INE_MUN_ID', 'geometry']]
MUN_GEOMETRY.rename(columns = {'geometry':'MUN_geometry'}, inplace = True)
MUN_GEOMETRY = pd.merge(MUNICIPIOS, MUN_GEOMETRY, on='INE_MUN_ID', how='inner')
MUN_GEOMETRY = MUN_GEOMETRY[['INE_MUN_ID', 'MUN_geometry']]
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Entidad geometría municipios MUN_GEOMETRY creada con éxito.'
  f.write(message)
  f.write('\n')
  f.close()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


Lo que interesa ahora es pues agregar a la tabla que será la vista materializada de municipios la geometría de cada uno de ellos:

In [19]:
total_municipios = total_municipios.merge(MUN_GEOMETRY, on='INE_MUN_ID')
total_municipios.head()
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Información geométrica municipal agregada a vista materializada total_municipios.'
  f.write(message)
  f.write('\n')
  f.close()

Otra de las geometrías descargadas y que pueden resultar de gran utilidad es la correspondiente a los polígonos de las provincias (que podrían obtenerse como resultado de sumar los polígonos de los municipios de cada una de ellas pero teniendo el dato ya descragado evitamos el esfuerzo computacional).

In [20]:
sf_p_1 = gpd.read_file("SHP_ETRS89/recintos_provinciales_inspire_peninbal_etrs89/recintos_provinciales_inspire_peninbal_etrs89.shp")
sf_p_2 = gpd.read_file("SHP_WGS84/recintos_provinciales_inspire_canarias_wgs84/recintos_provinciales_inspire_canarias_wgs84.shp")
sf_p = pd.concat([sf_p_1, sf_p_2], ignore_index=True)
sf_p.head()

Unnamed: 0,INSPIREID,COUNTRY,NATLEV,NATLEVNAME,NATCODE,NAMEUNIT,CODNUT1,CODNUT2,CODNUT3,geometry
0,ES.IGN.BDDAE.34160100000,ES,https://inspire.ec.europa.eu/codelist/Administ...,Provincia,34160100000,Araba/Álava,ES2,ES21,,"POLYGON ((-2.76808 42.61408, -2.76863 42.61334..."
1,ES.IGN.BDDAE.34080200000,ES,https://inspire.ec.europa.eu/codelist/Administ...,Provincia,34080200000,Albacete,ES4,ES42,,"POLYGON ((-2.55212 38.08501, -2.55207 38.08517..."
2,ES.IGN.BDDAE.34100300000,ES,https://inspire.ec.europa.eu/codelist/Administ...,Provincia,34100300000,Alacant/Alicante,ES5,ES52,,"MULTIPOLYGON (((-0.75223 37.88713, -0.75223 37..."
3,ES.IGN.BDDAE.34010400000,ES,https://inspire.ec.europa.eu/codelist/Administ...,Provincia,34010400000,Almería,ES6,ES61,,"MULTIPOLYGON (((-3.03624 35.93791, -3.03637 35..."
4,ES.IGN.BDDAE.34070500000,ES,https://inspire.ec.europa.eu/codelist/Administ...,Provincia,34070500000,Ávila,ES4,ES41,,"POLYGON ((-5.43382 40.24491, -5.43430 40.24276..."


Ahora busco cómo obtener el CPRO y hago la selección de las dos columnas que interesan:

In [21]:
sf_p['CPRO'] = (sf_p['NATCODE'].str[:6]).str[-2:]
PRO_GEOMETRY = sf_p[['CPRO', 'geometry']]
PRO_GEOMETRY.rename(columns = {'geometry':'PRO_geometry'}, inplace = True)
PRO_GEOMETRY = pd.merge(provincias, PRO_GEOMETRY, on='CPRO', how='inner')
PRO_GEOMETRY = PRO_GEOMETRY[['CPRO', 'PRO_geometry']]
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Entidad geometría municipios PRO_GEOMETRY creada con éxito.'
  f.write(message)
  f.write('\n')
  f.close()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


##Inventario de estaciones AEMET

Teniendo ya unas tablas que almacenan información geográfica suficiente y unificada, pude pasarse entonces a conocer las redes de estaciones metereológicas de aquellas institutos y organismos que vayamos a consultar.

Descargamos primero la información relativa al inventario de estaciones del que dispone la AEMET.

Para ello se hace de nuevo una llamada vía API.

In [22]:
#URL
inventario_estaciones_url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/inventarioestaciones/todasestaciones/"

try:
  inventario_estaciones_req = requests.request("GET", inventario_estaciones_url, headers=headers, params=aemet_api_key, timeout=(5,10))
  estaciones_data =  inventario_estaciones_req.json()['datos']
  #GET
  estaciones_data_json = requests.get(estaciones_data, timeout=(5,10)).json()
  estaciones_AEMET = pd.json_normalize(estaciones_data_json)
  estaciones_AEMET.head()
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_7 = len(estaciones_AEMET)    
    message_ok = ahora+' - INFO - Lectura datos inventario estaciones AEMET API correcta. El dataframe contiene '+str(num_rows_7)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - API AEMET inventario estaciones, la lectura no ha sido posible.'
    f.write(message)
    f.write('\n')
    f.close()   

Dado que las coordenadas no presentan el formato adecuado, se aplica transformación en estas columnas para poder transformarlas después a coordenadas en formato decimal.

In [23]:
estaciones_AEMET['latitud'] = estaciones_AEMET['latitud'].str[:2]+"º"+estaciones_AEMET['latitud'].str[2:4]+str("'")+estaciones_AEMET['latitud'].str[4:6]+str("''")+estaciones_AEMET['latitud'].str[6:8]
estaciones_AEMET['longitud'] = estaciones_AEMET['longitud'].str[:2]+"º"+estaciones_AEMET['longitud'].str[2:4]+str("'")+estaciones_AEMET['longitud'].str[4:6]+str("''")+estaciones_AEMET['longitud'].str[6:8]

In [24]:
estaciones_AEMET['latitud_dec'] = estaciones_AEMET['latitud'].apply(lambda x: dms2dec(x))
estaciones_AEMET['longitud_dec'] = estaciones_AEMET['longitud'].apply(lambda x: dms2dec(x))
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Formato coordenadas geográficas entidad estaciones_AEMET corregido.'
  f.write(message)
  f.write('\n')
  f.close()

A pesar de ser una tabla descargada del mismo enlace que el primer inventario de municipios, las estaciones no vinen referenciadas a ninguno de ellos, tan solo encontramos la provincia, una descripción textual de su ubicación y unas coordenadas.

En un primer momento lo único que podemos unir es entonces el código de provincia. Para conseguirlo, también necesité hacer una serie de transformaciones sobre la forma en la que estas están escritas en ambas tablas (la de MUNICIPIOS y esta que nos ocupa ahora). Lo conseguí aplicando algunas de las técnicas de normalización de textos aprendidas en el módulo de text mining (eliminar artículos, quitar acentos, eliminar traducciones y expresar todo en minúscula).

Primero sobre el dataframe de MUNICIPIOS:

In [25]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Se inicia módulo depuración de textos.'
  f.write(message)
  f.write('\n')
  f.close()

prov_cod = total_municipios[['CPRO', 'Provincia']]
prov_cod = prov_cod.drop_duplicates()

#elimino acentos
prov_cod['provincia_clean'] = prov_cod['Provincia'].apply(lambda x: unidecode.unidecode(x))
#Paso todo a minúsculas
prov_cod['provincia_clean'] = prov_cod['provincia_clean'].str.lower()
#se homogeniza la forma de exresar Santa
prov_cod['provincia_clean'] = prov_cod['provincia_clean'].str.replace('santa','sta.')

with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Minúsculas, acentos y abreviaturas corregidas en la columna provincia de la entidad municipios.'
  f.write(message)
  f.write('\n')
  f.close()

Sobre el dataframe de inventario de estaciones:

In [26]:
#Se quitan las tildes
estaciones_AEMET['provincia_clean'] = estaciones_AEMET['provincia'].apply(lambda x: unidecode.unidecode(x))
#Se pasa todo a minísculas
estaciones_AEMET['provincia_clean'] = estaciones_AEMET['provincia_clean'].str.lower()
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - Minúsculas y acentos corregidos en la columna provincia de la entidad estiones_AEMET.'
  f.write(message)
  f.write('\n')
  f.close()

Algunas funciones más para terminar de tener la equivalencia entre los nombres de las provincias:

In [27]:
def delete_translation(name):
  position = name.find('/')
  if position>0:
    name = name[:position]
  else:
    name = name
  return name


def delete_article(name):
  position = name.find(',')
  if position>0:
    name = name[:position]
  else:
    name = name
  return name


def delete_article_2(name):
  name = (((name.replace('la ', '')).replace('a ', '')).replace('illes ', '')).replace('las ', '')
  return name  


prov_cod['provincia_clean'] = prov_cod['provincia_clean'].apply(lambda x: delete_translation(x))
estaciones_AEMET['provincia_clean'] = estaciones_AEMET['provincia_clean'].apply(lambda x: delete_translation(x))
prov_cod['provincia_clean'] = prov_cod['provincia_clean'].apply(lambda x: delete_article(x))
estaciones_AEMET['provincia_clean'] = estaciones_AEMET['provincia_clean'].apply(lambda x: delete_article_2(x))


with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message1 = ahora+' - INFO - Se eliminan artículos y traducciones del campo provincia de ambas entidades.'
  message2 = ahora+' - DEBUG - Finaliza módulo depuración de textos.'
  f.write(message1)
  f.write('\n')
  f.write(message2)
  f.write('\n')
  f.close()


Quedando el inventario de estaciones con los códigos de las provincias:

In [28]:
estaciones_AEMET = pd.merge(estaciones_AEMET, prov_cod, on=['provincia_clean'], how='left')
estaciones_AEMET.head()
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - INFO - La entidad estiones_AEMET contiene ahora CPRO.'
  f.write(message)
  f.write('\n')
  f.close()

A pesar de conocer ya las provincias donde se encuentra cada estación, no podemos conocer en qué municipio se encuentran y es que, aunque a veces la descripción verbal de la localización de la estación sea el nombre de un municipio, muchas otras no. 

Con esto, lo que sí podemos hacer es entonces preguntarnos si las coordenadas pertencen a un municipio u otro gracias a que conocemos los recintos de cada uno de ellos, haciendo uno de los polígonos.

Para esto, defino la función localizar_municipio. En ella, conociendo las coordenadas de cada estación y también la provincia, se elige una subselect de los pueblos de la provincia y se escoge uno de ellos como contenedor de la estación empleando poly.contains(coord_est). La aplico a todo el dataframe de las estaciones metereológicas.

In [29]:
def localizar_municipio (lat_estacion:float, long_estacion:float, cod_prov:str):
  id_mun_0 = '00000'
  coord_est = Point(long_estacion, lat_estacion)
  mun_list = (total_municipios.loc[total_municipios["CPRO"] == cod_prov, 'INE_MUN_ID'])
  for municipio in mun_list:
    poly = (total_municipios.loc[total_municipios["INE_MUN_ID"] == municipio, 'MUN_geometry'])
    poly = np.array(poly)
    poly = poly[0]
    if poly.contains(coord_est):
      id_mun_0 = municipio
    else:
      pass
  return id_mun_0

In [30]:
try:
  estaciones_AEMET['INE_MUN_ID'] =  estaciones_AEMET.apply(lambda x: localizar_municipio(x['latitud_dec'], x['longitud_dec'], x['CPRO']), axis=1)
  estaciones_AEMET.head()
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Intento de localizar estaciones en municipio correcto.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Intento de localizar estaciones en municipio fallido.'
    f.write(message)
    f.write('\n')
    f.close()      

In [31]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  faltantes = len( estaciones_AEMET.loc[estaciones_AEMET['INE_MUN_ID']=='00000'])
  if faltantes==0:
    message = ahora+' - INFO - La entidad estiones_AEMET contiene ahora INE_MUN_ID.'
  else:
    message = ahora+' - INFO - La entidad estiones_AEMET aun presenta municipios sin INE_MUN_ID. Se ejecuta función locaclización 2.'
  f.write(message)
  f.write('\n')
  f.close()

Ocurre que algunas estaciones siguen quedando sin localizar en un municipio. Para tratar de solventar esto, definí la misma función haciendo que no se consideraran los pueblos de la provincia si no todos los pueblos del territorio, pensando en que algunas pudieran estar en zonas limítrofes entre provincias y pertenecieran en realidad a otra provincia que la marcada por el dataframe original.

Esto sí resolvió el problema en algunos casos pero no en su totalidad.

In [32]:
def localizar_municipio_2 (lat_estacion:float, long_estacion:float):
  id_mun_0 = '00000'
  coord_est = Point(long_estacion, lat_estacion)
  mun_list = (total_municipios.loc[total_municipios["CPRO"] != "0000", 'INE_MUN_ID'])
  for municipio in mun_list:
    poly = (total_municipios.loc[total_municipios["INE_MUN_ID"] == municipio, 'MUN_geometry'])
    poly = np.array(poly)
    poly = poly[0]
    if poly.contains(coord_est):
      id_mun_0 = municipio
    else:
      pass
  return id_mun_0

In [33]:
try:
  estaciones_AEMET['INE_MUN_ID'] =  estaciones_AEMET.apply(lambda x: localizar_municipio_2(x['latitud_dec'], x['longitud_dec']) if x['INE_MUN_ID'] == '00000' else x['INE_MUN_ID'], axis=1)
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Intento 2 de localizar estaciones en municipio correcto.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Intento 2 de localizar estaciones en municipio fallido.'
    f.write(message)
    f.write('\n')
    f.close()     


Los casos mostrados, se deben a que las estaciones estsán colocadas muy al borde de la costa. Esto hace que al localizarlas en el mapa, cayeran en el mar. También hay un caso especial en la provincia de Navarra que cae en una base militar de la OTAN y es un territorio que no se considera perteneciente a ningún municipio.

Para solventarlo, creé una tercera función. En este caso, en el dataframe de municipios disponemos también las coordenadas del centro de cada uno de ellos. Usando entonces este dato, la función escoge el municipio cuyo centro se encuentre a menor distancia de la estación metereológica.

In [34]:
def localizar_municipio_3 (lat_estacion:float, long_estacion:float, cod_prov:str):
  id_mun_0 = '00000'
  distancia_0 = 1000000
  coord_est = (lat_estacion, long_estacion)
  mun_list = (MUNICIPIOS.loc[MUNICIPIOS["CPRO"] == cod_prov, 'INE_MUN_ID'])
  for municipio in mun_list:
    lat_mun =   float((MUNICIPIOS.loc[MUNICIPIOS["INE_MUN_ID"] == str(municipio), 'latitud_dec']).to_list()[0])
    long_mun =  float((MUNICIPIOS.loc[MUNICIPIOS["INE_MUN_ID"] == str(municipio), 'longitud_dec']).to_list()[0])
    coord_mun = (lat_mun, long_mun)
    d = (geopy.distance.geodesic(coord_mun, coord_est).km)
    if d<distancia_0:
      distancia_0 = d
      id_mun_0 = municipio
    else:
      distancia_0= distancia_0
      id_mun_0 = id_mun_0

  return id_mun_0


In [35]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  faltantes = len(  estaciones_AEMET.loc[estaciones_AEMET['INE_MUN_ID']=='00000'])
  if faltantes == 0:
    message = ahora+' - INFO - La entidad estiones_AEMET contiene ahora INE_MUN_ID.'
  else:
    message = ahora+' - INFO - La entidad estiones_AEMET aun presenta municipios sin INE_MUN_ID. Se ejecuta función locaclización 3.'
  f.write(message)
  f.write('\n')
  f.close()

In [36]:
try:
  estaciones_AEMET['INE_MUN_ID'] =  estaciones_AEMET.apply(lambda x: localizar_municipio_3(x['latitud_dec'], x['longitud_dec'], x['CPRO']) if x['INE_MUN_ID'] == '00000' else x['INE_MUN_ID'], axis=1)
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Intento 3 de localizar estaciones en municipio correcto.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Intento 3 de localizar estaciones en municipio fallido.'
    f.write(message)
    f.write('\n')
    f.close()     


In [37]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  faltantes = len(  estaciones_AEMET.loc[estaciones_AEMET['INE_MUN_ID']=='00000'])
  if faltantes == 0:
    message = ahora+' - INFO - La entidad estiones_AEMET contiene ahora INE_MUN_ID.'
  else:
    message = ahora+' - ERROR - La entidad estiones_AEMET aun presenta municipios sin INE_MUN_ID.'
  f.write(message)
  f.write('\n')
  f.close()

In [38]:
estaciones_AEMET = estaciones_AEMET[['indicativo', 'nombre', 'INE_MUN_ID','latitud', 'longitud', 'latitud_dec', 'longitud_dec', 'altitud']]
estaciones_AEMET

Unnamed: 0,indicativo,nombre,INE_MUN_ID,latitud,longitud,latitud_dec,longitud_dec,altitud
0,0252D,ARENYS DE MAR,08006,41º35'15''N,02º32'24''E,41.587500,2.540000,74
1,0076,BARCELONA AEROPUERTO,08169,41º17'34''N,02º04'12''E,41.292778,2.070000,4
2,0200E,"BARCELONA, FABRA",08019,41º25'06''N,02º07'27''E,41.418333,2.124167,408
3,0201D,BARCELONA,08019,41º23'26''N,02º12'00''E,41.390556,2.200000,6
4,0149X,MANRESA,08113,41º43'12''N,01º50'25''E,41.720000,1.840278,291
...,...,...,...,...,...,...,...,...
286,C329Z,SAN SEBASTIÁN DE LA GOMERA,38036,28º05'23''N,17º06'41''W,28.089722,-17.111389,15
287,C449C,STA.CRUZ DE TENERIFE,38038,28º27'48''N,16º15'19''W,28.463333,-16.255278,35
288,C129Z,TAZACORTE,38045,28º35'45''N,17º54'55''W,28.595833,-17.915278,62
289,C447A,TENERIFE NORTE AEROPUERTO,38023,28º28'39''N,16º19'46''W,28.477500,-16.329444,632


Con esto, ya sí quedan todas las estaciones localizadas en un municipio.

## Inventario de estaciones NOAA

Otra de las webs de las que vamos a extraer información para trabajar con ella es la web de NOAA (National Oceanic and Atmospheric Administration), una web oficial del gobierno de EEUU. De manera igual a lo que hemos hecho con Aemet, tendremos que descargar su catálogo de estaciones disponibles en España y localizarlas en un municipio. La llamada se realiza vía API.

In [39]:
#noaa stationsin
noaa_headers = {"token":"njEAHYYVIkaPyuJQKEZyAuXRVOwCPodq"}
noaa_url_stations = 'https://www.ncei.noaa.gov/cdo-web/api/v2/stations?locationid=FIPS:SP&limit=1000'

try:
  noa_stations = requests.request("GET", noaa_url_stations,  headers=noaa_headers, timeout = (5, 10))
  noa_stations = noa_stations.json()
  estaciones_NOAA = pd.json_normalize(noa_stations['results'])
  estaciones_NOAA = estaciones_NOAA[['id', 'latitude', 'longitude', 'name', 'elevation']]
  estaciones_NOAA['id'] = estaciones_NOAA['id'].str.replace('GHCND:','')
  estaciones_NOAA
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_8 = len(estaciones_NOAA)
    message_ok = ahora+' - INFO - La entidad estaciones_NOAA ha sido creada con éxito. API NOAA correcta. El dataframe contiene '+str(num_rows_8)+' líneas.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Error, no se ha podido crear la entidad estaciones_NOAA. FATAL no es posible continuar.'
    f.write(message)
    f.write('\n')
    f.close()    

In [40]:
estaciones_NOAA

Unnamed: 0,id,latitude,longitude,name,elevation
0,SP000003195,40.411700,-3.678100,"MADRID RETIRO, SP",667.0
1,SP000004452,38.883100,-6.829200,"BADAJOZ TALAVERA LA REAL, SP",185.0
2,SP000006155,36.666700,-4.488100,"MALAGA AEROPUERTO, SP",7.0
3,SP000008027,43.307500,-2.039200,"SAN SEBASTIAN IGUELDO, SP",251.0
4,SP000008181,41.292800,2.069700,"BARCELONA AEROPUERTO, SP",4.0
...,...,...,...,...,...
194,SPW00013024,37.174917,-5.615944,"MORON DE LA FRONTERA, SP",86.9
195,SPW00013025,36.650000,-6.350000,"ROTA NAS, SP",25.6
196,SPW00014010,41.666242,-1.041553,"ZARAGOZA, SP",263.0
197,SPW00014011,40.496747,-3.445872,"TORREJON, SP",617.5


Siguiendo la misma estrategia que con el inventario de estaciones de la AEMET, emplearemos funciones para localizar cada estación en un municipio. En esta ocasión será un proceso más costoso ya que no conocemos en qué provincia se encuentra la estación y habrá de buscarse entre todos los municipios del territorio (unos 8100)

In [41]:
try:
  estaciones_NOAA['latitude'] = estaciones_NOAA['latitude'].astype(float)
  estaciones_NOAA['longitude'] = estaciones_NOAA['longitude'].astype(float)
  estaciones_NOAA['INE_MUN_ID'] =  estaciones_NOAA.apply(lambda x: localizar_municipio_2(x['latitude'], x['longitude']) , axis=1)
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Intento 1 de localizar estaciones en municipio correcto estaciones_NOAA.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Intento 3 de localizar estaciones en municipio fallido estaciones_NOAA.'
    f.write(message)
    f.write('\n')
    f.close()

In [42]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  faltantes = len(estaciones_NOAA.loc[estaciones_NOAA['INE_MUN_ID']=='00000'])
  if faltantes == 0:
    message = ahora+' - INFO - La entidad estiones_NOAA contiene ahora INE_MUN_ID.'
  else:
    message = ahora+' - INFO - La entidad estiones_NOAA aun presenta municipios sin INE_MUN_ID.'
  f.write(message)
  f.write('\n')
  f.close()

In [43]:
def localizar_municipio_4 (lat_estacion:float, long_estacion:float):
  id_mun_0 = '00000'
  distancia_0 = 1000000
  coord_est = (lat_estacion, long_estacion)
  mun_list = (MUNICIPIOS.loc[MUNICIPIOS["CPRO"] != "0000", 'INE_MUN_ID'])
  for municipio in mun_list:
    lat_mun =   float((MUNICIPIOS.loc[MUNICIPIOS["INE_MUN_ID"] == str(municipio), 'latitud_dec']).to_list()[0])
    long_mun =  float((MUNICIPIOS.loc[MUNICIPIOS["INE_MUN_ID"] == str(municipio), 'longitud_dec']).to_list()[0])
    coord_mun = (lat_mun, long_mun)
    d = (geopy.distance.geodesic(coord_mun, coord_est).km)
    if d<distancia_0:
      distancia_0 = d
      id_mun_0 = municipio
    else:
      distancia_0= distancia_0
      id_mun_0 = id_mun_0

  return id_mun_0

In [44]:
try:
  estaciones_NOAA['INE_MUN_ID'] =  estaciones_NOAA.apply(lambda x: localizar_municipio_4(x['latitude'], x['longitude']) if x['INE_MUN_ID'] == '00000' else x['INE_MUN_ID'], axis=1)
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message_ok = ahora+' - INFO - Intento 2 de localizar estaciones en municipio correcto estaciones_NOAA.'
    f.write(message_ok)
    f.write('\n')
    f.close()
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - Intento 2 de localizar estaciones en municipio fallido estaciones_NOAA.'
    f.write(message)
    f.write('\n')
    f.close()

In [45]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  faltantes = len(estaciones_NOAA.loc[estaciones_NOAA['INE_MUN_ID']=='00000'])
  if faltantes == 0:
    message = ahora+' - INFO - La entidad estiones_NOAA contiene ahora INE_MUN_ID.'
  else:
    message = ahora+' - INFO - La entidad estiones_NOAA aun presenta municipios sin INE_MUN_ID.'
  f.write(message)
  f.write('\n')
  f.close()

Donde finalmente se consiguen localizar todas las estaciones en un municipio.

In [46]:
estaciones_NOAA = estaciones_NOAA[['id', 'name', 'INE_MUN_ID','latitude', 'longitude', 'elevation']]
estaciones_NOAA

Unnamed: 0,id,name,INE_MUN_ID,latitude,longitude,elevation
0,SP000003195,"MADRID RETIRO, SP",28079,40.411700,-3.678100,667.0
1,SP000004452,"BADAJOZ TALAVERA LA REAL, SP",06015,38.883100,-6.829200,185.0
2,SP000006155,"MALAGA AEROPUERTO, SP",29067,36.666700,-4.488100,7.0
3,SP000008027,"SAN SEBASTIAN IGUELDO, SP",20069,43.307500,-2.039200,251.0
4,SP000008181,"BARCELONA AEROPUERTO, SP",08169,41.292800,2.069700,4.0
...,...,...,...,...,...,...
194,SPW00013024,"MORON DE LA FRONTERA, SP",41011,37.174917,-5.615944,86.9
195,SPW00013025,"ROTA NAS, SP",11030,36.650000,-6.350000,25.6
196,SPW00014010,"ZARAGOZA, SP",50297,41.666242,-1.041553,263.0
197,SPW00014011,"TORREJON, SP",28148,40.496747,-3.445872,617.5


## Observaciones metereológicas diarias

Comienzan a partir de aquí las descargas de observaciones metereológicas diarias. En concreto, de las dos páginas webs de las que ya he hablado, AEMET y NOAA. Para ello, realizaremos diferentes queries vía API.

La descarga de información de ambas plataformas supondrán otras dos entidades de nuestra base de datos.

En el caso de la API del AEMET, para este tipo de dataset se nos limita la query a un máximo de 31 días de consulta por lo que se diseña un loop que construye una url diferente para cada uno de los meses que queramos consultar entre un rango de fechas.

### AEMET

In [47]:
with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Se inicia módulo de carga de datos observaciones metereológicas.'
  f.write(message)
  f.write('\n')
  f.close()

#Datos metereológicos, observaciones diarias
start_date = datetime.date(2009,12,1)
end_date = datetime.date(2021,12,31)

num_months = (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)

aemet_df_joined = []

try:

  for i in range(num_months):
    new_start_date = (start_date+relativedelta(months=+1))
    new_end_date = ((start_date+relativedelta(months=+2))-relativedelta(days=1))
    my_monthly_aemet_url = 'https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/fechaini/'+(new_start_date.strftime("%Y-%m-%dT%H:%M:%SUTC"))+'/fechafin/'+(new_end_date.strftime("%Y-%m-%dT%H:%M:%SUTC"))+'/todasestaciones'
    observaciones_aemet_1_req = requests.get(my_monthly_aemet_url,  params=aemet_api_key)
    observaciones_aemet_data = observaciones_aemet_1_req.json()['datos']
    observaciones_aemet_json = requests.get(observaciones_aemet_data).json()
    observaciones_aemet = pd.json_normalize(observaciones_aemet_json)
    aemet_df_joined.append(observaciones_aemet)
    start_date = new_start_date
    end_date = new_end_date
    time.sleep(5)

  AEMET_diarios = pd.concat(aemet_df_joined)
  AEMET_diarios = AEMET_diarios[['indicativo', 'fecha', 'tmed','prec','tmin','horatmin','tmax','horatmax','dir','velmedia','racha','horaracha','presMax','horaPresMax','presMin','horaPresMin','sol']]

  #Elimino algunos string feos que se corresponden con NaNs
  AEMET_diarios['prec'] = AEMET_diarios['prec'].str.replace('Ip','')
  AEMET_diarios['prec'] = AEMET_diarios['prec'].str.replace('Acum','')

  #Convierto el separador decimal para que sea el mismo que en los datasets anteriores
  AEMET_diarios['tmed'] = AEMET_diarios['tmed'].str.replace(',','.')
  AEMET_diarios['prec'] = AEMET_diarios['prec'].str.replace(',','.')
  AEMET_diarios['tmin'] = AEMET_diarios['tmin'].str.replace(',','.')
  AEMET_diarios['tmax'] = AEMET_diarios['tmax'].str.replace(',','.')
  AEMET_diarios['dir'] = AEMET_diarios['dir'].str.replace(',','.')
  AEMET_diarios['velmedia'] = AEMET_diarios['velmedia'].str.replace(',','.')
  AEMET_diarios['racha'] = AEMET_diarios['racha'].str.replace(',','.')
  AEMET_diarios['presMax'] = AEMET_diarios['presMax'].str.replace(',','.')
  AEMET_diarios['presMin'] = AEMET_diarios['presMin'].str.replace(',','.')
  AEMET_diarios['sol'] = AEMET_diarios['sol'].str.replace(',','.')

  #Más strings que no aportan información, son NaNs
  AEMET_diarios['horatmin'] = AEMET_diarios['horatmin'].str.replace('Varias','')
  AEMET_diarios['horatmax'] = AEMET_diarios['horatmax'].str.replace('Varias','')
  AEMET_diarios['horaracha'] = AEMET_diarios['horaracha'].str.replace('Varias','')
  AEMET_diarios['horaPresMax'] = AEMET_diarios['horaPresMax'].str.replace('Varias','')
  AEMET_diarios['horaPresMin'] = AEMET_diarios['horaPresMin'].str.replace('Varias','')  
  
  #NaNs por vacíos
  AEMET_diarios['tmed'] = AEMET_diarios['tmed'].str.replace('nan','')
  AEMET_diarios['prec'] = AEMET_diarios['prec'].str.replace('nan','')
  AEMET_diarios['tmin'] = AEMET_diarios['tmin'].str.replace('nan','')
  AEMET_diarios['tmax'] = AEMET_diarios['tmax'].str.replace('nan','')
  AEMET_diarios['dir'] = AEMET_diarios['dir'].str.replace('nan','')
  AEMET_diarios['velmedia'] = AEMET_diarios['velmedia'].str.replace('nan','')
  AEMET_diarios['racha'] = AEMET_diarios['racha'].str.replace('nan','')
  AEMET_diarios['presMax'] = AEMET_diarios['presMax'].str.replace('nan','')
  AEMET_diarios['presMin'] = AEMET_diarios['presMin'].str.replace('nan','')
  AEMET_diarios['sol'] = AEMET_diarios['sol'].str.replace('nan','')


  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_9 = len(AEMET_diarios)
    message = ahora+' - INFO - Observaciones metereológicas AEMET diarias cargadas con éxito. El dataframe contiene '+str(num_rows_9)+' líneas.'
    f.write(message)
    f.write('\n')
    f.close()    
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - No fue posible cargar las observaciones metereológicas diarias de AEMET a partir de la fecha '+str(start_date)+'.'
    f.write(message)
    f.write('\n')
    f.close()  

### NOAA

En el caso de la API de NOAA, la consulta es nominal por cada una de las estaciones disponibles en su catálogo por lo que se diseña otro bucle que escribe una url diferente cada vez iterando por el inventario de estaciones de NOAA.

In [48]:
noaa_df_joined = []

try:    
  for estacion in estaciones_NOAA['id']:
    noaa_api_url = 'https://www.ncei.noaa.gov/access/services/data/v1?dataset=daily-summaries&stations='+estacion+'&startDate=2010-01-01&endDate=2021-12-31&includeAttributes=false&includeStationName=false&includeStationLocation=0&units=metric&format=json'
    observaciones_noaa_req = requests.request("GET", noaa_api_url, timeout=(40,60)).json()
    observaciones_noaa = pd.json_normalize(observaciones_noaa_req)
    noaa_df_joined.append(observaciones_noaa)

  NOAA_diarios = pd.concat(noaa_df_joined)
  NOAA_diarios = NOAA_diarios[['STATION', 'DATE', 'TMAX', 'TMIN', 'TAVG', 'SNWD']]
  NOAA_diarios.reset_index()
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    num_rows_10 = len(NOAA_diarios)
    message = ahora+' - INFO - Observaciones metereológicas NOAA diarias cargadas con éxito. El dataframe contiene '+str(num_rows_10)+' líneas.'
    f.write(message)
    f.write('\n')
    f.close()  
except:
  with open('ETL_logging.txt', 'a') as f:
    ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    message = ahora+' - ERROR - No fue posible cargar las observaciones metereológicas diarias de NOAA entre las fechas escogidas.'
    f.write(message)
    f.write('\n')
    f.close()      


with open('ETL_logging.txt', 'a') as f:
  ahora = datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")
  message = ahora+' - DEBUG - Se finaliza módulo de carga de datos observaciones metereológicas.'
  f.write(message)
  f.write('\n')
  f.close()

In [49]:
NOAA_diarios

Unnamed: 0,STATION,DATE,TMAX,TMIN,TAVG,SNWD
0,SP000003195,2010-01-01,7.6,3.6,,
1,SP000003195,2010-01-02,7.5,2.6,,
2,SP000003195,2010-01-03,7.4,4.2,,
3,SP000003195,2010-01-04,9.2,6.4,,
4,SP000003195,2010-01-05,9.8,6.2,,
...,...,...,...,...,...,...
4370,SPW00014011,2021-12-27,,,12.4,
4371,SPW00014011,2021-12-28,,,9.7,
4372,SPW00014011,2021-12-29,,,8.2,
4373,SPW00014011,2021-12-30,,,6.5,


## CSV y descargas

Finalmente, se salvan todas las tablas necesarias para crear la base de datos mySQL a formato CSV. 

In [50]:
comunidades_autonomas.to_csv('comunidades_autonomas.csv', index=False, sep =';')
provincias.to_csv('provincias.csv', index=False, sep =';')
MUNICIPIOS.to_csv('MUNICIPIOS.csv', index=False, sep =';')
ZIPs.to_csv('ZIPs.csv', index=False, sep =';')
PRO_GEOMETRY.to_csv('PRO_GEOMETRY.csv', index=False, sep =';')
MUN_GEOMETRY.to_csv('MUN_GEOMETRY.csv', index=False, sep =';')
total_municipios.to_csv('total_municipios.csv', index=False, sep =';')
estaciones_AEMET.to_csv('estaciones_AEMET.csv', index=False, sep =';')
estaciones_NOAA.to_csv('estaciones_NOAA.csv', index=False, sep =';')
AEMET_diarios.to_csv('AEMET_observaciones.csv', index=False, sep =';')
NOAA_diarios.to_csv('NOAA_observaciones.csv', index=False, sep =';')

In [52]:
files.download('comunidades_autonomas.csv')
files.download('provincias.csv')
files.download('MUNICIPIOS.csv')
files.download('ZIPs.csv')
files.download('PRO_GEOMETRY.csv')
files.download('MUN_GEOMETRY.csv')
files.download('total_municipios.csv')
files.download('estaciones_AEMET.csv')
files.download('estaciones_NOAA.csv')
files.download('AEMET_observaciones.csv')
files.download('NOAA_observaciones.csv')
files.download('ETL_logging.txt')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>