In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m767.4 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=cb98e1e414a3e2c28dcde4c953813a960914596f390b2c050030b2de66c8ee22
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master('local[*]') \
  .appName("2° Semana") \
  .getOrCreate()
spark

# Leitura dos Dados

In [None]:
dataset = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/ChallengeDataScience2°Ed/data/parquet")

In [None]:
dataset.show(10, True)

+--------------------+-----+----------+---------+---------+------------------+-------+------+------------+------------+-----------+----+--------------------+------------+----------+------+-----+--------+
|                  id|andar|area_total|area_util|banheiros|   caracteristicas|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|              bairro|        zona|condominio|  iptu| tipo|   valor|
+--------------------+-----+----------+---------+---------+------------------+-------+------+------------+------------+-----------+----+--------------------+------------+----------+------+-----+--------+
|dde8b6b7-111e-4ad...|    0|        10|       10|        0|              NULL|   NULL|  NULL|       Usado|      Outros|  Comercial|   1|              Centro|Zona Central|      NULL|  NULL|Venda| 20000.0|
|03a386b6-7ab8-4ef...|    0|        43|       43|        0|     Churrasqueira|      2|  NULL|       Usado| Apartamento|Residencial|   1|            Realengo|  Zona Oeste|     285.0|  N

# Tratamento dos Dados

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.types import DoubleType

In [None]:
# Remoção de colunas
dataset = dataset.drop("tipo")
dataset = dataset.drop("bairro")

In [None]:
# Contagem de Nulos "area_total"
dataset.select(f.isnull("area_total").alias("null_count")).groupBy("null_count").count().show()

+----------+-----+
|null_count|count|
+----------+-----+
|      true|12573|
|     false|75699|
+----------+-----+



In [None]:
# Contagem de Nulos "area_util"
dataset.select(f.isnull("area_util").alias("null_count")).groupBy("null_count").count().show()

+----------+-----+
|null_count|count|
+----------+-----+
|      true|   13|
|     false|88259|
+----------+-----+



In [None]:
# Conversão de Tipos das colunas
dataset = dataset.withColumns(
  {
    "area_total": f.col("area_total").cast(DoubleType()),
    "area_util": f.col("area_util").cast(DoubleType())
  }
)

In [None]:
# Preenchendo os dados Nulos com 0
dataset = dataset.fillna(0)

In [None]:
# Verificação de Nulos
for n in range(0, len(dataset.columns)):
  dataset.select(f.isnull(dataset.columns[n]).alias(dataset.columns[n])).groupBy(dataset.columns[n]).count().show()

+-----+-----+
|   id|count|
+-----+-----+
|false|88272|
+-----+-----+

+-----+-----+
|andar|count|
+-----+-----+
|false|88272|
+-----+-----+

+----------+-----+
|area_total|count|
+----------+-----+
|     false|88272|
+----------+-----+

+---------+-----+
|area_util|count|
+---------+-----+
|    false|88272|
+---------+-----+

+---------+-----+
|banheiros|count|
+---------+-----+
|    false|88272|
+---------+-----+

+---------------+-----+
|caracteristicas|count|
+---------------+-----+
|           true|21526|
|          false|66746|
+---------------+-----+

+-------+-----+
|quartos|count|
+-------+-----+
|  false|88272|
+-------+-----+

+------+-----+
|suites|count|
+------+-----+
| false|88272|
+------+-----+

+------------+-----+
|tipo_anuncio|count|
+------------+-----+
|       false|88272|
+------------+-----+

+------------+-----+
|tipo_unidade|count|
+------------+-----+
|       false|88272|
+------------+-----+

+--------+-----+
|tipo_uso|count|
+--------+-----+
|   false|88272

In [None]:
dataset.show(10)

