## Import libraries

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)


# ignore warnings
import warnings
warnings.filterwarnings('ignore')


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/census-income/census-income.test
/kaggle/input/census-income/census-income.data
/kaggle/input/census-income/census-income.names
/kaggle/input/adult-census-income/adult.csv


### Make sure spark is installed 

In [2]:
try:
    import pyspark
except ModuleNotFoundError:
    print("Pyspark not found. Installing quietly")
    !pip install pyspark -q

Pyspark not found. Installing quietly
[0m

## Create a spark session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/13 12:10:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Dataset description

In this project I use the US Census-Income dataset available on UCI's archive. This dataset contains 299 285 patterns and 40 features of weighted census data. The data contains demographic and employment related variables. 

The main task for the dataset is classification, with the goal of deciding if a person will earn above or below $50K/year. 

The two classes are: 

1. Below $50K.

2. Above $50K.


#### General information
- Name: Census-Income (KDD)
- Location: https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29
    - Download the census.tar.gz file
- Documentation: https://archive.ics.uci.edu/ml/machine-learning-databases/census-income-mld/census-income.names
- Files:
    - Training set: census-income.data
    - Testing set: census-income.test


#### Dataset columns are:

0. age
1. class of worker
2. industry code
3. occupation code
4. education
5. wage per hour
6. enrolled in edu inst last wk
7. marital status
8. major industry code
9. major occupation code
10. mace
11. hispanic Origin
12. sex
13. member of a labor union
14. reason for unemployment
15. full or part time employment stat
16. capital gains
17. capital losses
18. divdends from stocks
19. tax filer status
20. region of previous residence
21. state of previous residence
22. detailed household and family stat
23. detailed household summary in household
24. instance weight
25. migration code-change in msa
26. migration code-change in reg
27. migration code-move within reg
28. live in this house 1 year ago
29. migration prev res in sunbelt
30. num persons worked for employer
31. family members under 18
32. country of birth father
33. country of birth mother
34. country of birth self
35. citizenship
36. own business or self employed
37. fill inc questionnaire for veteran's admin
38. veterans benefits
39. weeks worked in year
40. year
41. income (class)

## Import dataset 

In [4]:
%%time

df_train = (spark.read
          .format("csv")
          .option("inferSchema", True)
          .load("/kaggle/input/census-income/census-income.data"))

df_test = (spark.read
          .format("csv")
          .option("inferSchema", True)
          .load("/kaggle/input/census-income/census-income.test"))

[Stage 3:>                                                          (0 + 4) / 4]

CPU times: user 13.5 ms, sys: 2.29 ms, total: 15.8 ms
Wall time: 10.7 s


                                                                                

In [5]:
df_train.show(5)

23/02/13 12:10:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---+--------------------+---+----+--------------------+---+----------------+--------------+--------------------+--------------------+--------------------+----------+-------+----------------+----------------+--------------------+----+----+----+------------------+----------------+----------------+--------------------+--------------------+-------+-----------+------------+------------+--------------------+----------------+----+--------------------+--------------+--------------+--------------+--------------------+----+----------------+----+----+----+---------+
|_c0|                 _c1|_c2| _c3|                 _c4|_c5|             _c6|           _c7|                 _c8|                 _c9|                _c10|      _c11|   _c12|            _c13|            _c14|                _c15|_c16|_c17|_c18|             

The dataset does not contain headers, so the column names are encoded as _cx.

In the dataset description, it is suggested to drop column _24, so we'll do that for both of the dataframes before we continue:

In [6]:
df_test = df_test.drop("_c24")
df_train = df_train.drop("_c24")

## Exploratory Data Analysis

### Check test/training distribution

Lets check how much data we use for training and testing:

In [7]:
df_train.count(), df_test.count()

(199523, 99762)

We see that we use about 67% of the data for training and 33% for testing.

### Data types

Check what data types we have: 

In [8]:
df_train.dtypes

[('_c0', 'int'),
 ('_c1', 'string'),
 ('_c2', 'double'),
 ('_c3', 'double'),
 ('_c4', 'string'),
 ('_c5', 'double'),
 ('_c6', 'string'),
 ('_c7', 'string'),
 ('_c8', 'string'),
 ('_c9', 'string'),
 ('_c10', 'string'),
 ('_c11', 'string'),
 ('_c12', 'string'),
 ('_c13', 'string'),
 ('_c14', 'string'),
 ('_c15', 'string'),
 ('_c16', 'double'),
 ('_c17', 'double'),
 ('_c18', 'double'),
 ('_c19', 'string'),
 ('_c20', 'string'),
 ('_c21', 'string'),
 ('_c22', 'string'),
 ('_c23', 'string'),
 ('_c25', 'string'),
 ('_c26', 'string'),
 ('_c27', 'string'),
 ('_c28', 'string'),
 ('_c29', 'string'),
 ('_c30', 'double'),
 ('_c31', 'string'),
 ('_c32', 'string'),
 ('_c33', 'string'),
 ('_c34', 'string'),
 ('_c35', 'string'),
 ('_c36', 'double'),
 ('_c37', 'string'),
 ('_c38', 'double'),
 ('_c39', 'double'),
 ('_c40', 'double'),
 ('_c41', 'string')]

### Preview dataset

In [9]:
df_train.show(5)

+---+--------------------+---+----+--------------------+---+----------------+--------------+--------------------+--------------------+--------------------+----------+-------+----------------+----------------+--------------------+----+----+----+------------------+----------------+----------------+--------------------+--------------------+-----------+------------+------------+--------------------+----------------+----+--------------------+--------------+--------------+--------------+--------------------+----+----------------+----+----+----+---------+
|_c0|                 _c1|_c2| _c3|                 _c4|_c5|             _c6|           _c7|                 _c8|                 _c9|                _c10|      _c11|   _c12|            _c13|            _c14|                _c15|_c16|_c17|_c18|              _c19|            _c20|            _c21|                _c22|                _c23|       _c25|        _c26|        _c27|                _c28|            _c29|_c30|                _c31|    

### View summary of the dataframe

In [10]:
df_train.describe().toPandas().transpose()

                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
_c0,199523,34.494198663813194,22.310895206650255,0,90
_c1,199523,,,Federal government,Without pay
_c2,199523,15.352320283877047,18.067128797857993,0.0,51.0
_c3,199523,11.306556136385279,14.454203916353702,0.0,46.0
_c4,199523,,,10th grade,Some college but no degree
_c5,199523,55.426908175999756,274.8964539028428,0.0,9999.0
_c6,199523,,,College or university,Not in universe
_c7,199523,,,Divorced,Widowed
_c8,199523,,,Agriculture,Wholesale trade


### Check for missing values
The dataset contains missing values according to UCI's archive which are encoded as '?'. Lets check which columns contain these values and how many rows of each.

In [11]:
from pyspark.sql.functions import col,isnan, when, count

df_train.select([
    count(
        when(
            isnan(c) 
            | col(c).isNull() 
            | col(c).contains("?"), 
            c)
    ).alias(c) for c in df_train.columns
]).show()

df_test.select([
    count(
        when(
            isnan(c) 
            | col(c).isNull() 
            | col(c).contains("?"), 
            c)
    ).alias(c) for c in df_test.columns
]).show()

                                                                                

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23| _c25| _c26| _c27|_c28| _c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0| 708|   0|   0|99696|99696|99696|   0|99696|   0|   0|6713|6119|3393|   0|   0|   0|   0|   0|   0|   0|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+-



+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23| _c25| _c26| _c27|_c28| _c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0| 330|   0|   0|49946|49946|49946|   0|49946|   0|   0|3429|3072|1764|   0|   0|   0|   0|   0|   0|   0|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+-----+----+----+----+----+----+----+----+----+----+----+----+-

                                                                                

We see that both dataframes contain missing values in column _c25, _c26, _c27, _c29, _c32, _c33 and _c34. We'll replace these values with `null` for both dataframes.

In [12]:
df_train = df_train.select([when(col(c).contains("?"), None).otherwise(col(c)).alias(c) for c in df_train.columns])
df_test = df_test.select([when(col(c).contains("?"), None).otherwise(col(c)).alias(c) for c in df_test.columns])

df_train.show(5)

+---+--------------------+---+----+--------------------+---+----------------+--------------+--------------------+--------------------+--------------------+----------+-------+----------------+----------------+--------------------+----+----+----+------------------+----------------+----------------+--------------------+--------------------+-----------+------------+------------+--------------------+----------------+----+--------------------+--------------+--------------+--------------+--------------------+----+----------------+----+----+----+---------+
|_c0|                 _c1|_c2| _c3|                 _c4|_c5|             _c6|           _c7|                 _c8|                 _c9|                _c10|      _c11|   _c12|            _c13|            _c14|                _c15|_c16|_c17|_c18|              _c19|            _c20|            _c21|                _c22|                _c23|       _c25|        _c26|        _c27|                _c28|            _c29|_c30|                _c31|    

### Class distribution
Lets see the distribution of classes:

In [13]:
df_train.groupBy("_c41").count().show()

[Stage 22:>                                                         (0 + 4) / 4]

+---------+------+
|     _c41| count|
+---------+------+
| - 50000.|187141|
|  50000+.| 12382|
+---------+------+



                                                                                

We can see that the class distribution is imbalanced, with the majority class being <50K.

## Data preparation and feature engineering

#### Class balancing
Class imbalance with the majority of records in the <50K class was observed. To improve accuracy and generalizability, a portion of the underrepresented class will be sampled to balance the number of records between classes.

In [14]:
df_train.groupBy("_c41").count().show()

+---------+------+
|     _c41| count|
+---------+------+
| - 50000.|187141|
|  50000+.| 12382|
+---------+------+



Divide the training set into two dataframes based on the class column (_c41) value

In [15]:
below_50k = df_train.filter("_c41 == ' - 50000.'")
above_50k = df_train.filter("_c41 == ' 50000+.'")

below_50k.count(), above_50k.count()

                                                                                

(187141, 12382)

In [16]:
# Count the number of records in the above_50K dataframe
record_count = above_50k.count()

# Sample the same amount of records from the below_50K dataframe
sampled_below_50k = below_50k.sample(False, 1.0 * record_count / below_50k.count())

sampled_below_50k.count()

                                                                                

12255

In [17]:
# Union the above_50K and sampled_below_50K dataframes to create a balanced dataset
balanced_dataset = above_50k.union(sampled_below_50k)

In [18]:
balanced_dataset.groupBy("_c41").count().show()



+---------+-----+
|     _c41|count|
+---------+-----+
|  50000+.|12382|
| - 50000.|12255|
+---------+-----+



                                                                                

In [19]:
balance = True

if balance: 
    print("Using balanced dataset")
    df_train = balanced_dataset

Using balanced dataset


### Encode categorical columns
Categorical columns are converted and added as new columns to the dataframe with the prefix 's'.

In [20]:
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, ChiSqSelector
from pyspark.ml import Pipeline


dtypes = df_train.dtypes
dtypes = dtypes[0:-1]
string_cols = [c for (c,t) in dtypes if t == "string"]
numerical_cols = [c for (c,t) in dtypes if t != "string"]

indexers = [StringIndexer(inputCol=col, outputCol="s"+col).setHandleInvalid("skip") for col in string_cols]

In [21]:
cols = df_train.columns
feature_cols = cols[:-1]  # Use all features except the label
converted_cols = ["s"+col for col in string_cols]
feature_cols, converted_cols

(['_c0',
  '_c1',
  '_c2',
  '_c3',
  '_c4',
  '_c5',
  '_c6',
  '_c7',
  '_c8',
  '_c9',
  '_c10',
  '_c11',
  '_c12',
  '_c13',
  '_c14',
  '_c15',
  '_c16',
  '_c17',
  '_c18',
  '_c19',
  '_c20',
  '_c21',
  '_c22',
  '_c23',
  '_c25',
  '_c26',
  '_c27',
  '_c28',
  '_c29',
  '_c30',
  '_c31',
  '_c32',
  '_c33',
  '_c34',
  '_c35',
  '_c36',
  '_c37',
  '_c38',
  '_c39',
  '_c40'],
 ['s_c1',
  's_c4',
  's_c6',
  's_c7',
  's_c8',
  's_c9',
  's_c10',
  's_c11',
  's_c12',
  's_c13',
  's_c14',
  's_c15',
  's_c19',
  's_c20',
  's_c21',
  's_c22',
  's_c23',
  's_c25',
  's_c26',
  's_c27',
  's_c28',
  's_c29',
  's_c31',
  's_c32',
  's_c33',
  's_c34',
  's_c35',
  's_c37'])

### Prepare the data for spark ML

In [22]:
assembler = VectorAssembler(
    inputCols = converted_cols + numerical_cols,  # Use both categorical and numerical features
    outputCol="features")

### Encode the label

In [23]:
labelIndexer = StringIndexer(inputCol="_c41", outputCol="label")  # Income column

### Feature selection

In [24]:
# Feature selection did not improve accuracy and is excluded from the tests
selector = ChiSqSelector(numTopFeatures=5, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="label")

## Classification
In this section we train and evaluate multiple machine learning algorithms for comparison.

In [25]:
def fit_and_predict(model):
    pipeline = Pipeline(stages=indexers + [assembler, labelIndexer, model])
    model = pipeline.fit(df_train)
    predictions = model.transform(df_test)
    return predictions

### Preview predictions with random forest

In [26]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label', 
                            featuresCol='features',
                            maxDepth=5,
                            numTrees=10, 
                            maxBins=64)

