In [1]:
# Importando as bibliotecas
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, cast, lower, isnan, when, count, to_timestamp
from pyspark.sql.types import StringType
from sqlalchemy import create_engine

## Carregando os dados
Nesta etapa, irei carregar os dados.

In [2]:
# Iniciando sessão no Spark
spark = SparkSession.builder.appName('Cluster').getOrCreate()


23/05/11 16:47:25 WARN Utils: Your hostname, daniel-VJFE43F11X-XXXXXX resolves to a loopback address: 127.0.1.1; using 192.168.0.157 instead (on interface wlo1)
23/05/11 16:47:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/11 16:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Lendos os dados csv
customers = spark.read.csv("../data/raw/olist_customers_dataset.csv", header=True)
geolocalization = spark.read.csv("../data/raw/olist_geolocation_dataset.csv", header=True)
order_items = spark.read.csv("../data/raw/olist_order_items_dataset.csv", header=True)
order_payments = spark.read.csv("../data/raw/olist_order_payments_dataset.csv", header=True)
order_reviews = spark.read.csv("../data/raw/olist_order_reviews_dataset.csv", header=True)
orders = spark.read.csv("../data/raw/olist_orders_dataset.csv", header=True)
products = spark.read.csv("../data/raw/olist_products_dataset.csv", header=True)
sellers = spark.read.csv("../data/raw/olist_sellers_dataset.csv", header=True)
product_category = spark.read.csv("../data/raw/product_category_name_translation.csv", header=True)

## Transformando os dados
Nesta etapa, irei realizar uma série de transformações nos dados. Antes disso, irei unir as tabelas conforme o indicado no diagrama de relacionamento, e só após isso, realizar  as transformações.

### Juntando os dados

In [4]:
# Criando tabelas temporárias
customers.createOrReplaceTempView("customers")
geolocalization.createOrReplaceTempView('geolocalization')
order_items.createOrReplaceTempView('order_items')
order_payments.createOrReplaceTempView('order_payments')
order_reviews.createOrReplaceTempView('order_reviews')
orders.createOrReplaceTempView('orders')
products.createOrReplaceTempView('products')
sellers.createOrReplaceTempView('sellers')
product_category.createOrReplaceTempView('product_category')

In [5]:
# Unindo as tabelas
orders_full = spark.sql("WITH geo_sellers AS  \
                        ( \
                        SELECT s.seller_zip_code_prefix, s.seller_city, s.seller_id,\
                        s.seller_state, g.geolocation_lat, g.geolocation_lng \
                        FROM sellers s  \
                        FULL OUTER JOIN geolocalization g  \
                        ON s.seller_zip_code_prefix = g.geolocation_zip_code_prefix \
                        ),  \
                        item_info AS \
                        ( \
                        SELECT oi.order_id, oi.order_item_id, oi.product_id, \
                        oi.seller_id, oi.shipping_limit_date, oi.price, \
                        oi.freight_value, p.product_category_name, \
                        p.product_name_lenght, p.product_description_lenght,  \
                        p.product_photos_qty, p.product_weight_g,  \
                        p.product_length_cm, p.product_height_cm, p.product_width_cm  \
                        FROM order_items oi  \
                        LEFT JOIN products p  \
                        ON oi.product_id = p.product_id \
                        )  \
                        SELECT o.order_id, o.customer_id, ii.product_id,  \
                        o.order_status, o.order_estimated_delivery_date, \
                        o.order_purchase_timestamp, o.order_approved_at,  \
                        o.order_delivered_carrier_date, o.order_delivered_customer_date,  \
                        or.review_score, or.review_comment_title, or.review_comment_message,  \
                        or.review_creation_date, or.review_answer_timestamp,  \
                        op.payment_sequential, op.payment_type, op.payment_installments,  \
                        op.payment_value, c.customer_city, c.customer_state,  \
                        gs.seller_city, gs.seller_state, ii.shipping_limit_date, ii.price, \
                        ii.freight_value, ii.product_category_name, \
                        ii.product_name_lenght, ii.product_description_lenght,  \
                        ii.product_photos_qty, ii.product_weight_g,  \
                        ii.product_length_cm, ii.product_height_cm, ii.product_width_cm  \
                        FROM orders o \
                        LEFT JOIN order_reviews or  \
                        ON o.order_id = or.order_id  \
                        LEFT JOIN order_payments op  \
                        ON o.order_id = op.order_id  \
                        LEFT JOIN customers c  \
                        ON o.customer_id = c.customer_id  \
                        LEFT JOIN item_info ii   \
                        ON o.order_id = ii.order_id  \
                        LEFT JOIN geo_sellers gs  \
                        ON ii.seller_id = gs.seller_id")

