# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

## <font color='blue'>Mini-Projeto 5</font>

### <font color='blue'>Machine Learning na Engenharia Civil com Apache Spark</font>

![title](imagens/MP5.png)

O  Mini-Projeto 5  aborda  todo  o  processo  de  Machine  Learning  no  contexto  de  um problema na área de 
Engenharia Civil.

Mas ao invés de aplicar as tarefas uma a uma, será criado módulos de automação. Ou seja, será desenvolvido um 
projeto próprio de sistema de AutoML, sem o uso de frameworks específicos e aplicando Machine Learning com o 
Spark MLlib no PySpark.

O concreto é o material mais importante na Engenharia Civil. 

A resistência à compressão do concreto é uma função altamente não linear da idade e dos ingredientes usados para 
prepararo concreto.

Nosso trabalho será construir um modelo preditivo capaz de prever a força do concreto com  base  em  uma  série  de
características  e  ingredientes  do  concreto.

Usaremos  um  dataset disponível publicamente no link abaixo:
    
https://archive.ics.uci.edu/ml/datasets/Concrete+Compressive+Strength
    
A variável “Concrete compressive strength”(coluna csMPano dataset) será a variável alvo e as demais serão as 
variáveis preditoras.

Como  iremos  prever  um  valor  numérico  que  representa  a  força  do  concreto  e  temos dados  de  entrada  
e  saída,  este  é  um  problema  de  regressão.  

Vamos  experimentar  diferentes algoritmos  de  regressão  e  escolher  o que  apresentar  a melhor performance. 

Técnicas  de otimização de hiperparâmetros serão exploradas para chegar ao melhor modelo possível.

Com o modelo treinado faremos previsões usando novos dados.

In [1]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

Versão da Linguagem Python Usada Neste Jupyter Notebook: 3.9.13


In [None]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
#!pip install -q -U watermark

In [2]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [3]:
# Imports
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import * # * importa tudo do pacote
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler#
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.stat import Correlation
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

## Preparando o Ambiente Spark

In [4]:
# Criando o Spark Context
sc = SparkContext(appName= 'Mini-Projeto5')

23/04/28 14:22:39 WARN Utils: Your hostname, DataScience resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/04/28 14:22:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/28 14:22:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc.setLogLevel("Error")

In [6]:
# Criando a sessão
spark  = SparkSession.builder.getOrCreate()

In [7]:
spark

## Carregando o Dataset

In [8]:
# Carrega os dados
dados = spark.read.csv('dados/dataset.csv', inferSchema=True, header=True)

                                                                                

In [9]:
type(dados)

pyspark.sql.dataframe.DataFrame

In [10]:
# Número de registros
dados.count()

1030

In [11]:
# Visualiza os dados no padrão do Spark DataFrame
dados.show(10)

+------+-----+------+-----+----------------+---------------+-------------+---+-----+
|cement| slag|flyash|water|superplasticizer|coarseaggregate|fineaggregate|age|csMPa|
+------+-----+------+-----+----------------+---------------+-------------+---+-----+
| 540.0|  0.0|   0.0|162.0|             2.5|         1040.0|        676.0| 28|79.99|
| 540.0|  0.0|   0.0|162.0|             2.5|         1055.0|        676.0| 28|61.89|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|270|40.27|
| 332.5|142.5|   0.0|228.0|             0.0|          932.0|        594.0|365|41.05|
| 198.6|132.4|   0.0|192.0|             0.0|          978.4|        825.5|360| 44.3|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|        670.0| 90|47.03|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0|365| 43.7|
| 380.0| 95.0|   0.0|228.0|             0.0|          932.0|        594.0| 28|36.45|
| 266.0|114.0|   0.0|228.0|             0.0|          932.0|     

In [12]:
# Visualiza os dados no formato do Pandas
dados.limit(10).toPandas()

