In [29]:
!pip install pyspark



In [30]:
import pyspark

In [32]:
from subprocess import check_output


In [34]:
from pyspark.sql import SparkSession


In [35]:

from pyspark.sql.functions import (count, col)


In [36]:

from pyspark.ml import Pipeline


In [39]:

from pyspark.ml.feature import (OneHotEncoder, 
                                StringIndexer, 
                                VectorAssembler)


In [40]:
from pyspark.ml.classification import (LogisticRegression, 
                                       DecisionTreeClassifier, 
                                       RandomForestClassifier)

In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [44]:
sc = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('adult_census')\
    .getOrCreate()

In [45]:
df = sc.read.load('census_data2.csv', 
                  format='com.databricks.spark.csv', 
                  header='true', 
                  inferSchema='true').cache()
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hour_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [46]:
df.toPandas().head(10).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
age,90,82,66,54,41,34,38,74,68,41
workclass,?,Private,?,Private,Private,Private,Private,State-gov,Federal-gov,Private
fnlwgt,77053,132870,186061,140359,264663,216864,150601,88638,422013,70037
education,HS-grad,HS-grad,Some-college,7th-8th,Some-college,HS-grad,10th,Doctorate,HS-grad,Some-college
education_num,9,9,10,4,10,9,6,16,9,10
marital_status,Widowed,Widowed,Widowed,Divorced,Separated,Divorced,Separated,Never-married,Divorced,Never-married
occupation,?,Exec-managerial,?,Machine-op-inspct,Prof-specialty,Other-service,Adm-clerical,Prof-specialty,Prof-specialty,Craft-repair
relationship,Not-in-family,Not-in-family,Unmarried,Unmarried,Own-child,Unmarried,Unmarried,Other-relative,Not-in-family,Unmarried
race,White,White,Black,White,White,White,White,White,White,White
sex,Female,Female,Female,Female,Female,Female,Male,Female,Female,Male


In [47]:
education_num = col("`education_num`")
capital_gain = col("`capital_gain`")
capital_loss = col("`capital_loss`")
hours_per_week = col("`hours_per_week`")
marital_status = col("`marital_status`")
native_country = col("`native_country`")


In [50]:
num_vars = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hour_per_week"]
cat_vars = ["workclass", "education", "marital_status", "occupation", 
            "relationship", "race", "sex", "native_country"]

In [51]:
def countNull(df, var):
    return df.where(df[var].isNull()).count()

all_cols = num_vars + cat_vars
{var: countNull(df, var) for var in all_cols}


{'age': 0,
 'fnlwgt': 0,
 'education_num': 0,
 'capital_gain': 0,
 'capital_loss': 0,
 'hour_per_week': 0,
 'workclass': 0,
 'education': 0,
 'marital_status': 0,
 'occupation': 0,
 'relationship': 0,
 'race': 0,
 'sex': 0,
 'native_country': 0}

In [53]:
stages = []
for cat_var in cat_vars:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(
        inputCol=cat_var, 
        outputCol=cat_var+"_indx")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(
        inputCols=[stringIndexer.getOutputCol()], 
        outputCols=[cat_var + "_vec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [54]:
label_indx = StringIndexer(
    inputCol="income", 
    outputCol="label")
stages += [label_indx]

In [55]:
assembler_inputs = [c+"_vec" for c in cat_vars] + num_vars
assembler = VectorAssembler(
    inputCols=assembler_inputs, 
    outputCol="features")
stages += [assembler]

In [56]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
df = pipeline.fit(df).transform(df)
# Keep relevant columns for later evaluation
selectedcols = ["label", "features"] + all_cols
df = df.select(selectedcols)
df.toPandas().head(15)

Unnamed: 0,label,features,age,fnlwgt,education_num,capital_gain,capital_loss,hour_per_week,workclass,education,marital_status,occupation,relationship,race,sex,native_country
0,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",90,77053,9,0,4356,40,?,HS-grad,Widowed,?,Not-in-family,White,Female,United-States
1,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",82,132870,9,0,4356,18,Private,HS-grad,Widowed,Exec-managerial,Not-in-family,White,Female,United-States
2,0.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",66,186061,10,0,4356,40,?,Some-college,Widowed,?,Unmarried,Black,Female,United-States
3,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",54,140359,4,0,3900,40,Private,7th-8th,Divorced,Machine-op-inspct,Unmarried,White,Female,United-States
4,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",41,264663,10,0,3900,40,Private,Some-college,Separated,Prof-specialty,Own-child,White,Female,United-States
5,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...",34,216864,9,0,3770,45,Private,HS-grad,Divorced,Other-service,Unmarried,White,Female,United-States
6,0.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",38,150601,6,0,3770,40,Private,10th,Separated,Adm-clerical,Unmarried,White,Male,United-States
7,1.0,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...",74,88638,16,0,3683,20,State-gov,Doctorate,Never-married,Prof-specialty,Other-relative,White,Female,United-States
8,0.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, ...",68,422013,9,0,3683,40,Federal-gov,HS-grad,Divorced,Prof-specialty,Not-in-family,White,Female,United-States
9,1.0,"(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",41,70037,10,0,3004,60,Private,Some-college,Never-married,Craft-repair,Unmarried,White,Male,?


In [57]:
(train, test) = df.randomSplit([0.8, 0.2], seed=777)
print(train.count())
print(test.count())

25949
6612


In [58]:
logr = LogisticRegression(
    maxIter = 10,
    regParam = 0.05,
    labelCol="label",
    featuresCol="features"
)
# 2. decision tree model
d_tree = DecisionTreeClassifier(
    maxDepth = 10,
    labelCol = "label",
    featuresCol="features"
)

# 3. random forest model
r_forest = RandomForestClassifier(
    numTrees = 10,
    labelCol = "label",
    featuresCol="features"
)

# fit models
lr_model = logr.fit(train)
dt_model = d_tree.fit(train)
rf_model = r_forest.fit(train)

In [59]:
# model evaluator
def testModel(model, df):
    pred = model.transform(df)
    evaluator = BinaryClassificationEvaluator(labelCol="label")
    return evaluator.evaluate(pred)

# accuracy output
models = {
    "Logistic regression": lr_model,
    "Decision tree": dt_model,
    "Random forest": rf_model
}

# model performance comparisson
{model_name: testModel(model, test) for model_name,model in models.items()}

{'Logistic regression': 0.8917868728349819,
 'Decision tree': 0.7602042371686096,
 'Random forest': 0.8830906651875323}