+--------------------+-----+----------+---------+---------+------------------+-------+------+------------+------------+-----------+----+------------+----------+------+--------+
|                  id|andar|area_total|area_util|banheiros|   caracteristicas|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|        zona|condominio|  iptu|   valor|
+--------------------+-----+----------+---------+---------+------------------+-------+------+------------+------------+-----------+----+------------+----------+------+--------+
|dde8b6b7-111e-4ad...|    0|      10.0|     10.0|        0|              NULL|      0|     0|       Usado|      Outros|  Comercial|   1|Zona Central|       0.0|   0.0| 20000.0|
|03a386b6-7ab8-4ef...|    0|      43.0|     43.0|        0|     Churrasqueira|      2|     0|       Usado| Apartamento|Residencial|   1|  Zona Oeste|     285.0|   0.0| 22999.0|
|2f9ade9b-9ab4-4cc...|    2|      13.0|     13.0|        2|              NULL|      0|     0|       Usado|      Out

## Aplicando variáves Dummy às variáveis Categóricas

In [None]:
# Retorna os dataframes com os nomes das colunas em minúsculo
def lower_columns_names(dataframe):
  map_rename = dict()
  for i in range(0, len(dataframe.columns)):
    map_rename.update({dataframe.columns[i]: dataframe.columns[i].lower()})
  return dataframe.withColumnsRenamed(map_rename)

In [None]:
caracteristicas = dataset.groupBy("id").pivot("caracteristicas").agg(f.lit(1)).na.fill(0)
caracteristicas = caracteristicas.drop("null")
caracteristicas = lower_columns_names(caracteristicas)

In [None]:
tipo_anuncio = dataset.groupBy("id").pivot("tipo_anuncio").agg(f.lit(1)).na.fill(0)
tipo_anuncio = tipo_anuncio.drop("Lançamento")
tipo_anuncio = lower_columns_names(tipo_anuncio)

In [None]:
tipo_unidade = dataset.groupBy("id").pivot("tipo_unidade").agg(f.lit(1)).na.fill(0)
tipo_unidade = tipo_unidade.drop("Outros")
tipo_unidade = lower_columns_names(tipo_unidade)

In [None]:
tipo_uso = dataset.groupBy("id").pivot("tipo_uso").agg(f.lit(1)).na.fill(0)
tipo_uso = tipo_uso.drop("Comercial")
tipo_uso = lower_columns_names(tipo_uso)

In [None]:
zona = dataset.groupBy("id").pivot("zona").agg(f.lit(1)).na.fill(0)
zona = zona.drop("")
zona = lower_columns_names(zona)

In [None]:
treated_dataset = dataset.drop("caracteristicas", "tipo_anuncio", "tipo_unidade", "tipo_uso", "zona")
treated_dataset = treated_dataset.join(caracteristicas, "id", "inner")
treated_dataset = treated_dataset.join(tipo_anuncio, "id", "inner")
treated_dataset = treated_dataset.join(tipo_unidade, "id", "inner")
treated_dataset = treated_dataset.join(tipo_uso, "id", "inner")
treated_dataset = treated_dataset.join(zona, "id", "inner")

In [None]:
treated_dataset.show()

+--------------------+-----+----------+---------+---------+-------+------+----+----------+------+---------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+-----+-----------+----+-----------+------------+----------+----------+--------+
|                  id|andar|area_total|area_util|banheiros|quartos|suites|vaga|condominio|  iptu|    valor|academia|animais permitidos|churrasqueira|condomínio fechado|elevador|piscina|playground|portaria 24h|portão eletrônico|salão de festas|usado|apartamento|casa|residencial|zona central|zona norte|zona oeste|zona sul|
+--------------------+-----+----------+---------+---------+-------+------+----+----------+------+---------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+-----+-----------+----+-----------+------------+----------+----------+--------+
|00012605-9cae-45b...|    0|   

In [None]:
treated_dataset.write.parquet("/content/drive/MyDrive/Colab Notebooks/ChallengeDataScience2°Ed/data/treated_dataset_parquet", mode="overwrite")

# Criação dos modelos de Machine Learning

## Vetorização dos Dados

In [4]:
from pyspark.sql import functions as f
from pyspark.ml.feature import VectorAssembler

