<a href="https://colab.research.google.com/github/Heveraldob12/Alura_Challenge_Spark/blob/main/Semana_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Instalando Spark


In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark
     

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [5]:
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, DoubleType

## Base de dados - Vista inicial

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
path_dados_file='/content/drive/MyDrive/Challenge/anuncio/part-00000-bb8f1917-41a9-4bee-bb1f-fde25f0d323e-c000.snappy.parquet'
anuncio = spark.read.parquet(path_dados_file)

In [8]:
anuncio.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- valor: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- condominio: string (nullable = true)



In [9]:
anuncio.show()

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+-----+-----+----+----------+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|              bairro|      zona|valor| tipo|iptu|condominio|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+-----+-----+----+----------+
|    3|        43|     [43]|        1|[Academia, Churra...|d2e3a3aa-09b5-45a...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|15000|Venda|null|       245|
|    2|        42|     [42]|        1|[Churrasqueira, P...|085bab2c-87ad-452...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|15000|Venda|   0|         0|
|    

# Dummy Caracteristicas e Zona

In [10]:
anuncio= anuncio.filter(anuncio['zona'] != '' )

In [11]:
print(anuncio.select('zona').distinct().collect())


[Row(zona='Zona Norte'), Row(zona='Zona Oeste'), Row(zona='Zona Central'), Row(zona='Zona Sul')]


In [12]:
zona = anuncio.groupBy('id').pivot('zona').agg(f.lit(1)).na.fill(0)
anuncio= anuncio.join(zona, 'id', how='left').drop('zona')

In [13]:
caracteristicas = anuncio.select(anuncio['id'],f.explode(anuncio['caracteristicas']))
caracteristicas.groupBy('col').count().show()
caracteristicas = caracteristicas.groupBy('id').pivot('col').agg(f.lit(1)).na.fill(0)

+------------------+-----+
|               col|count|
+------------------+-----+
|Condomínio fechado|34930|
|        Playground|31805|
| Portão eletrônico|29303|
|           Piscina|33170|
|Animais permitidos|30589|
|      Portaria 24h|30342|
|          Elevador|42929|
|          Academia|27602|
|   Salão de festas|33530|
|     Churrasqueira|31672|
+------------------+-----+



In [14]:
anuncio=anuncio.join(caracteristicas, 'id', how='left').drop('caracteristicas')

In [15]:
anuncio.show()

+--------------------+-----+----------+---------+---------+-------+------+------------+------------+-----------+----+--------------------+--------+-----+-----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_total|area_util|banheiros|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|              bairro|   valor| tipo| iptu|condominio|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+--------------------+-----+----------+---------+---------+-------+------+------------+------------+-----------+----+--------------------+--------+-----+-----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+------

## Tratamento e Remoção de colunas

In [16]:
anuncio = anuncio.withColumn("area_util", anuncio["area_util"][0].cast("double"))

In [17]:
def str_para_double(lista,data=anuncio):
  for i in lista:
    data = data.withColumn(i, f.col(i).cast("double"))
  return data

In [18]:
lista=[ "condominio", "iptu" , "valor"]

In [19]:
anuncio = str_para_double(lista)

In [20]:
anuncio.printSchema()

root
 |-- id: string (nullable = true)
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: double (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- bairro: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- tipo: string (nullable = true)
 |-- iptu: double (nullable = true)
 |-- condominio: double (nullable = true)
 |-- Zona Central: integer (nullable = true)
 |-- Zona Norte: integer (nullable = true)
 |-- Zona Oeste: integer (nullable = true)
 |-- Zona Sul: integer (nullable = true)
 |-- Academia: integer (nullable = true)
 |-- Animais permitidos: integer (nullable = true)
 |-- Churrasqueira: integer (nullable = true)
 |-- Condomínio fechado: integer (nullable = true)
 |-- E

In [21]:
print(anuncio.select('tipo_uso').distinct().collect())
print(anuncio.select('tipo_unidade').distinct().collect())
print(anuncio.select('tipo_anuncio').distinct().collect())
print(anuncio.select('tipo').distinct().collect())

[Row(tipo_uso='Comercial'), Row(tipo_uso='Residencial')]
[Row(tipo_unidade='Apartamento')]
[Row(tipo_anuncio='Usado'), Row(tipo_anuncio='Lançamento')]
[Row(tipo='Venda')]


In [22]:
anuncio.groupBy('bairro').count().show()
#Seria inviavel transformá-las em variáveis binarias o melhor seria remover essa coluna

+-------------------+-----+
|             bairro|count|
+-------------------+-----+
|             Cocotá|   16|
|              Gávea|  600|
|       Tomás Coelho|   37|
|            Ipanema| 2110|
|           Realengo|   46|
|      Gardênia Azul|    7|
|      Bento Ribeiro|   33|
|              Rocha|   38|
|Vicente de Carvalho|   87|
|         Manguinhos|    1|
|        Jacarepaguá| 4441|
|           Botafogo| 3521|
|               Leme|  233|
|       Campo Grande|  638|
|       Padre Miguel|   15|
|           Flamengo| 1372|
|       Santo Cristo|  358|
|  Engenho da Rainha|   25|
| Pedra de Guaratiba|   12|
|            Piedade|  103|
+-------------------+-----+
only showing top 20 rows



In [23]:
lista_remocao=('id','tipo_unidade','tipo','tipo_anuncio','tipo_uso','bairro')
anuncio=anuncio.drop(*lista_remocao)

## Removendo dados Nulos

Visualizando a quantidade de dados nulos


In [24]:
anuncio.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in anuncio.columns]).show()

+-----+----------+---------+---------+-------+------+----+-----+----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|andar|area_total|area_util|banheiros|quartos|suites|vaga|valor|iptu|condominio|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+-----+----------+---------+---------+-------+------+----+-----+----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|    0|      9409|        0|        0|      0|  5542|3005|   22|7152|      2347|           0|         0|         0|       0|   12895|             12895|        12895|             12895|   12895|  12895|     12895|    

In [25]:
'''Entre area_total e area_util a area_total tem mais dados nulos
   e não é necessario ter essas duas colunas, ou seja, a area_total irei remover'''
anuncio =  anuncio.drop('area_total')

  

In [26]:
anuncio=anuncio.na.fill(0)

In [27]:
anuncio.select([f.count(f.when(f.isnull(c), 1)).alias(c) for c in anuncio.columns]).show()


+-----+---------+---------+-------+------+----+-----+----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|andar|area_util|banheiros|quartos|suites|vaga|valor|iptu|condominio|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+-----+---------+---------+-------+------+----+-----+----+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|    0|        0|        0|      0|     0|   0|    0|   0|         0|           0|         0|         0|       0|       0|                 0|            0|                 0|       0|      0|         0|           0|                0|              0|


In [47]:
anuncio=anuncio.withColumnRenamed('Zona Central',"Zona_Central")\
    .withColumnRenamed('Zona Norte',"Zona_Norte")\
    .withColumnRenamed('Zona Oeste',"Zona_Oeste")\
    .withColumnRenamed('Zona Sul',"Zona_Sul")\
    .withColumnRenamed('Animais permitidos','Animais_permitidos')\
    .withColumnRenamed('Condomínio fechado','Condomínio_fechado')\
    .withColumnRenamed( 'Portaria 24h', 'Portaria_24h')\
    .withColumnRenamed( 'Portão eletrônico', 'Portão_eletrônico')\
    .withColumnRenamed('Salão de festas','Salão_festas')\
    .withColumnRenamed("salary","salary_amount")

In [28]:
anuncio.show()

+-----+---------+---------+-------+------+----+---------+-------+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|andar|area_util|banheiros|quartos|suites|vaga|    valor|   iptu|condominio|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+-----+---------+---------+-------+------+----+---------+-------+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|    0|    116.0|        2|      3|     1|   1|3793260.0|  100.0|     100.0|           0|         0|         1|       0|       1|                 1|            1|                 1|       1|      1|         1|           1|       

# Modelos

## Imports e Gráficos 

In [29]:
import plotly.express as px
from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.feature import StandardScaler
# from pyspark.ml.feature import PCA
from pyspark.ml.regression import GBTRegressor,LinearRegression

In [30]:
fig = px.imshow(anuncio.toPandas().corr(), text_auto=True)
fig.show()
# Muitas colunas no futuro irei remover-las para ter uma anasile melhor desse grafico

In [31]:
X = anuncio.columns
X.remove('valor')
X

['andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'iptu',
 'condominio',
 'Zona Central',
 'Zona Norte',
 'Zona Oeste',
 'Zona Sul',
 'Academia',
 'Animais permitidos',
 'Churrasqueira',
 'Condomínio fechado',
 'Elevador',
 'Piscina',
 'Playground',
 'Portaria 24h',
 'Portão eletrônico',
 'Salão de festas']

In [32]:
anuncio=anuncio.withColumnRenamed("valor","label")

In [33]:
anuncio_vector=VectorAssembler(inputCols=X, outputCol='features').transform(anuncio).select(['features', 'label'])

In [34]:
anuncio_vector.show(truncate=False, n=5)

+-----------------------------------------------------------------------------------------------+---------+
|features                                                                                       |label    |
+-----------------------------------------------------------------------------------------------+---------+
|[0.0,116.0,2.0,3.0,1.0,1.0,100.0,100.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]|3793260.0|
|(22,[1,2,3,4,5,6,7,10,14,17],[143.0,4.0,4.0,3.0,1.0,2000.0,3948.0,1.0,1.0,1.0])                |1600000.0|
|[7.0,236.0,4.0,5.0,2.0,3.0,20.0,20.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]  |1535000.0|
|(22,[1,2,3,4,5,6,7,11,14,16],[215.0,3.0,3.0,1.0,2.0,4500.0,1685.0,1.0,1.0,1.0])                |2279540.0|
|[3.0,64.0,1.0,2.0,2.0,1.0,80.0,784.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]  |380000.0 |
+-----------------------------------------------------------------------------------------------+---------+
only showing top 5 rows



In [35]:
treino, teste = anuncio_vector.randomSplit([0.7, 0.3], seed=101)

In [36]:
gbt = GBTRegressor(maxDepth=10,maxBins=32,lossType='squared', seed=101)
lr= LinearRegression(regParam=0.0, solver="normal")

In [37]:
gbt=gbt.fit(treino)
lr= lr.fit(treino)

In [38]:
previsoes_gbt_treino = gbt.transform(treino)
previsoes_gbt_teste = gbt.transform(teste)

previsoes_lr_treino = lr.transform(treino)
previsoes_lr_teste = lr.transform(teste)


## Avaliando o Modelo

In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

In [40]:
evaluator = RegressionEvaluator()

In [41]:
def sumario_regressao(nome,treino,teste):
  print(nome)
  print("="*40)
  print("Dados de Treino")
  print("="*40)
  print("Métricas")
  print("-"*40)
  print(f"R²: {evaluator.evaluate(treino, {evaluator.metricName : 'r2'})}")
  print(f"RMSE: {evaluator.evaluate(treino, {evaluator.metricName : 'rmse'})}")
  print("")
  print("="*40)
  print("Dados de Teste")
  print("="*40)
  print("Métricas")
  print("-"*40)
  print(f"R²: {evaluator.evaluate(teste, {evaluator.metricName : 'r2'})}")
  print(f"RMSE: {evaluator.evaluate(teste, {evaluator.metricName : 'rmse'})}")

In [42]:
sumario_regressao('Gradient-boosted tree regression',previsoes_gbt_treino, previsoes_gbt_teste)

Gradient-boosted tree regression
Dados de Treino
Métricas
----------------------------------------
R²: 0.8176473433252782
RMSE: 623194.9909585245

Dados de Teste
Métricas
----------------------------------------
R²: 0.7884268396572107
RMSE: 685077.3053739488


In [43]:
sumario_regressao('Linear regression',previsoes_lr_treino,previsoes_lr_teste )

Linear regression
Dados de Treino
Métricas
----------------------------------------
R²: 0.6637170222224695
RMSE: 846292.22063621

Dados de Teste
Métricas
----------------------------------------
R²: 0.6662043067221687
RMSE: 860497.5906847531


# Modelo escolhido vai ser o Gradient-boosted tree regression

## Final Semana 2
---
### OBS.: Por causa da volta as aulas da faculdade vou organizar os notebooks só no final da semana 4

In [48]:
anuncio.write.parquet(
    path='/content/drive/MyDrive/Challenge/anuncio_2',
    mode='overwrite'
)