In [None]:
from IPython.display import display
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "last"
import matplotlib.pyplot as plt
import numpy as np

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("census-income").config("spark-master", "local").getOrCreate()
spark

### Set up schema and read data from CSV

In [None]:
from pyspark.sql.types import *
struct_fields_list = [
    StructField("age", IntegerType(), True),
    StructField("class_of_worker", StringType(), True),
    StructField("industry_code", StringType(), True),
    StructField("occupation_code", StringType(), True),
    StructField("education", StringType(), True),
    StructField("wage_per_hr", DoubleType(), True),
    StructField("enrolled_in_edu_inst_last_wk", StringType(), True),
    StructField("marital_status", StringType(), True),
    StructField("major_industry_code", StringType(), True),
    StructField("major_occupation_code", StringType(), True),
    StructField("race", StringType(), True),
    StructField("hispanic_origin", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("mem_labour_union", StringType(), True),
    StructField("unemployment_reason", StringType(), True),
    StructField("employment_status", StringType(), True),
    StructField("capital_gain", DoubleType(), True),
    StructField("capital_loss", DoubleType(), True),
    StructField("stock_dividends", DoubleType(), True),
    StructField("tax_filer_status", StringType(), True),
    StructField("prev_region", StringType(), True),
    StructField("prev_state", StringType(), True),
    StructField("household_status", StringType(), True),
    StructField("household_summary", StringType(), True),
    StructField("instance_weight", DoubleType(), True),
    StructField("migration_code_msa", StringType(), True),
    StructField("migration_code_region", StringType(), True),
    StructField("migration_code_within_region", StringType(), True),
    StructField("live_in_this_house_one_year_ago", StringType(), True),
    StructField("migration_prev_res_in_sunbelt", StringType(), True),
    StructField("num_persons_for_employer", IntegerType(), True),
    StructField("parent", StringType(), True),
    StructField("birth_country_father", StringType(), True),
    StructField("birth_country_mother", StringType(), True),
    StructField("birth_country_self", StringType(), True),
    StructField("citizenship", StringType(), True),
    StructField("own_business", StringType(), True),
    StructField("veteran_QA", StringType(), True),
    StructField("veteran_benefits", StringType(), True),
    StructField("weeks_worked_in_yr", IntegerType(), True),
    StructField("year", StringType(), True),
    StructField("income", StringType(), True),
]

schema = StructType(struct_fields_list)

# read data, remove trailing and leading whitespace, set null value to ?
spark_train = spark.read.csv("census-income.data", 
                             schema=schema, 
                             ignoreLeadingWhiteSpace=True,
                             ignoreTrailingWhiteSpace=True,
                             nullValue="?")
spark_test = spark.read.csv("census-income.test", 
                             schema=schema, 
                             ignoreLeadingWhiteSpace=True,
                             ignoreTrailingWhiteSpace=True,
                             nullValue="?")

TRAIN_SIZE = spark_train.count()
TEST_SIZE = spark_test.count()

print("Train set shape: ({},{})".format(TRAIN_SIZE, len(spark_train.columns)))
print("Test set shape: ({}, {})".format(TEST_SIZE, len(spark_test.columns)))

In [None]:
# Get full dataset 
spark_ds = spark_train.union(spark_test)
# Drop instance_weight column (according to dataset description)
spark_ds = spark_ds.drop("instance_weight")
DATASET_SIZE = spark_ds.count()
# Full dataset shape
print("Dataset shape: ({}, {})".format(DATASET_SIZE, len(spark_ds.columns)))
print()
# Print first 5 rows
spark_ds.show(5, truncate=False, vertical=True)

In [None]:
target_col = "income"
nominal_cols = [x[0] for x in spark_ds.dtypes if x[1] == "string" and x[0] != target_col]
numeric_cols = [x[0] for x in spark_ds.dtypes if x[1] != "string" and x[0] != target_col]

print("Nominal columns:", nominal_cols)
print("There are {} nominal columns.".format(len(nominal_cols)))
print()
print("Numeric columns:", numeric_cols)
print("There are {} numeric columns.".format(len(numeric_cols)))

### Empty values preprocessing

In [None]:
from pyspark.sql.functions import *

# Count null and empty values in each columns
# In numeric columns
print("Empty values percentage in numeric columns:")
spark_ds.select([(count(when((col(c).isNull()) | (col(c) == ""), c)) / DATASET_SIZE * 100)\
                 .alias(c) for c in numeric_cols]).show(vertical=True)
# In nominal columns
print("Empty values percentage in nominal columns:")
spark_ds.select([(count(when((col(c).isNull()) | (col(c) == ""), c)) / DATASET_SIZE * 100)\
                 .alias(c) for c in nominal_cols]).show(vertical=True)

We can see that all numeric columns do not have empty values. Let's inspect nominal columns which have empty values and process them.

In [None]:
# Inspect columns with empty counts
cols_with_empty_vals = ["prev_state", "migration_code_msa", "migration_code_region", "migration_code_within_region",
                       "migration_prev_res_in_sunbelt", "birth_country_father", "birth_country_mother", "birth_country_self"]

We can see from the output above that 4 columns (which start with migration_code*) have a really high percentage of missing values compared to all other columns (around 50% data missing). We will drop these columns because it might be misleading to include these columns in our classifier.

In [None]:
# Notice that 4 columns: migration_code_* have a really high percentage of null values, ~50%
# Inspect a sample migration_code_* column's unique values count
print("Unique values count of migration_code_msa column:")
spark_ds.groupBy("migration_code_msa")\
        .count()\
        .withColumn("count", col("count") / DATASET_SIZE * 100)\
        .orderBy(desc("count")).show()

# We will drop these columns
redundant_cols = ["migration_code_msa", "migration_code_region", "migration_code_within_region",
                         "migration_prev_res_in_sunbelt"]

spark_ds = spark_ds.drop(*redundant_cols)
cols_with_empty_vals = [c for c in cols_with_empty_vals if c not in redundant_cols]
nominal_cols = [c for c in nominal_cols if c not in redundant_cols]

print("Total number of columns after dropping:", len(spark_ds.columns))

In [None]:
# helper function to get unique values percentage in each column
def get_unique_values_percentage(col_name):
    return spark_ds.groupBy(col_name)\
                    .count()\
                    .withColumn("count", col("count") / DATASET_SIZE * 100)\
                    .orderBy(desc("count"))

# helper function to get most frequent value from unique value counts list
def get_most_freq_value(unique_counts, col_name):
    return unique_counts\
            .orderBy(desc("count"))\
            .select(col_name)\
            .collect()[0][col_name]

# helper function to replace null values in column with new specified value
def replace_nulls(col_name, new_value):
    return spark_ds.withColumn(col_name, \
         when(col(col_name).isNull(), new_value).otherwise(col(col_name)))

For remaining columns with lower percentage of null values, our strategy is to replace null values with the most frequent value in each corresponding column. 

In [None]:
# Deal wih remaining columns with empty values but smaller percentage
prev_state_count = get_unique_values_percentage("prev_state")

birth_country_father_count = get_unique_values_percentage("birth_country_father")

birth_country_mother_count = get_unique_values_percentage("birth_country_mother")

birth_country_self_count = get_unique_values_percentage("birth_country_self")

print("prev_state column unique values count:")
prev_state_count.show()
print("birth_country_father column unique values count:")
birth_country_father_count.show()
print("birth_country_mother column unique values count:")
birth_country_mother_count.show()
print("birth_country_self column unique values count:")
birth_country_self_count.show()

# We will replace null values with the most frequent value in each column
prev_state_most_freq = get_most_freq_value(prev_state_count, "prev_state")
print("Most frequent value in prev_state column:", prev_state_most_freq)

birth_country_father_most_freq = get_most_freq_value(birth_country_father_count, "birth_country_father")
print("Most frequent value in birth_country_father column:", birth_country_father_most_freq)

birth_country_mother_most_freq = get_most_freq_value(birth_country_mother_count, "birth_country_mother")
print("Most frequent value in birth_country_mother column:", birth_country_mother_most_freq)

birth_country_self_most_freq = get_most_freq_value(birth_country_self_count, "birth_country_self")
print("Most frequent value in birth_country_self column:", birth_country_self_most_freq)

# Replace null values
spark_ds = replace_nulls("prev_state", prev_state_most_freq)
spark_ds = replace_nulls("birth_country_father", birth_country_father_most_freq)
spark_ds = replace_nulls("birth_country_mother", birth_country_mother_most_freq)
spark_ds = replace_nulls("birth_country_self", birth_country_self_most_freq)

print("Verify results:")
# verify results
get_unique_values_percentage("prev_state").show()
get_unique_values_percentage("birth_country_father").show()
get_unique_values_percentage("birth_country_mother").show()
get_unique_values_percentage("birth_country_self").show()
print("Empty values count:")
spark_ds.select([(count(when((col(c).isNull()) | (col(c) == ""), c)) / DATASET_SIZE * 100)\
                 .alias(c) for c in cols_with_empty_vals]).show(vertical=True)


After some inspection, we see that some columns have a really high percentage of "Not in universe" values, which means the value recorded is not in the survey's values domain. We probably do not want to include these columns when we train our classifier. We set the threshold to be 90% (i.e. if the percentage of "Not in universe" values is more than 90% in a specific column, we will drop it).

In [None]:
# Show "Not in universe" percentage count in each column
spark_ds.select([(count(when(col(c) == "Not in universe", c)) / DATASET_SIZE * 100)\
                 .alias(c) for c in nominal_cols]).show(vertical=True)

We can see that these 6 columns have more than 90% of "Not in universe" values. We will drop these columns from our dataset.

In [None]:
high_niu_cols = ['enrolled_in_edu_inst_last_wk', 'mem_labour_union', 'unemployment_reason', 'prev_region', 'prev_state', 'veteran_QA']
# drop these columns
spark_ds = spark_ds.drop(*high_niu_cols)

In [None]:
print("Final dataset shape: ({}, {})".format(DATASET_SIZE, len(spark_ds.columns)))

### Convert target variable (income) for binary classification problem

In [None]:
# Show percentage of each unique values in our target variable
income_count = get_unique_values_percentage("income")
income_count.show()
# We could see that around 94% of our data belongs to -50000 class (below 50$K dollar income/year)
# This signifies a binary classification problem with imbalanced data
# We will convert major label to be 0 (negative) and minor label to be 1 (positive)
major_label = income_count.orderBy(desc("count"))\
                .select("income")\
                .collect()[0]["income"]
minor_label = income_count.orderBy(desc("count"))\
                .select("income")\
                .collect()[1]["income"]

print("Major label:", major_label)
print("Minor label:", minor_label)

# Convert to numeric
spark_ds = spark_ds.withColumn("income", \
         when(col("income") == minor_label, 1).otherwise(0))
# Convert income column to be numeric type (int)
spark_ds = spark_ds.withColumn("income", col("income").cast("int"))

# verify results
print("Verify results:")
get_unique_values_percentage("income").show()

## Exploration data analysis

In [None]:
target_col = "income"
nominal_cols = [x[0] for x in spark_ds.dtypes if x[1] == "string" and x[0] != target_col]
numeric_cols = [x[0] for x in spark_ds.dtypes if x[1] != "string" and x[0] != target_col]

print("Nominal columns:", nominal_cols)
print("There are {} nominal columns.".format(len(nominal_cols)))
print()
print("Numeric columns:", numeric_cols)
print("There are {} numeric columns.".format(len(numeric_cols)))

We will explore numeric columns first, then move on to nominal columns. We will also try to only show columns with meaningful data because the total number of columns might be too large.

### Age

#### General statistics

In [None]:
spark_ds.select("age").describe().show()
median_age = spark_ds.approxQuantile("age", [0.5], 0)[0]
print("Median age:", median_age)

#### Plot
We will plot histogram labeled by target column (i.e. income).

In [None]:
age_0 = spark_ds.where(col("income") == 0).select("age").rdd.flatMap(lambda x: x).collect()
age_1 = spark_ds.where(col("income") == 1).select("age").rdd.flatMap(lambda x: x).collect()

plt.figure(figsize=(12,8))
plt.hist(age_0, bins=50, label="Below $50K income")
plt.hist(age_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of Age")
plt.xlabel('Age')
plt.ylabel('Frequency')
plt.show()

- We can see that people who have above \\$50K income are more than 20 years old (age > 20).
- Most of people who have > \\$50K+ income are from ~30 - ~60 years old.
- The data distribution is slightly right-skewed. You can see that from the point of age 50 onwards, the frequency column's height is almost half.
- More younger people (age < 50) takes part in the survey (census) than older people (age > 50).

### Wage/hr

#### General statistics

In [None]:
# convert to dollars first (original data is in cents unit)
converted_wage_per_hr = spark_ds.select((col("wage_per_hr") / 100).alias("wage_per_hour ($)"))

In [None]:
converted_wage_per_hr.describe().show()
median_wage_per_hr = converted_wage_per_hr.approxQuantile("wage_per_hour ($)", [0.5], 0)[0]
print("Median wage/hr:", median_wage_per_hr)

#### Plot

In [None]:
wage_per_hr_0 = spark_ds.where(col("income") == 0).select(col("wage_per_hr") / 100).rdd.flatMap(lambda x: x).collect()
wage_per_hr_1 = spark_ds.where(col("income") == 1).select(col("wage_per_hr") / 100).rdd.flatMap(lambda x: x).collect()

plt.figure(figsize=(12,8))
plt.hist(wage_per_hr_0, bins=50, label="Below $50K income")
plt.hist(wage_per_hr_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of wage/hr")
plt.xlabel('Wage/hr')
plt.ylabel('Frequency')
plt.show()

- The data distribution is extremely right skewed. Most of the data's wage per hour is 0, meaning most of the people doing the survey does not have a job. We can also see that most of the people who have low wage/hr (below \\$20/hr) have below \\$50K+ income/yr.
- Interestingly, most of people who have above \$50k+ income (orange part), have wage/hr equals 0. This might mean they have a different source of income other than working or maybe the data might be corrupted.

### Capital loss, capital gain and stock dividends
Like wage/hr column above, these columns also have extremely right-skewed data distribution, i.e. most of the data have value 0.

#### General statistics

In [None]:
spark_ds.select("capital_loss", "capital_gain", "stock_dividends").describe().show()
median_capital_loss = spark_ds.approxQuantile("capital_loss", [0.5], 0)[0]
median_capital_gain = spark_ds.approxQuantile("capital_gain", [0.5], 0)[0]
median_stock_dividends = spark_ds.approxQuantile("stock_dividends", [0.5], 0)[0]
print("Median capital loss:", median_capital_loss)
print("Median capital gain:", median_capital_gain)
print("Median capital stock_dividends:", median_stock_dividends)

#### Plot

In [None]:
capital_loss_0 = spark_ds.where(col("income") == 0).select("capital_loss").rdd.flatMap(lambda x: x).collect()
capital_loss_1 = spark_ds.where(col("income") == 1).select("capital_loss").rdd.flatMap(lambda x: x).collect()
capital_gain_0 = spark_ds.where(col("income") == 0).select("capital_gain").rdd.flatMap(lambda x: x).collect()
capital_gain_1 = spark_ds.where(col("income") == 1).select("capital_gain").rdd.flatMap(lambda x: x).collect()
stock_0 = spark_ds.where(col("income") == 0).select("stock_dividends").rdd.flatMap(lambda x: x).collect()
stock_1 = spark_ds.where(col("income") == 1).select("stock_dividends").rdd.flatMap(lambda x: x).collect()

plt.figure(figsize=(12,12))
# Capital loss histogram
plt.subplot(2,2,1)
plt.hist(capital_loss_0, bins=50, label="Below $50K income")
plt.hist(capital_loss_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of Capital loss")
plt.xlabel('Capital loss')
plt.ylabel('Frequency')

# Capital gain histogram
plt.subplot(2,2,2)
plt.hist(capital_gain_0, bins=50, label="Below $50K income")
plt.hist(capital_gain_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of Capital gain")
plt.xlabel('Capital gain')
plt.ylabel('Frequency')

# Stock dividends histogram
plt.subplot(2,2,3)
plt.hist(stock_0, bins=50, label="Below $50K income")
plt.hist(stock_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of Stock dividends")
plt.xlabel('Stock dividends')
plt.ylabel('Frequency')

plt.show()

### Weeks worked in year

#### General statistics

In [None]:
spark_ds.select("weeks_worked_in_yr").describe().show()
median_weeks_worked_in_yr = spark_ds.approxQuantile("weeks_worked_in_yr", [0.5], 0)[0]
print("Median weeks worked in a year:", median_weeks_worked_in_yr)

#### Plot

In [None]:
weeks_worked_0 = spark_ds.where(col("income") == 0).select("weeks_worked_in_yr").rdd.flatMap(lambda x: x).collect()
weeks_worked_1 = spark_ds.where(col("income") == 1).select("weeks_worked_in_yr").rdd.flatMap(lambda x: x).collect()

plt.figure(figsize=(12,8))
plt.hist(weeks_worked_0, bins=50, label="Below $50K income")
plt.hist(weeks_worked_1, bins=50, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Histogram of Weeks worked in 1 year")
plt.xlabel('Weeks worked in 1 year')
plt.ylabel('Frequency')
plt.show()

- The data distribution is very interesting. Most of the people either do not work (i.e. 0 week) or work for a whole year (i.e. 52 weeks). 
- A smaller number of people works part time during the year (lower frequency bars in the middle of the plot)
- We can clearly see that most of people who have above \$50K+ income/yr works for a whole year (orange part in week 52 bar).

### Scatter plot

#### Age vs Wage/hr

In [None]:
plt.figure(figsize=(12,8))
plt.scatter(age_0, wage_per_hr_0, label="Below $50K income")
plt.scatter(age_1, wage_per_hr_1, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Scatter of Age vs Wage/hr")
plt.xlabel('Age')
plt.ylabel('Wage/hr')
plt.show()

- We can see that most of the people who have above \\$50k+ income have wage/hr < \\$40/hr.
- Above the threshold of \\$40 wage/hr, there are more people (\*) with <50$K+ income than people with >\\$50K+ income. These people (\*) probably worked part time instead of full-time.

####  Age vs Capital gain

In [None]:
plt.figure(figsize=(12,8))
plt.scatter(age_0, capital_gain_0, label="Below $50K income")
plt.scatter(age_1, capital_gain_1, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Scatter of Age vs Capital gain")
plt.xlabel('Age')
plt.ylabel('Capital gain')
plt.show()

- We can see most of the people have capital gain below \\$40K. Capital gain below \\$40K does not necessarily mean that your income will be lower than \\$50K, but really high capital gain as we can see (around \\$100K) will result in >\\$50K+ income.

#### Age vs Capital loss

In [None]:
plt.figure(figsize=(12,8))
plt.scatter(age_0, capital_loss_0, label="Below $50K income")
plt.scatter(age_1, capital_loss_1, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Scatter of Age vs Capital loss")
plt.xlabel('Age')
plt.ylabel('Capital loss')
plt.show()

- We can see that almost all people with above $50K+ income have capital loss lower than \\$3000.
- Intuitively, people who have a high capital loss, i.e. above \\$3K, tend to have below \\$50K+ income.

#### Age vs stock dividends

In [None]:
plt.figure(figsize=(12,8))
plt.scatter(age_0, stock_0, label="Below $50K income")
plt.scatter(age_1, stock_1, label="Above $50K income")
plt.legend()
plt.grid()
plt.title("Scatter of Age vs Stock dividends")
plt.xlabel('Age')
plt.ylabel('Stock dividends')
plt.show()

- Below the threshold of \\$40K stock dividends, there are both type of income, either above or below \\$50K.
- Above the threshold of \\$40K stock dividends, there are only people with income above $50K.

### Nominal columns
We will mainly use stacked bar plot labeled by income for nominal columns to visualize our data.

#### Class of worker

In [None]:
# Query the data
worker_class_count = spark_ds.groupBy('class_of_worker').agg(
    count(when((col("income") == 0), True)).alias("count0"),
    count(when((col("income") == 1), True)).alias("count1")
)

# count rows labeled 0
worker_class_0_count = [row['count0'] for row in worker_class_count.collect()]
# count rows label 1
worker_class_1_count = [row['count1'] for row in worker_class_count.collect()]
# unique labels
worker_class_labels = [row['class_of_worker'] for row in worker_class_count.collect()]
index = np.arange(len(worker_class_labels))

# Plot the bar chart
plt.figure(figsize=(15,8))
plt.bar(index, worker_class_0_count, label="Below $50K income")
plt.bar(index, worker_class_1_count, label="Above $50K income")
plt.xticks(index, worker_class_labels, rotation=20)
plt.legend()
plt.grid()
plt.title("Class of worker bar chart")
plt.xlabel('Class of worker')
plt.ylabel('Frequency')
plt.show()

- It seems like the value "Not in universe" has the highest frequency in this column. Let's ignore that for now.
- Most people have "Private" class of worker, and the number of people with > \\$50k+ income mostly come from this category as well (the highest orange part in Private bar).

#### Education

In [None]:
# Query the data
education_count = spark_ds.groupBy('education').agg(
    count(when((col("income") == 0), True)).alias("count0"),
    count(when((col("income") == 1), True)).alias("count1")
)

# count rows labeled 0
education_0_count = [row['count0'] for row in education_count.collect()]
# count rows label 1
education_1_count = [row['count1'] for row in education_count.collect()]
# unique labels
education_labels = [row['education'] for row in education_count.collect()]
index = np.arange(len(education_labels))

# Plot the bar chart
plt.figure(figsize=(17,8))
plt.bar(index, education_0_count, label="Below $50K income")
plt.bar(index, education_1_count, label="Above $50K income")
plt.xticks(index, education_labels, rotation=80)
plt.legend()
plt.grid()
plt.title("Education bar chart")
plt.xlabel('Education')
plt.ylabel('Frequency')
plt.show()

- Most of the people taking the survey are High school graduate or Children.
- All children have income level 0 (below \\$50K)
- Bachelors degree have the highest proportion of people with income level 1 (above \\$50K). This is followed by "Some college but no degree", "High school graduate" and "Masters degree" with slightly less proportion of income level 1.

#### Marital status

In [None]:
# Query the data
marital_status_count = spark_ds.groupBy('marital_status').agg(
    count(when((col("income") == 0), True)).alias("count0"),
    count(when((col("income") == 1), True)).alias("count1")
)

# count rows labeled 0
marital_status_0_count = [row['count0'] for row in marital_status_count.collect()]
# count rows label 1
marital_status_1_count = [row['count1'] for row in marital_status_count.collect()]
# unique labels
marital_status_labels = [row['marital_status'] for row in marital_status_count.collect()]
index = np.arange(len(marital_status_labels))

# Plot the bar chart
plt.figure(figsize=(17,8))
plt.bar(index, marital_status_0_count, label="Below $50K income")
plt.bar(index, marital_status_1_count, label="Above $50K income")
plt.xticks(index, marital_status_labels, rotation=20)
plt.legend()
plt.grid()
plt.title("Marital status bar chart")
plt.xlabel('Marital status')
plt.ylabel('Frequency')
plt.show()

- Most of the people are "Never married", followed by "Married-Civillian spouse present".
- "Married-Civillian spouse present" has the highest proportion of people with income level 1 (highest orange part).

#### Industry

In [None]:
# Query the data
industry_count = spark_ds.groupBy('major_industry_code').agg(
    count(when((col("income") == 0), True)).alias("count0"),
    count(when((col("income") == 1), True)).alias("count1")
)

# count rows labeled 0
industry_0_count = [row['count0'] for row in industry_count.collect()]
# count rows label 1
industry_1_count = [row['count1'] for row in industry_count.collect()]
# unique labels
industry_labels = [row['major_industry_code'] for row in industry_count.collect()]
index = np.arange(len(industry_labels))

# Plot the bar chart
plt.figure(figsize=(17,8))
plt.bar(index, industry_0_count, label="Below $50K income")
plt.bar(index, industry_1_count, label="Above $50K income")
plt.xticks(index, industry_labels, rotation=80)
plt.legend()
plt.grid()
plt.title("Industry bar chart")
plt.xlabel('Industry')
plt.ylabel('Frequency')
plt.show()

- Most of the people taking the survey are "Not in universe or Children".
- The most popular industry is Retail trade with >20K people.
- The industry which has the most proportion of people with income level 1 is "Manufacturing-durable goods".

#### Sex

In [None]:
# Query the data
sex_count = spark_ds.groupBy('sex').agg(
    count(when((col("income") == 0), True)).alias("count0"),
    count(when((col("income") == 1), True)).alias("count1")
)

# count rows labeled 0
sex_0_count = [row['count0'] for row in sex_count.collect()]
# count rows label 1
sex_1_count = [row['count1'] for row in sex_count.collect()]
# unique labels
sex_labels = [row['sex'] for row in sex_count.collect()]
index = np.arange(len(sex_labels))

# Plot the bar chart
plt.figure(figsize=(8,8))
plt.bar(index, sex_0_count, label="Below $50K income")
plt.bar(index, sex_1_count, label="Above $50K income")
plt.xticks(index, sex_labels, rotation=80)
plt.legend()
plt.grid()
plt.title("Sex chart")
plt.xlabel('Sex')
plt.ylabel('Frequency')
plt.show()

- There are slightly more females than males taking the survey.
- Males have a higher proportion of people with income level 1 than female.

## Pyspark MLlib process

### Data preprocessing pipelines
The preprocessing pipeline will make sure that the features are converted correctly to be put into the ML classifier. This will include:
1. For numerical columns:  
    1.1. Use **VectorAssembler** to combine numeric columns into vector column "numericFeatures".  
    1.2. Use **StandardScaler** to standardize (z-score) the numeric features.  
2. For categorical columns:  
    2.1. First, use **StringIndexer** to convert categorical columns to be numeric.  
    2.2. Second, use **OneHotEncoderEstimator** to one hot encode the output of (2.1).  
3. Use **VectorAssembler** to combine all columns we have to a single vector column called "features". This will be the input to our ML classifier later.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# 1. Numerical columns
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numericFeatures")
scaler = StandardScaler(inputCol="numericFeatures", outputCol="scaledNumericFeatures",
                        withStd=True, withMean=True)

# 2. Categorical columns
# 2.1. StringIndexer
indexers = [StringIndexer(inputCol=c, outputCol="{}_indexed".format(c))
                 for c in nominal_cols]

# 2.1. One hot encode
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
             outputCol="{}_encoded".format(indexer.getOutputCol()))
             for indexer in indexers]

# 3. Combine into features vector column
final_assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                            + [scaler.getOutputCol()], outputCol="features")

# Execute pipeline on original dataset
preprocessing_pipeline = Pipeline(stages=[numeric_assembler, scaler] + indexers + encoders + [final_assembler])
transformed_ds = preprocessing_pipeline.fit(spark_ds).transform(spark_ds)

# Select only features and label (income) column
transformed_ds = transformed_ds.select("features", "income")
transformed_ds.show(5)

### Train and test set
Originally the dataset has provided us with train and test file but we combine them to perform data exploration analysis + preprocessing. Now we will split them like originally provided.

In [None]:
import pyspark.sql.functions as f
# Add a helper index column
tranformed_ds = transformed_ds.withColumn('index', f.monotonically_increasing_id())

# sort ascending and take first TRAIN_SIZE rows
spark_train = tranformed_ds.sort('index').limit(TRAIN_SIZE).select("features", "income")

# sort descending and TEST_SIZE rows
spark_test = tranformed_ds.sort('index', ascending=False).limit(TEST_SIZE).select("features", "income")

# Verify size
print("Train size:", spark_train.count())
print("Test size:", spark_test.count())

#### Distribution of label in train and test set

In [None]:
# Distribution of label in train and test set
print("Label distribution in train set:")
spark_train.groupBy("income")\
                .count()\
                .withColumn("count", col("count") / spark_train.count() * 100)\
                .orderBy(desc("count")).show()

print("Label distribution in test set")
spark_test.groupBy("income")\
                .count()\
                .withColumn("count", col("count") / spark_test.count() * 100)\
                .orderBy(desc("count")).show()

### Classification Models and Evaluation

#### 1. Logistic Regression

**A. Setup model + train-validation-split hyperparameters tuning**

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="income", featuresCol="features")

# Tuning evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="income", metricName="areaUnderROC")

# Tuning parameters grid for logistic regression model
lr_param_grid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01, 0.1])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter, [10, 100])\
    .build()

# Tuning train-validation-split settings
lr_tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=lr_param_grid,
                           evaluator=evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8,
                           parallelism=16)

**B. Train model and predict on test set**

In [None]:
# Run train-validation-split on train set and get the best model
lr_tvs_model = lr_tvs.fit(spark_train)

# Make predictions on test set
lr_predictions = lr_tvs_model.transform(spark_test)

In [None]:
# Best model found:
lr_best = lr_tvs_model.bestModel

print("Logistic Regression model summary:")
# Intercept
print("Intercept:", lr_best.intercept)
# Coefficients
print("Coefficients:")
coefficients = lr_best.coefficients
coefficients = [(float(c),) for c in coefficients]  # convert numpy type to float, and to tuple
coefficients_df = sqlContext.createDataFrame(coefficients, ["Feature Coefficients"])
coefficients_df.show()

# Regularization coefficient
print("Regularization coefficient:", lr_best._java_obj.getRegParam())
# Elastic net coefficient
print("Elastic net coefficient:", lr_best._java_obj.getElasticNetParam())
# Max iteration
print("Max iterations:", lr_best._java_obj.getMaxIter())

**C. Evaluation Metrics**

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Get overall statistics (i.e. accuracy, precision, recall and F1-score)
preds_labels_rdd = lr_predictions.rdd.map(lambda x: (float(x.prediction), float(x.income)))
lr_metrics = MulticlassMetrics(preds_labels_rdd)

# Overall statistics
lr_accuracy = lr_metrics.accuracy
# We are interested in the score of precision, recall and F1-score for positive class (few number of samples)
lr_precision = lr_metrics.precision(1.0)
lr_recall = lr_metrics.recall(1.0)
lr_f1Score = lr_metrics.fMeasure(1.0)
print("Logistic Regression summary stats:")
print("Accuracy = {}".format(lr_accuracy))
print("Precision = {}".format(lr_precision))
print("Recall = {}".format(lr_recall))
print("F1 Score = {}".format(lr_f1Score))

#### 2. Decision Tree

**A. Setup model + train-validation-split hyperparameters tuning**

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
# Create initial DecisionTree model
dt = DecisionTreeClassifier(labelCol="income", 
                            featuresCol="features", 
                            minInstancesPerNode=20)

# Tuning evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="income", metricName="areaUnderROC")

# Tuning parameters grid for decision tree model
dt_param_grid = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [15, 20, 30])\
    .addGrid(dt.maxBins, [20, 60])\
    .addGrid(dt.impurity, ['gini', 'entropy'])\
    .build()

# Tuning train-validation-split settings
dt_tvs = TrainValidationSplit(estimator=dt,
                           estimatorParamMaps=dt_param_grid,
                           evaluator=evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8,
                           parallelism=16)

**B. Train model and predict on test set**

In [None]:
# Run train-validation-split on train set and get the best model
dt_tvs_model = dt_tvs.fit(spark_train)

# Make predictions on test set
dt_predictions = dt_tvs_model.transform(spark_test)

In [None]:
# Best model found:
dt_best = dt_tvs_model.bestModel

print("Decision Tree model summary:")
print("Depth:", dt_best.depth)
print("Number of nodes:", dt_best.numNodes)
print("Impurity criteria:", dt_best._java_obj.getImpurity())
print("Max bins:", dt_best._java_obj.getMaxBins())

**C. Evaluation Metrics**

In [None]:
# Get overall statistics (i.e. accuracy, precision, recall and F1-score)
preds_labels_rdd = dt_predictions.rdd.map(lambda x: (float(x.prediction), float(x.income)))
dt_metrics = MulticlassMetrics(preds_labels_rdd)

# Overall statistics
dt_accuracy = dt_metrics.accuracy
# We are interested in the score of precision, recall and F1-score for positive class (few number of samples)
dt_precision = dt_metrics.precision(1.0)
dt_recall = dt_metrics.recall(1.0)
dt_f1Score = dt_metrics.fMeasure(1.0)
print("Decision Tree summary stats:")
print("Accuracy = {}".format(dt_accuracy))
print("Precision = {}".format(dt_precision))
print("Recall = {}".format(dt_recall))
print("F1 Score = {}".format(dt_f1Score))

#### 3. Random Forest

**A. Setup model + train-validation-split hyperparameters tuning**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
# Create initial RandomForest model
rf = RandomForestClassifier(labelCol="income", 
                            featuresCol="features",
                            impurity="entropy",                          
                            maxDepth=15,
                            maxBins=60,
                            minInstancesPerNode=20)

# Tuning evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="income", metricName="areaUnderROC")

# Tuning parameters grid for random forest model
rf_param_grid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [10, 20, 30])\
    .build()

