# Criando sessão Spark

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local") \
    .appName("Semana1") \
    .getOrCreate()

In [5]:
spark

# Lendo arquivo

In [6]:
dataframe = spark.read.json('D:\Projetos\ChallengeDataScience2\ChallengeDataScience2\Semana1\dataset\dataset_bruto.json')

In [7]:
dataframe.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------

# Explorando dados

In [8]:
dataframe.count()

89083

In [9]:
len(dataframe.columns)

3

In [10]:
dataframe.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

In [11]:
dataframe.select('anuncio').show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|anuncio                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
import pyspark.sql.functions as f

In [13]:
dataframe.select('anuncio.area_total').where(f.size(f.col('anuncio.area_total')) > 1).show(truncate=False)

+----------+
|area_total|
+----------+
+----------+



# Separando dados para análise

In [14]:
df_anuncio = dataframe.select('anuncio')
df_anuncio.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|anuncio                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
df_anuncio.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

In [16]:
df_anuncio = df_anuncio.select('anuncio.*')
df_anuncio.show(truncate=False)

+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+----------------------------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                |endereco                                                                                                                     |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                     |
+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+

In [17]:
df_anuncio.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

# Filtrando dados

In [18]:
df_anuncio.groupBy('tipo_uso').count().show(truncate=False)

+-----------+-----+
|tipo_uso   |count|
+-----------+-----+
|Comercial  |4542 |
|Residencial|84541|
+-----------+-----+



In [19]:
df_anuncio.groupBy('tipo_unidade').count().show(truncate=False)

+------------+-----+
|tipo_unidade|count|
+------------+-----+
|Outros      |11963|
|Apartamento |66801|
|Casa        |10319|
+------------+-----+



In [20]:
df_anuncio.groupBy('tipo_anuncio').count().show(truncate=False)

+------------+-----+
|tipo_anuncio|count|
+------------+-----+
|Usado       |88827|
|Lançamento  |256  |
+------------+-----+



In [21]:
df_anuncio.count()

89083

In [22]:
df_anuncio_filter = df_anuncio.where((f.col('tipo_uso') == 'Residencial') & (f.col('tipo_unidade') == 'Apartamento') & (f.col('tipo_anuncio') == 'Usado'))
df_anuncio_filter.show(truncate=False)

+-----+----------+---------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+----------------------------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                                   |endereco                                                                                                                             |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                     |
+-----+----------+---------+---------+--------------------------------------------------------------------------

In [23]:
df_anuncio_filter.count()

66562

# Tratando dados

In [24]:
df_anuncio_filter = df_anuncio_filter.withColumn('area_total', f.explode(df_anuncio_filter.area_total)) \
    .withColumn('area_util', f.explode(df_anuncio_filter.area_util)) \
    .withColumn('vaga', f.explode(df_anuncio_filter.vaga)) \
    .withColumn('banheiros', f.explode(df_anuncio_filter.banheiros)) \
    .withColumn('suites', f.explode(df_anuncio_filter.suites)) \
    .withColumn('quartos', f.explode(df_anuncio_filter.quartos))

df_anuncio_filter.show(truncate=False)

+-----+----------+---------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+--------------------------------------------------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                                   |endereco                                                                                                                             |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                                           |
+-----+----------+---------+---------+------------------------------