Unnamed: 0,cement,slag,flyash,water,superplasticizer,coarseaggregate,fineaggregate,age,csMPa
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27
3,332.5,142.5,0.0,228.0,0.0,932.0,594.0,365,41.05
4,198.6,132.4,0.0,192.0,0.0,978.4,825.5,360,44.3
5,266.0,114.0,0.0,228.0,0.0,932.0,670.0,90,47.03
6,380.0,95.0,0.0,228.0,0.0,932.0,594.0,365,43.7
7,380.0,95.0,0.0,228.0,0.0,932.0,594.0,28,36.45
8,266.0,114.0,0.0,228.0,0.0,932.0,670.0,28,45.85
9,475.0,0.0,0.0,228.0,0.0,932.0,594.0,28,39.29


In [13]:
#Schema
dados.printSchema()

root
 |-- cement: double (nullable = true)
 |-- slag: double (nullable = true)
 |-- flyash: double (nullable = true)
 |-- water: double (nullable = true)
 |-- superplasticizer: double (nullable = true)
 |-- coarseaggregate: double (nullable = true)
 |-- fineaggregate: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- csMPa: double (nullable = true)



## Módulo de Automação da Preparação de Dados

O MLlib requer que todas as colunas de entrada do dataframe sejam vetorizadas. Vamos criar uma função Python que irá automatizar nosso trabalho de preparação dos dados, incluindo a vetorização e todas as tarefas necessárias.

Primeiro, vamos listar e remover valores ausentes (se existirem). Vamos focar neste projeto em Machine Learning, mas lembre-se sempre de checar valores ausentes.

In [14]:
# Separamos os dados ausentes (se existirem) e removemos (se existirem)
dados_com_linhas_removidas = dados.na.drop()
print('Número de linhas antes de remover valores ausentes:', dados.count())
print('Número de linhas após remover valores ausentes:', dados_com_linhas_removidas.count())

Número de linhas antes de remover valores ausentes: 1030
Número de linhas após remover valores ausentes: 1030