In [6]:
# Checando o número de linhas
orders_full.count()

                                                                                

17094341

In [7]:
# Checando a quantidade de colunas
len(orders_full.columns)

33

In [8]:
# Buscando e dropando duplicatas
orders_full = orders_full.drop_duplicates()

In [9]:
# Checando a quantidade de linhas novamente
orders_full.count()

23/05/11 16:47:42 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

108500

In [10]:
# Salvando os dados
orders_full.write.format('parquet').mode('overwrite').save('../data/interim/orders_joined')

23/05/11 16:48:21 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

Agora com a junção já feita, vamos passar para a fase de tratamento dos dados.
Como a junção foi feita selecionando as colunas, não há colunas duplicadas no dataset.

### Tratando os dados
Nesta etapa, passarei de coluna a coluna tratando o dado conforme necessário.

In [11]:
# Checando as colunas
orders_full.columns

['order_id',
 'customer_id',
 'product_id',
 'order_status',
 'order_estimated_delivery_date',
 'order_purchase_timestamp',
 'order_approved_at',
 'order_delivered_carrier_date',
 'order_delivered_customer_date',
 'review_score',
 'review_comment_title',
 'review_comment_message',
 'review_creation_date',
 'review_answer_timestamp',
 'payment_sequential',
 'payment_type',
 'payment_installments',
 'payment_value',
 'customer_city',
 'customer_state',
 'seller_city',
 'seller_state',
 'shipping_limit_date',
 'price',
 'freight_value',
 'product_category_name',
 'product_name_lenght',
 'product_description_lenght',
 'product_photos_qty',
 'product_weight_g',
 'product_length_cm',
 'product_height_cm',
 'product_width_cm']

In [12]:
# Checando o schema
orders_full.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- review_score: string (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)
 |-- payment_sequential: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: string (nullable = true)
 |-- payment_value: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)


#### order_Status


In [13]:
# Checando os valores
orders_full.select('order_status').distinct().collect()

[Row(order_status='shipped'),
 Row(order_status='canceled'),
 Row(order_status='invoiced'),
 Row(order_status='created'),
 Row(order_status='delivered'),
 Row(order_status='unavailable'),
 Row(order_status='processing'),
 Row(order_status='approved')]

A coluna em questão não demonstra ter erros, onde os valores estão na mesma case
 e sem inconsistências. Além disso, o tipo do dado é **string**, o que 
 corresponde com os valores da coluna.

### order_estimated_delivery_date

In [14]:
orders_full.select('order_estimated_delivery_date').show()



+-----------------------------+
|order_estimated_delivery_date|
+-----------------------------+
|          2018-07-16 00:00:00|
|          2018-01-17 00:00:00|
|          2017-06-27 00:00:00|
|          2017-12-15 00:00:00|
|          2018-05-15 00:00:00|
|          2017-10-19 00:00:00|
|          2018-05-23 00:00:00|
|          2018-08-20 00:00:00|
|          2018-02-09 00:00:00|
|          2018-08-23 00:00:00|
|          2018-04-16 00:00:00|
|          2017-12-21 00:00:00|
|          2017-06-02 00:00:00|
|          2017-07-21 00:00:00|
|          2018-02-21 00:00:00|
|          2017-11-03 00:00:00|
|          2018-04-19 00:00:00|
|          2018-03-22 00:00:00|
|          2017-12-18 00:00:00|
|          2018-03-12 00:00:00|
+-----------------------------+
only showing top 20 rows



                                                                                

