## 4.5.1 Extraction of National Data : 

The goal of this notebook is to collect data from *INE*, to create a table ready to be joined with property data. 
We use the INE API to fetch demographic and economic data at the census section level.
We ensure compliance with INE's terms of service and scraping policies.


### Import libraries

In [1]:
import pandas as pd
import re
import os
import requests
import zipfile
import io
import json
import time
import sys
from delta import *
from datetime import datetime
from openpyxl import load_workbook
from os import makedirs, getcwd
from os.path import exists, join
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf, when, coalesce, array, transform, lit, struct, concat, expr, regexp_replace, lpad, length
from pyspark.sql.types import StructType, ArrayType, StringType, DoubleType, BooleanType, LongType

### Define paths and functions

In [3]:
conf_table = pd.read_csv("../CONF/conf.csv")
raw = '../../../../data/3_external_data/census/RAW'

In [4]:
def sanitize_table_name(name: str) -> str:
    # Replace any character that is not a letter, number, or underscore with an underscore
    return re.sub(r'\W+', '_', name)

def df_to_delta(df: DataFrame, path: str, table_name: str):
# This function will serve to save our DataFrames to the Delta Lake
    table_name = sanitize_table_name(table_name)
    df.to_csv(f'{path}/{table_name}.csv', index=False)

#### Extracting INE data using API requests : 
This is where the biggest chunk of the data will be coming from

In [5]:
def get_and_save_api_data(identificator, name):
    # This function makes API requests to the INE website and stores the resulting data in the DataLake
    url = fr'https://servicios.ine.es/wstempus/jsCache/ES/DATOS_TABLA/{identificator}'
    response = requests.get(url)   
    if response.status_code == 200:
        df_response = pd.DataFrame(response.json())
    else:
        print(response.status_code, f"Could not retrieve {name}")
        exit()
    df_to_delta(df_response, raw, f"raw_{name}")

In [8]:
tables_api_ine = conf_table[conf_table['EXTRACTION_TYPE'] == 'API_INE']

In [20]:
for index, row in tables_api_ine.iterrows():
    get_and_save_api_data(row['REQUEST_ID'], row['OUTPUT_NAME'])

#### Extracting rent data from INE using a web scrapper 

In [9]:
tablas_padron_ine = conf_table[conf_table['EXTRACTION_TYPE'] == 'PADRON']

In [23]:
for index, row in tablas_padron_ine.iterrows():
    url = row['REQUEST_ID']
    output_name = row['OUTPUT_NAME']
    response = requests.get(url)
    open(f'{raw}/{output_name}.xlsx', "wb").write(response.content)

In [24]:
tablas_padron_ine = conf_table[conf_table['EXTRACTION_TYPE'] == 'INE_SCRAPPER']

In [30]:
driver_path = "../CONF/chromedriver"

# Configuring the browser
options = webdriver.ChromeOptions()
options.add_experimental_option("prefs", {
    "download.default_directory": raw,
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "plugins.always_open_pdf_externally": True
})

s = Service(driver_path)
driver = webdriver.Chrome(options = options , service=s)


url = tablas_padron_ine['REQUEST_ID'].iloc[0]
driver.get(url)
time.sleep(2)


todo = driver.find_element(By.ID, "listadoInebase")
provinces = todo.find_elements(By.XPATH, './/li[@role = "treeitem"]')


links_provinces = []
name_provinces = []
dic_provinces = {}


for province in provinces:
    id_province = province.find_element(By.TAG_NAME, "a")
    links_provinces.append(id_province.get_attribute('href'))
    name_provinces.append(id_province.text)

    
for i in range(len(links_provinces)):
    driver.get(links_provinces[i]) 
    time.sleep(0.5)
    apartados = driver.find_element(By.XPATH, './/ul[@role = "group"]')
    elementos = apartados.find_elements(By.TAG_NAME,'li')
    dic_provinces[name_provinces[i]] = {}
    for elemento in elementos:
        id_apartado = elemento.get_attribute('id')
        name_apartado = elemento.text
        dic_provinces[name_provinces[i]][name_apartado] = id_apartado
        
driver.quit()

In [45]:
def peticion_INE(codigo, funcion = 'DATOS_TABLA', idioma = 'ES', DataFrame = True):
    """
    Function responsible for making the request to the INE website.
    Retries the request if the status message indicates the data is not yet ready.
    """
    
    response = requests.get(fr'https://servicios.ine.es/wstempus/js/{idioma}/{funcion}/{codigo}')
    if DataFrame:
        response = pd.DataFrame(response.json())
    return response

