# **Alura Challenge Data Science - Week 1 - Transforming Data with PySpark**

In [1]:
# pip install pyspark

In [2]:
# Importando Bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, FloatType, StringType

## **Iniciando sessão spark**

In [3]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Challenge Data Science 2") \
    .getOrCreate()

In [4]:
spark

## **ETL PROCESS (EXTRACT, TRANSFORM, LOAD)**

### **Extract**

In [5]:
import zipfile

In [6]:
# Extraindo o arquivo zipado
zipfile.ZipFile('data/semana-1.zip', 'r').extractall('data')

In [7]:
# Lendo o arquivo zipado e Criando o DataFrame Spark
path = 'data/dataset_bruto.json'
imoveis_df = spark.read.option("multiLine","true").json(path)
imoveis_df.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

In [8]:
print('O DataFrame possui {} colunas'.format(imoveis_df.count()))

O DataFrame possui 89083 colunas


### **Transform**

#### *Dealing with array and dictionary columns*

For our analysis only the information inside the column `"anuncio"` are relevant, therefore we will just select it.

In [9]:
anuncio_df = imoveis_df.select("anuncio.*")
anuncio_df.show()

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    0|        []|     [16]|      [0]|                  []|{Centro, 20061003...|47d553e0-79f2-4a4...|    [0]|   [0]|       Usado|      Outros|  Comercial| [1]|[{260, 107, Venda...|
|    0|        []|     [14]|      [0]|                  []|{Centro, 20051040...|b6ffbae1-17f6-487...|    [0]|    []|       Usado|      Outros|  Comercial| [0]|[{260, 107, Venda...|
|    0|    [1026]|   [1026]|      [0]|                  []|{Maria da Graça, ...|1fb030a5-9e3e-4

In [10]:
anuncio_df.printSchema()   

root
 |-- andar: long (nullable = true)
 |-- area_total: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- area_util: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- banheiros: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- suites: array (nullable = true)
 |    |-- element: long (c

Transforming **`array`** columns into **`string`** columns

In [11]:
anuncio_df = anuncio_df\
            .withColumn("quartos",anuncio_df.quartos[0])\
            .withColumn("suites",anuncio_df.suites[0])\
            .withColumn("banheiros",anuncio_df.banheiros[0])\
            .withColumn("area_total",anuncio_df.area_total[0])\
            .withColumn("area_util",anuncio_df.area_util[0])\
            .withColumn("vaga",anuncio_df.vaga[0])

anuncio_df.show()

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    0|      null|       16|        0|                  []|{Centro, 20061003...|47d553e0-79f2-4a4...|      0|     0|       Usado|      Outros|  Comercial|   1|[{260, 107, Venda...|
|    0|      null|       14|        0|                  []|{Centro, 20051040...|b6ffbae1-17f6-487...|      0|  null|       Usado|      Outros|  Comercial|   0|[{260, 107, Venda...|
|    0|      1026|     1026|        0|                  []|{Maria da Graça, ...|1fb030a5-9e3e-4

Extracting the **bairro and zona** from the **`endereço`** column. And extracting the **condominio, iptu, tipo e valor** from the **`valores`** column.

In [35]:
final_df = anuncio_df\
                    .withColumn('bairro', anuncio_df['endereco']['bairro'])\
                    .withColumn('zona', anuncio_df['endereco']['zona'])\
                    .withColumn('condominio',anuncio_df['valores']['condominio'][0])\
                    .withColumn('iptu',anuncio_df['valores']['iptu'][0])\
                    .withColumn('tipo',anuncio_df['valores']['tipo'][0])\
                    .withColumn('valor',anuncio_df['valores']['valor'][0])\
                    .drop('endereco', 'valores')

final_df.toPandas()

Unnamed: 0,andar,area_total,area_util,banheiros,caracteristicas,id,quartos,suites,tipo_anuncio,tipo_unidade,tipo_uso,vaga,bairro,zona,condominio,iptu,tipo,valor
0,0,,16,0.0,[],47d553e0-79f2-4a46-9390-5a3c962740c2,0.0,0.0,Usado,Outros,Comercial,1.0,Centro,Zona Central,260,107,Venda,10000
1,0,,14,0.0,[],b6ffbae1-17f6-4870-9950-e998ac1c8d6a,0.0,,Usado,Outros,Comercial,0.0,Centro,Zona Central,260,107,Venda,10000
2,0,1026,1026,0.0,[],1fb030a5-9e3e-4a1d-93cd-cd2d8a215e11,0.0,,Usado,Outros,Comercial,0.0,Maria da Graça,Zona Norte,,1613,Venda,10000
3,0,120,120,0.0,"[Portão eletrônico, Condomínio fechado]",ac6cb348-69d6-45af-9589-dc34099370d8,0.0,,Usado,Outros,Residencial,,Campo Grande,Zona Oeste,80,,Venda,10000
4,0,3,3,0.0,[],e032b908-ef42-4d4a-8125-eba4792bacbe,0.0,,Usado,Outros,Residencial,,São Cristóvão,Zona Norte,0,0,Venda,5000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
89078,0,,219,4.0,[],10f8eb1a-89f8-4ba8-aae0-078a3661353f,4.0,4.0,Lançamento,Outros,Residencial,1.0,Barra da Tijuca,Zona Oeste,0,0,Venda,3900000
89079,0,,200,3.0,[],dca4c35b-ae00-486e-aeea-60b03957dfae,3.0,3.0,Lançamento,Apartamento,Residencial,1.0,Leblon,Zona Sul,0,0,Venda,4220000
89080,0,,130,3.0,[],c2cee5ec-1d04-4792-a9d0-7bf50824223d,2.0,2.0,Lançamento,Apartamento,Residencial,1.0,Lagoa,Zona Sul,0,0,Venda,4311297
89081,0,268,268,5.0,[],7f700a8e-b171-46ea-993b-461162f7afba,4.0,4.0,Lançamento,Apartamento,Residencial,4.0,Engenho de Dentro,Zona Norte,0,0,Venda,4520008


In [36]:
final_df.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: string (nullable = true)
 |-- area_util: string (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- condominio: string (nullable = true)
 |-- iptu: string (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: string (nullable = true)



#### *Changing columns type*

Some column types are not consistent. Therefore, we need to tweak them for later use.

string --> float:

- area_total
- area_util
- condominio
- iptu
- valor

array --> string:

- caracteristicas

In [37]:
final_df = final_df\
            .withColumn('area_total', final_df['area_total'].cast(FloatType()))\
            .withColumn('area_util', final_df['area_util'].cast(FloatType()))\
            .withColumn('condominio', final_df['condominio'].cast(FloatType()))\
            .withColumn('iptu', final_df['iptu'].cast(FloatType()))\
            .withColumn('valor', final_df['valor'].cast(FloatType()))

final_df.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: float (nullable = true)
 |-- area_util: float (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- condominio: float (nullable = true)
 |-- iptu: float (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: float (nullable = true)



#### *Filtering the DF*

We are going to filter the DF by:

- tipo_uso = **`Residencial`**;
- tipo_unidade = **`Apartamento`**;
- tipo_anuncio = **`Usado`**.
- tipo = **`Venda`**

But first lets understand the impact of these filter by creating a frequency table

In [38]:
final_df\
    .select('tipo_uso')\
    .groupBy('tipo_uso')\
    .count()\
    .orderBy('tipo_uso', 'count', ascending=False)\
    .show()

+-----------+-----+
|   tipo_uso|count|
+-----------+-----+
|Residencial|84541|
|  Comercial| 4542|
+-----------+-----+



In [39]:
final_df\
    .select('tipo_uso', 'tipo_unidade')\
    .groupBy('tipo_uso', 'tipo_unidade')\
    .count()\
    .orderBy('tipo_uso', 'tipo_unidade', 'count', ascending=[False, True, False])\
    .show()

+-----------+------------+-----+
|   tipo_uso|tipo_unidade|count|
+-----------+------------+-----+
|Residencial| Apartamento|66797|
|Residencial|        Casa|10227|
|Residencial|      Outros| 7517|
|  Comercial| Apartamento|    4|
|  Comercial|        Casa|   92|
|  Comercial|      Outros| 4446|
+-----------+------------+-----+



In [40]:
final_df\
    .select('tipo_uso', 'tipo_unidade', 'tipo_anuncio')\
    .groupBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio')\
    .count()\
    .orderBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'count', ascending=[False, True, False, False])\
    .show()

+-----------+------------+------------+-----+
|   tipo_uso|tipo_unidade|tipo_anuncio|count|
+-----------+------------+------------+-----+
|Residencial| Apartamento|       Usado|66562|
|Residencial| Apartamento|  Lançamento|  235|
|Residencial|        Casa|       Usado|10224|
|Residencial|        Casa|  Lançamento|    3|
|Residencial|      Outros|       Usado| 7502|
|Residencial|      Outros|  Lançamento|   15|
|  Comercial| Apartamento|       Usado|    4|
|  Comercial|        Casa|       Usado|   92|
|  Comercial|      Outros|       Usado| 4443|
|  Comercial|      Outros|  Lançamento|    3|
+-----------+------------+------------+-----+



In [41]:
final_df\
    .select('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'tipo')\
    .groupBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'tipo')\
    .count()\
    .orderBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'tipo', 'count', ascending=[False, True, False, False ,False])\
    .show()

+-----------+------------+------------+-------+-----+
|   tipo_uso|tipo_unidade|tipo_anuncio|   tipo|count|
+-----------+------------+------------+-------+-----+
|Residencial| Apartamento|       Usado|  Venda|66348|
|Residencial| Apartamento|       Usado|Aluguel|  214|
|Residencial| Apartamento|  Lançamento|  Venda|  235|
|Residencial|        Casa|       Usado|  Venda|10161|
|Residencial|        Casa|       Usado|Aluguel|   63|
|Residencial|        Casa|  Lançamento|  Venda|    3|
|Residencial|      Outros|       Usado|  Venda| 7402|
|Residencial|      Outros|       Usado|Aluguel|  100|
|Residencial|      Outros|  Lançamento|  Venda|   15|
|  Comercial| Apartamento|       Usado|  Venda|    4|
|  Comercial|        Casa|       Usado|  Venda|   87|
|  Comercial|        Casa|       Usado|Aluguel|    5|
|  Comercial|      Outros|       Usado|  Venda| 4014|
|  Comercial|      Outros|       Usado|Aluguel|  429|
|  Comercial|      Outros|  Lançamento|  Venda|    3|
+-----------+------------+--

In [42]:
filtered_final_df = final_df\
                        .filter((final_df['tipo_uso'] == 'Residencial') &
                                (final_df['tipo_unidade'] == 'Apartamento') & 
                                (final_df['tipo_anuncio'] == 'Usado') &
                                (final_df['tipo'] == 'Venda'))

In [43]:
filtered_final_df\
    .select('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'tipo')\
    .groupBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'tipo')\
    .count()\
    .orderBy('tipo_uso', 'tipo_unidade', 'tipo_anuncio', 'count', ascending=[False, True, False, False])\
    .show()

+-----------+------------+------------+-----+
|   tipo_uso|tipo_unidade|tipo_anuncio|count|
+-----------+------------+------------+-----+
|Residencial| Apartamento|       Usado|66348|
+-----------+------------+------------+-----+



In [44]:
filtered_final_df.show()

+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+----------+----+-----+-------+
|andar|area_total|area_util|banheiros|     caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|              bairro|      zona|condominio|iptu| tipo|  valor|
+-----+----------+---------+---------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+----------+----+-----+-------+
|    3|      43.0|     43.0|        1|[Academia, Churra...|d2e3a3aa-09b5-45a...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|     245.0|null|Venda|15000.0|
|    2|      42.0|     42.0|        1|[Churrasqueira, P...|085bab2c-87ad-452...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|       0.0| 0.0|Venda|1500

In [45]:
filtered_final_df.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: float (nullable = true)
 |-- area_util: float (nullable = true)
 |-- banheiros: long (nullable = true)
 |-- caracteristicas: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- quartos: long (nullable = true)
 |-- suites: long (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: long (nullable = true)
 |-- bairro: string (nullable = true)
 |-- zona: string (nullable = true)
 |-- condominio: float (nullable = true)
 |-- iptu: float (nullable = true)
 |-- tipo: string (nullable = true)
 |-- valor: float (nullable = true)



#### *Changing columns name*

In [46]:
dict_columns = {
        'andar' : 'floor',
        'area_total' : 'total_area',
        'area_util' : 'useful_area',
        'banheiros': 'toilets',
        'caracteristicas' : 'features',
        'quartos' : 'bedrooms',
        'suites' : 'suites',
        'tipo_anuncio' : 'ad_type',
        'tipo_unidade' : 'unit_type',
        'tipo_uso' : 'use_type',
        'vaga' : 'car_slot',
        'bairro' : 'district',
        'zona' : 'zone',
        'condomino' : 'condominium',
        'iptu' : 'iptu',
        'tipo' : 'type',
        'valor' : 'value'
}

In [47]:
for key, value in dict_columns.items():
    filtered_final_df = filtered_final_df.withColumnRenamed(key, value)

In [48]:
filtered_final_df.show(2)

+-----+----------+-----------+-------+--------------------+--------------------+--------+------+-------+-----------+-----------+--------+---------+----------+----------+----+-----+-------+
|floor|total_area|useful_area|toilets|            features|                  id|bedrooms|suites|ad_type|  unit_type|   use_type|car_slot| district|      zone|condominio|iptu| type|  value|
+-----+----------+-----------+-------+--------------------+--------------------+--------+------+-------+-----------+-----------+--------+---------+----------+----------+----+-----+-------+
|    3|      43.0|       43.0|      1|[Academia, Churra...|d2e3a3aa-09b5-45a...|       2|  null|  Usado|Apartamento|Residencial|       1|Paciência|Zona Oeste|     245.0|null|Venda|15000.0|
|    2|      42.0|       42.0|      1|[Churrasqueira, P...|085bab2c-87ad-452...|       2|  null|  Usado|Apartamento|Residencial|       1|Paciência|Zona Oeste|       0.0| 0.0|Venda|15000.0|
+-----+----------+-----------+-------+-----------------

### **Load**

Saving the **`filtered_final_df`** in csv and in parquet. Then comparing reading performance for both.

*Parquet*

In [49]:
filtered_final_df.write.parquet(
    path = 'data/parquet',
    mode = 'overwrite'
)

CSV

In [51]:
# we need to transforma the features column DataType, which is an array, to String to be able to save de DataFrame as csv
filtered_final_df_csv = filtered_final_df.withColumn('features', filtered_final_df['features'].cast(StringType()))

In [53]:
filtered_final_df_csv.write.csv(
    path='data/csv',
    mode='overwrite',
    sep=';',
    header=True
)

Comparing performance

In [54]:
%%time

path = 'data/csv/part-00000-4340dfbc-acf6-4ad3-8cc2-6ec617ebd94a-c000.csv'
data = spark.read.csv(path, sep = ';', inferSchema=True)

CPU times: user 4.71 ms, sys: 7.62 ms, total: 12.3 ms
Wall time: 3.09 s


In [55]:
%%time

path = 'data/parquet/part-00000-8bc019cf-aa0b-4e9e-98c7-a3ac5f08fe01-c000.snappy.parquet'
data = spark.read.parquet(path)

CPU times: user 3.14 ms, sys: 3.56 ms, total: 6.7 ms
Wall time: 418 ms


As we can exist a difference in performance between parquet and csv files. Parquet have a better aggregation performance than csv files.