# Importing Iris dataset

In [0]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from functools import reduce

pd.set_option("display.max_rows", 999)
pd.set_option("display.max_columns", 999)
pd.set_option("display.max_colwidth", None)

spark = SparkSession.builder\
    .appName("DecisionTreeSparkSession") \
    .getOrCreate()

# Load DATA

In [0]:
#df = pd.read_csv("https://raw.githubusercontent.com/Apaulgithub/oibsip_task1/main/Iris.csv")

# Creating a spark dataframe

In [0]:
# df = spark.createDataFrame(df)
# display(df.limit(10))

# Creating a delta table in the workspace catalog and default schema

In [0]:
# df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.iris")

# Decision Tree - Spark 

## Reading the DATA

In [0]:
df = spark.read.table("workspace.default.iris")
display(df)

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# from pyspark.ml.linalg import Vector
# from pyspark.ml import training, testing
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.tuning import ParamGridBuilder

label_indexer = StringIndexer(inputCol ="Species", outputCol="label")

assembler = VectorAssembler(
    inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
    outputCol="features"
)

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=10, seed=42)

pipeline = Pipeline(stages=[label_indexer, assembler, dt])








In [0]:

# assembler = VectorAssembler(
#     inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"],
#     outputCol="features"
# )

#assembling the features , Sparks needs to put the features in a form of vector
inputCols = ['SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm']
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")

#Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=10, seed=42)

#All preprocessing + model are chained together, when fit() is called all stages are run in sequence
pipeline = Pipeline(stages=[label_indexer, assembler, dt])

#spliting the dataset to training set and test set
train, test = df.randomSplit([0.8, 0.2], seed=42)

#defining the paramGrid to be used for the cross validation part
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 3, 4, 5]) #those can be different
             .addGrid(dt.maxBins, [10, 20, 40]) #those can be different
             .build())

#defining the MultiCalssClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

#wrap in cross validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

#fitting the model with the best fold that has the highest score
cvModel = cv.fit(train)


print("Accuracy:", evaluator.evaluate(cvModel.transform(df)))
#print("AUC:", evaluator.evaluate(cvModel.transform(df), {evaluator.metricName: "areaUnderROC"}))
print("F1:", evaluator.evaluate(cvModel.transform(df), {evaluator.metricName: "f1"}))
print("Precision:", evaluator.evaluate(cvModel.transform(df), {evaluator.metricName: "weightedPrecision"}))
print("Recall:", evaluator.evaluate(cvModel.transform(df), {evaluator.metricName: "weightedRecall"}))
