### Config

In [None]:
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

from typing import List

In [None]:
# Path specifications
PROJECT_NAME = 'costa_rica_poverty'
PACKAGE_ROOT = Path().resolve().parent
INPUT_DIR = f"{PACKAGE_ROOT}/{PROJECT_NAME}/packages/model/model/input/data"
ASSET_DIR = f"{PACKAGE_ROOT}/{PROJECT_NAME}/packages/model/model/assets"
OUTPUT_DIR = f"{PACKAGE_ROOT}/{PROJECT_NAME}/packages/model/model/output"

# Data file specifications
CODEBOOK = 'codebook.csv'
DATA_FILE = 'train.csv'

### Data cleaning: test drafting

In [None]:
import pandas as pd
import numpy as np

In [None]:
codebook = pd.read_csv(f"{ASSET_DIR}/{CODEBOOK}")
df = pd.read_csv(f"{INPUT_DIR}/{DATA_FILE}")
print(df.shape)

In [None]:
################## Head of households

#### TEST
def test_head_of_household_exists(raw_data):
    # Given
    hh_head = df.groupby('idhogar')['parentesco1'].sum().reset_index()
    
    # When
    hh_head_none = hh_head[hh_head['parentesco1']!=1]
    
    # Then
    assert hh_head_none.shape[0]==0
    
#### FIX
class HeadOfHouseholdExist(BaseEstimator, TransformerMixin):
    def __init__(self, variables: List[str]) -> None:
        '''
        Specify variable input as list of strings representing column names of a pd.DataFrame.
        '''
        # YOUR CODE HERE
        return None
    
    def fit(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Required method for Sklearn TransformerMixin class.
        Remains inactive and performs no action for now.
        Leave as is.
        '''
        return self

    def transform(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Creates a copy of input dataframe, which is passed to method via the sklearn Pipeline.
        This method performs changes to the dataframe with reference to specified variables.
        The modified copy of the original dataframe is then returned and passed to the next step in pipeline.
        Define your specific transformation here.
        '''
        X = X.copy()
        # YOUR CODE HERE
        hh_head = df.groupby('idhogar')['parentesco1'].sum().reset_index()
        hh_head_none= hh_head[hh_head['parentesco1']!=1]
        hh_head_none
        
        X = X[~X['idhogar'].isin(hh_head_none['idhogar'].values)]

        return X

#### TEST
def test_household_target_match(raw_data):
    # Given
    householdTargetsMatching = (df.groupby('idhogar')['Target'].nunique()==1).reset_index()
    
    # When
    householdTargetMisMatch = householdTargetsMatching[householdTargetsMatching['Target']==False]
    
    # Then
    assert householdTargetMisMatch.shape[0]==0

    
#### FIX
class HouseholdTargetsMatch(BaseEstimator, TransformerMixin):
    def __init__(self, variables: List[str]) -> None:
        '''
        Specify variable input as list of strings representing column names of a pd.DataFrame.
        '''
        # YOUR CODE HERE
        return None
    
    def fit(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Required method for Sklearn TransformerMixin class.
        Remains inactive and performs no action for now.
        Leave as is.
        '''
        return self

    def transform(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Creates a copy of input dataframe, which is passed to method via the sklearn Pipeline.
        This method performs changes to the dataframe with reference to specified variables.
        The modified copy of the original dataframe is then returned and passed to the next step in pipeline.
        Define your specific transformation here.
        '''
        X = X.copy()
        # Identify household target mismatches
        householdTargetsMatching = (X.groupby('idhogar')['Target'].nunique()==1).reset_index()
        householdTargetMisMatch = householdTargetsMatching[householdTargetsMatching['Target']==False]
        
        # set all member targets to head of household target
        for household in householdTargetMisMatch['idhogar']:
            correct_poverty_level = int(X[(X['idhogar'] == household) & (X['parentesco1'] == 1)]['Target'])
            X.loc[df['idhogar'] == household, 'Target'] = correct_poverty_level

        return X


In [None]:
#### FIX
class BinaryEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, variables: List[str]) -> None:
        '''
        Specify variable input as list of strings representing column names of a pd.DataFrame.
        '''
        # YOUR CODE HERE
        return None
    
    def fit(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Required method for Sklearn TransformerMixin class.
        Remains inactive and performs no action for now.
        Leave as is.
        '''
        return self

    def transform(self, X: pd.DataFrame, y: pd.Series = None):
        '''
        Creates a copy of input dataframe, which is passed to method via the sklearn Pipeline.
        This method performs changes to the dataframe with reference to specified variables.
        The modified copy of the original dataframe is then returned and passed to the next step in pipeline.
        Define your specific transformation here.
        '''
        X = X.copy()
        
        string_to_binary = {"yes": 1, "no": 0}

        X['edjefa'] = X['edjefa'].replace(string_to_binary).astype(float)
        X['edjefe'] = X['edjefe'].replace(string_to_binary).astype(float)
        X['dependency'] = X['dependency'].replace(string_to_binary).astype(float)


        return X

In [None]:
corr_matrix = df.corr()

In [None]:
corr_matrix

In [None]:
# Select upper triangle of correlation matrix
upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))


In [None]:
upper

In [None]:

# Find index of feature columns with correlation greater than 0.95
to_drop = [column for column in upper.columns if any(abs(upper[column]) > 0.95)]



In [None]:
to_drop

In [None]:
['coopele', 'area2', 'tamhog', 'hhsize', 'hogar_total']

In [None]:
electricity_values = []

# Assign values
for i, row in df.iterrows():
    if row['noelec'] == 1:
        electricity_values.append(0)
    elif row['coopele'] == 1:
        electricity_values.append(1)
    elif row['public'] == 1:
        electricity_values.append(2)
    elif row['planpri'] == 1:
        electricity_values.append(3)
    else:
        elec.append(np.nan)

df['elec'] = elec

In [None]:
# Record the new variable and missing flag
df = df.drop(columns = ['noelec', 'coopele', 'public', 'planpri'])

In [None]:
pd.DataFrame(elec)[0].value_counts()