In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

irisDF = spark.read\
                .option('inferSchema','true')\
                .option('header','true')\
                .option('delimiter',',').csv('datasets/iris-dataset.txt')
vec = VectorAssembler(inputCols=irisDF.columns[0:4],outputCol='features')
irisDF = vec.transform(irisDF)
indexer = StringIndexer(inputCol='class',outputCol='label')
indexModel = indexer.fit(irisDF)
irisDF = indexModel.transform(irisDF)
#irisDF.show()
irisDF = irisDF.select('features','label')

trainDF, testDF = irisDF.randomSplit([0.75,0.25],seed=1234)

classifier = RandomForestClassifier()
model = classifier.fit(trainDF)
model.save('savedModel')

resultDF = model.transform(testDF)
resultDF.show()

eva = MulticlassClassificationEvaluator(metricName='accuracy') 
accuracy = eva.evaluate(resultDF)
print("Model Accuracy : ",accuracy)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import VectorAssembler, StringIndexer, StringIndexerModel,PCAModel
spark = SparkSession.builder.getOrCreate()

logSchema = StructType().add('sepal-length','float')\
                        .add('sepal-width','float')\
                        .add('petal-length','float')\
                        .add('petal-width','float')\
                        .add('class','string')


#log klasörünü dinlemeye başlıyoruz
irisDF = spark.readStream.option("sep", ",").schema(logSchema).csv("logs")
vec = VectorAssembler(inputCols=['sepal-length','sepal-width','petal-length','petal-width'],outputCol='features')

irisDF = vec.transform(irisDF)

model = RandomForestClassificationModel.load('irisModel')
resultDF = model.transform(irisDF)
resultDF = resultDF.select('features','class','prediction')

query = resultDF.writeStream.outputMode("update").format("console").start()
query.awaitTermination()
