<a href="https://colab.research.google.com/github/biamouras/alura_pyspark_challenge/blob/main/Notebooks/01_tratamento_dados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Insightplaces - Tratamento dos dados

A Insightplaces, plataforma fantasia de imóveis da Alura, está com dificuldades para definir os valores do imóveis e gostaria de fazer de modo automatizado. Ainda, a empresa percebeu que o recomendador atual não está sendo efetivo para gerar novos cliques entre os usuários.

Nosso projeto será trabalhar com os dados disponibilizados de imóveis no Rio de Janeiro para ajudar a empresa a lidar com estes dois problemas usando regressão para a definição dos valores e a análise de agrupamentos através de Machine Learning. 

Nesta primeira tarefa, iremos tratar os dados disponibilizados.

## Carregando os dados

In [1]:
from os import environ, listdir

In [2]:
# instalar as dependências para utilizar Spark no Colab
instala = True

# verifica se já está instalado 
for f in listdir('/content'):
  if f.find('spark') != -1:
    instala = False
    break 

if instala:
  !apt-get update -qq
  !apt-get install openjdk-8-jdk-headless -qq > /dev/null

  !wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
  !tar xf spark-3.1.2-bin-hadoop2.7.tgz
  !pip install -q findspark

environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

import findspark
findspark.init()

In [3]:
# iniciando o pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050')\
    .getOrCreate()

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# biblioteca para abrir o arquivo compactado
import zipfile

In [6]:
# caminho do arquivo 
dir_dados = '/content/drive/MyDrive/Colab Notebooks/alura_challenge_insightplaces/dados/'
name = 'semana-1'

# descompressão do arquivo
zipfile.ZipFile(dir_dados + name +'.zip', 'r').extractall(dir_dados)
# identificação do arquivo descomprimido
files = listdir(dir_dados)

path = [f for f in files if '.zip' not in f][0]
# leitura do arquivo
dados = spark.read.json(dir_dados + path)
print(f'O arquivo "{path}" tem {dados.count()} observações')
dados.show(5, truncate = False)

O arquivo "dataset_bruto.json" tem 89083 observações
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------+
|anuncio          

Já observamos que a base de dados contém colunas agrupadas dentro da coluna `anuncio` e também da coluna `imagens`. 

In [7]:
dados.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

As colunas serão separadas e selecionadas, pois vamos apenas trabalhar com os dados em `anuncio`

## Desaninhando colunas dos tipos `Array`

In [8]:
import pyspark.sql.functions as f
import pyspark.sql.types as types

In [9]:
# abrindo colunas em anuncio
dados = dados\
     .select('anuncio.*')
dados.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

In [10]:
# abrindo colunas do tipo array
for col, t_col in dados.dtypes:
  # verifica se é ArrayType
  if t_col.startswith('array<')  :
    # explode a coluna
    dados = dados\
         .withColumn(col, f.explode(col).alias(col))
    t_col = dict(dados.dtypes)[col]

