<a href="https://colab.research.google.com/github/ALXAVIER-DEV/Ciencia_de_Dados/blob/main/Spark_regressao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master('local[*]')\
    .appName("Regressao com Spark")\
    .getOrCreate()

spark

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
dados = spark.read.json('/content/drive/MyDrive/Arquivos/imoveis.json')

In [None]:
dados

In [None]:
dados.show(truncate=False)

In [None]:
dados.count()

In [None]:
dados.printSchema()

In [None]:
from os import truncate
dados\
    .select('ident.customerID', 'listing.*') \
    .show(truncate=False)

In [None]:
dados\
    .select("ident.customerID", "listing.types.*", "listing.features.*", "listing.address.*", "listing.prices.price", "listing.prices.tax.*")\
    .show(truncate=False)

In [None]:
dados\
    .select("ident.customerID", "listing.types.*", "listing.features.*", "listing.address.*", "listing.prices.price", "listing.prices.tax.*")\
    .drop("city", "location", "totalAreas")\
    .show(truncate=False)


In [None]:
dataset = dados\
    .select("ident.customerID", "listing.types.*", "listing.features.*", "listing.address.*", "listing.prices.price", "listing.prices.tax.*")\
    .drop("city", "location", "totalAreas")\
    #.show(truncate=False)


In [None]:
dataset.printSchema()


In [None]:
from pyspark.sql.types import IntegerType, DoubleType

In [None]:
dataset\
    .withColumn('usableAreas', dataset['usableAreas'].cast(IntegerType()))\
    .withColumn('price', dataset['price'].cast(DoubleType()))\
    .withColumn('condo', dataset['condo'].cast(DoubleType()))\
    .withColumn('iptu', dataset['iptu'].cast(DoubleType()))\
    .printSchema()

In [None]:
dataset = dataset\
    .withColumn('usableAreas', dataset['usableAreas'].cast(IntegerType()))\
    .withColumn('price', dataset['price'].cast(DoubleType()))\
    .withColumn('condo', dataset['condo'].cast(DoubleType()))\
    .withColumn('iptu', dataset['iptu'].cast(DoubleType()))

In [None]:
dataset.show()

In [None]:
dataset\
    .select('usage')\
    .groupBy('usage')\
    .count()\
    .show()

In [None]:
dataset\
    .select('*')\
    .where('usage=="Residencial"')

In [None]:
dataset\
    .select('unit')\
    .groupBy('unit')\
    .count()\
    .show()

In [None]:
dataset\
    .select('zone')\
    .groupBy('zone')\
    .count()\
    .show()

In [None]:
from pyspark.sql import functions as f

In [None]:
dataset\
    .select([f.count(f.when(f.isnan(c) | f.isnull(c), True)).alias(c) for c in dataset.columns])\
    .show()

In [None]:
dataset\
    .select('*')\
    .na\
    .fill(0)\
    .show()

In [None]:
dataset = dataset\
    .select('*')\
    .na\
    .fill(0)

In [None]:
dataset\
    .select([f.count(f.when(f.isnan(c) | f.isnull(c), True)).alias(c) for c in dataset.columns])\
    .show()

In [None]:
dataset\
    .select('zone')\
    .groupBy('zone')\
    .count()\
    .show()


In [None]:
dataset = dataset\
    .where(f.col('zone') != '')

In [None]:
dataset.show()

###Preparando os Dados


##Variáveis Dummy

In [None]:
dataset\
    .groupBy('customerID')\
    .pivot('unit')\
    .agg(f.lit(1))\
    .na\
    .fill(0)\
    .show()

In [None]:
unit = dataset\
    .groupBy('customerID')\
    .pivot('unit')\
    .agg(f.lit(1))\
    .na\
    .fill(0)

In [None]:
dataset\
    .groupBy('customerID')\
    .pivot('zone')\
    .agg(f.lit(1))\
    .na\
    .fill(0)\
    .show()

In [None]:
zone = dataset\
    .groupBy('customerID')\
    .pivot('zone')\
    .agg(f.lit(1))\
    .na\
    .fill(0)

In [None]:
dataset = dataset\
    .join(unit, 'customerID', how='inner')\
    .join(zone, 'customerID', how='inner')

In [None]:
dataset.show()

##Vetoriacao

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
dataset = dataset.withColumnRenamed('price','label')

In [None]:
X = [
    'bathrooms',
    'bedrooms',
    'floors',
    'parkingSpaces',
    'suites',
    'unitFloor',
    'unitsOnTheFloor',
    'usableAreas',
    'condo',
    'iptu',
    'Apartamento',
    'Casa',
    'Outros',
    'Zona Central',
    'Zona Norte',
    'Zona Oeste',
    'Zona Sul'
]

In [None]:
assembler = VectorAssembler(inputCols=X, outputCol= 'features')
assembler

In [None]:
dataset.show()

In [None]:
dataset_prep = assembler.transform(dataset).select('features', 'label')

In [None]:
dataset_prep.show(10, truncate=False)

