# Fit-Spark Classification

In [1]:
## Spark Lib
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics


from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pyspark.sql.functions as F
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType

from pyspark.ml.linalg import Vectors
from pyspark.mllib.util import MLUtils


import time
start_time = time.time()
%matplotlib inline

# Spark Session

In [2]:
spark = SparkSession.builder \
        .master("local[8]") \
        .appName("MachineLearningIris") \
        .getOrCreate()

# Classes

In [3]:
class Resultados:
    def __init__(self, algoritmo, acuracia, tempo, precisao, fpositivos, recall, parametroA, parametroB):
        self.algoritmo = algoritmo 
        self.acuracia = acuracia
        self.tempo = tempo
        self.precisao = precisao
        self.fpositivos = fpositivos
        self.recall = recall
        self.parametroA = parametroA
        self.parametroB = parametroB
        

        
        
class Naive:
    def __init__(self, smoothing):
        self.smoothing = smoothing 
        
class SVM:
    def __init__(self, maxIter, regParam):
        self.maxIter = maxIter 
        self.regParam = regParam
        
class Tree:
    def __init__(self, maxDepth, checkpointInterval):
        self.maxDepth = maxDepth 
        self.checkpointInterval = checkpointInterval

# Funções

## Naive Bayes

In [4]:
def naive(train, test, param):
    
    
    best = []
    
    timeList = []
    
    resul = []
    
    start_time_total =  time.time()

    
    for x in param.smoothing:
            
        
        start_time =  time.time()
        
    
        trainer = NaiveBayes(smoothing=x, modelType="multinomial")    

        evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",\
                metricName="accuracy")

        model = trainer.fit(train)

        result_nb = model.transform(test)

        accuracy_nb = evaluator.evaluate(result_nb) * 100
                
        timeFinal = time.time() - start_time
        
        timeList.append(timeFinal)
        
        print("Algorithm: Naive Bayes | Accuracy = %3.1f %% | Time = %3.1f s | Smoothing = %3.1f" % (accuracy_nb, timeFinal, x))
    

        # Matriz de Confusão
        preds_and_labels = result_nb.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')
        preds_and_labels = preds_and_labels.select(['prediction','label'])
        metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
        
        
        prec = metrics.precision(1.0) *100
        fp = metrics.falsePositiveRate(1.0)*100
        rec =  metrics.recall(1.0) *100
        
        resul.append(Resultados('naive', accuracy_nb, timeFinal, prec, fp, rec, x, None))
        
        best.append(accuracy_nb)
    
    
    timeTotal = time.time() - start_time_total

    
    print("Tempo Médio: %3.1f s" % (sum(timeList)/len(timeList)))
    print("Tempo Total: %3.1f s" % timeTotal)

        
    
        
    
    #return max(best)
    return resul
    
    



## SVM

In [5]:
def svm(train, test, param):
    
    best = []

    timeList = []

    resul = []

    start_time_total =  time.time()
    
    for x in param.maxIter:
        for y in param.regParam:
            
            start_time =  time.time()

    
            trainer = LinearSVC(featuresCol='features', labelCol='label',\
                            maxIter=x, regParam=y)

            ovr_trainer = OneVsRest(classifier=trainer)


            model = ovr_trainer.fit(train)

            result_svm = model.transform(test)


            evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",\
                    metricName="accuracy")

            accuracy_svm = evaluator.evaluate(result_svm) * 100
            
            timeFinal = time.time() - start_time

            timeList.append(timeFinal)

            print("Algorithm: SVM | Accuracy = %3.1f %% | Time = %3.1f s | maxIter = %3.1f | regParam = %3.1f" % (accuracy_svm, timeFinal, x, y))

            # Matriz de Confusão
            preds_and_labels = result_svm.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')
            preds_and_labels = preds_and_labels.select(['prediction','label'])
            metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
        
        
            prec = metrics.precision(1.0) *100
            fp = metrics.falsePositiveRate(1.0)*100
            rec =  metrics.recall(1.0) *100
            
            
            resul.append(Resultados('svm', accuracy_svm, timeFinal, prec, fp, rec, x, y))

            
            best.append(accuracy_svm)
    
    timeTotal = time.time() - start_time_total
            
    print("Tempo Médio: %3.1f s" % (sum(timeList)/len(timeList)))
    print("Tempo Total: %3.1f s" % timeTotal)