Como podemos ver, os valores dessa coluna se referem a data estimada da entrega, sem hora e nem minutos. Por isso, será transformada em apenas **data** e não **timestamp**.

In [15]:
# Alterando o tipo para data
orders_full = orders_full.withColumn("order_estimated_delivery_date", col("order_estimated_delivery_date").cast('date'))


### order_purchase_timestamp, order_approved_at, order_delivered_carrier_date, order_delivered_customer_date
Aqui 4 colunas serão tratadas juntas, pois demandam o mesmo tratamento, **string** para **timestamp**.

In [16]:
# Criando uma lista com as colunas
colunas = ['order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date']

In [17]:
# Realizando a transformação em cada coluna
for coluna in colunas:
    orders_full = orders_full.withColumn(coluna, to_timestamp(coluna))

### review_score
A princípio essa coluna parece estar com o tipo errado, vamos verificar os valores:

In [18]:
# Checando os valores distintos
orders_full.select('review_score').distinct().show()

+------------+
|review_score|
+------------+
|           3|
|        null|
|           5|
|           1|
|           4|
|           2|
+------------+



In [19]:
# Alterando o tipo para data
orders_full = orders_full.withColumn("review_score", col("review_score").cast('int'))


### review_comment_title

In [20]:
# Checando os valores distintos
orders_full.select('review_comment_title').show()



+--------------------+
|review_comment_title|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                ruim|
|                null|
|                null|
|        Kit muai tai|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



                                                                                

Aqui o formato parece estar correto, mas irei limpar o texto, retirando pontuações e normalizando o texto (mesma case).

In [21]:
# regex para remover pontuações
re_special = "[^\w\s]"

In [22]:
# Criando funções para remover caracteres especiais
func = lambda titulo: re.sub(re_special, ' ', str(titulo).strip())
func_udf = udf(func, StringType())

In [23]:
# Aplicando a função
orders_full = orders_full.withColumn('review_comment_title', func_udf(lower('review_comment_title')))

In [25]:
# Checando os valores distintos
orders_full.select('review_comment_title').show()



+--------------------+
|review_comment_title|
+--------------------+
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
|                ruim|
|                None|
|                None|
|        kit muai tai|
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
|                None|
+--------------------+
only showing top 20 rows



                                                                                

### review_comment_message

Como resultado final, nós temos:

root <br>
 |-- order_status: string (nullable = true) <br>
 |-- order_estimated_delivery_date: string (nullable = true) <br>
 |-- order_purchase_timestamp: string (nullable = true) <br>
 |-- order_approved_at: string (nullable = true) <br>
 |-- order_delivered_carrier_date: string (nullable = true) <br>
 |-- order_delivered_customer_date: string (nullable = true) <br>
 |-- review_score: string (nullable = true) <br>
 |-- review_comment_title: string (nullable = true) <br>
 |-- review_comment_message: string (nullable = true) <br>
 |-- review_creation_date: string (nullable = true) <br>
 |-- review_answer_timestamp: string (nullable = true) <br>
 |-- payment_sequential: string (nullable = true) <br> 
 |-- payment_type: string (nullable = true) <br>
 |-- payment_installments: string (nullable = true) <br>
 |-- payment_value: string (nullable = true) <br>
 |-- customer_city: string (nullable = true) <br>
 |-- customer_state: string (nullable = true)  <br>
 |-- seller_city: string (nullable = true) <br>
 |-- seller_state: string (nullable = true) <br>
 |-- geolocation_lat: string (nullable = true) <br>
 |-- geolocation_lng: string (nullable = true) <br>
 |-- shipping_limit_date: string (nullable = true) <br>
 |-- price: string (nullable = true) <br>
 |-- freight_value: string (nullable = true) <br>
 |-- product_category_name: string (nullable = true) <br>
 |-- product_name_lenght: string (nullable = true) <br>
 |-- product_description_lenght: string (nullable = true) <br>
 |-- product_photos_qty: string (nullable = true) <br>
 |-- product_weight_g: string (nullable = true) <br>
 |-- product_length_cm: string (nullable = true) <br>
 |-- product_height_cm: string (nullable = true) <br>
 |-- product_width_cm: string (nullable = true) <br>

 É perceptível que todos as colunas estão no formato de **string**, onde as 
 seguintes colunas demandam mudança:
 - order_estimated_delivery_date: STR --> DATE
 - order_purchase_timestamp: STR --> DATE
 - order_purchase_timestamp: STR --> DATE
 - order_approved_at: STR --> DATE
 - order_delivered_carrier_date: STR --> DATE
 - order_delivered_customer_date: STR --> DATE



