In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.Builder().appName('adult').getOrCreate()

[Dataset](http://archive.ics.uci.edu/ml/datasets/Adult)


In [3]:
from pyspark.sql.types import StructType, StructField, DecimalType, StringType, IntegerType

In [4]:
data_schema = StructType([
    StructField('age', IntegerType()),
    StructField('workclass', StringType()),
    StructField('fnlwgt', IntegerType()),
    StructField('education', StringType()),
    StructField('education-num', IntegerType()),
    StructField('marital-status', StringType()),
    StructField('occupation', StringType()),
    StructField('relationship', StringType()),
    StructField('race', StringType()),
    StructField('sex', StringType()),
    StructField('capital-gain', IntegerType()),
    StructField('capital-loss', IntegerType()),
    StructField('hours-per-week', IntegerType()),
    StructField('native-country', StringType()),
    StructField('labels', StringType())
])

In [43]:
df = spark.read.load('hdfs://localhost/user/halamix2/input/adult.data', format='csv', sep=', ', header=False, schema=data_schema)

In [13]:
data = df.replace('?', None) # replace all strings '?' with null value

In [14]:
# data = data.na.drop() #remove all lines containing at least one null value
data = data.na.fill('unknown')

In [15]:
print(df.count())
print(data.count())

32561
32561


In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [17]:
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = false)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = false)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = false)
 |-- occupation: string (nullable = false)
 |-- relationship: string (nullable = false)
 |-- race: string (nullable = false)
 |-- sex: string (nullable = false)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = false)
 |-- labels: string (nullable = false)



In [18]:
workclass_indexer = StringIndexer(inputCol='workclass', outputCol='workclass_index')
workclass_encoder = OneHotEncoder(inputCol='workclass_index', outputCol='workclass_encoded')

education_indexer = StringIndexer(inputCol='education', outputCol='education_index')
education_encoder = OneHotEncoder(inputCol='education_index', outputCol='education_encoded')

marital_status_indexer = StringIndexer(inputCol='marital-status', outputCol='marital-status_index')
marital_status_encoder = OneHotEncoder(inputCol='marital-status_index', outputCol='marital-status_encoded')

occupation_indexer = StringIndexer(inputCol='occupation', outputCol='occupation_index')
occupation_encoder = OneHotEncoder(inputCol='occupation_index', outputCol='occupation_encoded')

relationship_indexer = StringIndexer(inputCol='relationship', outputCol='relationship_index')
relationship_encoder = OneHotEncoder(inputCol='relationship_index', outputCol='relationship_encoded')

race_indexer = StringIndexer(inputCol='race', outputCol='race_index')
race_encoder = OneHotEncoder(inputCol='race_index', outputCol='race_encoded')

sex_indexer = StringIndexer(inputCol='sex', outputCol='sex_index') #only 2  possibilities

native_country_indexer = StringIndexer(inputCol='native-country', outputCol='native-country_index')
native_country_encoder = OneHotEncoder(inputCol='native-country_index', outputCol='native-country_encoded')

labels_indexer = StringIndexer(inputCol='labels', outputCol='labels_index') # only 2 possibilities

In [19]:
from pyspark.ml.feature import VectorAssembler

In [20]:
assembler = VectorAssembler(inputCols=['age',
 'workclass_encoded',
 'fnlwgt',
 'education_encoded',
 'education-num',
 'marital-status_encoded',
 'occupation_index',
 'relationship_encoded',
 'race_encoded',
 'sex_index',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country_encoded',
 'labels_index'], outputCol='features')

In [21]:
from pyspark.ml import Pipeline

In [22]:
pipeline = Pipeline(stages=[workclass_indexer,
                            workclass_encoder,
                            education_indexer,
                            education_encoder,
                            marital_status_indexer,
                            marital_status_encoder,
                            occupation_indexer,
                            occupation_encoder,
                            relationship_indexer,
                            relationship_encoder,
                            race_indexer,
                            race_encoder,
                            sex_indexer,
                            native_country_indexer,
                            native_country_encoder,
                            labels_indexer,
                            assembler
                            ])

In [23]:
prepared_data_pipeline = pipeline.fit(data)

In [24]:
prepared_data = prepared_data_pipeline.transform(data)

In [25]:
prepared_data = prepared_data.select(['features', 'labels_index'])

In [26]:
prepared_data.show(3, truncate=False)

+---------------------------------------------------------------------------------------------------+------------+
|features                                                                                           |labels_index|
+---------------------------------------------------------------------------------------------------+------------+
|(88,[0,5,9,12,25,27,32,34,38,43,45,46],[39.0,1.0,77516.0,1.0,13.0,1.0,3.0,1.0,1.0,2174.0,40.0,1.0])|0.0         |
|(88,[0,2,9,12,25,26,32,33,38,45,46],[50.0,1.0,83311.0,1.0,13.0,1.0,2.0,1.0,1.0,13.0,1.0])          |0.0         |
|(88,[0,1,9,10,25,28,32,34,38,45,46],[38.0,1.0,215646.0,1.0,9.0,1.0,9.0,1.0,1.0,40.0,1.0])          |0.0         |
+---------------------------------------------------------------------------------------------------+------------+
only showing top 3 rows



In [27]:
train_data, test_data = prepared_data.randomSplit([0.7, 0.3])

In [28]:
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier

In [29]:
rf_classifier = RandomForestClassifier(featuresCol='features', labelCol='labels_index', numTrees=15)
# dc_classifier = DecisionTreeClassifier(featuresCol='features', labelCol='labels_index')

In [30]:
model = rf_classifier.fit(train_data)
# model = dc_classifier.fit(train_data)

In [31]:
prediction = model.transform(test_data)

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [33]:
mc_evaluator = MulticlassClassificationEvaluator(
    predictionCol='prediction',
    labelCol='labels_index',
    metricName="accuracy")

In [34]:
results = mc_evaluator.evaluate(prediction)

In [35]:
results

0.9905516610789393

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [37]:
bc_evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='labels_index', metricName='areaUnderROC')

In [38]:
bc_results = bc_evaluator.evaluate(prediction)

In [39]:
print(bc_results)

0.9998481862492663