# Tuning train-validation-split settings
rf_tvs = TrainValidationSplit(estimator=rf,
                           estimatorParamMaps=rf_param_grid,
                           evaluator=evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8,
                           parallelism=16)

**B. Train model and predict on test set**

In [None]:
# Run train-validation-split on train set and get the best model
rf_tvs_model = rf_tvs.fit(spark_train)

# Make predictions on test set
rf_predictions = rf_tvs_model.transform(spark_test)

In [None]:
# Best model found:
rf_best = rf_tvs_model.bestModel

print("Random Forest model summary:")
print("Number of trees:", rf_best._java_obj.getNumTrees())

**C. Evaluation Metrics**

In [None]:
# Get overall statistics (i.e. accuracy, precision, recall and F1-score)
preds_labels_rdd = rf_predictions.rdd.map(lambda x: (float(x.prediction), float(x.income)))
rf_metrics = MulticlassMetrics(preds_labels_rdd)

# Overall statistics
rf_accuracy = rf_metrics.accuracy
# We are interested in the score of precision, recall and F1-score for positive class (few number of samples)
rf_precision = rf_metrics.precision(1.0)
rf_recall = rf_metrics.recall(1.0)
rf_f1Score = rf_metrics.fMeasure(1.0)
print("Random Forest summary stats:")
print("Accuracy = {}".format(rf_accuracy))
print("Precision = {}".format(rf_precision))
print("Recall = {}".format(rf_recall))
print("F1 Score = {}".format(rf_f1Score))

