<a href="https://colab.research.google.com/github/brferraz/sparktoknow/blob/main/Notebooks/ds_imoveis_rj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Utilizando o Spark no Google Colab

In [None]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

## Acessando o SparkUI

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

### Adicionando o ngrok

In [None]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [None]:
get_ipython().system_raw('./ngrok http 4050 &')

In [None]:
!curl -s http://localhost:4040/api/tunnels

## Base de dados

### Montando o drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Carregando dados dos imóveis

In [None]:
import zipfile
zipfile.ZipFile('/content/drive/MyDrive/desafio-ml/semana-1.zip','r').extractall('/content/drive/MyDrive/desafio-ml')

In [None]:
path = '/content/drive/MyDrive/desafio-ml/*.json'
imoveis = spark.read.json(path)

In [None]:
imoveis.printSchema()

### Pegando somente os anúncios

Convertendo de Struct para colunas

In [None]:
anuncios = imoveis.select('anuncio.*')

In [None]:
flt = anuncios.toDF('andar','area_total', 'area_util', 'banheiros',\
                    'caracteristicas', 'endereco', 'id', 'quartos',\
                    'suites', 'tipo_anuncio', 'tipo_unidade', \
                    'tipo_uso', 'vaga', 'valores')

### Filtrando os dados

In [None]:
flt.createOrReplaceTempView("dataframe")

1.  Tipo de uso: Residencial

In [None]:
residencialDF = spark.sql("SELECT * FROM dataframe WHERE tipo_uso LIKE 'Residencial'")

In [None]:
residencialDF.count()

2. Tipo unidade: Apartamento

In [None]:
apartamentoDF = spark.sql("SELECT * FROM dataframe WHERE tipo_unidade LIKE 'Apartamento'")

In [None]:
apartamentoDF.count()

3. Tipo anúncio: Usado

In [None]:
usadoDF = spark.sql("SELECT * FROM dataframe WHERE tipo_anuncio LIKE 'Usado'")

In [None]:
usadoDF.count()

##### Filtrando os anúncios

In [None]:
filtradoDF = spark.sql(
    "SELECT * FROM dataframe WHERE tipo_uso LIKE 'Residencial'\
    AND tipo_unidade LIKE 'Apartamento'\
    AND tipo_anuncio LIKE 'Usado'"
    )

In [None]:
filtradoDF.count()

#### Transformando os dados das colunas de **Lista** para **Inteiro**

In [None]:
flt = flt.withColumn('quartos', flt['quartos'][0])

In [None]:
flt = flt.withColumn('suites', flt['suites'][0])

In [None]:
flt = flt.withColumn('banheiros', flt['banheiros'][0])

In [None]:
flt = flt.withColumn('vaga', flt['vaga'][0])

In [None]:
flt.printSchema()

#### Transformando a coluna de endereço em colunas de bairro e zona

In [None]:
flt = flt.select('andar','area_total', 'area_util', 'banheiros',\
                    'caracteristicas', 'endereco.bairro', 'endereco.zona', 'id', 'quartos',\
                    'suites', 'tipo_anuncio', 'tipo_unidade', \
                    'tipo_uso', 'vaga', 'valores')

In [None]:
flt.toDF('andar','area_total', 'area_util', 'banheiros',\
                    'caracteristicas', 'endereco_bairro', 'endereco_zona', 'id', 'quartos',\
                    'suites', 'tipo_anuncio', 'tipo_unidade', \
                    'tipo_uso', 'vaga', 'valores')

In [None]:
flt.printSchema()

#### Transformando os elementos de valores em colunas do dataframe