# Introduction
In this small project, **classification** using **big data** tool (e. g., **pyspark**) is deployed. The data for exploration and classification is derived from the very popular source. [cencus income dataset from UCI machine learning](https://archive.ics.uci.edu/ml/machine-learning-databases/adult/)
- The google colab is used to implement to classification. Then, in the first part, the pyspark with **hadoop** is setup on the google cloud.
- In the second part, data is reading using pyspark as a dataframe that is then used to explore features and analyse by using **logistic classification**. There are two parts of data, the train data and test data that are consistent and representative. 
- Data obtained in the previous step will explore to see how relevant between features. 
- Data indeed are either continuous or categorical. Then, in order to be used as input of classifier, those data need to be converted into numerical so that the classifier can understand. **StringIndexer** and **OneHotEncoderEstimator** are used to convert **nominal categorical features** into the understandable format.
- The **Pipeline process** has been deploy to reduce the manually repeated the same process over time. Indeed, there are many categorical features need to be indexed and converted to numerical format, and **VectorAssembler** is used to combine all necessary features as a single feature to be inputted into classifier.
- Next step, The **logistic classification** is utilized to sucessfully classify observations from dataset into correct class. In this small project, there are only two classes, income higher than 50K and less than 50K. The classifier is set so that its hyperparameter **regularization parameter** is equal to 0.2, this a a random value to set and is tuned later to get the optimal value for this hyper parameter. 
- In addition, evaluation step is used to confirm the effectiveness of our classification about the data. Because two classes from the train data is **balance class** with the ratio (40% and 60% for each class), then **accuracy** metric is chosen to evaluate the classification performance. And the logistic classification gave **accuracy = 81%** as compared to the optimal **accucary = 85%** from the author of this dataset. see in the file 'data/adult.names'. Furthermore, the ROC is used to compute the are under the curve, with area 89% very closed to 100%.
- Last step, in order to tune the hyperparameter, the **GridSeaerch** combined with **crossvalidator** are used to find the best hyperparameter for regularization. And the best value is obtained with regularization hyperparameter is 0.01 with accuracy is closed to 84%.        
 
# 1. Set-up google colab for using pyspark 

In [0]:
'''
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

from google.colab import drive
drive.mount('/content/gdrive')

!cp '/content/gdrive/My Drive/pyspark/spark-2.4.5-bin-hadoop2.7.tgz' .

!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
'''
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


# 2. Load data and some processes to clean data

In [0]:
# setup spark to read file either sparkcontext or sqlcontext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, FloatType, StringType
from pyspark.sql.functions import col

conf = SparkConf().setAppName('spark_tutorial')
sc = SparkContext.getOrCreate(conf = conf)
sqlcontext = SQLContext(sc)

# reading data 'adult.data'
url = '/content/gdrive/My Drive/pyspark/adult.data'
df = sqlcontext.read.csv(url, inferSchema=True, header = False)
df.show(2)

# change column names
col_new = (['age', 'workclass', 'fnlwgt', 'education', 'education_num'
           ,'marital_status', 'occupation', 'relationship', 'race', 'sex'
           ,'capital_gain', 'capital_loss', 'hour_per_week', 'country', 'label']) 
col_old = df.columns
rename = dict(zip(col_old, col_new))
df =  df.select([col(c).alias(rename.get(c)) for c in col_old])

# show some short results
df.show(2)
df.printSchema()

+---+-----------------+-------+----------+----+-------------------+----------------+--------------+------+-----+------+----+----+--------------+------+
|_c0|              _c1|    _c2|       _c3| _c4|                _c5|             _c6|           _c7|   _c8|  _c9|  _c10|_c11|_c12|          _c13|  _c14|
+---+-----------------+-------+----------+----+-------------------+----------------+--------------+------+-----+------+----+----+--------------+------+
| 39|        State-gov|77516.0| Bachelors|13.0|      Never-married|    Adm-clerical| Not-in-family| White| Male|2174.0| 0.0|40.0| United-States| <=50K|
| 50| Self-emp-not-inc|83311.0| Bachelors|13.0| Married-civ-spouse| Exec-managerial|       Husband| White| Male|   0.0| 0.0|13.0| United-States| <=50K|
+---+-----------------+-------+----------+----+-------------------+----------------+--------------+------+-----+------+----+----+--------------+------+
only showing top 2 rows

+---+-----------------+-------+----------+-------------+-------

# 3. Exploration the data and some analytics

In [0]:
# attributes (columns) are divided into two groups:
# -> Continuous attributes
# -> Categorical attributes
con_attributes = ['age', 'fnlwgt', 'education_num','capital_gain', 'capital_loss', 'hour_per_week']
cat_attributes = [attribute for attribute in col_new if attribute not in con_attributes]
print('categorical: ', cat_attributes)
print('continuous: ', con_attributes) 

categorical:  ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'country', 'label']
continuous:  ['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hour_per_week']


