In [1]:
import os

os.environ["SPARK_HOME"] = "C:\spark"

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

# To display scroll:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [4]:
spark = SparkSession.builder.master('local[*]').appName("Week1").getOrCreate()

In [5]:
spark

In [6]:
path=os.path.abspath('C:\\Users\\Pichau\\Downloads\\semana-1\\dataset_bruto.json')

In [7]:
df = spark.read.json(path)

In [8]:
df.show(5)

+--------------------+--------------------+--------------------+
|             anuncio|             imagens|             usuario|
+--------------------+--------------------+--------------------+
|{0, [], [16], [0]...|[{39d6282a-71f3-4...|{9d44563d-3405-4e...|
|{0, [], [14], [0]...|[{23d2b3ab-45b0-4...|{36245be7-70fe-40...|
|{0, [1026], [1026...|[{1da65baa-368b-4...|{9dc415d8-1397-4d...|
|{0, [120], [120],...|[{79b542c6-49b4-4...|{9911a2df-f299-4a...|
|{0, [3], [3], [0]...|[{e2bc497b-6510-4...|{240a7aab-12e5-40...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [9]:
type(df)

pyspark.sql.dataframe.DataFrame

In [10]:
df.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

# Create anuncio DF

In [11]:
df.select("anuncio").schema.fields[0].dataType.fields[0].name

'andar'

In [12]:
df.select("anuncio").schema.fields[0].dataType.fields

[StructField('andar', LongType(), True),
 StructField('area_total', ArrayType(StringType(), True), True),
 StructField('area_util', ArrayType(StringType(), True), True),
 StructField('banheiros', ArrayType(LongType(), True), True),
 StructField('caracteristicas', ArrayType(StringType(), True), True),
 StructField('endereco', StructType([StructField('bairro', StringType(), True), StructField('cep', StringType(), True), StructField('cidade', StringType(), True), StructField('estado', StringType(), True), StructField('latitude', DoubleType(), True), StructField('longitude', DoubleType(), True), StructField('pais', StringType(), True), StructField('rua', StringType(), True), StructField('zona', StringType(), True)]), True),
 StructField('id', StringType(), True),
 StructField('quartos', ArrayType(LongType(), True), True),
 StructField('suites', ArrayType(LongType(), True), True),
 StructField('tipo_anuncio', StringType(), True),
 StructField('tipo_unidade', StringType(), True),
 StructFiel

In [13]:
anuncio = df.select(
    *[f.col("anuncio").getField(field.name).alias(field.name) for field in df.select("anuncio").schema.fields[0].dataType.fields]
)

In [14]:
anuncio.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

In [15]:
anuncio.show(15, False)

+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------+------------------------------------+-------+------+------------+------------+-----------+----+----------------------------+
|andar|area_total|area_util|banheiros|caracteristicas                                                                                                                |endereco                                                                                                                     |id                                  |quartos|suites|tipo_anuncio|tipo_unidade|tipo_uso   |vaga|valores                     |
+-----+----------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+

# Data analysis

In [16]:
array_columns = ['area_total','area_util','banheiros','caracteristicas','quartos','suites','vaga','valores']

In [17]:
max_area_total_items = anuncio.select(
    *[f.max(f.size(f.col(colName))).alias('total_'+colName) for colName in array_columns]
)

In [18]:
max_area_total_items.show()

+----------------+---------------+---------------+---------------------+-------------+------------+----------+-------------+
|total_area_total|total_area_util|total_banheiros|total_caracteristicas|total_quartos|total_suites|total_vaga|total_valores|
+----------------+---------------+---------------+---------------------+-------------+------------+----------+-------------+
|               1|              2|              2|                   10|            2|           2|         2|            2|
+----------------+---------------+---------------+---------------------+-------------+------------+----------+-------------+



In [19]:
anuncio.select('tipo_uso').groupBy('tipo_uso').count()\
    .withColumnRenamed('count','total')\
    .orderBy('total')\
    .show()

+-----------+-----+
|   tipo_uso|total|
+-----------+-----+
|  Comercial| 4542|
|Residencial|84541|
+-----------+-----+



In [20]:
anuncio.select('tipo_unidade').groupBy('tipo_unidade').count()\
    .withColumnRenamed('count','total')\
    .orderBy('total')\
    .show()

+------------+-----+
|tipo_unidade|total|
+------------+-----+
|        Casa|10319|
|      Outros|11963|
| Apartamento|66801|
+------------+-----+



In [21]:
anuncio.select('tipo_anuncio').groupBy('tipo_anuncio').count()\
    .withColumnRenamed('count','total')\
    .orderBy('total')\
    .show()

+------------+-----+
|tipo_anuncio|total|
+------------+-----+
|  Lançamento|  256|
|       Usado|88827|
+------------+-----+



In [22]:
anuncio.count()

89083

# Filtering

In [23]:
anuncio_filtered = anuncio.where("tipo_uso=='Residencial' AND tipo_unidade=='Apartamento' AND tipo_anuncio=='Usado'")

In [24]:
anuncio_filtered.count()

66562

In [25]:
anuncio_filtered.show(5)

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    3|      [43]|     [43]|      [1]|[Academia, Churra...|{Paciência, 23585...|d2e3a3aa-09b5-45a...|    [2]|    []|       Usado| Apartamento|Residencial| [1]|[{245, null, Vend...|
|    2|      [42]|     [42]|      [1]|[Churrasqueira, P...|{Paciência, 23585...|085bab2c-87ad-452...|    [2]|    []|       Usado| Apartamento|Residencial| [1]|[{0, 0, Venda, 15...|
|    1|      [41]|     [41]|      [1]|[Portaria 24h, Co...|{Guaratiba, 23036...|18d22cbe-1b86-4

# Column transformation

In [26]:
anuncio_filtered.columns

['andar',
 'area_total',
 'area_util',
 'banheiros',
 'caracteristicas',
 'endereco',
 'id',
 'quartos',
 'suites',
 'tipo_anuncio',
 'tipo_unidade',
 'tipo_uso',
 'vaga',
 'valores']

In [27]:
anuncio_filtered = anuncio_filtered.select(
     *[f.when(f.col(colName).getItem(0).isNull(), 0).otherwise(f.col(colName).getItem(0)).alias(colName)\
       if colName in ['quartos','suites','banheiros','vagas','area_total','area_util']\
       else f.col(colName)\
       for colName in anuncio_filtered.columns]
)

In [28]:
anuncio_filtered.show(5)

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    3|        43|       43|        1|[Academia, Churra...|{Paciência, 23585...|d2e3a3aa-09b5-45a...|      2|     0|       Usado| Apartamento|Residencial| [1]|[{245, null, Vend...|
|    2|        42|       42|        1|[Churrasqueira, P...|{Paciência, 23585...|085bab2c-87ad-452...|      2|     0|       Usado| Apartamento|Residencial| [1]|[{0, 0, Venda, 15...|
|    1|        41|       41|        1|[Portaria 24h, Co...|{Guaratiba, 23036...|18d22cbe-1b86-4

In [29]:
anuncio_filtered.select("endereco").show(5, False)

+-----------------------------------------------------------------------------------------------------------------------+
|endereco                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------+
|{Paciência, 23585430, Rio de Janeiro, Rio de Janeiro, -22.919851, -43.634034, BR, Estrada de Santa Eugênia, Zona Oeste}|
|{Paciência, 23585430, Rio de Janeiro, Rio de Janeiro, -22.928108, -43.635375, BR, Estrada de Santa Eugênia, Zona Oeste}|
|{Guaratiba, 23036060, Rio de Janeiro, Rio de Janeiro, -22.948756, -43.582824, BR, Estrada Cabuçu de Baixo, Zona Oeste} |
|{Cosmos, 23066271, Rio de Janeiro, Rio de Janeiro, -22.888194, -43.629602, BR, Estrada da Paciência, Zona Oeste}       |
|{Guaratiba, 23036060, Rio de Janeiro, Rio de Janeiro, -22.948291, -43.582205, BR, Estrada Cabuçu de Baixo, Zona Oeste} |
+-----------------------

In [30]:
anuncio_filtered.select("endereco").printSchema()

root
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)



In [31]:
anuncio_filtered = anuncio_filtered\
    .withColumn('bairro', f.col('endereco.bairro'))\
    .withColumn('zona', f.col('endereco.zona'))\
    .drop('endereco')

In [32]:
anuncio_filtered.show(5)

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+---------+----------+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|   bairro|      zona|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+---------+----------+
|    3|        43|       43|        1|[Academia, Churra...|d2e3a3aa-09b5-45a...|      2|     0|       Usado| Apartamento|Residencial| [1]|[{245, null, Vend...|Paciência|Zona Oeste|
|    2|        42|       42|        1|[Churrasqueira, P...|085bab2c-87ad-452...|      2|     0|       Usado| Apartamento|Residencial| [1]|[{0, 0, Venda, 15...|Paciência|Zona Oeste|
|    1|        41|       41|        1|[Portaria 24h, Co...|18d22cbe-1b86-476...|      2|     0|

In [33]:
anuncio_filtered.select("valores").printSchema()

root
 |-- valores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- condominio: string (nullable = true)
 |    |    |-- iptu: string (nullable = true)
 |    |    |-- tipo: string (nullable = true)
 |    |    |-- valor: string (nullable = true)



In [34]:
anuncio_filtered = anuncio_filtered\
    .withColumn('condominio', f.col("valores").getItem('condominio'))\
    .withColumn('iptu', f.col("valores").getItem('iptu'))\
    .withColumn('tipo', f.col("valores").getItem('tipo'))\
    .withColumn('valor', f.col("valores").getItem('valor'))\
    .drop('valores')

In [35]:
anuncio_filtered.show(5)

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+---------+----------+----------+------+-------+-------+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|   bairro|      zona|condominio|  iptu|   tipo|  valor|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+---------+----------+----------+------+-------+-------+
|    3|        43|       43|        1|[Academia, Churra...|d2e3a3aa-09b5-45a...|      2|     0|       Usado| Apartamento|Residencial| [1]|Paciência|Zona Oeste|     [245]|[null]|[Venda]|[15000]|
|    2|        42|       42|        1|[Churrasqueira, P...|085bab2c-87ad-452...|      2|     0|       Usado| Apartamento|Residencial| [1]|Paciência|Zona Oeste|       [0]|   [0]|[Venda]|[15000]|
|    1|        41|       41|  

In [36]:
anuncio_filtered.select('tipo').distinct().collect()

[Row(tipo=['Venda', 'Aluguel']),
 Row(tipo=['Aluguel', 'Venda']),
 Row(tipo=['Venda'])]

In [37]:
anuncio_filtered.filter(f.array_contains(f.col('tipo'), 'Venda') & f.array_contains(f.col('tipo'),'Aluguel')).show(5)

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+------------+------------+----------+------------+----------------+-------------+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|      bairro|        zona|condominio|        iptu|            tipo|        valor|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+------------+------------+----------+------------+----------------+-------------+
|    3|        48|       48|        1|[Churrasqueira, C...|c425f585-53e3-44b...|      2|     0|       Usado| Apartamento|Residencial| [0]|  Santa Cruz|  Zona Oeste|[350, 350]|    [20, 20]|[Venda, Aluguel]| [49000, 700]|
|    3|         0|       46|        1|[Churrasqueira, C...|df481ce5-65d0-4c1...|      2|     0|       Usado| Apartamento

In [38]:
# https://stackoverflow.com/questions/59235308/explode-two-pyspark-arrays-and-keep-elements-from-same-positions/59238352#59238352
transform_expr = "transform(tipo, (tipoVal, i) -> struct(tipoVal as tipo, iptu[i] as iptu, valor[i] as valor, condominio[i] as condominio))"

In [39]:
exploded_anuncio = anuncio_filtered.withColumn("merged_arrays", f.explode(f.expr(transform_expr))) \
    .withColumn("tipo", f.col("merged_arrays.tipo")) \
    .withColumn("iptu", f.col("merged_arrays.iptu"))\
    .withColumn("condominio", f.col("merged_arrays.condominio"))\
    .withColumn("valor", f.col("merged_arrays.valor"))\
    .drop('merged_arrays')

In [40]:
exploded_anuncio.count()

67183

In [41]:
exploded_anuncio.filter('tipo == "Aluguel"').count()

621

In [42]:
anuncios_venda = exploded_anuncio.filter('tipo == "Venda"')

In [43]:
anuncios_venda.count()

66562

In [44]:
anuncios_venda.show(5)

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+---------+----------+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|   bairro|      zona|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+---------+----------+----------+----+-----+-----+
|    3|        43|       43|        1|[Academia, Churra...|d2e3a3aa-09b5-45a...|      2|     0|       Usado| Apartamento|Residencial| [1]|Paciência|Zona Oeste|       245|null|Venda|15000|
|    2|        42|       42|        1|[Churrasqueira, P...|085bab2c-87ad-452...|      2|     0|       Usado| Apartamento|Residencial| [1]|Paciência|Zona Oeste|         0|   0|Venda|15000|
|    1|        41|       41|        1|[Portaria 24h, Co...|1

# Saving

In [83]:
anuncios_venda.write.parquet('C:\\Users\\Pichau\\Downloads\\semana-1\\anuncios_venda.parquet')

In [88]:
anuncios_venda\
    .withColumn("caracteristicas", f.col("caracteristicas").cast("string"))\
    .withColumn("vaga", f.col("vaga").cast("string"))\
    .write.options(header=True).csv('C:\\Users\\Pichau\\Downloads\\semana-1\\anuncios_venda.csv')

# Testing

In [84]:
anuncios_parquet = spark.read.parquet('C:\\Users\\Pichau\\Downloads\\semana-1\\anuncios_venda.parquet')

In [101]:
%%time
anuncios_parquet\
    .groupBy('tipo_unidade')\
    .agg(
        f.avg("andar").alias('andar'),
        f.avg("quartos")
    ).orderBy('andar').show()

+------------+------------------+------------------+
|tipo_unidade|             andar|      avg(quartos)|
+------------+------------------+------------------+
| Apartamento|2.3374598119046905|2.6121811243652533|
+------------+------------------+------------------+

CPU times: total: 0 ns
Wall time: 73 ms


In [96]:
anuncios_csv = spark.read.options(header=True).csv('C:\\Users\\Pichau\\Downloads\\semana-1\\anuncios_venda.csv')

In [100]:
%%time
anuncios_csv\
    .groupBy('tipo_unidade')\
    .agg(
        f.avg("andar").alias('andar'),
        f.avg("quartos")
    ).orderBy('andar').show()

+------------+------------------+------------------+
|tipo_unidade|             andar|      avg(quartos)|
+------------+------------------+------------------+
| Apartamento|2.3374598119046905|2.6121811243652533|
+------------+------------------+------------------+

CPU times: total: 0 ns
Wall time: 108 ms
