In [1]:
from dask.distributed import Client
import pandas as pd 
import dask.dataframe as dd
import numpy as np

In [2]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:52672  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 19.84 GiB


# Loading Data

In [3]:
# subset = ["v101","v025", "d104", "d106", "d107", "d108", "d113", "d121", "d122a", "d122b", "v044"
#           "v133","v137", "v153", "v190", "v501", "v502", "v511", "v512", "v730", "v731", "v741", "v012", 
#           "v745a","v745b", "v715", "v744a","v744b", "v744c", "v744d", "v744e"]
var = ['country', 'violence', 'age', 'age_first', 'duration', 'urban', 'edu', 'n_children', 'employ_paid',
       'own_house', 'own_land', 'att_wb', 'fam_ipv', 'part_age', 'part_edu', 'alcohol', 
       'wealth_idx', 'telephone_yes']

In [4]:
def clean_db(file):
    '''
    perform data cleaning on file
    '''    
    df = dd.read_csv(file, dtype={'v715': 'object', 'v133': 'int64'})
    
    df['country'] = file[:2]
    # create outcome variable
    df['emo_vio'] = df['d104'].apply(lambda x:1 if x == 'yes' else 0, meta=('emo_vio', 'int64'))
    df['phy_vio'] = df['d106'].apply(lambda x:1 if x == 'yes' else 0, meta=('phy_vio', 'int64')) | \
                df['d107'].apply(lambda x:1 if x == 'yes' else 0, meta=('phy_vio', 'int64'))
    df['sex_vio'] = df['d108'].apply(lambda x:1 if x == 'yes' else 0, meta=('sex_vio', 'int64'))
    df['violence'] = (df['emo_vio'] | df['phy_vio'] | df['sex_vio'])
    
    # clean individual-level features
    df['urban'] = df['v025'].apply(lambda x: 1 if x != 'rural' else 0, meta=('urban', 'int64'))
    df['employ_paid'] = df['v741'].apply(lambda x:1 if x in ('in-kind only', 'cash only', 'cash and in-kind') \
                                         and x  else 0, meta=('employ_paid', 'int64'))
    df['own_land'] = df['v745b'].apply(lambda x:1 if x != 'does not own' else 0, meta=('own_land', 'int64'))
    df['own_house'] = df['v745a'].apply(lambda x:1 if x != 'does not own' else 0, meta=('own_house', 'int64'))
    df['fam_ipv'] = df['d121'].apply(lambda x: 1 if x == 'yes' else 0, meta=('fam_ipv', 'int64'))
    
    for var in ['v744a', 'v744b', 'v744c', 'v744d', 'v744e']:
        df[var] = df[var].apply(lambda x:1 if x == 'yes' else 0, meta=(var, 'int64'))
    df['att_wb'] = (df[['v744a', 'v744b', 'v744c', 'v744d', 'v744e']].sum(axis=1) >= 1).astype(int)
    df = df.rename(columns=dict(zip(['v137', 'v133', 'v012', 'v511', 'v512'], 
                                    ['n_children', 'edu', 'age', 'age_first', 'duration'])))
    # clean partner features
    df['v715'] = df['v715'].apply(lambda x: np.NaN if x == 'don\'t know' else x, meta=('v715', 'int64'))
    df['v715'] = df['v715'].replace({np.NaN: -1}).astype(int)
    df['v715'] = df['v715'].replace({-1: np.NaN})
    #print(df['v715'].unique().compute())# = df['v715']
    df = df.rename(columns=dict(zip(['v730', 'v715'], ['part_age', 'part_edu'])))
    df['alcohol'] = df['d113'].apply(lambda x:1 if x == 'yes' else 0, meta=('alcohol', 'int64'))
    # household-level features
    df['wealth_idx'] = df['v190'].replace({'poorest': 0, 'poorer': 1, 'middle': 2, 'richer': 3, 'richest': 4})
    df['telephone_yes'] = df['v153'].apply(lambda x:1 if x == 'yes' else 0, meta=('telephone_yes', 'int64'))
    
    return df
    

# Combining Multiple Datasets

In [5]:
PK = clean_db('PK.csv')[var].dropna()
IA = clean_db('IA.csv')[var].dropna()
NP = clean_db('NP.csv')[var].dropna()

In [7]:
pooled = pd.concat([PK.compute(), NP.compute(), IA.compute()])

In [8]:
pooled.shape#.to_csv('pooled_clean.csv',index=False)

(73156, 18)

In [9]:
pooled.to_csv('pooled_clean.csv', index=False)#[1]#.compute()