In [0]:
# we can try to see how relevant between the age and label (incom)
# How young or old earned more in a month 
# using crosstab() 
income_label = (df.crosstab('age', 'label')
                  .orderBy('age_label'))
income_label.show(10)

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|       17|   395|    0|
|       18|   550|    0|
|       19|   710|    2|
|       20|   753|    0|
|       21|   717|    3|
|       22|   752|   13|
|       23|   865|   12|
|       24|   767|   31|
|       25|   788|   53|
|       26|   722|   63|
+---------+------+-----+
only showing top 10 rows



In [0]:
# How education and earning related
education_label = (df.crosstab('education','label')
                     .orderBy('education_label'))
education_label.show(10)

+---------------+------+-----+
|education_label| <=50K| >50K|
+---------------+------+-----+
|           10th|   871|   62|
|           11th|  1115|   60|
|           12th|   400|   33|
|        1st-4th|   162|    6|
|        5th-6th|   317|   16|
|        7th-8th|   606|   40|
|            9th|   487|   27|
|     Assoc-acdm|   802|  265|
|      Assoc-voc|  1021|  361|
|      Bachelors|  3134| 2221|
+---------------+------+-----+
only showing top 10 rows



In [0]:
# how marital status and their earning (capital gain) related
# using groupBy() and mean() to calculate for each group
marital_gain = (df.groupBy('marital_status')
                  .agg({'capital_gain':'mean'})
                  .sort('avg(capital_gain)', ascending = False))
marital_gain.show() 

+--------------------+------------------+
|      marital_status| avg(capital_gain)|
+--------------------+------------------+
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
| Married-spouse-a...| 653.9832535885167|
|             Widowed| 571.0715005035247|
|           Separated| 535.5687804878049|
|   Married-AF-spouse| 432.6521739130435|
|       Never-married|376.58831788823363|
+--------------------+------------------+



In [0]:
# count the number of countries in this data
# remove the those countries with low records
country_count = (df.groupBy('country')
                   .count()
                   .sort('count', ascending = True))
country_count.show()
print('number of training before remove: ', df.count())

#df.filter(df.country != ' Holand-Netherlands').count()
df = df.filter(df.country != ' Holand-Netherlands')
country_count = (df.groupBy('country')
                   .count()
                   .sort('count', ascending = True))
country_count.show()
print('number of training after remove: ', df.count())