def seccion_scrappeo(dic_provinces, apartado):
    """
    Function that takes the filters we've gathered and retreives the relevant information from the scrapped page, on all municipalities, returned in a dataframe. 
    """
    df = pd.DataFrame()
    for element in list(dic_provinces):
        codigo = dic_provinces[element][apartado].replace('idt_',"")
        response = peticion_INE(codigo)
        df_auxiliar = pd.DataFrame(response)
        df = pd.concat([df,df_auxiliar])
    
    return df

In [46]:
with open(f'{raw}/Codigos_Indicadores_renta.json', 'w', encoding='utf-8') as f:
    json.dump(dic_provinces, f, ensure_ascii=False, indent=4)

In [47]:
table_provinces = pd.read_json(f'{raw}/Codigos_Indicadores_renta.json')
lista_apartados = table_provinces.index.tolist()

In [49]:
apartado = lista_apartados[0]
df = seccion_scrappeo(dic_provinces, apartado)
output_name = tablas_padron_ine['OUTPUT_NAME'].iloc[0]
df.to_csv(fr'{raw}/{output_name}.csv', index=False)

#### Auxilliary Tables 
Allows us to map data

In [5]:
tablas_municipios_nacional = conf_table[conf_table['EXTRACTION_TYPE'] == 'MUNICIPIOS']
url_tabla_municipios_nacional = tablas_municipios_nacional['REQUEST_ID'].iloc[0]
output_tabla_municipios_nacional = tablas_municipios_nacional['OUTPUT_NAME'].iloc[0]

In [15]:
response = requests.get(url_tabla_municipios_nacional)
open(f'{raw}/{output_tabla_municipios_nacional}.xlsx', "wb").write(response.content)

306546

In [19]:
t = pd.read_excel(f'{raw}/cod_mun_raw.xlsx', header = 1, dtype={'CODAUTO': str, 'CPRO' : str , 'CMUN' : str, 'DC':str})
t.insert(0, 'COD_PRO_MUN', t[['CPRO', 'CMUN']].apply(lambda row: ''.join(row.values.astype(str)), axis=1))
col_nombre = [col for col in t.columns if 'NOMBRE' in col]
col_nombre.append('COD_PRO_MUN')
t = t[col_nombre]
t

Unnamed: 0,NOMBRE,COD_PRO_MUN
0,Agurain/Salvatierra,01051
1,Alegría-Dulantzi,01001
2,Amurrio,01002
3,Añana,01049
4,Aramaio,01003
...,...,...
8126,"Zaida, La",50296
8127,Zaragoza,50297
8128,Zuera,50298
8129,Ceuta,51001


In [20]:
path = f'{raw}/indicators.xlsx'
columnas_municipio = ['cpro', 'cmun']
columnas_seccion_censal = ['cpro', 'cmun', 'dist', 'secc']

In [21]:
t1 = pd.read_excel(path, dtype={'cpro': str, 'cmun' : str , 'dist' : str, 'secc':str})

In [22]:
t1 = t1[['cpro', 'cmun', 'dist', 'secc']]

In [23]:
t1.insert(0, 'Seccion censal', t1[columnas_seccion_censal].apply(lambda row: ''.join(row.values.astype(str)), axis=1))
t1.insert(1, 'Municipio', t1[columnas_municipio].apply(lambda row: ''.join(row.values.astype(str)), axis=1))

In [24]:
t_final = pd.merge(t1, t, left_on='Municipio', right_on='COD_PRO_MUN', how='left')
t_final.drop('COD_PRO_MUN', axis = 1, inplace = True)
t_final.rename(columns={'NOMBRE ': 'Nombre_municipio'}, inplace = True)

In [26]:
t_final.to_csv(f'{raw}/mapeo_cs_municipio.csv', index = False)

#### Obtaining Coordinate data for each municipality 

In [18]:
tablas_municipios_coordenadas = conf_table[conf_table['EXTRACTION_TYPE'] == 'MUNICIPIOS_COORDENADAS']
url_tabla_municipios_coordenadas = tablas_municipios_coordenadas['REQUEST_ID'].iloc[0]
output_tabla_municipios_coordenadas = tablas_municipios_coordenadas['OUTPUT_NAME'].iloc[0]