In [25]:
df_anuncio_filter.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: string (nullable = true)
 |-- area_util: string (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- valores: array (nullable = true)
 |    |-- eleme

In [26]:
df_anuncio_filter = df_anuncio_filter.withColumns({'bairro': df_anuncio_filter.endereco.bairro, 'zona': df_anuncio_filter.endereco.zona}).drop('endereco')
df_anuncio_filter.show(truncate=False)

+-----+----------+---------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+--------------------------------------------------+------------------------+----------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                                   |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                                           |bairro                  |zona      |
+-----+----------+---------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-

In [27]:
df_anuncio_filter.select('id', 'valores').show(truncate=False)

+------------------------------------+--------------------------------------------------+
|id                                  |valores                                           |
+------------------------------------+--------------------------------------------------+
|a2e6d7a5-0ff0-484d-b3d8-3a8f15e2d80e|[{285, NULL, Venda, 20000}]                       |
|2e6e5dfb-206c-4968-944b-ea4c3918b50d|[{245, NULL, Venda, 15000}]                       |
|99f8d0f9-95a4-4613-a55d-c949e7a73e90|[{245, NULL, Venda, 15000}]                       |
|df80b0d5-677c-4be7-93c4-ddef578125ac|[{245, NULL, Venda, 19999}]                       |
|111472a2-afa1-4a73-a8b3-3588ffba362c|[{0, 0, Venda, 30000}]                            |
|275c1589-6537-4bf7-9504-74410dbf01fc|[{NULL, NULL, Venda, 30000}]                      |
|ab8f367f-7509-45c7-ad9a-f51f84584376|[{280, 0, Venda, 25000}]                          |
|a8de531f-5e85-4217-aa7d-578b9cc3ee97|[{NULL, 0, Venda, 27000}]                         |
|c2676515-

In [28]:
df_valores = df_anuncio_filter.select('id', f.explode('valores').alias('valores')).select('id', 'valores.*')
df_valores.show(truncate=False)

+------------------------------------+----------+----+-------+-----+
|id                                  |condominio|iptu|tipo   |valor|
+------------------------------------+----------+----+-------+-----+
|a2e6d7a5-0ff0-484d-b3d8-3a8f15e2d80e|285       |NULL|Venda  |20000|
|2e6e5dfb-206c-4968-944b-ea4c3918b50d|245       |NULL|Venda  |15000|
|99f8d0f9-95a4-4613-a55d-c949e7a73e90|245       |NULL|Venda  |15000|
|df80b0d5-677c-4be7-93c4-ddef578125ac|245       |NULL|Venda  |19999|
|111472a2-afa1-4a73-a8b3-3588ffba362c|0         |0   |Venda  |30000|
|275c1589-6537-4bf7-9504-74410dbf01fc|NULL      |NULL|Venda  |30000|
|ab8f367f-7509-45c7-ad9a-f51f84584376|280       |0   |Venda  |25000|
|a8de531f-5e85-4217-aa7d-578b9cc3ee97|NULL      |0   |Venda  |27000|
|c2676515-0633-4375-958e-b7b1644a4108|NULL      |NULL|Venda  |28000|
|01de0250-50cc-4e75-9c01-b886c5059ddc|200       |0   |Venda  |24999|
|57ac572e-34e2-4427-b3fa-d2198c4e9203|NULL      |0   |Venda  |25000|
|ae90e3f7-4b9e-4e68-a28c-7e98b6b3b

In [29]:
df_valores.printSchema()

root
 |-- id: string (nullable = true)
 |-- condominio: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: string (nullable = true)



In [30]:
df_anuncio_filter.drop('valores').join(df_valores, 'id', how='inner')

DataFrame[id: string, andar: bigint, area_total: string, area_util: string, banheiros: bigint, caracteristicas: array<string>, quartos: bigint, suites: bigint, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: bigint, bairro: string, zona: string, condominio: string, iptu: string, tipo: string, valor: string]

In [31]:
df_anuncio_filter = df_anuncio_filter.drop('valores').join(df_valores, 'id', how='inner')
df_anuncio_filter.show()

+--------------------+-----+----------+---------+---------+--------------------+-------+------+------------+------------+-----------+----+--------------+------------+----------+----+-----+------+
|                  id|andar|area_total|area_util|banheiros|     caracteristicas|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|        bairro|        zona|condominio|iptu| tipo| valor|
+--------------------+-----+----------+---------+---------+--------------------+-------+------+------------+------------+-----------+----+--------------+------------+----------+----+-----+------+
|1fe78d41-b8e0-4d2...|    0|        44|       44|        1|                  []|      2|     0|       Usado| Apartamento|Residencial|   0|         Irajá|  Zona Norte|       170|   0|Venda|110000|
|40a2cbde-946b-42d...|    2|        47|       47|        1|[Condomínio fecha...|      2|     0|       Usado| Apartamento|Residencial|   1|  Campo Grande|  Zona Oeste|       380|NULL|Venda|138000|
|be269ddf-a4a3-4a7..

In [32]:
df_anuncio_filter.groupBy('tipo').count().show()

+-------+-----+
|   tipo|count|
+-------+-----+
|Aluguel|  408|
|  Venda|52159|
+-------+-----+



In [33]:
df_anuncio_filter = df_anuncio_filter.where(f.col('tipo') == 'Venda')
df_anuncio_filter.groupBy('tipo').count().show()

+-----+-----+
| tipo|count|
+-----+-----+
|Venda|52159|
+-----+-----+



# Salvando arquivo

## Parquet

In [34]:
df_anuncio_filter.write.parquet(path='D:\Projetos\ChallengeDataScience2\ChallengeDataScience2\Semana1\dataset\dataset_final_parquet', mode='overwrite')

In [35]:
read_parquet = spark.read.parquet('D:\Projetos\ChallengeDataScience2\ChallengeDataScience2\Semana1\dataset\dataset_final_parquet')

In [39]:
read_parquet.count()

52159

In [40]:
df_anuncio_filter.count()

52159

## CSV

In [43]:
df_anuncio_filter.drop('caracteristicas').write.csv(path='D:\Projetos\ChallengeDataScience2\ChallengeDataScience2\Semana1\dataset\dataset_final_csv', mode='overwrite', header=True)

In [44]:
read_csv = spark.read.csv('D:\Projetos\ChallengeDataScience2\ChallengeDataScience2\Semana1\dataset\dataset_final_csv')

In [45]:
read_csv.count()

52160