# Data Pipeline

# Goal
Create a data pipeline for the Census and Covid-19. And save to s3.

# Methodology
- Clean the data with the foundings in Exploration notebooks
- Concatenate and create the complete dataset for census data. (One for personas and another for fallecidos)
- This data will be upload to S3.


## Sections
1. [**Requirements**](#Requirements)
2. [**Functions**](#Functions)
3. [**Inputs**](#Inputs)
4. [**Pipeline**](#Pipeline)
    - [**Load_data**](#Load_data)
    - [**JoinTables**](#JoinTables)

# Requirements

In [1]:
#installing packages
sc.install_pypi_package("pandas")
sc.install_pypi_package("boto3")
sc.setCheckpointDir('hdfs:///covid')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1594762576659_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached https://files.pythonhosted.org/packages/af/f3/683bf2547a3eaeec15b39cef86f61e921b3b187f250fcd2b5c5fb4386369/pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas)
  Using cached https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.8.1

Collecting boto3
  Using cached https://files.pythonhosted.org/packages/3c/f4/41c1d8a69b07b2a087a7e552cbed21111ff36706fec2f1ba9983fba95771/boto3-1.14.20-py2.py3-none-any.whl
Collecting botocore<1.18.0,>=1.17.20 (from boto3)
  Using cached https://files.pythonhosted.org/packages/87/a6/1710181d97a6763ccced7f69fff8beea751633af2a101c3d02826cf4acce/botocore-1.17.20-py2.py3-none-any.whl
Collecting s3transfer<0.4.0,>=0.3.0 (from boto3)
  Using cached https://files.pythonhosted.org/packages

In [2]:
import time
import os
import boto3
import gc
import sys
import numpy as np
import pandas as pd
import pickle
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (FloatType, DateType, StructType, StructField, StringType, LongType, 
    IntegerType, ArrayType, BooleanType, DoubleType)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, QuantileDiscretizer
gc.enable()

spark = SparkSession.builder.config("spark.sql.shuffle.partitions", 20).appName("covid").getOrCreate()
print(spark.sparkContext.getConf().get('spark.driver.memory'))
print(spark.sparkContext.getConf().get("spark.sql.shuffle.partitions"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2048M
20

# Functions

## Loading data

In [3]:
def build_schema_census(source="vivienda"):
    """
    Build schema for different sources

    Parameters:
    -----------
    source : str
        Table source may be: "VIV", "PER", "HOG", "FALL", "MGN"

    Return:
    -------
    schema : spark.schema
        Spark schema for loading source table
    """
    if source == "VIV":
        schema = StructType([StructField("TIPO_REG", IntegerType()),
                             StructField("U_DPTO", IntegerType()),
                             StructField("U_MPIO", IntegerType()),
                             StructField("UA_CLASE", IntegerType()),
                             StructField("U_EDIFICA", IntegerType()),
                             StructField("COD_ENCUESTAS", IntegerType()),
                             StructField("U_VIVIENDA", IntegerType()),
                             StructField("UVA_ESTATER", IntegerType()),
                             StructField("UVA1_TIPOTER", DoubleType()),
                             StructField("UVA2_CODTER", DoubleType()),
                             StructField("UVA_ESTA_AREAPROT", IntegerType()),
                             StructField("UVA1_COD_AREAPROT", DoubleType()),
                             StructField("UVA_USO_UNIDAD", IntegerType()),
                             StructField("V_TIPO_VIV", DoubleType()),
                             StructField("V_CON_OCUP", DoubleType()),
                             StructField("V_TOT_HOG", DoubleType()),
                             StructField("V_MAT_PARED", DoubleType()),
                             StructField("V_MAT_PISO", DoubleType()),
                             StructField("VA_EE", DoubleType()),
                             StructField("VA1_ESTRATO", DoubleType()),
                             StructField("VB_ACU", DoubleType()),
                             StructField("VC_ALC", DoubleType()),
                             StructField("VD_GAS", DoubleType()),
                             StructField("VE_RECBAS", DoubleType()),
                             StructField("VE1_QSEM", DoubleType()),
                             StructField("VF_INTERNET", DoubleType()),
                             StructField("V_TIPO_SERSA", DoubleType()),
                             StructField("L_TIPO_INST", DoubleType()),
                             StructField("L_EXISTEHOG", DoubleType()),
                             StructField("L_TOT_PERL", DoubleType())
                             ])
    elif source == "HOG":
        schema = StructType([StructField("TIPO_REG", IntegerType()),
                             StructField("U_DPTO", IntegerType()),
                             StructField("U_MPIO", IntegerType()),
                             StructField("UA_CLASE", IntegerType()),
                             StructField("COD_ENCUESTAS", IntegerType()),
                             StructField("U_VIVIENDA", IntegerType()),
                             StructField("H_NROHOG", DoubleType()),
                             StructField("H_NRO_CUARTOS", DoubleType()),
                             StructField("H_NRO_DORMIT", DoubleType()),
                             StructField("H_DONDE_PREPALIM", DoubleType()),
                             StructField("H_AGUA_COCIN", DoubleType()),
                             StructField("HA_NRO_FALL", DoubleType()),
                             StructField("HA_TOT_PER", DoubleType())
                             ])
    elif source == "PER":
        schema = StructType([StructField("TIPO_REG", IntegerType()),
                             StructField("U_DPTO", IntegerType()),
                             StructField("U_MPIO", IntegerType()),
                             StructField("UA_CLASE", IntegerType()),
                             StructField("U_EDIFICA", IntegerType()),
                             StructField("COD_ENCUESTAS", IntegerType()),
                             StructField("U_VIVIENDA", IntegerType()),
                             StructField("P_NROHOG", DoubleType()),
                             StructField("P_NRO_PER", IntegerType()),
                             StructField("P_SEXO", IntegerType()),
                             StructField("P_EDADR", IntegerType()),
                             StructField("P_PARENTESCOR", DoubleType()),
                             StructField("PA1_GRP_ETNIC", IntegerType()),
                             StructField("PA11_COD_ETNIA", DoubleType()),
                             StructField("PA12_CLAN", DoubleType()),
                             StructField("PA21_COD_VITSA", DoubleType()),
                             StructField("PA22_COD_KUMPA", DoubleType()),
                             StructField("PA_HABLA_LENG", DoubleType()),
                             StructField("PA1_ENTIENDE", DoubleType()),
                             StructField("PB_OTRAS_LENG", DoubleType()),
                             StructField("PB1_QOTRAS_LENG", DoubleType()),
                             StructField("PA_LUG_NAC", IntegerType()),
                             StructField("PA_VIVIA_5ANOS", DoubleType()),
                             StructField("PA_VIVIA_1ANO", DoubleType()),
                             StructField("P_ENFERMO", DoubleType()),
                             StructField("P_QUEHIZO_PPAL", DoubleType()),
                             StructField("PA_LO_ATENDIERON", DoubleType()),
                             StructField("PA1_CALIDAD_SERV", DoubleType()),
                             StructField("CONDICION_FISICA", DoubleType()),
                             StructField("P_ALFABETA", DoubleType()),
                             StructField("PA_ASISTENCIA", DoubleType()),
                             StructField("P_NIVEL_ANOSR", DoubleType()),
                             StructField("P_TRABAJO", DoubleType()),
                             StructField("P_EST_CIVIL", DoubleType()),
                             StructField("PA_HNV", DoubleType()),
                             StructField("PA1_THNV", DoubleType()),
                             StructField("PA2_HNVH", DoubleType()),
                             StructField("PA3_HNVM", DoubleType()),
                             StructField("PA_HNVS", DoubleType()),
                             StructField("PA1_THSV", DoubleType()),
                             StructField("PA2_HSVH", DoubleType()),
                             StructField("PA3_HSVM", DoubleType()),
                             StructField("PA_HFC", DoubleType()),
                             StructField("PA1_THFC", DoubleType()),
                             StructField("PA2_HFCH", DoubleType()),
                             StructField("PA3_HFCM", DoubleType()),
                             StructField("PA_UHNV", DoubleType()),
                             StructField("PA1_MES_UHNV", DoubleType()),
                             StructField("PA2_ANO_UHNV", DoubleType())
                             ])
    elif source == "FALL":
        schema = StructType([StructField("TIPO_REG", IntegerType()),
                             StructField("U_DPTO", IntegerType()),
                             StructField("U_MPIO", IntegerType()),
                             StructField("UA_CLASE", IntegerType()),
                             StructField("COD_ENCUESTAS", IntegerType()),
                             StructField("U_VIVIENDA", IntegerType()),
                             StructField("F_NROHOG", IntegerType()),
                             StructField("FA1_NRO_FALL", IntegerType()),
                             StructField("FA2_SEXO_FALL", IntegerType()),
                             StructField("FA3_EDAD_FALL", IntegerType()),
                             StructField("FA4_CERT_DEFUN", IntegerType())
                             ])
    elif source == "MGN":
        schema = StructType([StructField("U_DPTO", IntegerType()),
                             StructField("U_MPIO", IntegerType()),
                             StructField("UA_CLASE", IntegerType()),
                             StructField("UA1_LOCALIDAD", IntegerType()),
                             StructField("U_SECT_RUR", IntegerType()),
                             StructField("U_SECC_RUR", IntegerType()),
                             StructField("UA2_CPOB", IntegerType()),
                             StructField("U_SECT_URB", IntegerType()),
                             StructField("U_SECC_URB", IntegerType()),
                             StructField("U_MZA", IntegerType()),
                             StructField("U_EDIFICA", IntegerType()),
                             StructField("COD_ENCUESTAS", IntegerType()),
                             StructField("U_VIVIENDA", IntegerType())
                             ])
    else:
        print("Source not valid. Enter one of the following sources: VIV, PER, HOG, FALL, MGN")
    return schema


def build_schema_covid(source="covid"):
    """
    Build schema for different covid sources

    Parameters:
    -----------
    source : str
        Table source may be: "covid", "tests"

    Return:
    -------
    schema : spark.schema
        Spark schema for loading source table
    """
    if source == "covid":
        schema = StructType([StructField("fecha_de_notificaci_n", DateType()),
                             StructField("c_digo_divipola", StringType()),
                             StructField("ciudad_de_ubicaci_n", StringType()),
                             StructField("departamento", StringType()),
                             StructField("atenci_n", StringType()),
                             StructField("edad", IntegerType()),
                             StructField("sexo", StringType()),
                             StructField("tipo", StringType()),
                             StructField("estado", StringType()),
                             StructField("pa_s_de_procedencia", StringType()),
                             StructField("fis", DateType()),
                             StructField("fecha_diagnostico", DateType()),
                             StructField("fecha_recuperado", DateType()),
                             StructField("fecha_reporte_web", DateType()),
                             StructField("tipo_recuperaci_n", StringType()),
                             StructField("codigo_departamento", StringType()),
                             StructField("codigo_pais", StringType()),
                             StructField("pertenencia_etnica", StringType()),
                             StructField("nombre_grupo_etnico", StringType()),
                             StructField("fecha_de_muerte", DateType()),
                             StructField("Asintomatico", IntegerType()),
                             StructField("divipola_dpto", IntegerType()),
                             StructField("divipola_mpio", IntegerType())
                             ])
    elif source == "tests":
        schema = StructType([StructField("fecha", DateType()),
                             StructField("acumuladas", DoubleType()),
                             StructField("amazonas", DoubleType()),
                             StructField("antioquia", DoubleType()),
                             StructField("arauca", DoubleType()),
                             StructField("atlantico", DoubleType()),
                             StructField("bogota", DoubleType()),
                             StructField("bolivar", DoubleType()),
                             StructField("boyaca", DoubleType()),
                             StructField("caldas", DoubleType()),
                             StructField("caqueta", DoubleType()),
                             StructField("casanare", DoubleType()),
                             StructField("cauca", DoubleType()),
                             StructField("cesar", DoubleType()),
                             StructField("choco", DoubleType()),
                             StructField("cordoba", DoubleType()),
                             StructField("cundinamarca", DoubleType()),
                             StructField("guainia", DoubleType()),
                             StructField("guajira", DoubleType()),
                             StructField("guaviare", DoubleType()),
                             StructField("huila", DoubleType()),
                             StructField("magdalena", DoubleType()),
                             StructField("meta", DoubleType()),
                             StructField("narino", DoubleType()),
                             StructField("norte_de_santander", DoubleType()),
                             StructField("putumayo", DoubleType()),
                             StructField("quindio", DoubleType()),
                             StructField("risaralda", DoubleType()),
                             StructField("san_andres", DoubleType()),
                             StructField("santander", DoubleType()),
                             StructField("sucre", DoubleType()),
                             StructField("tolima", DoubleType()),
                             StructField("valle_del_cauca", DoubleType()),
                             StructField("vaupes", DoubleType()),
                             StructField("vichada", DoubleType()),
                             StructField("procedencia_desconocida", DoubleType()),
                             StructField("positivas_acumuladas", DoubleType()),
                             StructField("negativas_acumuladas", DoubleType()),
                             StructField("positividad_acumulada", DoubleType()),
                             StructField("indeterminadas", DoubleType()),
                             StructField("barranquilla", DoubleType()),
                             StructField("cartagena", DoubleType()),
                             StructField("santa_marta", DoubleType())
                             ])
    else:
        print("Source not valid. Enter one of the following sources: 'covid', 'tests'")
    return schema
              
def build_schema_divipola(source="divipola"):
    """
    Build schema for different covid sources

    Parameters:
    -----------
    source : str
        Table source may be: "covid", "tests"

    Return:
    -------
    schema : spark.schema
        Spark schema for loading source table
    """
    if source == "divipola":
        schema = StructType([StructField("cod_depto", IntegerType()),
                             StructField("cod_mpio", IntegerType()),
                             StructField("dpto", StringType()),
                             StructField("nom_mpio", StringType()),
                             StructField("tipo_municipio", StringType())
                             ])
    else:
        print("Source not valid. Enter one of the following sources: 'covid', 'tests'")
    return schema
              
def get_censo_paths(bucket_s3, directory_key):
    """
    Get dictionary of census data for each department
    
    Parameters:
    -----------
    bucket_s3 : s3.Bucket
        Boto3 Bucket object
    directory_key : path
        Directory key in S3
    
    Return:
    -------
    dict_paths_departments : dict
        Dictionary with the data path for each departtment
    """
    dict_paths_departments = {}
    for object_summary in bucket_s3.objects.filter(Prefix=directory_key):
        name = object_summary.key
        if name.endswith(".CSV"):
            list_paths = name.split("/")
            department = list_paths[2].split("_")[1]
            if "MGN" in list_paths[-1]:
                if not(department in dict_paths_departments):
                    dict_paths_departments[department] = {}
                dict_paths_departments[department].update({"MGN": os.path.join(f"s3a://{bucket_s3.name}", name)})                
            elif "FALL" in list_paths[-1]:
                if not(department in dict_paths_departments):
                    dict_paths_departments[department] = {}
                dict_paths_departments[department].update({"FALL": os.path.join(f"s3a://{bucket_s3.name}", name)})
            elif "HOG" in list_paths[-1]:
                if not(department in dict_paths_departments):
                    dict_paths_departments[department] = {}
                dict_paths_departments[department].update({"HOG": os.path.join(f"s3a://{bucket_s3.name}", name)})
            elif "VIV" in list_paths[-1]:
                if not(department in dict_paths_departments):
                    dict_paths_departments[department] = {}
                dict_paths_departments[department].update({"VIV": os.path.join(f"s3a://{bucket_s3.name}", name)})
            elif "PER" in list_paths[-1]:
                if not(department in dict_paths_departments):
                    dict_paths_departments[department] = {}
                dict_paths_departments[department].update({"PER": os.path.join(f"s3a://{bucket_s3.name}", name)})
    return dict_paths_departments

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Preprocessing

In [4]:
def add_suffix_to_cols(df, suffix):
    for col in df.columns:
        df = df.withColumnRenamed(col, col + suffix)
    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Inputs

In [6]:
metadata = {"CENSO": {"VIVIENDA": {"useful_columns": ['U_DPTO', 'U_MPIO', 'UA_CLASE', 'U_EDIFICA',
                                                      'COD_ENCUESTAS', 'U_VIVIENDA', 'UVA_USO_UNIDAD',
                                                      'V_TIPO_VIV', 'V_CON_OCUP', 'V_TOT_HOG',
                                                      'V_MAT_PARED', 'V_MAT_PISO', 'VA_EE', 'VA1_ESTRATO', 'VB_ACU', 'VC_ALC',
                                                      'VD_GAS', 'VE_RECBAS', 'VE1_QSEM', 'VF_INTERNET', 'V_TIPO_SERSA',
                                                      'L_TIPO_INST', 'L_EXISTEHOG', 'L_TOT_PERL']
                                   },
                      "HOGAR": {"useful_columns": ['U_DPTO', 'U_MPIO', 'UA_CLASE', 'COD_ENCUESTAS',
                                                   'U_VIVIENDA', 'H_NROHOG', 'H_NRO_CUARTOS', 'H_NRO_DORMIT',
                                                   'H_DONDE_PREPALIM', 'H_AGUA_COCIN', 'HA_NRO_FALL', 'HA_TOT_PER']},
                      "PERSONAS": {"useful_columns": ['U_DPTO', 'U_MPIO', 'UA_CLASE', 'U_EDIFICA',
                                                      'COD_ENCUESTAS', 'U_VIVIENDA', 'P_NROHOG', 'P_NRO_PER', 'P_SEXO',
                                                      'P_EDADR', 'P_PARENTESCOR', 'PA_LUG_NAC',
                                                      'PA_VIVIA_5ANOS', 'PA_VIVIA_1ANO', 'P_ENFERMO', 'P_QUEHIZO_PPAL',
                                                      'PA_LO_ATENDIERON', 'PA1_CALIDAD_SERV', 'CONDICION_FISICA',
                                                      'P_ALFABETA', 'PA_ASISTENCIA', 'P_NIVEL_ANOSR', 'P_TRABAJO',
                                                      'P_EST_CIVIL', 'PA_HNV', 'PA1_THNV', 'PA2_HNVH', 'PA3_HNVM', 'PA_HNVS',
                                                      'PA1_THSV', 'PA2_HSVH', 'PA3_HSVM', 'PA_HFC', 'PA1_THFC', 'PA2_HFCH',
                                                      'PA3_HFCM']},
                      "FALLECIDOS": {"useful_columns": ['U_DPTO', 'U_MPIO', 'UA_CLASE', 'COD_ENCUESTAS',
                                                        'U_VIVIENDA', 'F_NROHOG', 'FA1_NRO_FALL', 'FA2_SEXO_FALL',
                                                        'FA3_EDAD_FALL', 'FA4_CERT_DEFUN']},
                      "GEOREFERENCIACION": {"useful_columns": ['U_DPTO', 'U_MPIO', 'UA_CLASE', 'UA1_LOCALIDAD', 'U_SECT_RUR',
                                                               'U_SECC_RUR', 'UA2_CPOB', 'U_SECT_URB', 'U_SECC_URB', 'U_MZA',
                                                               'U_EDIFICA', 'COD_ENCUESTAS', 'U_VIVIENDA']},
                      "DIVIPOLA": {"useful_columns": ['cod_depto', 'cod_mpio', 'dpto', 'nom_mpio', 'tipo_municipio']}
                      },
            }

bucket='censo-covid'
s3_resource = boto3.resource('s3')
bucket_s3 = s3_resource.Bucket(bucket)
show = True

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Paths**

In [7]:
censo_covid_bucket_s3 = f"s3a://{bucket}"

raw_data_path = os.path.join(censo_covid_bucket_s3, "raw-data")
censo_data_path = os.path.join(raw_data_path, "censo")
covid_tests_path = os.path.join(raw_data_path, "covid-tests.csv")
covid_path = os.path.join(raw_data_path, "covid.csv")
divipola_path = os.path.join(raw_data_path, "divipola.csv")

dict_paths_departments = get_censo_paths(bucket_s3, directory_key=os.path.join("raw-data", "censo"))

final_data_path = os.path.join(censo_covid_bucket_s3, "final-data")
complete_personas_path = os.path.join(final_data_path, "complete_personas")
complete_fallecidos_path = os.path.join(final_data_path, "complete_fallecidos")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Pipeline

### Load_data

In [8]:
covid_data = spark.read.option("header", "true").csv(covid_path,
                            schema=build_schema_covid(source="covid"))
if show:
    covid_data.limit(10).toPandas().T
    print("Length: ", covid_data.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                 0  ...            9
fecha_de_notificaci_n   2020-03-02  ...   2020-03-12
c_digo_divipola              11001  ...        11001
ciudad_de_ubicaci_n    Bogotá D.C.  ...  Bogotá D.C.
departamento           Bogotá D.C.  ...  Bogotá D.C.
atenci_n                Recuperado  ...   Recuperado
edad                            19  ...           36
sexo                             F  ...            F
tipo                     Importado  ...    Importado
estado                        Leve  ...         Leve
pa_s_de_procedencia         ITALIA  ...       ESPAÑA
fis                     2020-02-27  ...   2020-03-06
fecha_diagnostico       2020-03-06  ...   2020-03-12
fecha_recuperado        2020-03-13  ...   2020-03-21
fecha_reporte_web       2020-03-06  ...   2020-03-12
tipo_recuperaci_n              PCR  ...          PCR
codigo_departamento             11  ...           11
codigo_pais                    380  ...          724
pertenencia_etnica            Otro  ...       

In [9]:
covid_tests_data = spark.read.option("header", "true").csv(covid_tests_path,
                                  schema=build_schema_covid(source="tests"))
if show:
    covid_tests_data.limit(1000).toPandas().T
    print("Length: ", covid_tests_data.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                0           1    ...          130          131
fecha                    2020-03-05  2020-03-06  ...   2020-07-13   2020-07-14
acumuladas                      636         739  ...  1.05681e+06  1.08242e+06
amazonas                        NaN         NaN  ...        11640        11809
antioquia                       NaN         NaN  ...       117268       120233
arauca                          NaN         NaN  ...         3329         3342
atlantico                       NaN         NaN  ...        29314        30522
bogota                          NaN         NaN  ...       326305       335193
bolivar                         NaN         NaN  ...         7049         7079
boyaca                          NaN         NaN  ...        16583        16740
caldas                          NaN         NaN  ...        10383        10421
caqueta                         NaN         NaN  ...         2422         2452
casanare                        NaN         NaN  ...

In [10]:
divipola_data = spark.read.option("header", "true").csv(divipola_path, 
                                  schema=build_schema_divipola(source="divipola"))
if show:
    divipola_data.limit(10).toPandas().T
    print("Length: ", divipola_data.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                        0          1  ...          8          9
cod_depto               5          5  ...          5          5
cod_mpio                1          2  ...         38         40
dpto            ANTIOQUIA  ANTIOQUIA  ...  ANTIOQUIA  ANTIOQUIA
nom_mpio         MEDELLÍN  ABEJORRAL  ...  ANGOSTURA      ANORÍ
tipo_municipio  Municipio  Municipio  ...  Municipio  Municipio

[5 rows x 10 columns]
Length:  1121

In [11]:
census_data_dict={}
for department, val in dict_paths_departments.items():
    for table, table_path in val.items():
        if not(table in census_data_dict):
            census_data_dict[table] = spark.read.option("header", "true").csv(table_path, schema=build_schema_census(source=table))
        else:
            aux = spark.read.option("header", "true").csv(table_path, schema=build_schema_census(source=table))
            census_data_dict[table] = census_data_dict[table].union(aux)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Clean data

In [12]:
drop_viv_cols = [col for col in census_data_dict["VIV"].columns\
                 if col not in metadata["CENSO"]["VIVIENDA"]["useful_columns"]]
census_data_dict["VIV"] = census_data_dict["VIV"].drop(*drop_viv_cols)
if show:
    census_data_dict["VIV"].limit(10).toPandas().T
    print("Length: ",census_data_dict["VIV"].count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   0     1     2     3     4     5     6     7     8     9
U_DPTO             5     5     5     5     5     5     5     5     5     5
U_MPIO             1     1     1     1     1     1     1     1     1     1
UA_CLASE           1     1     1     1     1     1     1     1     1     1
U_EDIFICA          1     1     1     1     1     1     1     1     1     1
COD_ENCUESTAS   3646  3649  3658  3662  3666  3694  3697  3698  3699  3700
U_VIVIENDA         5     6    38    40    39    15     1    46    45    37
UVA_USO_UNIDAD     1     1     1     1     1     1     1     1     1     1
V_TIPO_VIV         1     1     2     1     2     2     2     2     2     2
V_CON_OCUP         1     1     1     1     1     1     1     1     1     1
V_TOT_HOG          2     1     6     3     1     1     1     1     1     1
V_MAT_PARED        1     1     1     1     8     1     1     1     1     1
V_MAT_PISO         2     2     5     2     4     2     2     2     2     2
VA_EE              1     

In [13]:
drop_hog_cols = [col for col in census_data_dict["HOG"].columns\
                 if col not in metadata["CENSO"]["HOGAR"]["useful_columns"]]
census_data_dict["HOG"] = census_data_dict["HOG"].drop(*drop_hog_cols)
census_data_dict["HOG"] = census_data_dict["HOG"].fillna(99, subset=['H_NROHOG'])
census_data_dict["HOG"] = census_data_dict["HOG"].withColumn("H_NROHOG", F.col("H_NROHOG").cast(IntegerType()))
if show:
    census_data_dict["HOG"].limit(10).toPandas().T
    print("Length: ",census_data_dict["HOG"].count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     0     1     2     3     4     5     6     7     8     9
U_DPTO               5     5     5     5     5     5     5     5     5     5
U_MPIO               1     1     1     1     1     1     1     1     1     1
UA_CLASE             1     1     1     1     1     1     1     1     1     1
COD_ENCUESTAS     3626  3629  3630  3630  3632  3634  3635  3636  3640  3641
U_VIVIENDA           4     2     1     1     6     9     6    10     1     5
H_NROHOG             1     1     1     2     1     1     1     1     1     1
H_NRO_CUARTOS        4     4     3     3     3     3     3     3     3     2
H_NRO_DORMIT         2     3     2     2     2     2     2     2     2     2
H_DONDE_PREPALIM     1     1     1     1     1     1     1     1     1     1
H_AGUA_COCIN         1     1     1     1     1     1     1     1     1     1
HA_NRO_FALL       None  None  None  None  None  None  None  None  None  None
HA_TOT_PER           5     3     3     4     3     2     1     3     2     3

In [14]:
drop_per_cols = [col for col in census_data_dict["PER"].columns\
                 if col not in metadata["CENSO"]["PERSONAS"]["useful_columns"]]
census_data_dict["PER"] = census_data_dict["PER"].drop(*drop_per_cols)
census_data_dict["PER"] = census_data_dict["PER"].fillna(99, subset=['P_NROHOG'])
census_data_dict["PER"] = census_data_dict["PER"].withColumn("P_NROHOG", F.col("P_NROHOG").cast(IntegerType()))
if show:
    census_data_dict["PER"].limit(10).toPandas().T
    print("Length: ",census_data_dict["PER"].count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                     0     1     2     3     4     5     6     7     8     9
U_DPTO               5     5     5     5     5     5     5     5     5     5
U_MPIO               1     1     1     1     1     1     1     1     1     1
UA_CLASE             1     1     1     1     1     1     1     1     1     1
U_EDIFICA            6     6     6     6     6     6     6     6     5     5
COD_ENCUESTAS     3626  3626  3626  3626  3626  3629  3629  3629  3630  3630
U_VIVIENDA           4     4     4     4     4     2     2     2     1     1
P_NROHOG             1     1     1     1     1     1     1     1     1     1
P_NRO_PER            1     2     3     4     5     1     2     3     5     6
P_SEXO               1     2     2     2     1     1     2     1     2     1
P_EDADR              5    11     7     1    11     6    12     8    12     7
P_PARENTESCOR        3     1     3     4     2     5     4     1     1     3
PA_LUG_NAC           2     2     1     1     1     2     2     2     1     1

In [15]:
drop_fall_cols = [col for col in census_data_dict["FALL"].columns\
                 if col not in metadata["CENSO"]["FALLECIDOS"]["useful_columns"]]
census_data_dict["FALL"] = census_data_dict["FALL"].drop(*drop_fall_cols)
census_data_dict["FALL"] = census_data_dict["FALL"].fillna(99, subset=['F_NROHOG'])
census_data_dict["FALL"] = census_data_dict["FALL"].withColumn("F_NROHOG", F.col("F_NROHOG").cast(IntegerType()))
if show:
    census_data_dict["FALL"].limit(10).toPandas().T
    print("Length: ",census_data_dict["FALL"].count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   0     1     2     3     4     5     6     7     8     9
U_DPTO             5     5     5     5     5     5     5     5     5     5
U_MPIO             1     1     1     1     1     1     1     1     1     1
UA_CLASE           1     1     1     1     1     1     1     1     1     1
COD_ENCUESTAS   3658  3799  3839  4342  4367  4741  9326  9341  9346  9346
U_VIVIENDA        38     6    12     5     5    28     5     8     4     4
F_NROHOG           6     1     1     1     1     1     1     1     1     2
FA1_NRO_FALL       1     1     1     1     1     1     1     1     1     1
FA2_SEXO_FALL      2     2     1     1     1     2     1     2     2     2
FA3_EDAD_FALL     90    90    89    58    33    94    71    97    64    64
FA4_CERT_DEFUN     1     1     1     2     1     9     1     2     1     1
Length:  94219

In [16]:
drop_mgn_cols = [col for col in census_data_dict["MGN"].columns\
                 if col not in metadata["CENSO"]["GEOREFERENCIACION"]["useful_columns"]]
census_data_dict["MGN"] = census_data_dict["MGN"].drop(*drop_mgn_cols)
if show:
    census_data_dict["MGN"].limit(10).toPandas().T
    print("Length: ",census_data_dict["MGN"].count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                       0          1          2  ...          7          8          9
U_DPTO                 5          5          5  ...          5          5          5
U_MPIO                 1          1          1  ...          1          1          1
UA_CLASE               1          1          1  ...          1          1          1
UA1_LOCALIDAD          0          0          0  ...          0          0          0
U_SECT_RUR             0          0          0  ...          0          0          0
U_SECC_RUR             0          0          0  ...          0          0          0
UA2_CPOB               0          0          0  ...          0          0          0
U_SECT_URB             0          0          0  ...          0          0          0
U_SECC_URB             0          0          0  ...          0          0          0
U_MZA                  0          0          0  ...          0          0          0
U_EDIFICA              1          1          1  ...          1   

### JoinTables

#### Personas

In [17]:
df_per_complete = census_data_dict["PER"].join(census_data_dict["VIV"], 
                                               on=['U_DPTO', 'U_MPIO', 'UA_CLASE', 
                                                   'U_EDIFICA', 'COD_ENCUESTAS',
                                                   'U_VIVIENDA'],
                                               how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
assert df_per_complete.filter(F.col("UVA_USO_UNIDAD").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
census_data_dict["HOG"] = add_suffix_to_cols(census_data_dict["HOG"], "_H")
df_per_complete = df_per_complete.join(census_data_dict["HOG"],
                                       (df_per_complete.U_DPTO==census_data_dict["HOG"].U_DPTO_H)&\
                                       (df_per_complete.U_MPIO==census_data_dict["HOG"].U_MPIO_H)&\
                                       (df_per_complete.UA_CLASE==census_data_dict["HOG"].UA_CLASE_H)&\
                                       (df_per_complete.COD_ENCUESTAS==census_data_dict["HOG"].COD_ENCUESTAS_H)&\
                                       (df_per_complete.U_VIVIENDA==census_data_dict["HOG"].U_VIVIENDA_H)&\
                                       (df_per_complete.P_NROHOG==census_data_dict["HOG"].H_NROHOG_H),
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
assert df_per_complete.filter(F.col("H_NROHOG_H").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_per_complete = df_per_complete.join(census_data_dict["MGN"],
                                       on=['U_DPTO', 'U_MPIO', 'UA_CLASE', 'U_EDIFICA', 
                                           'COD_ENCUESTAS', 'U_VIVIENDA'],
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
assert df_per_complete.filter(F.col("UA1_LOCALIDAD").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
df_per_complete = df_per_complete.join(divipola_data, 
                                       (df_per_complete.U_DPTO==divipola_data.cod_depto)&\
                                       (df_per_complete.U_MPIO==divipola_data.cod_mpio),
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
assert df_per_complete.filter(F.col("nom_mpio").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
duplicate_columns = ['cod_depto', 'cod_mpio', 'U_DPTO_H', 'U_MPIO_H', 'UA_CLASE_H',
                     'COD_ENCUESTAS_H', 'U_VIVIENDA_H', 'H_NROHOG_H']
df_per_complete = df_per_complete.drop(*duplicate_columns)

if show:
    df_per_complete.limit(10).toPandas().T
    print("Length: ", df_per_complete.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                        0          1  ...          8          9
U_DPTO                  5          5  ...          5          5
U_MPIO                  1          1  ...          1          1
UA_CLASE                1          1  ...          1          1
U_EDIFICA               1          1  ...          1          1
COD_ENCUESTAS        3702       3702  ...       4386       4402
...                   ...        ...  ...        ...        ...
U_SECC_URB              6          6  ...          1          6
U_MZA                   6          6  ...         21         23
dpto            ANTIOQUIA  ANTIOQUIA  ...  ANTIOQUIA  ANTIOQUIA
nom_mpio         MEDELLÍN   MEDELLÍN  ...   MEDELLÍN   MEDELLÍN
tipo_municipio  Municipio  Municipio  ...  Municipio  Municipio

[70 rows x 10 columns]
Length:  20386281

In [26]:
df_per_complete.write.partitionBy('dpto')\
        .mode('overwrite').option("header","true").csv(complete_personas_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Fallecidos

In [27]:
df_fall_complete = census_data_dict["FALL"].join(census_data_dict["VIV"], 
                                               on=['U_DPTO', 'U_MPIO', 'UA_CLASE', 
                                                   'COD_ENCUESTAS', 'U_VIVIENDA'],
                                               how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
assert df_fall_complete.filter(F.col("UVA_USO_UNIDAD").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df_fall_complete = df_fall_complete.join(census_data_dict["HOG"],
                                       (df_fall_complete.U_DPTO==census_data_dict["HOG"].U_DPTO_H)&\
                                       (df_fall_complete.U_MPIO==census_data_dict["HOG"].U_MPIO_H)&\
                                       (df_fall_complete.UA_CLASE==census_data_dict["HOG"].UA_CLASE_H)&\
                                       (df_fall_complete.COD_ENCUESTAS==census_data_dict["HOG"].COD_ENCUESTAS_H)&\
                                       (df_fall_complete.U_VIVIENDA==census_data_dict["HOG"].U_VIVIENDA_H)&\
                                       (df_fall_complete.F_NROHOG==census_data_dict["HOG"].H_NROHOG_H),
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
assert df_fall_complete.filter(F.col("H_NROHOG_H").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df_fall_complete = df_fall_complete.join(census_data_dict["MGN"],
                                       on=['U_DPTO', 'U_MPIO', 'UA_CLASE', 'U_EDIFICA', 
                                           'COD_ENCUESTAS', 'U_VIVIENDA'],
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
assert df_fall_complete.filter(F.col("UA1_LOCALIDAD").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
df_fall_complete = df_fall_complete.join(divipola_data, 
                                       (df_fall_complete.U_DPTO==divipola_data.cod_depto)&\
                                       (df_fall_complete.U_MPIO==divipola_data.cod_mpio),
                                       how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
assert df_fall_complete.filter(F.col("nom_mpio").isNull()).count() == 0.0

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
duplicate_columns = ['cod_depto', 'cod_mpio', 'U_DPTO_H', 'U_MPIO_H', 'UA_CLASE_H',
                     'COD_ENCUESTAS_H', 'U_VIVIENDA_H', 'H_NROHOG_H']
df_fall_complete = df_fall_complete.drop(*duplicate_columns)

if show:
    df_fall_complete.limit(10).toPandas().T
    print("Length: ", df_fall_complete.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                            0          1  ...          8          9
U_DPTO                      5          5  ...          5          5
U_MPIO                      1          1  ...          1          1
UA_CLASE                    1          1  ...          1          1
U_EDIFICA                   1          1  ...          1          1
COD_ENCUESTAS          156689     752560  ...    2232362    2417415
U_VIVIENDA                 12          4  ...        214         15
F_NROHOG                    1          1  ...          1          1
FA1_NRO_FALL                1          1  ...          1          1
FA2_SEXO_FALL               2          2  ...          1          2
FA3_EDAD_FALL              88         30  ...         51         26
FA4_CERT_DEFUN              1          1  ...          1          1
UVA_USO_UNIDAD              1          1  ...          1          1
V_TIPO_VIV                  2          1  ...          2          1
V_CON_OCUP                  1          1  ...   

In [39]:
df_fall_complete.repartition(1).write.partitionBy('dpto')\
    .mode('overwrite').option("header","true").csv(complete_fallecidos_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…