In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName('Mushrooms').getOrCreate()

In [4]:
df = spark.read.csv('mushrooms.csv',header=True,inferSchema=True)

In [5]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import when, count, col, isnull
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [6]:
df.printSchema()

root
 |-- class: string (nullable = true)
 |-- cap-shape: string (nullable = true)
 |-- cap-surface: string (nullable = true)
 |-- cap-color: string (nullable = true)
 |-- bruises: string (nullable = true)
 |-- odor: string (nullable = true)
 |-- gill-attachment: string (nullable = true)
 |-- gill-spacing: string (nullable = true)
 |-- gill-size: string (nullable = true)
 |-- gill-color: string (nullable = true)
 |-- stalk-shape: string (nullable = true)
 |-- stalk-root: string (nullable = true)
 |-- stalk-surface-above-ring: string (nullable = true)
 |-- stalk-surface-below-ring: string (nullable = true)
 |-- stalk-color-above-ring: string (nullable = true)
 |-- stalk-color-below-ring: string (nullable = true)
 |-- veil-type: string (nullable = true)
 |-- veil-color: string (nullable = true)
 |-- ring-number: string (nullable = true)
 |-- ring-type: string (nullable = true)
 |-- spore-print-color: string (nullable = true)
 |-- population: string (nullable = true)
 |-- habitat: string 

In [7]:
df.na.drop().show()

+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|class|cap-shape|cap-surface|cap-color|bruises|odor|gill-attachment|gill-spacing|gill-size|gill-color|stalk-shape|stalk-root|stalk-surface-above-ring|stalk-surface-below-ring|stalk-color-above-ring|stalk-color-below-ring|veil-type|veil-color|ring-number|ring-type|spore-print-color|population|habitat|
+-----+---------+-----------+---------+-------+----+---------------+------------+---------+----------+-----------+----------+------------------------+------------------------+----------------------+----------------------+---------+----------+-----------+---------+-----------------+----------+-------+
|    p|        x|          s|        n|      t|   p|              f|           c|        n|   

In [None]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

In [11]:
in_cols = df.schema.names[1:]
str_indexers = [StringIndexer(inputCol=c, outputCol=c+'_idx') for c in in_cols]

In [12]:
onehot_encoders = [OneHotEncoder(dropLast=False, inputCol=c+'_idx', outputCol=c+'_onehot') for c in in_cols]

onehot_cols = [c+'_onehot' for c in in_cols]
print(onehot_cols[0:4])

['cap-shape_onehot', 'cap-surface_onehot', 'cap-color_onehot', 'bruises_onehot']


In [13]:
feat_assembler = VectorAssembler(inputCols=onehot_cols, outputCol='features')


In [15]:
label_indexer = StringIndexer(inputCol=df.schema.names[0], outputCol='poisonous')


In [17]:
df.groupby('class').count().show()
df.groupby('cap-shape').count().show()

+-----+-----+
|class|count|
+-----+-----+
|    e| 4208|
|    p| 3916|
+-----+-----+

+---------+-----+
|cap-shape|count|
+---------+-----+
|        x| 3656|
|        f| 3152|
|        k|  828|
|        c|    4|
|        b|  452|
|        s|   32|
+---------+-----+



In [18]:
pipeline = Pipeline(stages=str_indexers+onehot_encoders+[feat_assembler, label_indexer])


In [20]:
mushrooms_trans = pipeline \
                    .fit(df) \
                    .transform(df)

mushrooms_trans = mushrooms_trans.withColumnRenamed('poisonous', 'label')

mushrooms_train, mushrooms_val = mushrooms_trans.randomSplit([0.05, 0.95], seed=2021)

model = RandomForestClassifier(labelCol = 'label', featuresCol = 'features',numTrees=10)



ppl = Pipeline(stages=[model])

paramGrid = ParamGridBuilder()\
    .addGrid(model.maxDepth, [6, 8, 10]) \
    .addGrid(model.numTrees, [100,200])\
    .addGrid(model.featureSubsetStrategy, ['onethird', 'sqrt', 'log2']).build()

crossval = CrossValidator(estimator=ppl,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2) 

model = crossval.fit(mushrooms_train)

In [21]:
pred = model.transform(mushrooms_val)
pred.select("probability","prediction","label").show()

+--------------------+----------+-----+
|         probability|prediction|label|
+--------------------+----------+-----+
|[0.97377047170335...|       0.0|  0.0|
|[0.99044388205378...|       0.0|  0.0|
|[0.96902769956806...|       0.0|  0.0|
|[0.98938102701819...|       0.0|  0.0|
|[0.97509103500866...|       0.0|  0.0|
|[0.98819777869242...|       0.0|  0.0|
|[0.97034826287337...|       0.0|  0.0|
|[0.99162713037954...|       0.0|  0.0|
|[0.97877047170335...|       0.0|  0.0|
|[0.99040542051532...|       0.0|  0.0|
|[0.97398923802960...|       0.0|  0.0|
|[0.98938102701819...|       0.0|  0.0|
|[0.98009103500866...|       0.0|  0.0|
|[0.98815931715396...|       0.0|  0.0|
|[0.99126998752240...|       0.0|  0.0|
|[0.97084190027477...|       0.0|  0.0|
|[0.98671494432484...|       0.0|  0.0|
|[0.96272733326769...|       0.0|  0.0|
|[0.98902388416104...|       0.0|  0.0|
|[0.97216246358009...|       0.0|  0.0|
+--------------------+----------+-----+
only showing top 20 rows