In [20]:
response = requests.get(url_tabla_municipios_coordenadas)
open(f'{raw}/{output_tabla_municipios_coordenadas}.xlsx', "wb").write(response.content)

1349120

In [23]:
tabla_coordenadas = pd.read_excel(f'{raw}/coord_mun.xlsx', header=2)

In [25]:
tabla_coordenadas["Población"] = tabla_coordenadas["Población"].apply(lambda x: x.replace(" (", ", ").replace(")", ""))

In [26]:
data_cpro_pro = [['01','Álava'], ['02','Albacete'], ['03','Alicante/Alacant'], ['04','Almería'], ['05','Ávila'], ['06','Badajoz'],
                ['07', 'Illes Balears'], ['08', 'Barcelona'], ['09', 'Burgos'], ['10', 'Cáceres'], ['11', 'Cádiz'], ['12', 'Castellón/Castelló'],
                ['13', 'Ciudad Real'], ['14', 'Córdoba'], ['15', 'A Coruña'], ['16', 'Cuenca'], ['17', 'Girona'], ['18', 'Granada'],
                ['19', 'Guadalajara'], ['20', 'Guipúzcoa'], ['21', 'Huelva'], ['22', 'Huesca'], ['23', 'Jaén'], ['24', 'León'],
                ['25', 'Lleida'], ['26', 'La Rioja'], ['27', 'Lugo'], ['28', 'Madrid'], ['29', 'Málaga'], ['30', 'Murcia'],
                ['31', 'Navarra'], ['32', 'Ourense'], ['33', 'Asturias'], ['34', 'Palencia'], ['35', 'Las Palmas'], ['36', 'Pontevedra'],
                ['37', 'Salamanca'], ['38', 'Santa Cruz de Tenerife'], ['39', 'Cantabria'], ['40', 'Segovia'], ['41', 'Sevilla'], ['42', 'Soria'],
                ['43', 'Tarragona'], ['44', 'Teruel'], ['45', 'Toledo'], ['46', 'Valencia/València'], ['47', 'Valladolid'], ['48', 'Vizcaya'],
                ['49', 'Zamora'], ['50', 'Zaragoza'], ['51', 'Ceuta'], ['52', 'Melilla']]

In [27]:
df_cpro_pro = pd.DataFrame(data_cpro_pro, columns=['cpro', 'Nombre provincia'])
df_cpro_pro.head()

Unnamed: 0,cpro,Nombre provincia
0,1,Álava
1,2,Albacete
2,3,Alicante/Alacant
3,4,Almería
4,5,Ávila


In [28]:
tabla_coordenadas_final = tabla_coordenadas.merge(df_cpro_pro, left_on=['Provincia'], right_on=['Nombre provincia'], how='left').drop('Nombre provincia', axis = 1)

In [36]:
t.columns

Index(['NOMBRE ', 'COD_PRO_MUN'], dtype='object')

In [32]:
tabla_coordenadas_final.head()

Unnamed: 0,Comunidad,Provincia,Población,Latitud,Longitud,Altitud,Habitantes,Hombres,Mujeres,cpro
0,Andalucía,Almería,Abla,37.14114,-2.780104,871.1684,1504,783,721,4
1,Andalucía,Almería,Abrucena,37.13305,-2.797098,976.9387,1341,682,659,4
2,Andalucía,Almería,Adra,36.74807,-3.022522,10.97898,24373,12338,12035,4
3,Andalucía,Almería,Albánchez,37.2871,-2.181163,481.3123,815,422,393,4
4,Andalucía,Almería,Alboloduy,37.03319,-2.62175,388.4346,674,334,340,4


In [39]:
t_final = t.merge(tabla_coordenadas_final[['Latitud', 'Longitud', 'Altitud', 'Población']], 
              left_on=['NOMBRE '], right_on=['Población'], how = 'left').drop('Población', axis = 1)

# Inspect the scraped data

In [40]:
t_final.head()

Unnamed: 0,NOMBRE,COD_PRO_MUN,Latitud,Longitud,Altitud
0,Agurain/Salvatierra,1051,,,
1,Alegría-Dulantzi,1001,42.84149,-2.513507,561.6857
2,Amurrio,1002,43.05265,-3.001022,219.691
3,Añana,1049,42.80235,-2.982607,628.5115
4,Aramaio,1003,43.054,-2.566,381.8797


# Write the data from scraping to a CSV 

In [41]:
t_final.to_csv(f'../../../../data/3_external_data/census/RAW/mapeo_cs_municipio.csv', index = False)