In [18]:
order_reviews.show(15)

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+
|7bc2406110b926393...|73fc7af87114b3971...|           4|                null|                  null| 2018-01-18 00:00:00|    2018-01-18 21:46:59|
|80e641a11e56f04c1...|a548910a1c6147796...|           5|                null|                  null| 2018-03-10 00:00:00|    2018-03-11 03:05:13|
|228ce5500dc1d8e02...|f9e4b658b201a9f2e...|           5|                null|                  null| 2018-02-17 00:00:00|    2018-02-18 14:36:24|
|e64fb393e7b32834b...|658677c97b385a9be...|           5|                null|  Recebi bem antes ...| 2017-04-21 00:00:00|   

In [19]:
order_reviews.select('review_score').distinct().collect()

[Row(review_score='2018-01-27 20:03:09'),
 Row(review_score='2017-07-09 00:33:40'),
 Row(review_score='2017-10-21 01:12:49'),
 Row(review_score='2018-04-16 19:34:18'),
 Row(review_score='2017-03-28 22:38:46'),
 Row(review_score='2018-05-04 02:18:31'),
 Row(review_score='2017-12-14 20:09:47'),
 Row(review_score='2018-01-30 16:02:47'),
 Row(review_score='2018-01-04 17:32:51'),
 Row(review_score=' alias comprei justamente pelo prazo de entrega."'),
 Row(review_score='2018-06-17 20:29:17'),
 Row(review_score='2017-02-10 03:19:21'),
 Row(review_score='2018-05-04 22:37:24'),
 Row(review_score='2017-04-21 00:00:00'),
 Row(review_score='2017-03-19 22:44:11'),
 Row(review_score='2018-08-30 23:20:15'),
 Row(review_score='2018-09-03 11:26:24'),
 Row(review_score=' fica tudo engovinhado gostaria de trocar produto.'),
 Row(review_score='2017-10-08 20:42:50'),
 Row(review_score='2016-10-28 12:21:53'),
 Row(review_score='2018-04-01 00:27:51'),
 Row(review_score='2018-03-14 11:39:53'),
 Row(review_sco

#### Criando novas features

Agora que a tabela já parece estar boa, irei criar uma nova feature a partir da 
coluna ``customer_state``, criando categorias para os estados, onde cada estado 
vai receber a região em que se localiza no Brasil.

In [12]:
# Criando lista de regiões  
estados_norte = ['AC', 'AM', 'RO', 'RR', 'AP', 'TO', 'PA']
estados_nordeste = ['MA', 'PI', 'CE', 'RN', 'PB', 'PE', 'AL', 'BA', 'SE']
estados_centro = ['MT', 'MS', 'DF', 'GO']
estados_sudeste = ['MG', 'SP', 'ES', 'RJ']
estados_sul = ['PR', 'SC', 'RS']

# Criando função 
func = lambda estado: 'NORTE' if estado in estados_norte else ('NORDESTE' if estado in estados_nordeste else ('CENTRO-OESTE' if estado in estados_centro else ('SUDESTE' if estado in estados_sudeste else 'SUL')))
func_udf = udf(func, StringType())

In [13]:
# Aplicando a função
customers = customers.withColumn("Regiao", func_udf("customer_state"))

## Salvando os dados

In [7]:
# Criando a engine de um banco MySql
engine = create_engine('mysql+pymysql://root:123456@localhost:3306')