### Pyspark MLlib result summary

In [None]:
num_models = 3
index = np.arange(num_models)
width = 0.15 # width of bar

# trained models' results
spark_accuracy = [lr_accuracy, dt_accuracy, rf_accuracy]
spark_precision = [lr_precision, dt_precision, rf_precision]
spark_recall = [lr_recall, dt_recall, rf_recall]
spark_f1 = [lr_f1Score, dt_f1Score, rf_f1Score]

# side-by-side bar plot
plt.figure(figsize=(12,8))
plt.bar(index, spark_accuracy, width, label="Accuracy")
plt.bar(index + width, spark_precision, width, label="Precision")
plt.bar(index + width * 2, spark_recall, width, label="Recall")
plt.bar(index + width * 3, spark_f1, width, label="F1 Score")

# Additional info
plt.ylabel("Score")
plt.title("Pyspark MLlib Result summary")
plt.xticks(index + width * 1.5, ('Logistic Regression', 'Decision Tree', 'Random Forest'))
plt.yticks(np.arange(0, 1.1, 0.1))
plt.grid()
plt.legend()
plt.show()

### Convert to pandas dataframe

In [None]:
import pandas as pd
pd.set_option('display.max_columns', 50)
# Pandas dataframe
pd_dataset = spark_ds.toPandas()
# To get train and test set
pd_train = pd_dataset.iloc[:TRAIN_SIZE, :]
pd_test = pd_dataset.iloc[TRAIN_SIZE:, :]

pd_train.shape
pd_test.shape

In [None]:
pd_dataset.head()

In [None]:
# verify that there are no missing values :)
pd_dataset.isnull().sum().sort_values(ascending=False)