In [5]:
dataset = spark.read.parquet("/content/drive/MyDrive/Colab Notebooks/ChallengeDataScience2°Ed/data/treated_dataset_parquet")

In [6]:
features = []
for feature in dataset.columns:
  if feature not in ["id", "valor"]:
    features.append(feature)

In [7]:
vecAssembler = VectorAssembler(inputCols=features, outputCol="features")
dataset = vecAssembler.transform(dataset)

In [8]:
dataset = dataset.withColumn(colName="label", col=dataset.valor)
dataset.show(5, False)

+------------------------------------+-----+----------+---------+---------+-------+------+----+----------+------+---------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+-----+-----------+----+-----------+------------+----------+----------+--------+----------------------------------------------------------------------------------------------------------+---------+
|id                                  |andar|area_total|area_util|banheiros|quartos|suites|vaga|condominio|iptu  |valor    |academia|animais permitidos|churrasqueira|condomínio fechado|elevador|piscina|playground|portaria 24h|portão eletrônico|salão de festas|usado|apartamento|casa|residencial|zona central|zona norte|zona oeste|zona sul|features                                                                                                  |label    |
+------------------------------------+-----+----------+---------+---------+-------+-----

In [9]:
train_dataset = dataset.select("features", "label")

In [10]:
train_dataset.show(5, False)

+----------------------------------------------------------------------------------------------------------+---------+
|features                                                                                                  |label    |
+----------------------------------------------------------------------------------------------------------+---------+
|(27,[0,1,2,3,4,7,8,9,19,20,22,23],[2.0,35.0,35.0,2.0,1.0,100.0,100.0,1.0,1.0,1.0,1.0,1.0])                |245000.0 |
|(27,[0,1,2,3,4,5,6,7,8,13,19,20,22,25],[5.0,169.0,169.0,5.0,4.0,2.0,2.0,998.0,2600.0,1.0,1.0,1.0,1.0,1.0])|955000.0 |
|(27,[0,1,2,3,4,5,6,7,8,14,19,21,22,25],[1.0,360.0,360.0,1.0,4.0,4.0,3.0,800.0,1300.0,1.0,1.0,1.0,1.0,1.0])|3790000.0|
|(27,[0,1,2,3,4,5,6,7,8,9,19,20,22,25],[1.0,82.0,82.0,1.0,2.0,1.0,1.0,736.0,998.0,1.0,1.0,1.0,1.0,1.0])    |280000.0 |
|(27,[1,2,4,5,6,7,8,9,19,20,22,25],[50.0,50.0,2.0,1.0,1.0,504.0,50.0,1.0,1.0,1.0,1.0,1.0])                 |249900.0 |
+-----------------------------------------------

In [11]:
# Separação dos Dados de Treino e Teste
# x = Dados de Treino
# y = Dados de Teste
x, y = train_dataset.randomSplit(weights=[0.7, 0.3], seed=101)
print(f"Tamanho de X: {x.count()}")
print(f"Tamanho de Y: {y.count()}")

Tamanho de X: 61681
Tamanho de Y: 26591


## Criação do modelos de Regressão

### Decision Tree

In [12]:
from pyspark.ml.regression import DecisionTreeRegressor

In [13]:
dtr = DecisionTreeRegressor(seed=101, maxDepth=7)
dtr_model = dtr.fit(x)

In [14]:
dtr_predictions = dtr_model.transform(y)

In [15]:
dtr_predictions.show(10)

