### I. Classification des Champignons avec Pyspark et MLlib

In [108]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

#### Initialisation de SparkContext et SparkSession

In [110]:
sc = SparkContext()

spark = SparkSession.builder.appName("MushroomClassification").getOrCreate()

### Schema du DataFrame

In [111]:
data_mushrooms = spark.read.format('csv').\
option('inferSchema','true').\
option('header', 'true').\
option('path', 'mushrooms.csv').\
load()

In [75]:
data_mushrooms.printSchema()

root
 |-- class: string (nullable = true)
 |-- cap-shape: string (nullable = true)
 |-- cap-surface: string (nullable = true)
 |-- cap-color: string (nullable = true)
 |-- bruises: string (nullable = true)
 |-- odor: string (nullable = true)
 |-- gill-attachment: string (nullable = true)
 |-- gill-spacing: string (nullable = true)
 |-- gill-size: string (nullable = true)
 |-- gill-color: string (nullable = true)
 |-- stalk-shape: string (nullable = true)
 |-- stalk-root: string (nullable = true)
 |-- stalk-surface-above-ring: string (nullable = true)
 |-- stalk-surface-below-ring: string (nullable = true)
 |-- stalk-color-above-ring: string (nullable = true)
 |-- stalk-color-below-ring: string (nullable = true)
 |-- veil-type: string (nullable = true)
 |-- veil-color: string (nullable = true)
 |-- ring-number: string (nullable = true)
 |-- ring-type: string (nullable = true)
 |-- spore-print-color: string (nullable = true)
 |-- population: string (nullable = true)
 |-- habitat: string 

In [94]:
type(data_mushrooms)

pyspark.sql.dataframe.DataFrame

### Preprocessing des features 

In [112]:
in_cols = data_mushrooms.schema.names[1:]

str_indexers = [StringIndexer(inputCol=c, outputCol=c+'_idx') for c in in_cols]
# a list of StringIndexer objects to convert strings to integer indices
# each indexer is responsible for converting one feature column

onehot_encoders = [OneHotEncoder(dropLast=False, inputCol=c+'_idx', outputCol=c+'_onehot') for c in in_cols]
# a list of OneHotEncoder objects to convert integer indices of cat levels to one-hot encoded columns
# each encoder is responsible fore encoding one feature column

onehot_cols = [c+'_onehot' for c in in_cols]

feat_assembler = VectorAssembler(inputCols=onehot_cols, outputCol='features')
# a VectorAssembler object that assembles all the one-hot encoded columns into one column,
# each row of which is a vector of all the numbers in those one-hot columns.
# e.g.
# +-----+-----+-----+-----+---------------------+
# |cat_0|cat_1|cat_2|cat_3|             features|
# +-----+-----+-----+-----+---------------------+
# |    1|    0|    0|    0| [1.0, 0.0, 0.0, 0.0]|
# |    0|    1|    0|    0| [0.0, 1.0, 0.0, 0.0]|
# |    0|    0|    0|    1| [0.0, 0.0, 0.0, 1.0]|
# +-----+-----+-----+-----+---------------------+

label_indexer = StringIndexer(inputCol = data_mushrooms.schema.names[0], outputCol='poisonous')
# a StringIndexer object that converts <class> column's {e, p} to {0, 1}
# Because there are more 'e' class in the sample, it will be encoded 0, since StringIndexer gives more frequent levels a lower index
# Run `mushrooms.groupby('class').count().show()` in pyspark shell to see counts of each class

pipeline = Pipeline(stages=str_indexers+onehot_encoders+[feat_assembler, label_indexer])
# A Pipeline object that combines all the transformations we defined above.

# Use the pipeline object to transform our dataframe
data_mushrooms_trans = pipeline \
                    .fit(data_mushrooms) \
                    .transform(data_mushrooms) \
                    .cache()

In [113]:
 data_mushrooms.groupby('class').count().show()

+-----+-----+
|class|count|
+-----+-----+
|    e| 4208|
|    p| 3916|
+-----+-----+



### Split du dataset en train et test

In [114]:
mushrooms_train, mushrooms_val = data_mushrooms_trans.randomSplit([0.7, 0.3], seed=2017)

### Train model with Random forest

In [116]:
model = RandomForestClassifier(labelCol='poisonous', featuresCol='features', numTrees=200) \
        .fit(mushrooms_train)

### Predict and evaluate a model 

In [117]:
pred = model.transform(mushrooms_val)

results = pred.select(['probability', 'prediction', 'poisonous'])

results_collect = results.collect()
# After .collect(), `results_collect` become a list of Row objects

correct = results.withColumn('correct', (results.prediction==results.poisonous).cast('integer')).select('correct')

accuracy = correct.agg({'correct':'mean'}).collect()[0][0]

print('Test accuracy:', accuracy)

Test accuracy: 0.988529291274068


###### Some intermediate results

In [118]:
results.show(5)

+--------------------+----------+---------+
|         probability|prediction|poisonous|
+--------------------+----------+---------+
|[0.95982912711196...|       0.0|      0.0|
|[0.96550770066792...|       0.0|      0.0|
|[0.96195439264680...|       0.0|      0.0|
|[0.94786380659111...|       0.0|      0.0|
|[0.95354238014707...|       0.0|      0.0|
+--------------------+----------+---------+
only showing top 5 rows



In [121]:
correct.show(5)

+-------+
|correct|
+-------+
|      1|
|      1|
|      1|
|      1|
|      1|
+-------+
only showing top 5 rows



In [122]:
accuracy

0.988529291274068