In [15]:
# Função de preparação dos dados
def func_modulo_prep_dados(df,
                           variaveis_entrada,
                           variaveis_saida,
                           tratar_outliers = True,
                           padronizar_dados = True):
    
    #Gerar um novo dataframe, renomeando o argumento que representa a variavél saida, por exigência do Spark
    novo_df = df.withColumnRenamed(variavel_saida,'label')
    
    #Converte a variável alvo para o tipo numérico como float(Encondig)
    if str(novo_df.schema['label'].dataType) != 'IntegerType':
        novo_df = novo_df.withColumn('label', novo_df['label'].cast(FloatType()))
    
    
    #Listas de Controle para as variáveis
    variaveis_numericas =[]
    variaveis_categoricas =[]
    
   # Se tiver variáveis de entrada do tipo String, converte para numérico
    for coluna in variaveis_entrada:
        
        # Verifica se a variável é do tipo string
        if str(novo_df.schema[coluna].dataType) == 'StringType':
            
            #Define a variável com um sufixo
            novo_nome_coluna = coluna + '_num'
            
            # Adicionamos à lista de variáveis categóricas
            variaveis_categoricas.append(novo_nome_coluna)
        
        else:
            
            # Adiciona variávies numéricas na lista correspondente
            variaveis_numericas.append(coluna)
            
            # Colocamos os dados no dataframe de variáveis indexadas
            df_indexed = novo_df
    
    # Se o dataframe tiver dados do tipo String, aplicamos a indexação
    # Verificamos se a lista de variávies categóricas não está vazia
    if len(variaveis_categoricas) != 0:
        
        #loop pelas colunas
        for coluna in novo_df:
            
            # Se a variável é do tipo String, é criada, treinada e aplicado indexador
            if str(novo_df.schema[coluna].dataType) == 'StringType':
                
                # Cria o indexador
                indexer = StringIndexer(inputCol= coluna, outputCol= coluna + '_num')
                
                # Treina e aplica o indexador
                df_indexed = indexer.fit(novo_df).transform(novo_df)
    else:
        
        # Se não tem variável categóricas, coloca os dados no dataframe de variáveis indexadas
        df_indexed = novo_df
        
    # Tratamento de outliers
    if tratar_outliers == True:
        print('\nAplicando o tratamento de outliers...')
        
        # Cria dicionário para Outliers
        d = {}
        
        # Dicionário de quartis das variáveis do dataframe indexado(Para veriáveis numéricas)
        for col in variaveis_numericas:
            d[col] = df_indexed.approxQuantile(col,[0.01,0.99],0.25)
        
        # Agora aplicamos a transformação dependendo da distribuição de cada variável
        for col in variaveis_numericas:
            
            # Extraimos a assimetria dos dados e usamos isso para tratar os outliers
            skew = df_indexed.agg(skewness(df_indexed[col])).collect()
            skew = skew[0][0]
            
            #Verificamos a assimetria e então aplicamos
            
            #trnasformação de log +1 se a assimetria for positiva
            if skew > 1:
                indexed = df_indexed.withColumn(col, log(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] ) + 1).alias(col))
                print("\nA variável " + col + " foi tratada para assimetria positiva (direita) com skew =", skew)
            
            # Transformação exponencial se a assimetria for negativa
            elif skew < -1:
                indexed = df_indexed.withColumn(col, \
                exp(when(df[col] < d[col][0], d[col][0])\
                .when(df_indexed[col] > d[col][1], d[col][1])\
                .otherwise(df_indexed[col] )).alias(col))
                print("\nA variável " + col + " foi tratada para assimetria negativa (esquerda) com skew =", skew)
    
    # Vetorização
    
    # Lista final de atributos
    lista_atributos = variaveis_numericas + variaveis_categoricas#Concatena as variávies
    
    # Cria o vetorizador para os atributos
    vetorizador = VectorAssembler(inputCols = lista_atributos, outputCol = 'features')
    
    # Aplica o vetorizadoe ao conjunto de dados
    dados_vetorizados = vetorizador.transform(df_indexed).select('features','label')
    
    # Se a flag padronizar_dados está como True, então padronizamos os dados colocando-os na mesma escala
    if padronizar_dados == True:
        print("\nPadronizando o conjunto de dados para o intervalo de 0 a 1...")
        
        # Cria o scaler
        scaler = MinMaxScaler(inputCol= 'features', outputCol= 'scaledFeatures')
        
        # Calcula o sumário de estatísticas e gera o padronizador
        global scalerModel
        scalerModel = scaler.fit(dados_vetorizados)
        
        #Padroniza as variáveis para o intervalo[min,max]
        dados_padronizados = scalerModel.transform(dados_vetorizados)
        
        # Gera os dados finais
        dados_finais = dados_padronizados.select('label','scaledFeatures')
        
        #Renomeia as colunas(requerido pelo Spark)
        dados_finais = dados_finais.withColumnRenamed('scaledFeatures','features')
        
        print("\nPadronização Concluída!")

   # Se a flag está como False, então não padronizamos os dados
    else:
        print("\nOs dados não serão padronizados pois a flag padronizar_dados está com o valor False.")
        dados_finais = dados_vetorizados
    
    return dados_finais

> Agora aplicamos o módulo de preparação dos dados.

In [16]:
# Lista de variáveis de entrada(todas menos a target)
variaveis_entrada = dados.columns[:-1]

In [17]:
# Variável Alvo
variavel_saida = dados.columns[-1]

In [18]:
# Aplica a função
dados_finais = func_modulo_prep_dados(dados, variaveis_entrada, variavel_saida)


Aplicando o tratamento de outliers...

A variável age foi tratada para assimetria positiva (direita) com skew = 3.2644145354168086

Padronizando o conjunto de dados para o intervalo de 0 a 1...

Padronização Concluída!


In [19]:
# Visualiza
dados_finais.show(10, truncate=False)

