Para a construção de um modelo utilizando o Spark e MLlib são necessários 6 passos:

#1) Construir e iniciar a seção SPARK
#2) Implementar o carregamento dos dados para o spark: Carregar o arquivo, especificar o formato desejado e lê os dados como um Dataframe do Spark
#3) Identificar as características a serem utilizadas para treinamento e teste do modelo
#4) Instanciar as classes e os objetos dos algoritmos a serem utilizados
#5) Utilizar o método fit() para realizar o treinamento do modelo
#5) Avaliar o modelo

In [2]:
from pyspark.ml.classification import LogisticRegression as LR  # utilizada para importar a classificação através da regressão logística
from pyspark.ml.feature import VectorAssembler #Utilizada para construir o vetor/matriz que vamos utilizar para a classificação
from pyspark.ml.feature import StringIndexer #Atribui a cada valor literal um valor inteiro para utilizar no algoritmo 
from pyspark.ml import Pipeline # Utilizada para criar o pipeline de utilização dos dados

In [3]:
from pyspark.sql import SparkSession #importa a biblioteca que cria a seção do spark

In [4]:
#inicia a seção para a utilização do spark
spark = SparkSession.builder.appName("ClassificadorVinhos").getOrCreate() #cria a seção caso não exista ou obtém a já criada

In [5]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/d1995_07_01-24d0c.json,d1995_07_01-24d0c.json,7451741
dbfs:/FileStore/tables/d1995_07_02-c3f44.json,d1995_07_02-c3f44.json,9572086
dbfs:/FileStore/tables/d1995_07_03-f99af.json,d1995_07_03-f99af.json,14125303
dbfs:/FileStore/tables/d1995_07_04-c5a7f.json,d1995_07_04-c5a7f.json,12030333
dbfs:/FileStore/tables/d1995_07_05-0f261.json,d1995_07_05-0f261.json,14662194
dbfs:/FileStore/tables/d1995_07_06-20619.json,d1995_07_06-20619.json,15557682
dbfs:/FileStore/tables/d1995_07_07-2dd8d.json,d1995_07_07-2dd8d.json,15279295
dbfs:/FileStore/tables/d1995_07_08-83302.json,d1995_07_08-83302.json,7033852
dbfs:/FileStore/tables/d1995_07_09-f75d3.json,d1995_07_09-f75d3.json,5589076
dbfs:/FileStore/tables/d1995_07_10-160a3.json,d1995_07_10-160a3.json,10616482


In [6]:
diretorioDataset="/FileStore/tables/winequality_red-42ff5.csv"

In [7]:
#realiza a carga do dataset
wine_df = spark.read.format("csv"). \
    options(header="true",\
    inferschema="true",sep=';'). \
    load(diretorioDataset)

In [8]:
#realiza o print do esquema do dataframe
wine_df.printSchema()

In [9]:
#realiza o print do dataset
wine_df.show(15)

In [10]:
#realizando um filtro para o dataframe
wine_df.filter(wine_df.quality>7).show(5)

In [11]:
#realizando um filtro para o dataframe
wine_df.filter(wine_df.quality>8).show(5)

In [12]:
#importa a função para criar uma nova coluna no dataset (similar às funções do SQL)
from pyspark.sql.functions import when


In [13]:
#cria a nova coluna que contém a classificação como ruim, boa ou mediana
wine_df = wine_df.withColumn('quality_new',\
    when(wine_df['quality']< 5, 0 ).\
    otherwise(when(wine_df['quality']<8,1)\
    .otherwise(2)))

In [14]:
#realiza o print do dataset
wine_df.show(15)

In [15]:
#realiza o print do dataset
wine_df.filter(wine_df.quality_new>1).show(5)

In [16]:
#converte os valores para string - não executa, é apenas transformação
string_index = StringIndexer(inputCol='quality_new',\
    outputCol='quality'+'Index')

In [17]:
#Seleciona as colunas a serem utilizadas como entradas para a classificação - não executa, é apenas transformação
vectors = VectorAssembler(inputCols = \
    ['fixed acidity','volatile acidity',\
    'citric acid','residual sugar','chlorides',\
    'free sulfur dioxide', 'total sulfur dioxide', \
    'density','pH','sulphates', 'alcohol'],\
    outputCol = 'features')

In [18]:
#cria o "novo dataframe" - não executa, é apenas transformação
stages = [vectors, string_index]

In [19]:
#define o pipeline
pipeline = Pipeline().setStages(stages)

In [20]:
#aplicando as transformações ao dataset - executa o comando/ação
pipelineModel = pipeline.fit(wine_df)

In [21]:
#aplica as transformações obtidas através do pipeline
pl_data_df = pipelineModel.transform(wine_df)

In [22]:
#print do dataset após a aplicação do pipeline
pl_data_df.show(15)

In [23]:
#divide o dataset entre treinamento e teste (70% treinamento e 30% teste)
train_df, test_df = pl_data_df.randomSplit([0.7,0.3])

In [24]:
train_df.show(5)

In [25]:
test_df.show(5)

In [26]:
#instancia a classe para a execução do modelo através da regressão logística
classificador= LR(featuresCol = 'features', \
    labelCol='qualityIndex',\
               maxIter=50) #maximo de iterações

In [27]:
#aplica o treinamento do modelo
modelo = classificador.fit(train_df)

In [28]:
#obtém o sumário (dados de análise) para o modelo criado
modelSummary = modelo.summary

In [29]:
    # print das estatísticas do modelo
    accuracy = modelSummary.accuracy #acurácia da classificação
    fPR = modelSummary.weightedFalsePositiveRate #taxa de falsos positivos
    tPR = modelSummary.weightedTruePositiveRate #taxa de verdadeiros positivos
    fMeasure = modelSummary.weightedFMeasure() #f-score
    precision = modelSummary.weightedPrecision #precision
    recall = modelSummary.weightedRecall #recall
    print("Acurácia: {}\
        Taxa de verdadeiros positivos {} F-score {} Precision {} Recall {}"\
        .format(accuracy, tPR, fMeasure, precision, recall))