+--------------------+-----+
|             country|count|
+--------------------+-----+
|  Holand-Netherlands|    1|
|            Scotland|   12|
|             Hungary|   13|
|            Honduras|   13|
| Outlying-US(Guam...|   14|
|          Yugoslavia|   16|
|                Laos|   18|
|            Thailand|   18|
|            Cambodia|   19|
|     Trinadad&Tobago|   19|
|                Hong|   20|
|             Ireland|   24|
|             Ecuador|   28|
|              Greece|   29|
|              France|   29|
|                Peru|   31|
|           Nicaragua|   34|
|            Portugal|   37|
|                Iran|   43|
|               Haiti|   44|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|             country|count|
+--------------------+-----+
|            Scotland|   12|
|            Honduras|   13|
|             Hungary|   13|
| Outlying-US(Guam...|   14|
|          Yugoslavia|   16|
|                Laos|   18|
|            Thai

32560

# 4. Convert categorical attribute to numerical attribute


In [0]:
# using onehot encoder to convert categorical to numerical
# however, StringIndexer should be used to index the categorical
# create a copy of df as df_
df_ = df
df_.show(2)

# converting 
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

stringindexer_ = StringIndexer(inputCol='workclass', outputCol='workclass_index').fit(df_).transform(df_)
onehot_ = OneHotEncoder(dropLast=False, inputCol='workclass_index', outputCol='onehot').transform(stringindexer_)
onehot_.show(2)

# show the total number of workclass elements
onehot_.select('workclass').distinct().show()

+---+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+-------------+--------------+------+
|age|        workclass| fnlwgt| education|education_num|     marital_status|      occupation|  relationship|  race|  sex|capital_gain|capital_loss|hour_per_week|       country| label|
+---+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+-------------+--------------+------+
| 39|        State-gov|77516.0| Bachelors|         13.0|      Never-married|    Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|         40.0| United-States| <=50K|
| 50| Self-emp-not-inc|83311.0| Bachelors|         13.0| Married-civ-spouse| Exec-managerial|       Husband| White| Male|         0.0|         0.0|         13.0| United-States| <=50K|
+---+-----------------+-------+----------+-------------+-------------------+----

# 5. Pipeline the process
The pipeline process is used to go through:
StringIndexer and OnehotEncoder multiple features, and then VectorAssembler to combine all necessary features. 

In [0]:
# As we have many categorical columns,
# it will take time to do manualy, then pipeline 
# is the perfect solution
df.show(2)

# pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler

stages = []
# string indexer and onehot encoder
for column in cat_attributes:
  stringindexer = StringIndexer(inputCol=column, outputCol=column + 'index')
  onehot = (OneHotEncoderEstimator( inputCols=[stringindexer.getOutputCol()] 
                                   ,outputCols=[stringindexer.getOutputCol() + 'onehot']))
  stages += [stringindexer, onehot]

# prepare columns as total attributes
columns = (con_attributes + [coln + 'index' + 'onehot' 
                             for coln in cat_attributes 
                             if coln not in ['label']]) 

# using assembler 
vectassembler = VectorAssembler(inputCols=columns, outputCol='features')
stages += [vectassembler]

# pipeline all stages
pipeline = Pipeline(stages = stages)
pipelinemodel = pipeline.fit(df)
model = pipelinemodel.transform(df)

model.show(2)

+---+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+-------------+--------------+------+
|age|        workclass| fnlwgt| education|education_num|     marital_status|      occupation|  relationship|  race|  sex|capital_gain|capital_loss|hour_per_week|       country| label|
+---+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+-------------+--------------+------+
| 39|        State-gov|77516.0| Bachelors|         13.0|      Never-married|    Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|         40.0| United-States| <=50K|
| 50| Self-emp-not-inc|83311.0| Bachelors|         13.0| Married-civ-spouse| Exec-managerial|       Husband| White| Male|         0.0|         0.0|         13.0| United-States| <=50K|
+---+-----------------+-------+----------+-------------+-------------------+----

In [0]:
# choose the labelindex as target
# pick the label index and target as 
# inputs to classification model
from pyspark.ml.linalg import DenseVector

df_train = (model.rdd
            .map(lambda x: (x.labelindex, DenseVector(x.features)))
            .toDF(['label','features']))
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[39.0,77516.0,13....|
|  0.0|[50.0,83311.0,13....|
+-----+--------------------+
only showing top 2 rows



In [0]:
# check the balance of the class in the df_train
df_train.groupBy('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|24719|
|  1.0| 7841|
+-----+-----+



# 6. Train the df_train using logistic 
### 6.1 train model logistic regression on df_train

In [0]:
# prepare the model
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='label', maxIter=20, regParam=0.2)
model = lr.fit(df_train)

### 6.2 preparation data test (df_test)
Reading from the local file

In [0]:
# prepare the df_test to check the model
# reading data 'adult.data'
url = '/content/gdrive/My Drive/pyspark/adult.test'
df_te = sqlcontext.read.csv(url, inferSchema=True, header = False)
col_new_test = (['age', 'workclass', 'fnlwgt', 'education', 'education_num'
           ,'marital_status', 'occupation', 'relationship', 'race', 'sex'
           ,'capital_gain', 'capital_loss', 'hour_per_week', 'country', 'label']) 
col_old_test = df_te.columns
rename = dict(zip(col_old_test, col_new_test))
df_te =  df_te.select([col(c).alias(rename.get(c)) for c in col_old_test])
df_te.show(2)

# convert categorical to numerical 
stages_te = []
# string indexer and onehot encoder
for column in cat_attributes:
  stringindexer_te = StringIndexer(inputCol=column, outputCol=column + 'index')
  onehot_te = (OneHotEncoderEstimator( inputCols=[stringindexer_te.getOutputCol()] 
                                   ,outputCols=[stringindexer_te.getOutputCol() + 'onehot']))
  stages_te += [stringindexer_te, onehot_te]

# prepare columns as total attributes
columns_te = (con_attributes + [coln + 'index' + 'onehot' 
                             for coln in cat_attributes 
                             if coln not in ['label']]) 

# using assembler 
vectassembler_te = VectorAssembler(inputCols=columns_te, outputCol='features')
stages_te += [vectassembler_te]

# pipeline all stages
pipeline_te = Pipeline(stages = stages)
pipelinemodel_te = pipeline_te.fit(df_te)
model_te = pipelinemodel_te.transform(df_te)
model_te.show(2)

# convert to dataframe test 
df_test = (model_te.rdd
                   .map(lambda x: (x.labelindex, DenseVector(x.features)))
                   .toDF(['label','features']))
df_test.show(2)
# check the balance of the class in df_test
df_train.groupBy('label').count().show()

+---+---------+--------+---------+-------------+-------------------+------------------+------------+------+-----+------------+------------+-------------+--------------+-------+
|age|workclass|  fnlwgt|education|education_num|     marital_status|        occupation|relationship|  race|  sex|capital_gain|capital_loss|hour_per_week|       country|  label|
+---+---------+--------+---------+-------------+-------------------+------------------+------------+------+-----+------------+------------+-------------+--------------+-------+
| 25|  Private|226802.0|     11th|          7.0|      Never-married| Machine-op-inspct|   Own-child| Black| Male|         0.0|         0.0|         40.0| United-States| <=50K.|
| 38|  Private| 89814.0|  HS-grad|          9.0| Married-civ-spouse|   Farming-fishing|     Husband| White| Male|         0.0|         0.0|         50.0| United-States| <=50K.|
+---+---------+--------+---------+-------------+-------------------+------------------+------------+------+-----+--

### 6.3 evaluation the model on the test data (df_test)
#### --> metrics used is accuracy

In [0]:
# use model to predict on the df_test
predictions = model.transform(df_test)
predictions.show(5)
# accuracy is computed by compare all 
# the correctly classified data between 
# the actual data and predictions data
accuracy = predictions.filter(predictions.label == predictions.prediction).count()/predictions.count()
accuracy

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[25.0,226802.0,7....|[3.20986998856715...|[0.96120401759308...|       0.0|
|  0.0|[38.0,89814.0,9.0...|[0.61505681392989...|[0.64909346492769...|       0.0|
|  1.0|[28.0,336951.0,12...|[0.42485421330410...|[0.60464423499671...|       0.0|
|  1.0|[44.0,160323.0,10...|[0.85920469520179...|[0.70249446560256...|       0.0|
|  0.0|[18.0,103497.0,10...|[3.14858119337005...|[0.95885278045267...|       0.0|
+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



0.819851360481543

--> metrics used is ROC

In [0]:
# ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction')
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8805142853080808
areaUnderROC


## 7. Tuning the model 
#### --> Change the parameter regularization

In [0]:
# tune the model
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# set param ranges 
# regParam: regularization parameters is [0.01, 0.4]
paramGrid = (ParamGridBuilder()
                               .addGrid(lr.regParam, [0.01, 0.4])
                               .build())

# create crossvalidator model
cv = CrossValidator(estimator = lr
                   ,estimatorParamMaps = paramGrid 
                   ,evaluator = evaluator 
                   ,numFolds = 5)

# train and predict for the best model among possible cases
# 5 folds x 2 regparams = 10 cases in total
cvModel =cv.fit(df_train)
predictions = cvModel.transform(df_test)

# compute the accuracy
accuracy_best = predictions.filter(predictions.label == predictions.prediction).count()/predictions.count()
accuracy_best

0.8368650574289048

# Conclusion
Here we can see, change the regularization from 0.2 to 0.01, the accuracy improves a bit from 81% to 84%. 