+--------------------+--------+------------------+
|            features|   label|        prediction|
+--------------------+--------+------------------+
|(27,[0,1,2,3,4,5,...|440000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|305000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|819000.0| 646291.3934772762|
|(27,[0,1,2,3,4,5,...|390000.0| 646291.3934772762|
|(27,[0,1,2,3,4,5,...|289000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|310000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|270000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|280989.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|407000.0|347688.96192560176|
|(27,[0,1,2,3,4,5,...|634000.0| 646291.3934772762|
+--------------------+--------+------------------+
only showing top 10 rows



### Random Forest

In [16]:
from pyspark.ml.regression import RandomForestRegressor

In [17]:
rfr = RandomForestRegressor(seed=101, maxDepth=7, numTrees=10)
rfr_model = rfr.fit(x)

In [18]:
rfr_predictions = rfr_model.transform(y)

In [19]:
rfr_predictions.show(10)

+--------------------+--------+------------------+
|            features|   label|        prediction|
+--------------------+--------+------------------+
|(27,[0,1,2,3,4,5,...|440000.0| 665950.6445506474|
|(27,[0,1,2,3,4,5,...|305000.0|387578.44432714937|
|(27,[0,1,2,3,4,5,...|819000.0| 567717.5509366416|
|(27,[0,1,2,3,4,5,...|390000.0| 567717.5509366416|
|(27,[0,1,2,3,4,5,...|289000.0|387509.61873282277|
|(27,[0,1,2,3,4,5,...|310000.0| 397809.6825423495|
|(27,[0,1,2,3,4,5,...|270000.0|387578.44432714937|
|(27,[0,1,2,3,4,5,...|280989.0|387509.61873282277|
|(27,[0,1,2,3,4,5,...|407000.0| 397809.6825423495|
|(27,[0,1,2,3,4,5,...|634000.0| 567717.5509366416|
+--------------------+--------+------------------+
only showing top 10 rows



### Métricas dos Modelos

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

In [21]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction")

In [30]:
dtr_metrics = {
  "r2": evaluator.evaluate(dataset=dtr_predictions ,params={evaluator.metricName: "r2"}),
  "rmse": evaluator.evaluate(dataset=dtr_predictions ,params={evaluator.metricName: "rmse"})
}

rfr_metrics = {
  "r2": evaluator.evaluate(dataset=rfr_predictions ,params={evaluator.metricName: "r2"}),
  "rmse": evaluator.evaluate(dataset=rfr_predictions ,params={evaluator.metricName: "rmse"})
}

In [60]:
# Printa as métricas de um modelo de forma organizada
def print_model_metrics(model_name: str, metrics: dict):
  print(f"- {model_name}")
  for key in metrics:
    print(f"{key.upper()}: {metrics[key]}")

In [63]:
print("="*10, "Métricas", "="*10)
print_model_metrics("Decision Tree", dtr_metrics)
print()
print_model_metrics("Random Forest", rfr_metrics)

- Decision Tree
R2: 0.7436994752752304
RMSE: 847992.893927737

- Random Forest
R2: 0.7425610917358508
RMSE: 849874.0286092127


### Gradient-Boosted Tree - Extra



In [65]:
from pyspark.ml.regression import GBTRegressor

In [66]:
gbtr = GBTRegressor(maxDepth=7, maxIter=10, minInstancesPerNode=10, seed=101)
gbtr_model = gbtr.fit(x)

In [68]:
gbtr_predictions = gbtr_model.transform(y)

In [69]:
gbtr_predictions.show(5)

+--------------------+--------+------------------+
|            features|   label|        prediction|
+--------------------+--------+------------------+
|(27,[0,1,2,3,4,5,...|440000.0|326170.36585539946|
|(27,[0,1,2,3,4,5,...|305000.0|346523.48582350236|
|(27,[0,1,2,3,4,5,...|819000.0| 622998.2155701588|
|(27,[0,1,2,3,4,5,...|390000.0| 622998.2155701588|
|(27,[0,1,2,3,4,5,...|289000.0|337612.29466524924|
+--------------------+--------+------------------+
only showing top 5 rows



In [74]:
# Métricas do GBT
gbtr_metrics = {
  "r2": evaluator.evaluate(dataset=gbtr_predictions ,params={evaluator.metricName: "r2"}),
  "rmse": evaluator.evaluate(dataset=gbtr_predictions ,params={evaluator.metricName: "rmse"})
}

print("="*10, "Métricas", "="*10)
print_model_metrics("Gradient-Boosted Tree", gbtr_metrics)

- Gradient-Boosted Tree
R2: 0.7735415054110923
RMSE: 797098.0987791977
