In [None]:
# PARA TRABALHAR COM O GOOGLE COLAB É NECESSÁRIO:
# !apt-get update -qq
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
# !tar xf spark-3.1.2-bin-hadoop2.7.tgz
# !pip install -q findspark

import os
import findspark

os.environ["SPARK_HOME"] = "D:/spark-3.5.5-bin-hadoop3"
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

import pandas as pd

### Obs: para este projeto estamos trabalhando com o google colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')
SPARK = spark.read.json('/content/drive/ml/imoveis.json')

In [None]:
# TRANSFORMAR COLUNAS MULTICATEGORICAS STRING PARA BINÁRIOS COM PIVOT (DUMMIES)
SPARK.groupBy('ID_UNICO'.pivot('COLUNA_2')).agg.(f.lit(1)).na.fill(0)

In [None]:
# PARA UTILIZAR ML COM SPARK, NECESSITAMOS VETORIZAR O DATAFRAME
from pyspark.ml.feature import VectorAssembler

## Importante: Para o modelo de regressão é esperado que o valor a ser previsto tenha o nome "label"

In [None]:
SPARK = SPARK.withColumnRenamed('COLUNA_2', 'label')

In [None]:
# VAMOS SEPARAR EM VETORES VARIAVEIS EXPLICATIVAS
X = ['COLUNA_5',
    'COLUNA_6',
    'COLUNA_7',
    'COLUNA_8',
    'COLUNA_9']

In [None]:
# A VETORIZAÇÃO É FEITA PELO OBJETO VETOR E AS FUNÇÕIES DO SPARK ESPERAM RECEBER O OUTPUT CO0M NOME 'features'
ASSEMBLER = VectorAssembler(inputCols=X, outputCol='features')

DF_PREP = ASSEMBLER.transform(SPARK).select('features', 'label')

In [None]:
from pyspark.ml.stat import Correlation

# TRABALHANDO APENAS COM O METODO PEARSON (MATRIZES DENSAS)
CORRELACAO = Correlation.corr(DF_PREP, 'features').collect()[0][0]

In [None]:
# TRANSFORMANDO EM VETOR PARA TRABALHAR DE FORMA MAIS FACIL
# CRIANDO MATRIZ DE CORRELAÇÃO EM UM DATAFRAME DO PANDAS
DF_CORRELACAO = pd.DataFrame(CORRELACAO.toArray(), columns=X, index=X)

In [None]:
# PARA FACILITAR A VISUALIZAÇÃO DOS DADOS (PRINCIPALMENTE DE SUA GRANDEZA)
# VAMOS CRIAR UM HEATMAP
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(12, 10))
paleta = sns.color_palette("light:salmon", as_cmap=True)
sns.heatmap(DF_CORRELACAO.round(1), annot=True, cmap=paleta)

In [None]:
from pyspark.ml.regression import LinearRegression

# DEFININDO AS VERIAVEIS DE TREINO E TESTE PARA 0.7/0.3 E DEIXANDO A ALEATORIEDADE COM UM SEED MAIS ESPECIFICO,
# GARANTINDO QUE TEREMOS O MESMO RESULTADO SEMPRE QUE RODARMOS
TREINO, TESTE = DF_PREP.randomSplit([0.7, 0.3], seed=101)

# REGRESSÃO UTILIZANDO O MÉTODO REGRESSÃO LINEAR

In [None]:
lr = LinearRegression()

MODELO_LR = lr.fit(TREINO)
PREVISAO_LR_TREINO = MODELO_LR.transform(TREINO)


In [None]:
RESUMO = MODELO_LR.summary

# AGORA TEMOS QUE ANALISAR O AJUSTE DO MODELO AOS DADOS (R2)
AJUSTE = RESUMO.r2

# E TAMBÉM O ERRO ASSOCIADO (RMSE) - QUANTO MENOR ESTA METRICA, MELHOR
ERRO = RESUMO.rootMeanSquaredError

In [None]:
# AGORA AVALIAREMOS (EVALUATE) COM O TESTE
RESUMO_TESTE = MODELO_LR.evaluate(TESTE)

AJUSTE_TESTE = RESUMO_TESTE.r2
ERRO_TESTE = RESUMO_TESTE.rootMeanSquaredError

