# Começando o Trabalho
---

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html) (Google Colab)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]')\
        .appName('Inicaiando com Spark')\
        .config('spark.ui.port', '4050')\
        .getOrCreate()


[Site ngrok](https://ngrok.com)

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [None]:
get_ipython().system_raw('./ngrok authtoken 2I50FVEk3zjCs3XJ9ib258aiWja_89AFvZtiDwr75gH8ebGEn')
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [None]:
spark

## Carregamento de dados

### Dados De Venda de Imoveis
#### Receita Federal

> [Imoveis](https://caelum-online-public.s3.amazonaws.com/challenge-spark/semana-1.zip)
> 
---
[property SparkSession.read](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.read.html)

[DataFrameReader.csv(*args)](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html)


### Montando nosso drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados das empresas

In [None]:
import zipfile

In [None]:
path = '/content/drive/MyDrive/Colab Notebooks/Imercao PySpark/dataset_bruto.json'
data = spark.read.json(path)

In [None]:
data.count()

89083

# Manipulando os Dados
---

## Operações básicas

In [None]:
data.limit(5).toPandas()

Unnamed: 0,anuncio,imagens,usuario
0,"(0, [], [16], [0], [], (Centro, 20061003, Rio ...","[(39d6282a-71f3-47bc-94aa-909351ecd881, https:...","(9d44563d-3405-4e84-9381-35b7cf40a9a4, Frank)"
1,"(0, [], [14], [0], [], (Centro, 20051040, Rio ...","[(23d2b3ab-45b0-47f2-a27f-ffbe066c2c38, https:...","(36245be7-70fe-40cd-84d4-70a57dc8f976, Caroline)"
2,"(0, [1026], [1026], [0], [], (Maria da Graça, ...","[(1da65baa-368b-490f-bef7-1834b9a428c1, https:...","(9dc415d8-1397-4d8d-8425-b8310fad309d, Oliver)"
3,"(0, [120], [120], [0], [Portão eletrônico, Con...","[(79b542c6-49b4-4f06-80b5-ef4fcdf98f76, https:...","(9911a2df-f299-4a73-a384-281417abaf69, Matthew)"
4,"(0, [3], [3], [0], [], (São Cristóvão, 2093167...","[(e2bc497b-6510-4731-8942-97b3d3cdab5e, https:...","(240a7aab-12e5-4069-9a2c-9dca4c4f9c68, Jude)"


## Analisando os dados

[Data Types](https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.sql.html#data-types)

In [None]:
data.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-


#**Transformar cada campo da coluna "anuncio" em uma coluna separada**

Nossa base de dados possui 3 campos principais:

  * anuncio
  * imagens
  * usuario
  
Para nossa análise, apenas as informações do campo "anuncio" serão relevantes. 

Por isso, vamos focar em analisar as colunas desse campo.

In [None]:
data = data.drop('imagens','usuario')
data.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

In [None]:
data = data.select('anuncio.*')

In [None]:
data.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

#**Filtrar a base de dados**

O time de Data Science solicitou que fizéssemos alguns filtros nas colunas tipo_uso, tipo_unidade e tipo_anuncio da nossa base de dados:

* tipo_uso: **Residencial**;
* tipo_unidade: **Apartamento**;
* tipo_anuncio: **Usado**.

In [None]:
data.groupBy('tipo_uso','tipo_unidade','tipo_anuncio').count().orderBy('tipo_uso','tipo_unidade','tipo_anuncio').show()

+-----------+------------+------------+-----+
|   tipo_uso|tipo_unidade|tipo_anuncio|count|
+-----------+------------+------------+-----+
|  Comercial| Apartamento|       Usado|    4|
|  Comercial|        Casa|       Usado|   92|
|  Comercial|      Outros|  Lançamento|    3|
|  Comercial|      Outros|       Usado| 4443|
|Residencial| Apartamento|  Lançamento|  235|
|Residencial| Apartamento|       Usado|66562|
|Residencial|        Casa|  Lançamento|    3|
|Residencial|        Casa|       Usado|10224|
|Residencial|      Outros|  Lançamento|   15|
|Residencial|      Outros|       Usado| 7502|
+-----------+------------+------------+-----+



In [None]:
data_rau = data.filter((data['tipo_uso'] == 'Residencial') & (data['tipo_unidade'] == 'Apartamento') & (data['tipo_anuncio'] == 'Usado'))

In [None]:
data_rau.select('*').show(5)

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    3|      [43]|     [43]|      [1]|[Academia, Churra...|{Paciência, 23585...|d2e3a3aa-09b5-45a...|    [2]|    []|       Usado| Apartamento|Residencial| [1]|[{245, null, Vend...|
|    2|      [42]|     [42]|      [1]|[Churrasqueira, P...|{Paciência, 23585...|085bab2c-87ad-452...|    [2]|    []|       Usado| Apartamento|Residencial| [1]|[{0, 0, Venda, 15...|
|    1|      [41]|     [41]|      [1]|[Portaria 24h, Co...|{Guaratiba, 23036...|18d22cbe-1b86-4

# **Transformar as colunas dos cômodos dos imóveis de listas para inteiros**

In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as f

In [None]:
cols = ['quartos','suites','banheiros','vaga','area_total','area_util']
for c in cols:
  data_rau = data_rau.withColumn(c,data_rau[c][0].cast(IntegerType()))
data_rau.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 

# **Tratamento de informações sobre localização**

A equipe de ciência de dados nos solicitou que apenas as informações sobre bairro e zona da cidade fossem extraídas.

Então, vamos analisar a coluna endereco e transformar apenas as informações sobre bairro e zona em colunas de nosso DataFrame.

In [None]:
data_rau = data_rau.withColumn('bairro',data_rau['endereco.bairro'])
data_rau.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 

In [None]:
data_rau = data_rau.withColumn('zona',data_rau['endereco.zona'])
data_rau = data_rau.drop('endereco')
data_rau.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- condominio: string (nullable = true)
 |    |    |-- iptu: string (nullable = true)
 |    |    |-- tipo: string (nullable = true)
 |    |    |-- valor: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)



# **Transformar cada campo da coluna "valores" em uma coluna separada**

In [None]:
cols = ['condominio','iptu','tipo','valor']
for c in cols:
  if c != 'tipo':
    data_rau = data_rau.withColumn(c,data_rau[f'valores.{c}'][0].cast(IntegerType()))
  else:
    data_rau = data_rau.withColumn(c,data_rau[f'valores.{c}'][0])
data_rau = data_rau.drop('valores')
data_rau.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- condominio: integer (nullable = true)
 |-- iptu: integer (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: integer (nullable = true)



# **Tratamento para a coluna de valores**

A InsightPlaces permite que o(a) anunciante crie um anúncio com duas opções de valor. Assim, o(a) cliente pode criar um anúncio que mostre tanto o valor de venda do imóvel quanto o seu valor de locação, juntamente com os valores de taxa de condomínio (quando houver) e taxa de IPTU. Estes valores são diferenciados pelo campo tipo que pode assumir os valores Venda e Aluguel.

Como se trata de um estudo sobre o preço de venda dos imóveis, o time de cientistas de dados solicitou apenas as informações do tipo VENDA.

Selecione apenas os valores de Venda.

In [None]:
data_rau = data_rau.filter(data_rau['tipo'] == 'Venda')

# **Salvar os dados no formato parquet**

In [None]:
data_rau.write.parquet(
    path="/content/drive/MyDrive/Colab Notebooks/Imercao PySpark/parquet",
    mode = "overwrite"
    )