dados.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: string (nullable = true)
 |-- area_util: string (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- valores: struct (nullable = true)
 |    |-- condominio: string (nullable = true)
 |    |-- iptu

In [11]:
dados.show(5)

+-----+----------+---------+---------+------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|   caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    3|        43|       43|        1|     Churrasqueira|{Cosmos, 23066271...|a2e6d7a5-0ff0-484...|      2|     0|       Usado| Apartamento|Residencial|   0|{285, null, Venda...|
|    3|        43|       43|        1|           Piscina|{Cosmos, 23066271...|a2e6d7a5-0ff0-484...|      2|     0|       Usado| Apartamento|Residencial|   0|{285, null, Venda...|
|    3|        43|       43|        1|        Playground|{Cosmos, 23066271...|a2e6d7a5-0ff0-484...|      

## Filtro da base

A _Insightplaces_ tem interesse, por enquanto, apenas no mercado **residencial** e com o foco em **apartamentos usados** e apenas para **venda**.

In [12]:
dados.createOrReplaceTempView('dfView')
apartamentos = spark.sql(
    '''
    SELECT *
    FROM dfView
    WHERE tipo_uso = 'Residencial'
    AND tipo_unidade = 'Apartamento'
    AND tipo_anuncio = 'Usado'
    AND valores.tipo = 'Venda'
    '''
)

## Seleção das variáveis

Temos algumas variáveis que estão dentro do Struct `endereço` e apenas `bairro`, `zona`, `latitude` e `longitude` são de interesse para a análise.

In [13]:
apartamentos.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: string (nullable = true)
 |-- area_util: string (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- valores: struct (nullable = true)
 |    |-- condominio: string (nullable = true)
 |    |-- iptu

In [14]:
# seleção das variáveis dentro do struct
apartamentos = apartamentos\
               .withColumn('bairro', apartamentos['endereco.bairro'])\
               .withColumn('zona', apartamentos['endereco.zona'])\
               .withColumn('latitude', apartamentos['endereco.latitude'])\
               .withColumn('longitude', apartamentos['endereco.longitude'])\
               .drop('endereco')

Temos também a variável `valores` como `StructType`. Todas as informações são de interesse, então vamos abrir esta variável.

In [15]:
apartamentos = apartamentos\
              .select('*', 'valores.*')\
              .drop('valores')

In [16]:
apartamentos.show(5)

+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+------+----------+----------+----------+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|   caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|bairro|      zona|  latitude| longitude|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+------+----------+----------+----------+----------+----+-----+-----+
|    3|        43|       43|        1|     Churrasqueira|a2e6d7a5-0ff0-484...|      2|     0|       Usado| Apartamento|Residencial|   0|Cosmos|Zona Oeste|-22.888194|-43.629602|       285|null|Venda|20000|
|    3|        43|       43|        1|           Piscina|a2e6d7a5-0ff0-484...|      2|     0|       Usado| Apartamento|Residencial|   0|Cosmos|Zona Oeste|-22.888194|-43.629602|    

As variáveis usadas para o filtro também não serão mais necessárias.

In [17]:
apartamentos = apartamentos\
              .drop('tipo', 'tipo_unidade', 'tipo_anuncio')

## Ajustando os tipos de dados

### Para `ShortType`

Algumas colunas estão como `long` e podem ocupar muito espaço na memória. Dificilmente 'andar', 'banheiros', 'quartos', 'suites' e 'vaga' estarão entre -9223372036854775808 e 9223372036854775807. É mais realista trabalharmos com a amplitude entre -32868 a 32767.


In [18]:
# colunas para ShortType
short_cols = ['andar', 'banheiros', 'quartos', 'suites', 'vaga']

apartamentos\
  .select(short_cols)\
  .summary()\
  .show()

+-------+------------------+------------------+------------------+-----------------+------------------+
|summary|             andar|         banheiros|           quartos|           suites|              vaga|
+-------+------------------+------------------+------------------+-----------------+------------------+
|  count|            256378|            256378|            256378|           256378|            256378|
|   mean| 3.586524584792767| 2.612716379720569| 2.684867656351169|1.355151378043358|1.5252790801082776|
| stddev|20.518669867683673|1.3280423628287443|0.8914500936128742|1.079796411918633|1.3724415837796045|
|    min|                 0|                 1|                 0|                0|                 0|
|    25%|                 0|                 2|                 2|                1|                 1|
|    50%|                 1|                 2|                 3|                1|                 1|
|    75%|                 5|                 3|                 

In [19]:
for col in short_cols:
  apartamentos = apartamentos\
                .withColumn(col, apartamentos[col].cast(types.ShortType()))

### Para `DoubleType`

As colunas `area_total`, `area_util` e `valor` estão registradas como `string` e deveriam ser `double`.

In [20]:
# colunas para DoubleType
double_cols = ['area_total', 'area_util', 'condominio', 'iptu', 'valor']

apartamentos\
  .select(double_cols)\
  .summary()\
  .show()

+-------+------------------+------------------+-----------------+------------------+------------------+
|summary|        area_total|         area_util|       condominio|              iptu|             valor|
+-------+------------------+------------------+-----------------+------------------+------------------+
|  count|            256378|            256378|           252001|            245967|            256378|
|   mean| 1779.026347814555|119.32255107692548|5865.844290300435| 5992.105766220672|1349805.8185647754|
| stddev|124793.63253120104| 90.92271772107559| 113503.924571155|182018.99399502264|1553606.5824942489|
|    min|                 0|                10|                0|                 0|            100000|
|    25%|              70.0|              69.0|            380.0|              31.0|          489000.0|
|    50%|              90.0|              90.0|            850.0|             294.0|          810000.0|
|    75%|             142.0|             140.0|           1430.0

In [21]:
for col in double_cols:
  apartamentos = apartamentos\
          .withColumn(col, apartamentos[col].cast(types.DoubleType()))

apartamentos.printSchema()

root
 |-- andar: short (nullable = true)
 |-- area_total: double (nullable = true)
 |-- area_util: double (nullable = true)
 |-- banheiros: short (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: short (nullable = true)
 |-- suites: short (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: short (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)



## Salvando arquivo

In [22]:
apartamentos.write.parquet(dir_dados+'parquet')

In [24]:
apartamentos.write.csv(dir_dados+'csv',
                       sep = ';',
                       header = True)

### Teste de eficiência: parquet x csv

In [25]:
%%time
# leitura do csv
aptos = spark.read.csv(dir_dados + 'csv',
                       sep = ';',
                       header = True)
aptos.printSchema()

root
 |-- andar: string (nullable = true)
 |-- area_total: string (nullable = true)
 |-- area_util: string (nullable = true)
 |-- banheiros: string (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: string (nullable = true)
 |-- suites: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- condominio: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- valor: string (nullable = true)

CPU times: user 10.3 ms, sys: 964 µs, total: 11.3 ms
Wall time: 626 ms


In [26]:
%%time
# leitura do parquet
aptos = spark.read.parquet(dir_dados + 'parquet')
aptos.printSchema()

root
 |-- andar: short (nullable = true)
 |-- area_total: double (nullable = true)
 |-- area_util: double (nullable = true)
 |-- banheiros: short (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: short (nullable = true)
 |-- suites: short (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: short (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)

CPU times: user 3.06 ms, sys: 1.84 ms, total: 4.91 ms
Wall time: 241 ms


A leitura do Parquet foi 2 vezes mais rápida que do csv. Apesar de inicialmente não ser perceptivo para o usuário, teremos um ganho importante no processamentos dos dados.

In [27]:
spark.stop()