# REGRESSÃO UTILIZANDO O MÉTODO DE ARVORE DE DECISÃO

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

# ALTURA ARBITRARIA PARA ARVORE "maxDepth"
ARVORE = DecisionTreeRegressor(seed=101, maxDepth=7)

MODELO_ARVORE = ARVORE.fit(TREINO)
PREVISAO_ARVORE_TREINO = MODELO_ARVORE.transform(TREINO)
PREVISAO_ARVORE_TREINO.show()

In [None]:
# AGORA PRECISAMOS AVALIAR O MODELO
# DESSA VEZ VAMOS UTILIZAR UM OBJ DA LIB DO PYSPARK
from pyspark.ml.evaluation import RegressionEvaluator

EVALUATOR = RegressionEvaluator()
print(EVALUATOR.evaluate(PREVISAO_ARVORE_TREINO, {EVALUATOR.merticName: "r2"}))
print(EVALUATOR.evaluate(PREVISAO_ARVORE_TREINO, {EVALUATOR.merticName: "rmse"}))

In [None]:
# AVALIANDO O TESTE PARA A ARVORE
PREVISAO_ARVORE_TESTE = MODELO_ARVORE.transform(TESTE)

print(EVALUATOR.evaluate(PREVISAO_ARVORE_TESTE, {EVALUATOR.merticName: "r2"}))
print(EVALUATOR.evaluate(PREVISAO_ARVORE_TESTE, {EVALUATOR.merticName: "rmse"}))

# REGRESSÃO UTILIZANDO O MÉTODO RANDOM FOREST

In [None]:
from pyspark.ml.regression import RandomForestRegressor

RFR = RandomForestRegressor(seed=101, maxDepth=7, numTrees=10)
MODELO_RFR = RFR.fit(TREINO)
PREVISAO_RFR_TREINO = MODELO_RFR.transform(TREINO)

In [None]:
print(EVALUATOR.evaluate(PREVISAO_RFR_TREINO, {EVALUATOR.merticName: "r2"}))
print(EVALUATOR.evaluate(PREVISAO_RFR_TREINO, {EVALUATOR.merticName: "rmse"}))

In [None]:
# AVALIANDO TESTES DA RANDOM FOREST REGRESSION
PREVISAO_RFR_TESTE = MODELO_RFR.transform(TESTE)

print(EVALUATOR.evaluate(PREVISAO_RFR_TESTE, {EVALUATOR.merticName: "r2"}))
print(EVALUATOR.evaluate(PREVISAO_RFR_TESTE, {EVALUATOR.merticName: "rmse"}))

# TÉCNICAS DE OTIMIZAÇÃO: CROSS VALIDATION

In [None]:
# PARA O MÉTODO DE ARVORE DE DECISÃO

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

#########################################################################
###       O CROSS VALIDATOR NOS PERMITE FAZER DIVERSAS SEPARAÇÕES     ###
###       DE TREINO/TESTE PARA AVALIAR/TREINAR MELHOR O MODELO        ###
###      E O PARAMGRIDBUILDER É RESPONSAVEL NOS DEIXA AVALIAR MAIS    ###
###    PARAMETROS DE UMA VEZ PARA ENTENDERMOS QUAL O MELHOR PARAMETRO ###
###                         PARA OS MODELOS                           ###
#########################################################################

DTR = DecisionTreeRegressor()
EVALUATOR = RegressionEvaluator()

GRID = ParamGridBuilder().addGrid(DTR.maxDepth, [2, 5, 10]).addGrid(DTR.maxBins, [10, 32, 45]).build()
DTR_CV = CrossValidator(estimator=DTR, estimatorParamMaps=GRID, evaluator=EVALUATOR, numFolds=3, seed=101) 

In [None]:
MODELO_DTR_CV = DTR_CV.fit(TREINO)
PREVISOES_DTR_CV_TESTE = MODELO_DTR_CV.transform(TESTE)

print(EVALUATOR.evaluate(PREVISOES_DTR_CV_TESTE, {EVALUATOR.merticName: "r2"}))
print(EVALUATOR.evaluate(PREVISOES_DTR_CV_TESTE, {EVALUATOR.merticName: "rmse"}))