# 1b. Survey Cleaning - Mexico 
Mexico survey data is from the 2015 partial census, obtained from IPUMs. (https://international.ipums.org/international-action/sample_details/country/mx#tab_mx2015a)

In [4]:
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import shutil
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from statsmodels.stats.weightstats import DescrStatsW

In [5]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import *
spark = SparkSession \
    .builder \
    .appName("random") \
    .config("spark.sql.files.maxPartitionBytes", 67108864) \
    .config("spark.driver.memory", "50g") \
    .config("spark.driver.maxResultSize", "2g")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [6]:
def save_df(df: SparkDataFrame, outfname: str, sep: str = ',') -> None:
    """
    Saves spark dataframe to csv file, using work-around to deal with spark's automatic partitioning and naming
    """
    outfolder = outfname[:-4]
    df.repartition(1).write.csv(path=outfolder, mode="overwrite", header="true", sep=sep)
    # Work around to deal with spark automatic naming
    old_fname = [fname for fname in os.listdir(outfolder) if fname[-4:] == '.csv'][0]
    os.rename(outfolder + '/' + old_fname, outfname)
    shutil.rmtree(outfolder)

In [10]:
def grouped_weighted_mean(df, agg_cols, feature_cols, weight_col):
    data = df.copy()
    for feature_col in feature_cols:
        data[feature_col] = data[feature_col]*data[weight_col]
    data = data.groupby(agg_cols, as_index=False).agg('sum')
    for feature_col in feature_cols:
        data[feature_col] = data[feature_col]/data[weight_col] 
    return data[agg_cols + feature_cols + [weight_col]]

In [7]:
df = spark.read.csv('/data/mosaiks/ipums/raw/ipumsi_00001.csv', header=True)\
    .select(['SERIAL', 'HHWT', 'GEO1_MX2015', 'GEO2_MX2015', 'ELECTRIC', 'PHONE', 'CELL', 'INTERNET', 'AUTOS', 
             'HOTWATER', 'AIRCON', 'COMPUTER', 'WASHER', 'REFRIG', 'TV', 'RADIO', 'URBAN'])\
    .withColumnRenamed('SERIAL', 'hhid')\
    .withColumnRenamed('HHWT', 'weight')\
    .withColumnRenamed('GEO1_MX2015', 'state')\
    .withColumnRenamed('GEO2_MX2015', 'municipality')

df = df.withColumn('ELECTRIC', 
                   when(col('ELECTRIC') == 1, 1)\
                   .when(col('ELECTRIC') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('PHONE', 
                   when(col('PHONE') == 1, 1)\
                   .when(col('PHONE') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('CELL', 
                   when(col('CELL') == 1, 1)\
                   .when(col('CELL') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('INTERNET', 
                   when(col('INTERNET') == 1, 1)\
                   .when(col('INTERNET') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('AUTOS', 
                   when(col('AUTOS') == 0, 0)\
                   .when(col('AUTOS') == 8, np.nan)\
                   .when(col('AUTOS') == 9, np.nan)\
                   .otherwise(1))

df = df.withColumn('HOTWATER', 
                   when(col('HOTWATER') == 1, 1)\
                   .when(col('HOTWATER') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('AIRCON', 
                   when(col('AIRCON') == 10, 0)\
                   .when(col('AIRCON') == 0, np.nan)\
                   .when(col('AIRCON') == 99, np.nan)\
                   .otherwise(1))

df = df.withColumn('COMPUTER', 
                   when(col('COMPUTER') == 1, 1)\
                   .when(col('COMPUTER') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('WASHER', 
                   when(col('WASHER') == 0, np.nan)\
                   .when(col('WASHER') == 9, np.nan)\
                   .when(col('WASHER') == 1, 0)\
                   .otherwise(1))

df = df.withColumn('REFRIG', 
                   when(col('REFRIG') == 1, 1)\
                   .when(col('REFRIG') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('TV', 
                   when(col('TV') == 0, np.nan)\
                   .when(col('TV') == 99, np.nan)\
                   .when(col('TV') == 10, 0)
                   .otherwise(1))

df = df.withColumn('RADIO', 
                   when(col('RADIO') == 1, 1)\
                   .when(col('RADIO') == 2, 0)\
                   .otherwise(np.nan))

df = df.withColumn('RURAL', 
                   when(col('URBAN') == 2, 0)\
                   .when(col('URBAN') == 1, 1)\
                   .otherwise(np.nan))

save_df(df, '/data/mosaiks/replication/ipums/raw/mexico_reduced.csv')

In [8]:
# Read in data
indiv = pd.read_csv('/data/mosaiks/replication/ipums/raw/mexico_reduced.csv')
df = indiv.drop_duplicates(subset=['hhid'])
print('Number of households: %i' % len(df))

# Create asset index
assets = ['ELECTRIC', 'PHONE', 'CELL', 'INTERNET', 'AUTOS', 'HOTWATER', 'AIRCON', 'COMPUTER', 'WASHER',
         'REFRIG', 'TV', 'RADIO']
df = df.dropna(subset=assets)
print('Number of households with all assets: %i' % len(df))
pca = PCA(n_components=1, svd_solver='arpack')
scaler = StandardScaler()
standardized_assets = scaler.fit_transform(df[assets])
asset_index = pca.fit_transform(standardized_assets)
df['asset_index'] = asset_index
print('Explained variance: %.2f' % pca.explained_variance_ratio_)

# Get poor indicator - bottom 41.9% by asset index
wq = DescrStatsW(data=df['asset_index'].values, weights=df['weight'].values)
cutoff = wq.quantile(probs=np.array([.419]), return_pandas=False)[0]
df['poor'] = (df['asset_index'] < cutoff).astype('int')

df = indiv.merge(df[['hhid', 'asset_index', 'poor']], on='hhid', how='inner')
df = df.rename({'RURAL':'rural'}, axis=1)
print('Number of individuals with all assets: %i' % len(df))
print('Number of municipalities: %i' % len(df['municipality'].unique()))

Number of households: 2927196
Number of households with all assets: 2871373
Explained variance: 0.34
Number of individuals with all assets: 11137182
Number of municipalities: 2446


In [13]:
hh = df.drop_duplicates(subset=['hhid'])
grouped_hh = grouped_weighted_mean(hh, ['municipality', 'state'], ['asset_index', 'rural'], 'weight')
grouped_hh['rural'] = (grouped_hh['rural'] > 0.5).astype('int')
grouped = grouped_hh.drop('weight', axis=1)
print('Percent of municipalities where more than half of HH are rural: %.2f' % grouped['rural'].mean())
print('Correlation between rural and wealth: %.2f' % np.corrcoef(grouped['rural'], grouped['asset_index'])[0][1])
grouped[['municipality', 'state', 'asset_index', 'rural']]\
    .to_csv('/data/mosaiks/replication/surveys/mexico/grouped.csv', index=False)

Percent of municipalities where more than half of HH are rural: 0.56
Correlation between rural and wealth: -0.51
