### Importando o Framework PySpark

In [1]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

### Criando  Sessão Spark

In [2]:
spark = SparkSession.builder.appName('Avaliação imóveis - SP').getOrCreate()

### Lendo arquivo CSV

In [62]:
df = spark.read.csv(
    'Preços Imóveis - Sao paulo.csv',
    sep=';',
    header=True
)

In [63]:
# Exibindo o DataFrame
df.show()

+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|Número|             Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|
+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|   255|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  False|
|Rua Aurelia Perez...|    42| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  False|
|      Jardim Morumbi|     0|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  False|
|  Rua Tobias Barreto|   195|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  False|
|    Rua Graham Bell |     0|        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  False|
| Rua Francisco Paulo|    31|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  False|
|         Rua Tapaji |     0

### Ordenando e Filtrando o df 

In [57]:
# Analisando os quartos, de forma descendente(maior para o menor), do bairro Jardim Paulista
df.orderBy(col('Quartos').desc()).filter(col('Bairro') == 'Jardim Paulista').show()

+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|         Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|
+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|Rua Conselheiro T...|Jardim Paulista|São Paulo|420.0|      5|        7|    5|4500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|390.0|      5|        6|    6|5500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|390.0|      5|        6|    6|5500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      5|        5|    8|4990000.0|  false|
|Rua Doutor João P...|Jardim Paulista|São Paulo|457.0|      5|        6|    3|6500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|520.0|      5|        5|    4|3196250.0|  false|
|Alameda Joaquim E...|Jardim Paulista|São Paulo|400.0|      5|        5|    4|6000000.0|  false|
|      Alameda Franca|Jardim P

Usando a linguagem SQL:

In [56]:
# Registrando o DataFrame como uma visualização temporária
df.createOrReplaceTempView("df")

# Executando a consulta SQL
spark.sql("""
    SELECT *
    FROM df
    WHERE Bairro = "Jardim Paulista"
    ORDER BY Quartos DESC
""").show()

+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|         Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|
+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|Rua Conselheiro T...|Jardim Paulista|São Paulo|420.0|      5|        7|    5|4500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|390.0|      5|        6|    6|5500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|390.0|      5|        6|    6|5500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      5|        5|    8|4990000.0|  false|
|Rua Doutor João P...|Jardim Paulista|São Paulo|457.0|      5|        6|    3|6500000.0|  false|
|     Jardim Paulista|Jardim Paulista|São Paulo|520.0|      5|        5|    4|3196250.0|  false|
|Alameda Joaquim E...|Jardim Paulista|São Paulo|400.0|      5|        5|    4|6000000.0|  false|
|      Alameda Franca|Jardim P

### Removendo coluna "Número"

In [5]:
# Eliminando a coluna Número
df = df.drop('Número')

### Verificando e alterando o tipo das colunas 

In [6]:
# Exibindo o esquema do DataFrame
df.printSchema()

root
 |-- Rua: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- Área: string (nullable = true)
 |-- Quartos: string (nullable = true)
 |-- Banheiros: string (nullable = true)
 |-- Vagas: string (nullable = true)
 |-- Preço: string (nullable = true)
 |-- Aluguel: string (nullable = true)



In [7]:
# Realizando a conversão de tipos das colunas no DataFrame
df = df.withColumn(
    'Área', col('Área').cast('float')
).withColumn(
    'Quartos', col('Quartos').cast('int')
).withColumn(
    'Banheiros', col('Banheiros').cast('int')
).withColumn(
    'Vagas', col('Vagas').cast('int')
).withColumn(
    'Preço', col('Preço').cast('float')
).withColumn(
    'Aluguel', col('Aluguel').cast('boolean')
)

In [8]:
# Exibindo o esquema do DataFrame, ápos as adequaçãoes nas colunas
df.printSchema()

root
 |-- Rua: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- Área: float (nullable = true)
 |-- Quartos: integer (nullable = true)
 |-- Banheiros: integer (nullable = true)
 |-- Vagas: integer (nullable = true)
 |-- Preço: float (nullable = true)
 |-- Aluguel: boolean (nullable = true)



In [9]:
# Exibindo o DataFrame
df.show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|             Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false|
|  Rua Tobias Barreto|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  false|
|    Rua Graham Bell |        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  false|
| Rua Francisco Paulo|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  false|
|         Rua Tapaji |        Vila Alpina|São Paulo|114.0|      3|        3|    2| 585000.0