In [None]:
from pyspark.ml.stat import Correlation
import pandas as pd


In [None]:
correlacao = Correlation.corr(dataset_prep, 'features').collect()

In [None]:
correlacao

In [None]:
correlacao = Correlation.corr(dataset_prep, 'features').collect()[0][0]

In [None]:
correlacao

In [None]:
correlacao.toArray()

In [None]:
dataframe_correlacao = pd.DataFrame(correlacao.toArray(), columns=X, index=X)

In [None]:
dataframe_correlacao

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
plt.figure(figsize=(12,10))
paleta = sns.color_palette("light:salmon", as_cmap=True)
sns.heatmap(dataframe_correlacao.round(1), annot=True, cmap=paleta)

## Ajuste e Previsão

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
treino, teste = dataset_prep.randomSplit([0.7, 0.3], seed=101)

In [None]:
treino.count()

In [None]:
teste.count()

In [None]:
lr = LinearRegression()

In [None]:
modelo_lr = lr.fit(treino)

In [None]:
previsoes_lr_treino = modelo_lr.transform(treino)

In [None]:
previsoes_lr_treino.show()

##Métricas

In [None]:
resumo_treino = modelo_lr.summary

In [None]:
resumo_treino.r2

In [None]:
resumo_treino.rootMeanSquaredError

In [None]:
resumo_teste = modelo_lr.evaluate(teste)

In [None]:
resumo_teste.r2

In [None]:
resumo_teste.rootMeanSquaredError

###Tabela Resumo Regressão Linear

In [None]:
print('Linear Regression')
print("")
print("="*30)
print("Dados de Treino")
print("="*30)
print("R²: %f" % resumo_treino.r2)
print("RMSE: %f" % resumo_treino.rootMeanSquaredError)
print("")
print("="*30)
print("Dados de Teste")
print("="*30)
print("R²: %f" % resumo_teste.r2)
print("RMSE: %f" % resumo_teste.rootMeanSquaredError)


##Aula - 3 Decision Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

In [None]:
dtr = DecisionTreeRegressor(seed=101, maxDepth=7)

In [None]:
modelo_dtr = dtr.fit(treino)

In [None]:
previsoes_dtr_treino = modelo_dtr.transform(treino)

In [None]:
previsoes_dtr_treino.show()

##Métricas

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:
evaluator = RegressionEvaluator()

print(evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "r2"}))
print(evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "rmse"}))


In [None]:
previsoes_dtr_teste = modelo_dtr.transform(teste)

In [None]:
previsoes_dtr_teste.show()

In [None]:
print('Decision Tree Regression')
print("="*30)
print("Dados de Treino")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_treino, {evaluator.metricName: "rmse"}))
print("")
print("="*30)
print("Dados de Teste")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "rmse"}))


## Randon Forest Refgressor

In [None]:
from pyspark.ml.regression import RandomForestRegressor

In [None]:
rfr = RandomForestRegressor(seed=101, maxDepth=7, numTrees=10)

In [None]:
modelo_rfr = rfr.fit(treino)

In [None]:
previsoes_rfr_treino = modelo_rfr.transform(treino)

In [None]:
previsoes_rfr_treino.show()

###Métricas

In [None]:
print(evaluator.evaluate(previsoes_rfr_treino, {evaluator.metricName: "r2"}))
print(evaluator.evaluate(previsoes_rfr_treino, {evaluator.metricName: "rmse"}))

In [None]:
previsoes_rfr_teste = modelo_rfr.transform(teste)

In [None]:
previsoes_rfr_teste.show()

In [None]:
print('Random Forest Regression')
print("="*30)
print("Dados de Treino")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_rfr_treino, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_rfr_treino, {evaluator.metricName: "rmse"}))
print("")
print("="*30)
print("Dados de Teste")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_rfr_teste, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_rfr_teste, {evaluator.metricName: "rmse"}))


###Árvore de Decisão com Cross Validation

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
dtr = DecisionTreeRegressor()

In [None]:
grid = ParamGridBuilder() \
    .addGrid(dtr.maxDepth, [2,5,10]) \
    .addGrid(dtr.maxBins, [10, 32, 45]) \
    .build()

In [None]:
evaluator = RegressionEvaluator()

In [None]:
dtr_cv = CrossValidator(
    estimator=dtr,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3,
    seed = 101

)

In [None]:
modelo_dtr_cv = dtr_cv.fit(treino)

In [None]:
previsoes_dtr_cv_teste = modelo_dtr_cv.transform(teste)

In [None]:
print('Decision Tree Regression')
print("="*30)
print("Sem Cross Validation")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_teste, {evaluator.metricName: "rmse"}))
print("")
print("="*30)
print("Com Cross Validation")
print("="*30)
print("R²: %f" % evaluator.evaluate(previsoes_dtr_cv_teste, {evaluator.metricName: "r2"}))
print("RMSE: %f" % evaluator.evaluate(previsoes_dtr_cv_teste, {evaluator.metricName: "rmse"}))
