# conclusions
- the most convinient key for merge is ['id', 'idversion'] with 1,135,584 matchs
- Adding 'cs7idregistrospoblacionn' to merge adds only 134 observations and 908,764 so not coincide on ['id', 'idversion'] (which has 99.99% of birth date collisions), meaning a different hash might be used across tables
- the distribution of icv on the Superate beneficiaries (**with duplicated households removed**) is:  

	| icv | superate beneficiaries|
    | -- | -- |
    | 1.0 |	105,969  |
    | 2.0 |	600,520  |
	| 3.0 |	331,589  |
	| 4.0 |	34,896  |
	| Total |	1,072,974|

In [None]:
# magics
%load_ext autoreload
%autoreload 2
%config Completer.use_jedi = False

In [None]:
# pandas stack
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import gc
from sklearn.pipeline import make_pipeline

In [None]:
# get timestamp in utc format
from datetime import datetime
timestamp = datetime.utcnow().strftime('%Y%m%dT%H%MZ')
print(f'Timestamp for saving outputs: {timestamp}')

In [None]:
from projectetl.utils.config import data_dir
from criteriaetl.utils.display_func import cdisplay
from criteriaetl.transformers.fusion_base import MergeTransformer
from projectetl.utils.dataload import load_s3_data_do

In [None]:
import contextlib
import pandas.io.formats.format as pf

# copied from https://stackoverflow.com/questions/29663252/
# can-you-format-pandas-integers-for-display-like-pd-options-display-float-forma
@contextlib.contextmanager
def df_formatting(float_fmt='{:0,.1f}', integer_fmt='{:,d}'):
    orig_float_format = pd.options.display.float_format
    orig_int_format = pf.IntArrayFormatter

    pd.options.display.float_format = float_fmt.format
    class IntArrayFormatter(pf.GenericArrayFormatter):
        def _format_strings(self):
            formatter = self.formatter or integer_fmt.format
            fmt_values = [formatter(x) for x in self.values]
            return fmt_values
    pf.IntArrayFormatter = IntArrayFormatter
    yield
    pd.options.display.float_format = orig_float_format
    pf.IntArrayFormatter = orig_int_format
    

In [None]:
superate_beneficiaries_df = pd.read_csv(
    f'{data_dir}/AR/raw/SIUBEN-SUPERATE.csv',
    usecols=['CS1NoFormularioN', 'CS7IDRegistrosPoblacionN', 'ID',
             'IdVersion', 'CS12FechaNacimientoD', 'ICV'], 
    na_values=" ")
superate_beneficiaries_df.columns = superate_beneficiaries_df.columns.str.lower()
gc.collect()
cdisplay(superate_beneficiaries_df.head())

In [None]:
superate_beneficiaries_df.info()

In [None]:
superate_beneficiaries_df

In [None]:
miembros_parsed_df = load_s3_data_do('miembros')

In [None]:
cdisplay(miembros_parsed_df.head())

In [None]:
households_parsed_df = load_s3_data_do('hogares')
cdisplay(households_parsed_df.head())

In [None]:
with df_formatting('{:,}'):
    miembros_ids_df = miembros_parsed_df[[
        'cs7noformularion', 'id', 'idversion', 'cs12fechanacimientod']]
    miembros_ids_df.info()
    display(miembros_ids_df.head())

# Define merge

In [None]:
matched_obs = {}
for col in ['id', 'idversion']:
    matched_obs[col] = pd.to_numeric(
        superate_beneficiaries_df[col].dropna(), downcast='unsigned').isin(
        miembros_parsed_df[col]).sum()

In [None]:
matched_obs['noformularion'] = pd.to_numeric(superate_beneficiaries_df[
    'cs1noformularion'].dropna(), downcast='unsigned').isin(
     miembros_parsed_df['cs7noformularion']).sum()

In [None]:
to_merge_miembros_df = miembros_parsed_df[[
    'cs7noformularion', 'cs7idregistrospoblacionn', 'id', 'idversion',
    'cs12fechanacimientod']]

to_merge_superate_df = superate_beneficiaries_df[[
    'cs1noformularion', 'cs7idregistrospoblacionn', 'id', 'idversion']
].astype('Int64')

to_merge_superate_df.loc[:, 'cs12fechanacimientod'] = pd.to_datetime(
    superate_beneficiaries_df['cs12fechanacimientod'].str.slice(0, -8), 
    format='%m/%d/%Y')

In [None]:
merge_col = 'id'
merged_df = to_merge_miembros_df.merge(to_merge_superate_df.dropna(
    subset=[merge_col]), on=merge_col, indicator=True)
print(merged_df._merge.value_counts())
merged_df.cs12fechanacimientod_x.subtract(merged_df.cs12fechanacimientod_y).value_counts()

