In [None]:
import seaborn as sns
from pyspark.sql import SparkSession

# Start Spark Session
spark = SparkSession.builder.appName('iris').getOrCreate()

# Load iris dataset from seaborn
iris = sns.load_dataset('iris')

# Convert the iris dataset to a Spark DataFrame
iris_df = spark.createDataFrame(iris)


In [None]:
iris_df.show(5)

In [None]:
from pyspark.ml.feature import StringIndexer

# Convert target variable into numerical form
indexer = StringIndexer(inputCol="species", outputCol="label")
iris_df = indexer.fit(iris_df).transform(iris_df)
iris_df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler

# Specify the input and output columns of the vector assembler
assembler = VectorAssembler(
    inputCols=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'],
    outputCol='features')

# Transform the data
iris_df = assembler.transform(iris_df)

In [None]:
iris_df.show(5)

In [None]:
# Split the data into training and test sets
train_data, test_data = iris_df.randomSplit([0.7, 0.3])

In [None]:
train_data.show(5)

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Random Forest Classifier
rf = RandomForestClassifier(
    featuresCol='features', labelCol='label',
    numTrees=100
)

rf_model = rf.fit(train_data)

# Make predictions on the test data
predictions = rf_model.transform(test_data)


# from pyspark.ml.classification import LogisticRegression

# # Create a Logistic Regression model and fit it to the training data
# lr = LogisticRegression(featuresCol='features', labelCol='label')
# lr_model = lr.fit(train_data)

# # Make predictions on the test data
# predictions = lr_model.transform(test_data)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert prediction column to double type
predictions = predictions.withColumn(
    "prediction", predictions["prediction"].cast("double")
)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)

# Compute confusion matrix
predictionAndLabels = predictions.select(
    "prediction", "label"
).rdd
metrics = MulticlassMetrics(predictionAndLabels)

# Get the confusion matrix
confusion_matrix = metrics.confusionMatrix().toArray()

print("Confusion Matrix:\n", confusion_matrix)
print("Accuracy = %g" % accuracy)


In [None]:
spark.stop()