In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('income_pred').getOrCreate()

In [2]:
donor_df = spark.read.csv('/FileStore/tables/income_census.csv',inferSchema=True,header=True)

In [3]:
donor_df.printSchema()

In [4]:
donor_df.show(5)

In [5]:
donor_df = donor_df.filter(donor_df.income.isNotNull())

In [6]:
donor_df = donor_df.filter(donor_df.income.isNotNull())
donor_df = donor_df.withColumn('income',f.regexp_replace('income','>50K.','>50K')).withColumn('income',f.regexp_replace('income','<=50K.','<=50K'))

In [7]:
### Total number of records in the dataset
total_recs = donor_df.count()
income_abv50 = donor_df.filter(donor_df.income == '>50K').count()
perc_income = (income_abv50/float(total_recs))*100
print('Total number of records in the dataset {}'.format(total_recs))
print('Number of individual records whose income is greater than $50000 : {}'.format(income_abv50))
print('Number of individual records whose income is  less than $50000 : {}'.format(donor_df.filter(donor_df.income == '<=50K').count()))
print('Percentage of individuals making more than $50,000 : {0:.2f}%'.format(perc_income))

In [8]:
### Starting to preprocess the data. Renaming the columns in to usable format.
donor_df = donor_df.withColumnRenamed('education-num','education_num').withColumnRenamed('marital-status','marital_status').withColumnRenamed('capital-gain','capital_gain').withColumnRenamed('capital-loss','capital_loss').withColumnRenamed('hours-per-week','hours_per_week').withColumnRenamed('native-country','native_country')



In [9]:
max_capital_gain = donor_df.agg({"capital_gain": "max"}).collect()[0]
max_capital_gain = max_capital_gain['max(capital_gain)']
min_capital_gain = donor_df.agg({'capital_gain':'min'}).collect()[0]
min_capital_gain = min_capital_gain['min(capital_gain)']
max_capital_loss = donor_df.agg({'capital_loss':'max'}).collect()[0]
max_capital_loss = max_capital_loss['max(capital_loss)']
min_capital_loss = donor_df.agg({'capital_loss':'min'}).collect()[0]
min_capital_loss = min_capital_loss['min(capital_loss)']
print('Max and min values of capital-gain feature {} and {}'.format(max_capital_gain,min_capital_gain))
print('Max and min values of capital-loss feature {} and {}'.format(max_capital_loss,min_capital_loss))


In [10]:
### Applying log transformations on capital_gain and capital_loss features.
import pyspark.sql.functions as f
donor_df= donor_df.withColumn('capital_gain',f.log(donor_df.capital_gain + 1))
donor_df= donor_df.withColumn('capital_loss',f.log(donor_df.capital_loss + 1))

In [11]:
## Applying the data transformation on other numeric features that is age,education_num, hours_per_week, capital_gain, capital_loss

from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder

max_education_num , min_education_num = donor_df.select(f.max('education_num'), f.min('education_num')).first()
max_age, min_age = donor_df.select(f.max("age"), f.min("age")).first()
max_capital_gain, min_capital_gain = donor_df.select(f.max('capital_gain'),f.min('capital_gain')).first()
max_capital_loss , min_capital_loss = donor_df.select(f.max('capital_loss'), f.min('capital_loss')).first()
max_hours_per_week , min_hours_per_week = donor_df.select(f.max('hours_per_week'), f.min('hours_per_week')).first()



In [12]:
donor_df = donor_df.withColumn('age',(donor_df.age - min_age/max_age - min_age))
donor_df = donor_df.withColumn('capital_loss',(donor_df.capital_loss- min_capital_loss/max_capital_loss - min_capital_loss))
donor_df = donor_df.withColumn('capital_gain',(donor_df.capital_gain- min_capital_gain/max_capital_gain - min_capital_gain))
donor_df = donor_df.withColumn('hours_per_week',(donor_df.hours_per_week- min_hours_per_week/max_hours_per_week - min_hours_per_week))
donor_df = donor_df.withColumn('education_num',(donor_df.education_num - min_education_num/max_education_num - min_education_num))

In [13]:
### Done preprocessing numeric data.
donor_df.select(donor_df.income).distinct().show()

In [14]:
indexer = StringIndexer(inputCol = 'income', outputCol = 'income_index')
donor_df= indexer.fit(donor_df).transform(donor_df)

In [15]:
categorical_df = ['workclass','education_level','marital_status','occupation','relationship','race','sex','native_country']
for category in categorical_df:
    indexer = StringIndexer(inputCol = category, outputCol = category+'_index')
    donor_df = indexer.fit(donor_df).transform(donor_df)

In [16]:
for category in categorical_df:
    encoder = OneHotEncoder(inputCol = category+'_index', outputCol = category+'_encoded' )
    donor_df = encoder.transform(donor_df)

In [17]:
donor_df.select('race_encoded','native_country_encoded','education_level_encoded').show()

In [18]:
donor_df.columns

In [19]:
assembler = VectorAssembler(inputCols=['age',
  'education_num',
  'capital_gain',
 'capital_loss',
 'hours_per_week',
  'workclass_encoded',
 'education_level_encoded',
 'marital_status_encoded',
 'occupation_encoded',
 'relationship_encoded',
 'race_encoded',
 'sex_encoded',
 'native_country_encoded'],outputCol='features')
donor_df = assembler.transform(donor_df)

In [20]:
final_donor_df = donor_df.select('features','income_index')

In [21]:
final_donor_df = final_donor_df.withColumnRenamed('income_index','label')

In [22]:
final_donor_train , final_donor_test = final_donor_df.randomSplit([0.7,0.3])

final_donor_train.show(5)

In [23]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, DecisionTreeClassifier,RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [24]:
lrc = LogisticRegression(maxIter=10)  ## regParam=0.3, elasticNetParam=0.8
dtc = DecisionTreeClassifier(labelCol='label',featuresCol='features')
rfc = RandomForestClassifier(labelCol='label',featuresCol='features')  ### numTrees=10
gbt = GBTClassifier(labelCol='label',featuresCol='features',maxIter=10)  

In [25]:
bin_eval = BinaryClassificationEvaluator(labelCol='label')

In [26]:
params = ParamGridBuilder().addGrid(lrc.regParam, [0.1,0.01]).addGrid(lrc.elasticNetParam, [0,1]).addGrid(dtc.maxDepth, [1,5]).addGrid(rfc.numTrees, [1,10]).build()
params_dtc = ParamGridBuilder().addGrid(dtc.maxDepth, [1,5]).build()

In [27]:
### Hyper parameter tuning and model selection
estimators = ['lrc','rfc','dtc','gbt']

for clf in estimators:
    cross_model = CrossValidator(estimator=dtc,estimatorParamMaps=params,evaluator=bin_eval,numFolds=5)
    cv_model = cross_model.fit(final_donor_train)

In [28]:
print('Best estimator is: ',cv_model.getEstimator())
print('Best estimator parameters: ',cv_model.bestModel)


In [29]:
### Making predictions on unseen test data with the best classifier.
test_preds = cv_model.transform(final_donor_test)

In [30]:
test_preds.show()

In [31]:
evaluator = BinaryClassificationEvaluator(labelCol='label')


In [32]:
### As this is an unbalanced dataset, accuracy is not the appropriate measure to evaluate the model. AreaunderROC will give us the proper evaluation.
metrics = evaluator.evaluate(test_preds,{evaluator.metricName:'areaUnderROC'})

In [33]:
print('Area under ROC curve: {0:.2f}'.format(metrics))