#    return max(best)
    return resul


## Decision Tree

In [6]:
def decTree(train, test, param):
    
    best = []

    timeList = []
    
    resul = []

    start_time_total =  time.time()
    
    for x in param.maxDepth:
        for y in param.checkpointInterval:
            
            start_time =  time.time()

    
            trainer = DecisionTreeClassifier(featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability',\
                                             rawPredictionCol='rawPrediction', maxDepth=x, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0,\
                                             maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=y, impurity='gini', seed=None)


            evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",\
                        metricName="accuracy")

            model = trainer.fit(train)

            result_dt = model.transform(test)
    
            accuracy_dt = evaluator.evaluate(result_dt) * 100
        
            timeFinal = time.time() - start_time
            
            timeList.append(timeFinal)
    
            #print("Decision Tree: accuracy = %3.1f %%" % accuracy_dt)
        
            print("Algorithm: decTree | Accuracy = %3.1f %% | Time = %3.1f s | maxDepth = %3.1f | checkpointInterval = %3.1f" % (accuracy_dt, timeFinal, x, y))

            # Matriz de Confusão
            preds_and_labels = result_dt.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')
            preds_and_labels = preds_and_labels.select(['prediction','label'])
            metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
        
        
            prec = metrics.precision(1.0) *100
            fp = metrics.falsePositiveRate(1.0)*100
            rec =  metrics.recall(1.0) *100
            
            
            resul.append(Resultados('dtree', accuracy_dt, timeFinal, prec, fp, rec, x, y))
    
    timeTotal = time.time() - start_time_total
            
    print("Tempo Médio: %3.1f s" % (sum(timeList)/len(timeList)))
    print("Tempo Total: %3.1f s" % timeTotal)
    
    
#    return accuracy_dt
    return resul




## Função que Executa Todos os Algoritmos de ML

In [7]:
def autoChoice(train, test, choice, naivep, svmp, dtreep):
    
    resul=[]
    
    
    if choice == 'auto':
        resul.append(Resultados('naive' ,naive(train, test, naivep)))
        resul.append(Resultados('svm', svm(train, test, svmp)))
        resul.append(Resultados('DecisionTree', decTree(train, test, dtreep)))
        
        resul.sort(key=lambda x: x.acuracia, reverse=True)
        print(resul[0].algoritmo, resul[0].acuracia, sep =' ' )
        
        
        
    elif choice == 'naive':
        return naive(train, test, naivep)
        #print("A acurácia da Naive Bayes: %3.1f %%" % naive(train, test, naivep))
    elif choice == 'svm':
        return svm(train, test, svmp)
        #print("A acurácia da SVM: %3.1f %%" % svm(train, test, svmp))
    elif choice == 'tree':
        return decTree(train, test, dtreep)
        #print("A acurácia da Decision Tree: %3.1f %%" % decTree(train, test, dtreep))
    else:
        print("Opção Inválida")

# Carregamento dos Dados

In [8]:
# Datasets Marcados com "OK", são os que já estão tratados e testados

#orig_data = spark.read.format("csv").options(sep=',',header='true',inferschema='true').\
#            load('/home/tiago/Mestrado/Dissertacao/dataset/iris.data')#OK

#orig_data = spark.read.format("csv").options(sep=',',header='true',inferschema='true').\
#            load('/home/tiago/Mestrado/Dissertacao/dataset/breast-cancer-wisconsin.data')#OK

#orig_data = spark.read.format("csv").options(sep=',',header='true',inferschema='true').\
#            load('/home/tiago/Mestrado/Dissertacao/dataset/glass.data') #OK

#orig_data = spark.read.format("csv").options(sep=',',header='true',inferschema='true').\
#            load('/home/tiago/Mestrado/Dissertacao/dataset/sonar.data')#OK

orig_data = spark.read.format("csv").options(sep=',',header='true',inferschema='true').\
            load('/home/tiago/Mestrado/Dissertacao/dataset/sonar.data')



# Parâmetros

In [9]:
train_sample = 0.7
test_sample = 0.3

# 'auto'  = Testa tudo
# 'tree'  = Decision Tree
# 'naive' = Naive Bayes
# 'svm'   = SVM
choice = 'naive'

# Parâmetros Naive
# Nome = Smoothing
#smoothing = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
smoothing = [round(x * 0.1, 1) for x in range(0, 100)]

#smoothing = [1, 2, 3]
naiveParam = Naive(smoothing)

# Parâmetros SVM
maxIter = [80,90,100]
regParam = [0.1,0.2,0.3]
svmParam = SVM(maxIter, regParam)

# Parâmetros Decision Tree
# Deapht / MaxIter
maxDepth = [1,2,3,4,5]
MaxIter = [10,15,20]
dtreeParam = Tree(maxDepth, MaxIter)

################


#amostra = ""
#execucao = "dtree-MD-custom-30-40-CP-custom-15-30"

# Pré-Processamento/Tratamento Padrão

In [10]:
indexer = StringIndexer(inputCol="class", outputCol="label").fit(orig_data)
label_data = indexer.transform(orig_data)

labelReverse = IndexToString().setInputCol("label")

label_data = label_data.drop("class")

ignore = ['label']
list = [x for x in label_data.columns if x not in ignore]

assembler = VectorAssembler(
            inputCols=list,
            outputCol='features')

data = (assembler.transform(label_data).select("label","features"))


# RandomSplit Train/Test

In [11]:
(train, test) = data.randomSplit([train_sample, test_sample], 1234)

In [12]:
resul = []
resul = autoChoice(train, test, choice, naiveParam, svmParam, dtreeParam)

Algorithm: Naive Bayes | Accuracy = 72.2 % | Time = 3.1 s | Smoothing = 0.0
Algorithm: Naive Bayes | Accuracy = 72.2 % | Time = 1.2 s | Smoothing = 0.1
Algorithm: Naive Bayes | Accuracy = 72.2 % | Time = 1.1 s | Smoothing = 0.2
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.9 s | Smoothing = 0.3
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.8 s | Smoothing = 0.4
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.7 s | Smoothing = 0.5
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.7 s | Smoothing = 0.6
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.7 s | Smoothing = 0.7
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.7 s | Smoothing = 0.8
Algorithm: Naive Bayes | Accuracy = 73.6 % | Time = 0.7 s | Smoothing = 0.9
Algorithm: Naive Bayes | Accuracy = 75.0 % | Time = 0.7 s | Smoothing = 1.0
Algorithm: Naive Bayes | Accuracy = 75.0 % | Time = 0.8 s | Smoothing = 1.1
Algorithm: Naive Bayes | Accuracy = 76.4 % | Time = 0.6 s | Smoothing = 1.2
Algorithm: N

In [13]:
resul.sort(key=lambda x: x.acuracia, reverse=True)
for i in range(5):
    print("Algorithm: %s | Accuracy = %3.1f %% | Time = %3.1f s | Smoothing = %3.1f" % (resul[i].algoritmo, resul[i].acuracia, resul[i].tempo, resul[i].parametroA))

Algorithm: naive | Accuracy = 77.8 % | Time = 0.6 s | Smoothing = 2.8
Algorithm: naive | Accuracy = 77.8 % | Time = 0.6 s | Smoothing = 2.9
Algorithm: naive | Accuracy = 77.8 % | Time = 0.6 s | Smoothing = 3.0
Algorithm: naive | Accuracy = 77.8 % | Time = 0.6 s | Smoothing = 3.1
Algorithm: naive | Accuracy = 77.8 % | Time = 0.6 s | Smoothing = 3.2


In [14]:
print(resul[0].precisao, resul[0].fpositivos, resul[0].recall, sep =' ' )

81.48148148148148 12.82051282051282 66.66666666666666