In [None]:
merge_col = 'cs7idregistrospoblacionn'
merged_df_a = to_merge_miembros_df.merge(to_merge_superate_df.dropna(
    subset=[merge_col]), on=merge_col, indicator=True)
print(merged_df_a._merge.value_counts())
merged_df_a.cs12fechanacimientod_x.subtract(merged_df_a.cs12fechanacimientod_y).value_counts()

In [None]:
merge_col = 'idversion'
merged_df = to_merge_miembros_df.drop_duplicates(subset=[merge_col]).merge(
    to_merge_superate_df.dropna(
    subset=[merge_col]), on=merge_col, indicator=True, how='right')
display(merged_df._merge.value_counts())
merged_df.cs12fechanacimientod_x.subtract(merged_df.cs12fechanacimientod_y).value_counts()

In [None]:
merge_left_col = 'cs7noformularion'
merge_right_col = 'cs1noformularion'
merged_df = to_merge_miembros_df.drop_duplicates(subset=[merge_left_col]).merge(
    to_merge_superate_df.dropna(
    subset=[merge_right_col]), left_on=merge_left_col, right_on=merge_right_col, 
    indicator=True, how='right')
display(merged_df._merge.value_counts())
merged_df.cs12fechanacimientod_x.subtract(merged_df.cs12fechanacimientod_y).value_counts()

In [None]:
merge_both_col = 'cs7idregistrospoblacionn'
merge_left_col = 'cs7noformularion'
merge_right_col = 'cs1noformularion'
merged_df = to_merge_miembros_df.merge(
    to_merge_superate_df.dropna(
    subset=[merge_both_col, merge_right_col]), left_on=[merge_both_col, merge_left_col],
    right_on=[merge_both_col, merge_right_col], indicator=True, how='right')
display(merged_df._merge.value_counts())
merged_df.cs12fechanacimientod_x.subtract(merged_df.cs12fechanacimientod_y).value_counts()

In [None]:
merge_col = ['id', 'idversion']
merged_df_b = to_merge_miembros_df.merge(to_merge_superate_df.dropna(
    subset=merge_col), on=merge_col, indicator=True)
print(merged_df_b._merge.value_counts())
merged_df_b.cs12fechanacimientod_x.subtract(merged_df_b.cs12fechanacimientod_y).value_counts()

In [None]:
merge_stats = pd.Series(matched_obs).to_frame(name='# coincidencias')


merge_stats.loc[:, '# igual fecha nacimiento'] = [1135517, np.nan, np.nan]
merge_stats.loc['id & idversion'] = [1135584, 1135517]

with df_formatting('{:,}'):
    display(merge_stats)

In [None]:
merged_df_b

In [None]:
(~ merged_df_a.cs7idregistrospoblacionn.isin(merged_df_b.cs7idregistrospoblacionn_x)).sum()

In [None]:
merged_df_b.cs7idregistrospoblacionn_x.ne(
    merged_df_b.cs7idregistrospoblacionn_y).sum()

# Get ICV statistics

In [None]:
icv_superate = superate_beneficiaries_df.icv.value_counts().sort_index()
print(f'Superate beneficieries across ICV: \n{icv_superate}')

df_ = icv_superate.to_frame('superate beneficiaries')
df_.loc['Total'] = icv_superate.sum()
with df_formatting('{:,}'):
    display(df_)

In [None]:
merge_cols = ['id', 'idversion']
household_key = {
    'member': 'cs7noformularion',
    'household': 'cs1noformularion'
}
to_merge_miembros_df = miembros_parsed_df[[
    'cs7noformularion', 'id', 'idversion']]

to_merge_superate_df = superate_beneficiaries_df[[
    'id', 'idversion', 'icv']
].astype({'id': 'Int64', 'idversion': 'Int64', 'icv': 'category'})

In [None]:
merge_pipe = make_pipeline(
    MergeTransformer(lambda: to_merge_superate_df.dropna(
    subset=merge_col), merge_kwargs=dict(
    on=merge_cols)),
    MergeTransformer(lambda: households_parsed_df[[
        household_key['household'], 'nivelpobreza'
    ]], merge_kwargs=dict(
    left_on=household_key['member'], right_on=household_key['household'])
))
merged_df = merge_pipe.transform(to_merge_miembros_df)


In [None]:
merged_df

In [None]:
merged_df.icv.eq(merged_df.nivelpobreza).value_counts(normalize=True)

In [None]:
icv_superate_wo_duplicates = merged_df.groupby(household_key['household'])[
    'icv'].nth(0).value_counts().sort_index()
display(icv_superate_wo_duplicates)
df_ = icv_superate_wo_duplicates.to_frame('superate beneficiaries')
df_.loc['Total'] = icv_superate_wo_duplicates.sum()
with df_formatting('{:,}'):
    display(df_)