## Initialization

In [None]:
%%shell
# For PySpark
# ref: https://www.alura.com.br/artigos/iniciando-projeto-spark-no-colab
apt-get install openjdk-8-jdk-headless -qq > /dev/null
wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
tar xf spark-3.1.2-bin-hadoop2.7.tgz
pip install -q findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting colab-xterm
  Downloading colab_xterm-0.1.2-py3-none-any.whl (115 kB)
[K     |████████████████████████████████| 115 kB 5.0 MB/s 
Installing collected packages: colab-xterm
Successfully installed colab-xterm-0.1.2




In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('Iniciando com Spark') \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [None]:
filename = "dataset_bruto.json"

## Terminal

In [None]:
%load_ext colabxterm
%xterm

## Preprocessing



In [None]:
from pyspark.sql import functions as sf
from pyspark.sql import udf
from pyspark.sql.types import *

In [None]:
sdf_raw = spark.read.json(proj_path+"data/"+filename, multiLine=True)
sdf_anuncio = sdf_raw.select("anuncio").rdd.map(lambda x: x[0]).toDF()

### Unpacking features

In [None]:
# Unpacking values 1
lt_cols = ["quartos", "suites", "banheiros", "vaga", "area_total", "area_util"]
sdf_anuncio = (
    sdf_anuncio.select(
        *[sf.col(i)[0].alias(i) for i in lt_cols] 
        +[i for i in sdf_anuncio.columns if not i in lt_cols]
    )
)

In [None]:
# Unpacking values 2
sdf_valores = sdf_anuncio.select("valores").rdd.map(lambda x: x[0][0]).toDF()
sdf_endereco = sdf_anuncio.select("endereco").rdd.map(lambda x: x[0]).toDF()

def enumerate_rows(sdf, name="index"):
  return sdf.withColumn(name, sf.monotonically_increasing_id())

sdf_anuncio = (
    enumerate_rows(sdf_anuncio)
    .join(
      enumerate_rows(sdf_valores), on=["index"], how="left"
    )
    .join(
      enumerate_rows(sdf_endereco), on=["index"], how="left"
    )
    .drop("index")
)

In [None]:
sdf_anuncio.show(5)

+-------+------+---------+----+----------+---------+-----+--------------------+--------------------+--------------------+------------+------------+-----------+--------------------+----------+----+-----+------+--------------------+--------+--------------+--------------+----------+----------+----+--------------------+------------+
|quartos|suites|banheiros|vaga|area_total|area_util|andar|     caracteristicas|            endereco|                  id|tipo_anuncio|tipo_unidade|   tipo_uso|             valores|condominio|iptu| tipo| valor|              bairro|     cep|        cidade|        estado|  latitude| longitude|pais|                 rua|        zona|
+-------+------+---------+----+----------+---------+-----+--------------------+--------------------+--------------------+------------+------------+-----------+--------------------+----------+----+-----+------+--------------------+--------+--------------+--------------+----------+----------+----+--------------------+------------+
|      

### Filtering

In [None]:
lt_filters = [
    'tipo_uso == "Residencial"', 
    'tipo_unidade == "Apartamento"', 
    'tipo_anuncio == "Usado"', 
    'tipo == "Venda"'
    ]
    
lt_cols_drop = ["valores", "endereco", "cep", "cidade", "estado", "latitude", "longitude", "país", "rua"]

In [None]:
sdf_anuncio_filt = sdf_anuncio.where(" and ".join(lt_filters))
sdf_anuncio_filt = sdf_anuncio_filt.drop(*lt_cols_drop)

In [None]:
sdf_anuncio_filt.show(5)

+-------+------+---------+----+----------+---------+-----+--------------------+--------------------+------------+------------+-----------+----------+----+-----+------+------------------+----+----------+
|quartos|suites|banheiros|vaga|area_total|area_util|andar|     caracteristicas|                  id|tipo_anuncio|tipo_unidade|   tipo_uso|condominio|iptu| tipo| valor|            bairro|pais|      zona|
+-------+------+---------+----+----------+---------+-----+--------------------+--------------------+------------+------------+-----------+----------+----+-----+------+------------------+----+----------+
|      2|  null|        1|   1|        60|       60|    0|[Condomínio fechado]|dc99d9e1-4c63-41f...|       Usado| Apartamento|Residencial|         0|   0|Venda| 15000|Pedra de Guaratiba|  BR|Zona Oeste|
|      2|     0|        1|   1|     16546|       50|    0|[Churrasqueira, P...|5c1ccfd0-5a9a-4d1...|       Usado| Apartamento|Residencial|       360|   0|Venda|125000|             Bangu|  

## Saving file

In [None]:
sdf_anuncio_filt.write.parquet("dataset_tratado")