### Pipeline de Codificação de Variáveis Categóricas

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [11]:
# Criando StringIndexer para as colunas 'Bairro' e 'Rua'
rua_indexer = StringIndexer(inputCol="Rua", outputCol="Rua_Index")
bairro_indexer = StringIndexer(inputCol="Bairro", outputCol="Bairro_Index")

# Criando OneHotEncoder para os índices numéricos das colunas 'Bairro' e 'Rua'
rua_encoder = OneHotEncoder(inputCol="Rua_Index", outputCol="Rua_Encoded")
bairro_encoder = OneHotEncoder(inputCol="Bairro_Index", outputCol="Bairro_Encoded")

# Criando um pipeline para encadear os transformadores
pipeline = Pipeline(stages=[bairro_indexer, rua_indexer, bairro_encoder, rua_encoder])

# Aplicando o pipeline aos dados
df_transformed = pipeline.fit(df).transform(df)


In [29]:
df_transformed.show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+------------+---------+-----------------+-------------------+
|                 Rua|             Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|Bairro_Index|Rua_Index|   Bairro_Encoded|        Rua_Encoded|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+------------+---------+-----------------+-------------------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false|         4.0|   1323.0|  (280,[4],[1.0])|(2505,[1323],[1.0])|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false|        85.0|   1488.0| (280,[85],[1.0])|(2505,[1488],[1.0])|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false|        36.0|     32.0| (280,[36],[1.0])|  (2505,[32],[1.0])|
|  Rua Tobias Barreto|      

In [13]:
df_transformed.select(['Rua_Encoded', 'Bairro_Encoded']).show()

+-------------------+-----------------+
|        Rua_Encoded|   Bairro_Encoded|
+-------------------+-----------------+
|(2505,[1323],[1.0])|  (280,[4],[1.0])|
|(2505,[1488],[1.0])| (280,[85],[1.0])|
|  (2505,[32],[1.0])| (280,[36],[1.0])|
|(2505,[1211],[1.0])| (280,[34],[1.0])|
| (2505,[230],[1.0])| (280,[10],[1.0])|
|(2505,[1835],[1.0])|(280,[204],[1.0])|
|(2505,[2375],[1.0])|(280,[152],[1.0])|
|(2505,[2429],[1.0])| (280,[19],[1.0])|
|(2505,[2396],[1.0])|(280,[270],[1.0])|
| (2505,[805],[1.0])| (280,[37],[1.0])|
|(2505,[1984],[1.0])|(280,[236],[1.0])|
|(2505,[1879],[1.0])| (280,[10],[1.0])|
|(2505,[1303],[1.0])| (280,[26],[1.0])|
| (2505,[595],[1.0])| (280,[41],[1.0])|
|(2505,[1947],[1.0])|(280,[114],[1.0])|
| (2505,[456],[1.0])|  (280,[3],[1.0])|
|(2505,[1989],[1.0])| (280,[95],[1.0])|
|(2505,[2349],[1.0])|(280,[126],[1.0])|
|(2505,[1429],[1.0])|(280,[105],[1.0])|
| (2505,[573],[1.0])|(280,[124],[1.0])|
+-------------------+-----------------+
only showing top 20 rows



### Prevendo dados com Machine Learning

In [14]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# Selecionando as colunas relevantes para a regressão e criando a feature vector
assembler = VectorAssembler(
    inputCols=['Rua_Encoded', 'Bairro_Encoded', 'Área','Quartos','Banheiros', 'Vagas'], # colunas relevantes
    outputCol='features')

In [15]:
# Transformando os dados usando o vetor de características
df_analise = assembler.transform(df_transformed)

In [16]:
# Dividindo os dados em conjuntos de treinamento e teste
train, test = df_analise.randomSplit([0.8, 0.2], seed=42)

#### Regressão Linear

In [17]:
# Inicializando o modelo de regressão linear
lr = LinearRegression(featuresCol="features", labelCol="Preço")

# Treinando o modelo com os dados de treinamento
lr_model = lr.fit(train)

In [18]:
# Fazendo previsões nos dados de teste
predictions = lr_model.transform(test)

# Visualizando as previsões
predictions.select("prediction", "Preço", "features").show()