predictions = fit_and_predict(model=rf)

                                                                                

In [27]:
predictions.select("prediction", "label", "rawPrediction", "probability", "features").show(5)

+----------+-----+--------------------+--------------------+--------------------+
|prediction|label|       rawPrediction|         probability|            features|
+----------+-----+--------------------+--------------------+--------------------+
|       1.0|  0.0|[2.80836683235338...|[0.28083668323533...|(40,[1,3,4,5,8,12...|
|       0.0|  0.0|[9.87407855759461...|[0.98740785575946...|(40,[0,1,3,12,15,...|
|       0.0|  0.0|[9.87407855759461...|[0.98740785575946...|(40,[0,1,3,7,8,12...|
|       1.0|  0.0|[1.32231686081115...|[0.13223168608111...|(40,[1,4,5,6,28,2...|
|       0.0|  0.0|[9.87407855759461...|[0.98740785575946...|(40,[0,1,3,8,12,1...|
+----------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics


metrics = [
    "accuracy",
    "weightedPrecision",
    "weightedRecall",
    "f1",
    "areaUnderROC",
]


def evaluate(predictions):
    results = {}
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    results[metrics[-1]] = evaluator.evaluate(predictions)
    
    evaluator = MulticlassClassificationEvaluator(labelCol="label")
    
    for metric in metrics[:-1]: 
        results[metric] = evaluator.evaluate(predictions, {evaluator.metricName: metric })
    
    predictions.crosstab("prediction", "label").select('prediction_label', '`1.0`', '`0.0`', ).show(30, False)
    print(results)
    
    return results

In [29]:
evaluate(predictions)

                                                                                

+----------------+----+-----+
|prediction_label|1.0 |0.0  |
+----------------+----+-----+
|1.0             |2407|8623 |
|0.0             |276 |36073|
+----------------+----+-----+

{'areaUnderROC': 0.9241005172742401, 'accuracy': 0.8121741699909243, 'weightedPrecision': 0.9485660977281498, 'weightedRecall': 0.8121741699909243, 'f1': 0.8596659608547074}


{'areaUnderROC': 0.9241005172742401,
 'accuracy': 0.8121741699909243,
 'weightedPrecision': 0.9485660977281498,
 'weightedRecall': 0.8121741699909243,
 'f1': 0.8596659608547074}

## Tests 

In [30]:
def predict_and_evaluate(model):
    predictions = fit_and_predict(model)
    return evaluate(predictions)

In [31]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes

logreg = LogisticRegression(featuresCol="features", labelCol="label")
dt = DecisionTreeClassifier(labelCol="label", 
                            featuresCol="features",
                            maxDepth=5,
                            maxBins=64)
rf = RandomForestClassifier(labelCol='label', 
                            featuresCol='features',
                            maxDepth=5,
                            numTrees=10, 
                            maxBins=64)
gbt = GBTClassifier(labelCol="label", 
                    featuresCol="features", 
                    maxIter=10, 
                    maxBins=64)
# nb = NaiveBayes(smoothing=1.0, modelType="bernoulli")

classifiers = {
    "Logistic Regression": logreg,
    "Decision Tree": dt,
    "Random Forest": rf,
    "Gradient-boosted tree": gbt,
#     "Naïve Bayes": nb
}

In [32]:
stats = {}

for name, classifier in classifiers.items(): 
    print(f"\nClassifier: {name}")
    history = predict_and_evaluate(model=classifier)
    stats[name] = history


Classifier: Logistic Regression


                                                                                

+----------------+----+-----+
|prediction_label|1.0 |0.0  |
+----------------+----+-----+
|1.0             |2349|7591 |
|0.0             |334 |37105|
+----------------+----+-----+

{'areaUnderROC': 0.9316686984207586, 'accuracy': 0.8327318010088858, 'weightedPrecision': 0.9483378702307008, 'weightedRecall': 0.8327318010088858, 'f1': 0.873423839697516}

Classifier: Decision Tree


                                                                                

+----------------+----+-----+
|prediction_label|1.0 |0.0  |
+----------------+----+-----+
|1.0             |2448|10056|
|0.0             |235 |34640|
+----------------+----+-----+

{'areaUnderROC': 0.8638368032426589, 'accuracy': 0.7827940648810654, 'weightedPrecision': 0.9481013378443416, 'weightedRecall': 0.7827940648810654, 'f1': 0.839620256206106}

Classifier: Random Forest


                                                                                

+----------------+----+-----+
|prediction_label|1.0 |0.0  |
+----------------+----+-----+
|1.0             |2407|8623 |
|0.0             |276 |36073|
+----------------+----+-----+

{'areaUnderROC': 0.9241030439720129, 'accuracy': 0.8121741699909243, 'weightedPrecision': 0.9485660977281498, 'weightedRecall': 0.8121741699909243, 'f1': 0.8596659608547074}

Classifier: Gradient-boosted tree




+----------------+----+-----+
|prediction_label|1.0 |0.0  |
+----------------+----+-----+
|1.0             |2376|7316 |
|0.0             |307 |37380|
+----------------+----+-----+

{'areaUnderROC': 0.9375447467334884, 'accuracy': 0.8391059330082948, 'weightedPrecision': 0.949569293073513, 'weightedRecall': 0.8391059330082948, 'f1': 0.877825541233962}


                                                                                

In [33]:
from prettytable import PrettyTable

def print_stats(stats):
    table = PrettyTable()
    table.field_names = ["Classifier", *metrics]

    for classifier, _metrics in stats.items():            
        values = [f"{(_metrics[metric]*100):.2f} %" for metric in metrics]
        table.add_row([classifier, *values])

    print(table)

In [34]:
print_stats(stats)

+-----------------------+----------+-------------------+----------------+---------+--------------+
|       Classifier      | accuracy | weightedPrecision | weightedRecall |    f1   | areaUnderROC |
+-----------------------+----------+-------------------+----------------+---------+--------------+
|  Logistic Regression  | 83.27 %  |      94.83 %      |    83.27 %     | 87.34 % |   93.17 %    |
|     Decision Tree     | 78.28 %  |      94.81 %      |    78.28 %     | 83.96 % |   86.38 %    |
|     Random Forest     | 81.22 %  |      94.86 %      |    81.22 %     | 85.97 % |   92.41 %    |
| Gradient-boosted tree | 83.91 %  |      94.96 %      |    83.91 %     | 87.78 % |   93.75 %    |
+-----------------------+----------+-------------------+----------------+---------+--------------+
