<a href="https://colab.research.google.com/github/AllanRodrigo/Data_Science_Alura/blob/main/Notebooks/Semana_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Instalando Dependências para rodar no Colab**

In [2]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [5]:
import findspark
findspark.init()

# **Configuração do Spark IU**

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [7]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [8]:
get_ipython().system_raw('./ngrok authtoken ')
get_ipython().system_raw('./ngrok http 4050 &')

In [9]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[{"name":"command_line (http)","uri":"/api/tunnels/command_line%20%28http%29","public_url":"http://9d4c-35-245-57-22.ngrok.io","proto":"http","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}},{"name":"command_line","uri":"/api/tunnels/command_line","public_url":"https://9d4c-35-245-57-22.ngrok.io","proto":"https","config":{"addr":"http://localhost:4050","inspect":true},"metrics":{"conns":{"count":0,"gauge":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0},"http":{"count":0,"rate1":0,"rate5":0,"rate15":0,"p50":0,"p90":0,"p95":0,"p99":0}}}],"uri":"/api/tunnels"}


In [10]:
spark

# **Montando os dados de origem**

In [11]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
import zipfile

zipfile.ZipFile('/content/drive/MyDrive/curso-spark/semana-1.zip', 'r').extractall('/content/drive/MyDrive/curso-spark/semana_1')

In [13]:
path = '/content/drive/MyDrive/curso-spark/semana_1'
casas = spark.read.json(path)

Analise a quantidade de linhas e colunas;

In [14]:
len(casas.columns)

3

In [15]:
casas.count()

89083

Avalie a estrutura da base de dados;

In [16]:
casas.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

Transformar cada campo da coluna "anuncio" em uma coluna separada

In [37]:
from pyspark.sql.types import DoubleType, StringType, IntegerType
from pyspark.sql import functions as f

In [18]:
anuncios = casas.select("anuncio.*")

In [19]:
anuncios.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

Filtrar a base de dados

In [31]:
anuncios.select('id', 'tipo_uso')\
        .groupBy('tipo_uso')\
        .agg(
            f.count("id").alias("frequencia")
        )\
        .orderBy('tipo_uso', ascending=True)\
        .show()

+-----------+----------+
|   tipo_uso|frequencia|
+-----------+----------+
|  Comercial|      4542|
|Residencial|     84541|
+-----------+----------+



In [32]:
anuncios.select('id', 'tipo_unidade')\
        .groupBy('tipo_unidade')\
        .agg(
            f.count("id").alias("frequencia")
        )\
        .orderBy('tipo_unidade', ascending=True)\
        .show()

+------------+----------+
|tipo_unidade|frequencia|
+------------+----------+
| Apartamento|     66801|
|        Casa|     10319|
|      Outros|     11963|
+------------+----------+



In [33]:
anuncios.select('id', 'tipo_anuncio')\
        .groupBy('tipo_anuncio')\
        .agg(
            f.count("id").alias("frequencia")
        )\
        .orderBy('tipo_anuncio', ascending=True)\
        .show()

+------------+----------+
|tipo_anuncio|frequencia|
+------------+----------+
|  Lançamento|       256|
|       Usado|     88827|
+------------+----------+



In [30]:
anuncios.where(anuncios.tipo_uso == 'Residencial')\
        .where(anuncios.tipo_unidade == 'Apartamento')\
        .where(anuncios.tipo_anuncio == 'Usado')\
        .show(5, False)

+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+---------------------------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                |endereco                                                                                                               |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                    |
+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+--------------

Transformar as colunas dos cômodos dos imóveis de listas para inteiros

In [73]:
anuncios2 = anuncios.withColumn('quartos', anuncios['quartos'][0].cast(IntegerType()))\
                   .withColumn('suites', anuncios['suites'][0].cast(IntegerType()))\
                   .withColumn('banheiros', anuncios['banheiros'][0].cast(IntegerType()))\
                   .withColumn('vaga', anuncios['vaga'][0].cast(IntegerType()))\
                   .withColumn('area_total', anuncios['area_total'][0].cast(IntegerType()))\
                   .withColumn('area_util', anuncios['area_util'][0].cast(IntegerType()))

anuncios2.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 

Tratamento de informações sobre localização

In [74]:
anuncios2 = anuncios2.select("andar", "area_total", "area_util", "banheiros", "caracteristicas", \
                             "endereco.bairro", "endereco.zona", "id", "quartos", "suites", \
                             "tipo_anuncio", "tipo_unidade", "tipo_uso", "vaga", "valores")

anuncios2.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- condominio: string (nullable = true)
 |    |    |-- iptu: string (nullable = true)
 |    |    |-- tipo: string (nullable = true)
 |    |    |-- valor: string (nullable = true)



Transformar cada campo da coluna "valores" em uma coluna separada

```
# Aproveite para converter a coluna Caracteristicas que não encontrei o item de tratamento para a mesma, porém precisamos tratar se quiser no futuro salvar em CSV
```

In [75]:
anuncios2 = anuncios2.withColumn('condominio', anuncios2['valores'][0]['condominio'])\
                     .withColumn('iptu', anuncios2['valores'][0]['iptu'])\
                     .withColumn('tipo', anuncios2['valores'][0]['tipo'])\
                     .withColumn('valor', anuncios2['valores'][0]['valor'])\
                     .withColumn('caracteristicas', anuncios2['caracteristicas'][0])

anuncios2 = anuncios2.drop("valores")

anuncios2.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- condominio: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: string (nullable = true)



In [76]:
anuncios2.show(5)

+-----+----------+---------+---------+-----------------+--------------+------------+--------------------+-------+------+------------+------------+-----------+----+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|  caracteristicas|        bairro|        zona|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+-----------------+--------------+------------+--------------------+-------+------+------------+------------+-----------+----+----------+----+-----+-----+
|    0|      null|       16|        0|             null|        Centro|Zona Central|47d553e0-79f2-4a4...|      0|     0|       Usado|      Outros|  Comercial|   1|       260| 107|Venda|10000|
|    0|      null|       14|        0|             null|        Centro|Zona Central|b6ffbae1-17f6-487...|      0|  null|       Usado|      Outros|  Comercial|   0|       260| 107|Venda|10000|
|    0|      1026|     1026|        0|  

Tratamento para a coluna de valores

In [77]:
valores_venda = anuncios.select(f.explode("valores").alias("valores")).select("valores.*").where("tipo == 'Venda'")

valores_venda.show(5)

+----------+----+-----+-----+
|condominio|iptu| tipo|valor|
+----------+----+-----+-----+
|       260| 107|Venda|10000|
|       260| 107|Venda|10000|
|      null|1613|Venda|10000|
|        80|null|Venda|10000|
|         0|   0|Venda| 5000|
+----------+----+-----+-----+
only showing top 5 rows



In [78]:
anuncios2 = anuncios2.where(anuncios2.tipo == 'Venda')

anuncios2.show(5)

+-----+----------+---------+---------+-----------------+--------------+------------+--------------------+-------+------+------------+------------+-----------+----+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|  caracteristicas|        bairro|        zona|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+-----------------+--------------+------------+--------------------+-------+------+------------+------------+-----------+----+----------+----+-----+-----+
|    0|      null|       16|        0|             null|        Centro|Zona Central|47d553e0-79f2-4a4...|      0|     0|       Usado|      Outros|  Comercial|   1|       260| 107|Venda|10000|
|    0|      null|       14|        0|             null|        Centro|Zona Central|b6ffbae1-17f6-487...|      0|  null|       Usado|      Outros|  Comercial|   0|       260| 107|Venda|10000|
|    0|      1026|     1026|        0|  

Salvar os dados no formato parquet

In [79]:
anuncios2.write.parquet(
    path='/content/drive/MyDrive/curso-spark/anuncios/parquet',
    mode='overwrite'
)

teste_leitura = spark.read.parquet(
    '/content/drive/MyDrive/curso-spark/anuncios/parquet'
)

teste_leitura.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- condominio: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: string (nullable = true)



Salvar o arquivo no formato csv

In [80]:
anuncios2.write.csv(path='/content/drive/MyDrive/curso-spark/anuncios/csv',
                    mode='overwrite',
                    sep=';',
                    header=True)

teste_leitura = spark.read.csv(path='/content/drive/MyDrive/curso-spark/anuncios/csv',
                    sep=';',
                    inferSchema=True,
                    header=True)

teste_leitura.printSchema()

root
 |-- andar: integer (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- condominio: integer (nullable = true)
 |-- iptu: integer (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: integer (nullable = true)



Comparar o desempenho da leitura

In [81]:
%%time

spark.read.parquet(
    '/content/drive/MyDrive/curso-spark/anuncios/parquet'
)

CPU times: user 1.78 ms, sys: 689 µs, total: 2.47 ms
Wall time: 98.1 ms


DataFrame[andar: bigint, area_total: int, area_util: int, banheiros: int, caracteristicas: string, bairro: string, zona: string, id: string, quartos: int, suites: int, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: int, condominio: string, iptu: string, tipo: string, valor: string]

In [82]:
%%time

spark.read.csv(path='/content/drive/MyDrive/curso-spark/anuncios/csv',
                    sep=';',
                    inferSchema=True,
                    header=True)

CPU times: user 9.23 ms, sys: 0 ns, total: 9.23 ms
Wall time: 677 ms


DataFrame[andar: int, area_total: int, area_util: int, banheiros: int, caracteristicas: string, bairro: string, zona: string, id: string, quartos: int, suites: int, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: int, condominio: int, iptu: int, tipo: string, valor: int]