+------------------+---------+--------------------+
|        prediction|    Preço|            features|
+------------------+---------+--------------------+
|3219409.3518080194|3750000.0|(2789,[1282,2548,...|
| 2600073.224621462|2600000.0|(2789,[704,2513,2...|
|2515177.2036139993|2600000.0|(2789,[274,2513,2...|
|3554106.4258927023|6000000.0|(2789,[1285,2513,...|
|1299991.9729953518|1300000.0|(2789,[708,2597,2...|
|4188542.9776866827|3390000.0|(2789,[709,2509,2...|
|2133488.1164470986|1690000.0|(2789,[188,2520,2...|
| 1655920.551840134|1745000.0|(2789,[276,2520,2...|
| 3598254.575193813|3500000.0|(2789,[414,2643,2...|
|1620476.8356334576|1300000.0|(2789,[711,2509,2...|
| 1821287.330127682|1730000.0|(2789,[711,2509,2...|
| 2300029.367929453|2300000.0|(2789,[712,2509,2...|
|1323580.0163435908|1300000.0|(2789,[277,2509,2...|
| 2458916.488860454|3480000.0|(2789,[1290,2548,...|
|1418444.2445708348|1390000.0|(2789,[189,2509,2...|
|1229302.2098989028|1130000.0|(2789,[278,2537,2...|
|1245838.957

In [19]:
# Avaliando o modelo usando o evaluator padrão
evaluator = RegressionEvaluator(labelCol="Preço", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 721441


In [20]:
df_analise.show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+------------+---------+-----------------+-------------------+--------------------+
|                 Rua|             Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|Bairro_Index|Rua_Index|   Bairro_Encoded|        Rua_Encoded|            features|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+------------+---------+-----------------+-------------------+--------------------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false|         4.0|   1323.0|  (280,[4],[1.0])|(2505,[1323],[1.0])|(2789,[1323,2509,...|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false|        85.0|   1488.0| (280,[85],[1.0])|(2505,[1488],[1.0])|(2789,[1488,2590,...|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 68

#### Random Forest Regressor 

In [21]:
from pyspark.ml.regression import RandomForestRegressor

In [22]:
# Inicializando o modelo
rfr = RandomForestRegressor(featuresCol="features", labelCol="Preço")

# Treinando o modelo com os dados de treinamento
rfr_model = rfr.fit(train)

In [23]:
# Fazendo previsões nos dados de teste
predictions_rfr = rfr_model.transform(test)

# Visualizando as previsões
predictions_rfr.select("prediction", "Preço", "features").show()

+------------------+---------+--------------------+
|        prediction|    Preço|            features|
+------------------+---------+--------------------+
|3238508.9360938966|3750000.0|(2789,[1282,2548,...|
|1349428.7712903605|2600000.0|(2789,[704,2513,2...|
|1314717.8216158529|2600000.0|(2789,[274,2513,2...|
| 3346206.213227139|6000000.0|(2789,[1285,2513,...|
|2318152.7360799895|1300000.0|(2789,[708,2597,2...|
|3310015.2638948266|3390000.0|(2789,[709,2509,2...|
|1558538.4701223401|1690000.0|(2789,[188,2520,2...|
|  1597934.42350948|1745000.0|(2789,[276,2520,2...|
|  2585645.34740905|3500000.0|(2789,[414,2643,2...|
|1807591.3939342846|1300000.0|(2789,[711,2509,2...|
| 2078939.595358966|1730000.0|(2789,[711,2509,2...|
|2959115.8700761166|2300000.0|(2789,[712,2509,2...|
|1497497.5521425654|1300000.0|(2789,[277,2509,2...|
|2926159.8782787872|3480000.0|(2789,[1290,2548,...|
|1460251.3371345825|1390000.0|(2789,[189,2509,2...|
|1513093.7048938654|1130000.0|(2789,[278,2537,2...|
| 871835.172

In [24]:
# Avaliando o modelo usando o evaluator padrão
evaluator = RegressionEvaluator(labelCol="Preço", predictionCol="prediction", metricName="rmse")
rmse_rfr = evaluator.evaluate(predictions_rfr)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_rfr)

Root Mean Squared Error (RMSE) on test data = 936613


In [25]:
rmse > rmse_rfr

False