+-----+------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                      |
+-----+------------------------------------------------------------------------------------------------------------------------------+
|79.99|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.6947674418604651,0.20572002007024587,0.07417582417582418]               |
|61.89|[1.0,0.0,0.0,0.3210862619808307,0.07763975155279502,0.7383720930232558,0.20572002007024587,0.07417582417582418]               |
|40.27|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,0.739010989010989]                    |
|41.05|[0.526255707762557,0.3964941569282137,0.0,0.8482428115015974,0.0,0.3808139534883721,0.0,1.0]                                  |
|44.3 |[0.22054794520547943,0.3683917640511965,0.0,0.56

## Verificando a Correlação

Vamos nos certificar de que não temos multicolinearidade antes de prosseguirmos. Lembre-se das seguintes diretrizes para o Coeficiente de Correlação de Pearson:

- .00-.19 (correlação muito fraca)
- .20-.39 (correlação fraca)
- .40-.59 (correlação moderada)
- .60-.79 (correlação forte)
- .80-1.0 (correlação muito forte)

In [20]:
# Extrai a correlação
coeficientes_corr = Correlation.corr(dados_finais, 'features','pearson').collect()[0][0]

[Stage 52:>                                                         (0 + 1) / 1]                                                                                

In [21]:
# Converte o resultado em array
array_corr = coeficientes_corr.toArray()

In [22]:
array_corr

array([[ 1.        , -0.27521591, -0.39746734, -0.08158675,  0.09238617,
        -0.10934899, -0.22271785,  0.08194602],
       [-0.27521591,  1.        , -0.3235799 ,  0.10725203,  0.04327042,
        -0.28399861, -0.28160267, -0.04424602],
       [-0.39746734, -0.3235799 ,  1.        , -0.25698402,  0.37750315,
        -0.00996083,  0.07910849, -0.15437052],
       [-0.08158675,  0.10725203, -0.25698402,  1.        , -0.65753291,
        -0.1822936 , -0.45066117,  0.27761822],
       [ 0.09238617,  0.04327042,  0.37750315, -0.65753291,  1.        ,
        -0.26599915,  0.22269123, -0.19270003],
       [-0.10934899, -0.28399861, -0.00996083, -0.1822936 , -0.26599915,
         1.        , -0.17848096, -0.00301588],
       [-0.22271785, -0.28160267,  0.07910849, -0.45066117,  0.22269123,
        -0.17848096,  1.        , -0.1560947 ],
       [ 0.08194602, -0.04424602, -0.15437052,  0.27761822, -0.19270003,
        -0.00301588, -0.1560947 ,  1.        ]])

In [23]:
# Lista a correlação entre os atributos e a variável alvo
for item in array_corr:
    print(item[7])

0.08194602387182176
-0.044246019304454175
-0.15437051606792915
0.27761822152100296
-0.19270002804347258
-0.0030158803467436645
-0.15609470264758615
1.0


## Divisão em Dados de Treino e Teste

In [24]:
# Divisão com proporção 70/30
dados_treino, dados_teste = dados_finais.randomSplit([0.7,0.3])

## Módulo de AutoML (Automated Machine Learning)

https://spark.apache.org/docs/latest/ml-classification-regression.html#regression

Vamos criar uma função para automatizar o uso de diversos algoritmos. Nossa função irá criar, treinar e avaliar cada um deles com diferentes combinações de hiperparâmetros. E então escolheremos o modelo de melhor performance.

In [25]:
# Módulo de Machine Learning
def func_modulo_ml(algoritmo_regressao):

    # Função para obter o tipo do algoritmo de regressão e criar a instância do objeto
    # Usaremos isso para automatizar nosso processo
    def func_tipo_algo(algo_regressao):
        algoritmo = algo_regressao
        tipo_algo = type(algoritmo).__name__
        return tipo_algo
    
    # Aplica a função anterior
    tipo_algo = func_tipo_algo(algoritmo_regressao)

    # Se o algoritmo for Regressão Linear, entramos neste bloco if
    if tipo_algo == "LinearRegression":
        
        # Treinamos a primeira versão do modelo sem validação cruzada
        modelo = regressor.fit(dados_treino)
        
        # Métricas do modelo
        print('\033[1m' + "Modelo de Regressão Linear Sem Validação Cruzada:" + '\033[0m')
        print("")
        
        # Avalia o modelo com dados de teste
        resultado_teste = modelo.evaluate(dados_teste)

        # Imprime as métricas de erro do modelo com dados de teste
        print("RMSE em Teste: {}".format(resultado_teste.rootMeanSquaredError))
        print("Coeficiente R2 em Teste: {}".format(resultado_teste.r2))
        print("")
        
        # Agora vamos criar a segunda versão do modelo com mesmo algoritmo, mas usando validação cruzada
        
        # Prepara o grid de hiperparâmetros
        paramGrid = (ParamGridBuilder().addGrid(regressor.regParam, [0.1, 0.01]).build())
        
        # Cria os avaliadores
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Cria o Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        print('\033[1m' + "Modelo de Regressão Linear Com Validação Cruzada:" + '\033[0m')
        print("")
        
        # Treina o modelo com validação cruzada
        modelo = crossval.fit(dados_treino)
        
        # Salva o melhor modelo da versão 2
        global LR_BestModel 
        LR_BestModel = modelo.bestModel
                
        # Previsões com dados de teste
        previsoes = LR_BestModel.transform(dados_teste)
        
        # Avaliação do melhor modelo
        resultado_teste_rmse = eval_rmse.evaluate(previsoes)
        print('RMSE em Teste:', resultado_teste_rmse)
        
        resultado_teste_r2 = eval_r2.evaluate(previsoes)
        print('Coeficiente R2 em Teste:', resultado_teste_r2)
        print("")
    
        # Lista de colunas para colocar no dataframe de resumo
        columns = ['Regressor', 'Resultado_RMSE', 'Resultado_R2']
        
        # Formata os resultados e cria o dataframe
        
        # Formata as métricas e nome do algoritmo
        rmse_str = [str(resultado_teste_rmse)] 
        r2_str = [str(resultado_teste_r2)] 
        tipo_algo = [tipo_algo] 
        
        # Cria o dataframne
        df_resultado = spark.createDataFrame(zip(tipo_algo, rmse_str, r2_str), schema = columns)
        
        # Grava os resultados no dataframe
        df_resultado = df_resultado.withColumn('Resultado_RMSE', df_resultado.Resultado_RMSE.substr(0, 5))
        df_resultado = df_resultado.withColumn('Resultado_R2', df_resultado.Resultado_R2.substr(0, 5))
        
        return df_resultado

    else:
        
        # Verificamos se o algoritmo é o Decision Tree e criamos o grid de hiperparâmetros
        if tipo_algo in("DecisionTreeRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.maxBins, [10, 20, 40]).build())

        # Verificamos se o algoritmo é o Random Forest e criamos o grid de hiperparâmetros
        if tipo_algo in("RandomForestRegressor"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.numTrees, [5, 20]).build())

        # Verificamos se o algoritmo é o GBT e criamos o grid de hiperparâmetros
        if tipo_algo in("GBTRegressor"):
            paramGrid = (ParamGridBuilder() \
                         .addGrid(regressor.maxBins, [10, 20]) \
                         .addGrid(regressor.maxIter, [10, 15])
                         .build())
            
        # Verificamos se o algoritmo é o Isotonic 
        if tipo_algo in("IsotonicRegression"):
            paramGrid = (ParamGridBuilder().addGrid(regressor.isotonic, [True, False]).build())

        # Cria os avaliadores
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        eval_r2 = RegressionEvaluator(metricName = "r2")
        
        # Prepara o Cross Validator
        crossval = CrossValidator(estimator = regressor,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = eval_rmse,
                                  numFolds = 3) 
        
        # Treina o modelo usando validação cruzada
        modelo = crossval.fit(dados_treino)
        
        # Extrai o melhor modelo
        BestModel = modelo.bestModel

        # Resumo de cada modelo
        
        # Métricas do modelo
        if tipo_algo in("DecisionTreeRegressor"):
            
            # Variável global
            global DT_BestModel 
            DT_BestModel = modelo.bestModel
            
            # Previsões com dados de teste
            previsoes_DT = DT_BestModel.transform(dados_teste)
            
            print('\033[1m' + "Modelo Decision Tree Com Validação Cruzada:" + '\033[0m')
            print(" ")
            
            # Avaliação do modelo
            resultado_teste_rmse = eval_rmse.evaluate(previsoes_DT)
            print('RMSE em Teste:', resultado_teste_rmse)
        
            resultado_teste_r2 = eval_r2.evaluate(previsoes_DT)
            print('Coeficiente R2 em Teste:', resultado_teste_r2)
            print("")
        
        # Métricas do modelo
        if tipo_algo in("RandomForestRegressor"):
            
            # Variável global
            global RF_BestModel 
            RF_BestModel = modelo.bestModel
            
            # Previsões com dados de teste
            previsoes_RF = RF_BestModel.transform(dados_teste)
            
            print('\033[1m' + "Modelo RandomForest Com Validação Cruzada:" + '\033[0m')
            print(" ")
            
            # Avaliação do modelo
            resultado_teste_rmse = eval_rmse.evaluate(previsoes_RF)
            print('RMSE em Teste:', resultado_teste_rmse)
        
            resultado_teste_r2 = eval_r2.evaluate(previsoes_RF)
            print('Coeficiente R2 em Teste:', resultado_teste_r2)
            print("")
        
        # Métricas do modelo
        if tipo_algo in("GBTRegressor"):

            # Variável global
            global GBT_BestModel 
            GBT_BestModel = modelo.bestModel
            
            # Previsões com dados de teste
            previsoes_GBT = GBT_BestModel.transform(dados_teste)
            
            print('\033[1m' + "Modelo Gradient-Boosted Tree (GBT) Com Validação Cruzada:" + '\033[0m')
            print(" ")
            
            # Avaliação do modelo
            resultado_teste_rmse = eval_rmse.evaluate(previsoes_GBT)
            print('RMSE em Teste:', resultado_teste_rmse)
        
            resultado_teste_r2 = eval_r2.evaluate(previsoes_GBT)
            print('Coeficiente R2 em Teste:', resultado_teste_r2)
            print("")
            
        # Métricas do modelo
        if tipo_algo in("IsotonicRegression"):

            # Variável global
            global ISO_BestModel 
            ISO_BestModel = modelo.bestModel
            
            # Previsões com dados de teste
            previsoes_ISO = ISO_BestModel.transform(dados_teste)
            
            print('\033[1m' + "Modelo Isotonic Com Validação Cruzada:" + '\033[0m')
            print(" ")
            
            # Avaliação do modelo
            resultado_teste_rmse = eval_rmse.evaluate(previsoes_ISO)
            print('RMSE em Teste:', resultado_teste_rmse)
        
            resultado_teste_r2 = eval_r2.evaluate(previsoes_ISO)
            print('Coeficiente R2 em Teste:', resultado_teste_r2)
            print("")
                    
        # Lista de colunas para colocar no dataframe de resumo
        columns = ['Regressor', 'Resultado_RMSE', 'Resultado_R2']
        
        # Faz previsões com dados de teste
        previsoes = modelo.transform(dados_teste)
        
        # Avalia o modelo para gravar o resultado
        eval_rmse = RegressionEvaluator(metricName = "rmse")
        rmse = eval_rmse.evaluate(previsoes)
        rmse_str = [str(rmse)]
        
        eval_r2 = RegressionEvaluator(metricName = "r2")
        r2 = eval_r2.evaluate(previsoes)
        r2_str = [str(r2)]
         
        tipo_algo = [tipo_algo] 
        
        # Cria o dataframe
        df_resultado = spark.createDataFrame(zip(tipo_algo, rmse_str, r2_str), schema = columns)
        
        # Grava o resultado no dataframe
        df_resultado = df_resultado.withColumn('Resultado_RMSE', df_resultado.Resultado_RMSE.substr(0, 5))
        df_resultado = df_resultado.withColumn('Resultado_R2', df_resultado.Resultado_R2.substr(0, 5))
        
        return df_resultado

> Agora executamos o módulo de Machine Learning.

In [26]:
# Lista de algoritmos
regressores = [LinearRegression(),
               DecisionTreeRegressor(),
               RandomForestRegressor(),
               GBTRegressor(),
               IsotonicRegression()] 

In [27]:
# Lista de colunas e valores
colunas = ['Regressor', 'Resultado_RMSE', 'Resultado_R2']
valores = [("N/A", "N/A", "N/A")]

In [28]:
# Prepara a tabela de resumo
df_resultados_treinamento = spark.createDataFrame(valores, colunas)

In [29]:
# Loop de treinamento
for regressor in regressores:
    
    # Para cada regressor obtém o resultado
    resultado_modelo = func_modulo_ml(regressor)
    
    # Grava os resultados
    df_resultados_treinamento = df_resultados_treinamento.union(resultado_modelo)

[1mModelo de Regressão Linear Sem Validação Cruzada:[0m

RMSE em Teste: 10.638716332789414
Coeficiente R2 em Teste: 0.5996076456770542

[1mModelo de Regressão Linear Com Validação Cruzada:[0m

RMSE em Teste: 10.642212240017546
Coeficiente R2 em Teste: 0.5993444627027547

[1mModelo Decision Tree Com Validação Cruzada:[0m
 
RMSE em Teste: 8.519208550017536
Coeficiente R2 em Teste: 0.7432527401284419





[1mModelo RandomForest Com Validação Cruzada:[0m
 
RMSE em Teste: 7.54231110538922
Coeficiente R2 em Teste: 0.7987591226907497

[1mModelo Gradient-Boosted Tree (GBT) Com Validação Cruzada:[0m
 
RMSE em Teste: 6.554795407372699
Coeficiente R2 em Teste: 0.8480062958690837

[1mModelo Isotonic Com Validação Cruzada:[0m
 
RMSE em Teste: 14.146547862028632
Coeficiente R2 em Teste: 0.29204074424230897



In [30]:
# Retorna as linhas diferentes de N/A
df_resultados_treinamento = df_resultados_treinamento.where("Regressor!='N/A'")

In [31]:
# Imprime
df_resultados_treinamento.show(10, False)

+---------------------+--------------+------------+
|Regressor            |Resultado_RMSE|Resultado_R2|
+---------------------+--------------+------------+
|LinearRegression     |10.64         |0.599       |
|DecisionTreeRegressor|8.519         |0.743       |
|RandomForestRegressor|7.542         |0.798       |
|GBTRegressor         |6.554         |0.848       |
|IsotonicRegression   |14.14         |0.292       |
+---------------------+--------------+------------+



 > O modelo GBT apresentou a melhor performance geral e será usado em produção.

## Fazendo Previsões com o Modelo Treinado

Para fazer as previsões com o modelo treinado, vamos preparar um registro com novos dados.

- Cement: 540
- Blast Furnace Slag: 0
- Fly Ash: 0
- Water: 162
- Superplasticizer: 2.5
- Coarse Aggregate: 1040
- Fine Aggregate: 676
- Age: 28

In [33]:
# Lista dos valores de entrada
values = [(540,0.0,0.0,162,2.5,1040,676,28)]

In [34]:
# Nomes das colunas
column_names = dados.columns
column_names = column_names[0:8]

In [35]:
# Associa valores aos nomes de coluna
novos_dados = spark.createDataFrame(values, column_names)

In [38]:
# Aplicamos na coluna age a mesma transformação aplicada na preparação dos dados.
novos_dados = novos_dados.withColumn("age", log("age") +1)

In [36]:
# Lista de atributos
lista_atributos = ["cement",
                   "slag",
                   "flyash",
                   "water",
                   "superplasticizer",
                   "coarseaggregate",
                   "fineaggregate",
                   "age"]

In [39]:
# Cria o vetorizador
assembler = VectorAssembler(inputCols = lista_atributos, outputCol = 'features')

In [40]:
# Transforma os dados em vetor
novos_dados = assembler.transform(novos_dados).select('features')

In [41]:
# Padroniza os dados (mesma transformação aplicada aos dados de treino)
novos_dados_scaled = scalerModel.transform(novos_dados)

In [42]:
# Seleciona a coluna resultante
novos_dados_final = novos_dados_scaled.select('scaledFeatures')

In [43]:
# Renomeia a coluna (requerimento do MLlib)
novos_dados_final = novos_dados_final.withColumnRenamed('scaledFeatures','features')

In [44]:
# Previsões com novos dados usando o modelo de melhor performance
previsoes_novos_dados = GBT_BestModel.transform(novos_dados_final)

In [45]:
# Resultado
previsoes_novos_dados.show()

+--------------------+-----------------+
|            features|       prediction|
+--------------------+-----------------+
|[1.0,0.0,0.0,0.32...|46.02643612888154|
+--------------------+-----------------+



# Fim