# PySpark - Fundamentos e aplicações

A ideia deste *notebook* é apresentar os fundamentos de uso da API *Python* para *Spark*. Vamos verificar os principais comandos, e entender o que são ações, transformações, execução *lazy* e as famosas DAG's geradas pelo *Spark* (essa última parte de uma forma mais superficial).

Tudo isso será feito dentro de um ambiente *Databricks*, o qual conta com algumas funcionalidades que facilitam nosso trabalho diário.

Após essa breve introdução e explanação dos conceitos, nós iremos simular uma utilização em um problema de negócio, para facilitar a assimilação de tudo que foi visto.

## Pontos importantes sobre essa interface

A aba na lateral direita nos permite:

- criar *notebooks*, a partir da opção *create*;
- importar dados, também na opção *create*. Essa importação é feita a partir de uma interface gráfica, muito simples de ser utilizada (vale a pena o teste);
- criação de um *cluster* na opção *compute*. Todo *notebook* deve ser associado a um *cluster* para execução.

## Dados utilizados

Os dados utilizados estão dispobilizados no *Kaggle*:

- Link para *download*: [Conjunto de dados](https://www.kaggle.com/olistbr/brazilian-ecommerce?select=olist_order_payments_dataset.csv)

Nesse projeto vamos utilizar as tabelas:

- *olist_customers_dataset.csv*;
- *olist_order_payments_dataset.csv*;
- *olist_orders_dataset.csv*.

### Contexto dos dados

"*This dataset was generously provided by Olist, the largest department store in Brazilian marketplaces. Olist connects small businesses from all over Brazil to channels without hassle and with a single contract. Those merchants are able to sell their products through the Olist Store and ship them directly to the customers using Olist logistics partners. After a customer purchases the product from Olist Store a seller gets notified to fulfill that order. Once the customer receives the product, or the estimated delivery date is due, the customer gets a satisfaction survey by email where he can give a note for the purchase experience and write down some comments.*"

#### Link entre os dados

![databases](https://i.imgur.com/HRhd2Y0.png)

## Referências úteis

- Documentação: [Documentação PySpark](https://spark.apache.org/docs/latest/api/python/index.html);
- Criando tabelas: [Criação de tabelas no databricks](https://docs.databricks.com/data/tables.html).

In [0]:
import pyspark.sql.types as T         #Define os tipos nativos do PySpark
import pyspark.sql.functions as F     #Importa as funções nativas do Spark para manipulação dos dados
from pyspark.sql.window import Window #Importa a função utilizada para criação de janelas

# Lendo dados importados para o nosso sistema de arquivos

In [0]:
# Por enquanto não vamos entrar em detalhes sobre o que esses dados significam
df = table("orders")

display(df)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


Como podemos verificar, o *Databricks* traz dois comandos muitos úteis para leitura e apresentação dos dados:
  
  
 - **table**: esse comando lê um arquivo gravado no sistema de arquivos do nosso *cluster*;
 - **display**: esse comando apresenta os dados de uma forma estilizada.

# Verificando o esquema (tipo de dados em cada uma das colunas)

In [0]:
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)



# Descrição básica do conjunto de dados

In [0]:
display(df.describe())

summary,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
count,99441,99441,99441,99441,99281,97658,96476,99441
mean,,,,,,,,
stddev,,,,,,,,
min,00010242fe8c5a6d1ba2dd792cb16214,00012a2ce6f8dcda20d059ce98491703,approved,2016-09-04 21:15:19,2016-09-15 12:16:38,2016-10-08 10:34:01,2016-10-11 13:46:32,2016-09-30 00:00:00
max,fffe41c64501cc87c801fd61db3f6244,ffffe8b65bbe3087b653a978c870db99,unavailable,2018-10-17 17:30:18,2018-09-03 17:40:06,2018-09-11 19:48:28,2018-10-17 13:22:46,2018-11-12 00:00:00


In [0]:
n_rows = df.count()

print(f"Number of rows in this dataframe: {n_rows}.")

Number of rows in this dataframe: 99441.


# Selecionando colunas

In [0]:
df.select("order_id", "customer_id")

Out[6]: DataFrame[order_id: string, customer_id: string]

### UÉ!? Cada a tabela?

Aqui entramos em um ponto interessante sobre o *Spark*: Ele é **lazy evaluation**, isso significa que ele só apresentará os resultados caso mandemos explicitamente, por meio de uma ação (que avisa ao *Spark* para realizar a computação e devolver os resultados ao *driver*).

Além de ações o Spark também tem transformações (que nunca geram uma saída, somente criam um novo conjunto de dados).

#### Para executar uma transformação propriamente dita, precisamos de uma ação.

Isso se deve porque o *Spark*, internamente, gera um *DAG* (grafo acíclico direcionado), com todas as transformações que realizarmos no nosso conjunto de dados. Isso serve para possibilitar que o mecanismo de otimizção de *queries* opere, definindo o plano de execução físico mais apropriado para nossa *query* de acordo com os passo que definimos.

##### Alguns exemplos de ações:

- count;
- show.


##### Alguns exemplos de transformações:

- select;
- filter;
- groupby.

In [0]:
# Display chama internamente uma ação show, por isso vemos o resultado da nossa transformação select
display(df.select("order_id", "customer_id"))

order_id,customer_id
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d


Podemos ver que o método *.select* é utilizado para realizar a seleção de colunas no conjunto de dados. Outra coisa que podemos notar é a proximidade entre a sintaxe do *PySpark* e a sintaxe do *SQL*.

In [0]:
#Também podemos utilizar funções dentro do nosso método .select além de darmos um alias para o resultado
display(df.select(F.countDistinct(F.col("order_status")).alias("count_distinct_order_status")))

count_distinct_order_status
8


# Filtrando colunas

In [0]:
display(df.filter(F.col("order_status") == "invoiced"))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
0760a852e4e9d89eb77bf631eaaf1c84,d2a79636084590b7465af8ab374a8cf5,invoiced,2018-08-03 17:44:42,2018-08-07 06:15:14,,,2018-08-21 00:00:00
38b7efdf33dd5561f4f5d4f6e07b0414,021e84751ba0ead75b6d314a6ead88d9,invoiced,2017-08-01 18:17:41,2017-08-01 18:32:30,,,2017-08-28 00:00:00
51b0dccc8596ce37a930dff2d63a10a2,31bf1057b00f14804278590bbac18b1b,invoiced,2017-05-05 22:34:48,2017-05-05 22:45:12,,,2017-06-06 00:00:00
5504eaa5a86eb25fa666cf2e6b96c701,a30be38e1ed0ffb39b318bf1d15e98d9,invoiced,2017-11-29 08:33:06,2017-11-29 08:56:23,,,2017-12-20 00:00:00
c3a6dc9afef5794ea4a867b5a18e0ad1,0c305f688ce08dc9402a2164fd6f29d6,invoiced,2017-11-25 14:10:20,2017-11-25 17:32:57,,,2017-12-19 00:00:00
566c5bff1142c9d6003a83fd2e8ad1e5,892022d497c9598ce4a313f64ceaa87e,invoiced,2017-11-23 18:03:23,2017-11-23 18:25:18,,,2017-12-18 00:00:00
b4c9f083bce61caf47fb38f9ba058bce,6ce60503e530d8a67926e9b8e3fdb635,invoiced,2017-08-05 16:00:56,2017-08-05 16:15:16,,,2017-08-30 00:00:00
f029d5966aa03c87e17e2482f86202b1,ece9fb6661a1dc0cd93f5f8c9cf13b83,invoiced,2017-10-23 21:44:36,2017-10-23 21:56:09,,,2017-11-14 00:00:00
6a828279d73f18174dd3b9c653d5213d,c994647aae9a45b8ae6eb75b77354ed3,invoiced,2017-06-25 17:54:26,2017-06-25 18:05:22,,,2017-07-14 00:00:00


Algumas pessoas utilizam o método *.where* em vez do método *.filter*. Ambos fazem a mesma coisa, sendo o *.where* definido como um *alias* para o método *.filter*.

##### Agora que passamos por alguns comandos, vamos ver algumas variações que a sintaxe permite:

In [0]:
#Algumas pessoas utilizam a seguinte sintaxe:
display(df.filter('order_status == "invoiced"'))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
0760a852e4e9d89eb77bf631eaaf1c84,d2a79636084590b7465af8ab374a8cf5,invoiced,2018-08-03 17:44:42,2018-08-07 06:15:14,,,2018-08-21 00:00:00
38b7efdf33dd5561f4f5d4f6e07b0414,021e84751ba0ead75b6d314a6ead88d9,invoiced,2017-08-01 18:17:41,2017-08-01 18:32:30,,,2017-08-28 00:00:00
51b0dccc8596ce37a930dff2d63a10a2,31bf1057b00f14804278590bbac18b1b,invoiced,2017-05-05 22:34:48,2017-05-05 22:45:12,,,2017-06-06 00:00:00
5504eaa5a86eb25fa666cf2e6b96c701,a30be38e1ed0ffb39b318bf1d15e98d9,invoiced,2017-11-29 08:33:06,2017-11-29 08:56:23,,,2017-12-20 00:00:00
c3a6dc9afef5794ea4a867b5a18e0ad1,0c305f688ce08dc9402a2164fd6f29d6,invoiced,2017-11-25 14:10:20,2017-11-25 17:32:57,,,2017-12-19 00:00:00
566c5bff1142c9d6003a83fd2e8ad1e5,892022d497c9598ce4a313f64ceaa87e,invoiced,2017-11-23 18:03:23,2017-11-23 18:25:18,,,2017-12-18 00:00:00
b4c9f083bce61caf47fb38f9ba058bce,6ce60503e530d8a67926e9b8e3fdb635,invoiced,2017-08-05 16:00:56,2017-08-05 16:15:16,,,2017-08-30 00:00:00
f029d5966aa03c87e17e2482f86202b1,ece9fb6661a1dc0cd93f5f8c9cf13b83,invoiced,2017-10-23 21:44:36,2017-10-23 21:56:09,,,2017-11-14 00:00:00
6a828279d73f18174dd3b9c653d5213d,c994647aae9a45b8ae6eb75b77354ed3,invoiced,2017-06-25 17:54:26,2017-06-25 18:05:22,,,2017-07-14 00:00:00


### Também podemos utilizar código SQL, de duas formas:

In [0]:
%sql

SELECT * FROM orders

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


In [0]:
#Também podemos executar código SQL dessa forma
spark.sql("SELECT * FROM orders")

Out[6]: DataFrame[order_id: string, customer_id: string, order_status: string, order_purchase_timestamp: string, order_approved_at: string, order_delivered_carrier_date: string, order_delivered_customer_date: string, order_estimated_delivery_date: string]

Dependendo de como você carregar os seus dados, as tabelas precisarão ser registradas como uma *view*:

```
df.createOrReplaceTempView("test")
```

E após isso suas *queries* podem ser executadas normalmente.

**OBS.:** Essa *view* tem duração atrelada a sua *SparkSession* (que não entraremos em detalhes nesse tutorial).

# Criando colunas

Para criamos colunas vamos utilizar o método *.withColumn*. Esse método necessita de dois parâmetros, o nome da nova coluna e o conteúdo da mesma.

In [0]:
#Criando uma coluna test e preenchedo ela com o LITERAL 1 (importante notar isso)
display(df.withColumn("test", F.lit(1)))

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,test
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,1
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,1
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,1
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,1
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,1
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,1
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,1
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,1
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00,1
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00,1


Para utilizarmos as funções nativas do *PySpark* vamos criar uma coluna igual ao ano que a ordem de compra foi criada.

In [0]:
df = df.withColumn("order_purchase_year", F.year(F.col("order_purchase_timestamp")))

display(df)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_year
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,2017
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,2018
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,2018
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,2017
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,2018
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,2017
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,2017
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,2017
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00,2017
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00,2017


In [0]:
# Também podemos realizar outras operações, como a subtração entre duas datas
df = df.withColumn("diff_in_days_between_purchase_and_approved", F.datediff(F.col("order_approved_at"), F.col("order_purchase_timestamp")))

display(df)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_year,diff_in_days_between_purchase_and_approved
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,2017,0
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,2018,2
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,2018,0
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,2017,0
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,2018,0
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,2017,0
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,2017,2
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,2017,0
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00,2017,2
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00,2017,0


Todas as função nativas do *PySpark* estão dentro do módulo *functions*, vale a pena explorar.

## Vamos brincar um pouco com a otimização de *queries* do *Spark*?

Para isso vamos simular um conjunto de transformações simples:

- Criamos uma coluna *test* e atribuímos a ela o valor 10;
- Após isso adicionamos 20 ao valor presente na coluna;
- Por fim, adicionamos 30 ao valor presente na coluna.

In [0]:
temp_df = table("orders")

temp_df = temp_df.withColumn("test", F.lit(10))
temp_df = temp_df.withColumn("test", F.col("test") + F.lit(20))
temp_df = temp_df.withColumn("test", F.col("test") + F.lit(30))

temp_df.explain(mode="extended")

== Parsed Logical Plan ==
'Project [order_id#1231, customer_id#1232, order_status#1233, order_purchase_timestamp#1234, order_approved_at#1235, order_delivered_carrier_date#1236, order_delivered_customer_date#1237, order_estimated_delivery_date#1238, ('test + 30) AS test#2368]
+- Project [order_id#1231, customer_id#1232, order_status#1233, order_purchase_timestamp#1234, order_approved_at#1235, order_delivered_carrier_date#1236, order_delivered_customer_date#1237, order_estimated_delivery_date#1238, (test#2348 + 20) AS test#2358]
   +- Project [order_id#1231, customer_id#1232, order_status#1233, order_purchase_timestamp#1234, order_approved_at#1235, order_delivered_carrier_date#1236, order_delivered_customer_date#1237, order_estimated_delivery_date#1238, 10 AS test#2348]
      +- SubqueryAlias spark_catalog.default.orders
         +- Relation[order_id#1231,customer_id#1232,order_status#1233,order_purchase_timestamp#1234,order_approved_at#1235,order_delivered_carrier_date#1236,order_deliv

Nosso plano lógico em detalhes (o plano lógico é a *query* que você escreveu):

**OBS.: Deve ser lido de baixo para cima.**

```
== Analyzed Logical Plan ==
Project [(test#4207 + 30) AS test#4217]
+- Project [(test#4197 + 20) AS test#4207]
   +- Project [ 10 AS test#4197]
```


Nosso plano físico (otimizado):

```
*(1) Project [60 AS test#4217]
```

O otimizador substitui nossas três operações por uma resultante, mais simplificada (que no caso é basicamente atribuir o valor 60 a coluna *test*).

###Trabalhando com GroupBy

In [0]:
#Podemos realizar contagens da quantidade de linhas em cada grupo dessa forma
display(df.groupby("order_status").count())

order_status,count
shipped,1107
canceled,625
invoiced,314
created,5
delivered,96478
unavailable,609
processing,301
approved,2


In [0]:
# Outra forma de reescrevermos o código acima é:
display(df.groupby("order_status").agg(F.count(F.col("*"))))

order_status,count(1)
shipped,1107
canceled,625
invoiced,314
created,5
delivered,96478
unavailable,609
processing,301
approved,2


In [0]:
# Renomeando a coluna com a contagem
display(df.groupby("order_status").agg(F.count(F.col("*")).alias("count")))

order_status,count
shipped,1107
canceled,625
invoiced,314
created,5
delivered,96478
unavailable,609
processing,301
approved,2


*groupby* utilizando o método *.agg* é muito mais geral, podendo ser utilizado em diversas situações. Reparem o uso do método *.alias* para renomear a coluna.

#Joins

*Joins* são extremamente importantes para quem trabalha com bancos de dados. Por sorte, o *PySpark* fornece uma forma concisa e elegante de se trabalhar com esse tipo de operação.

O método que devemos utilizar é o *.join*.

Esse método nos permite definir:

- As colunas que serão utilizadas para o *join* a partir do parâmetro *on* (esse parâmetro também aceita condições lógicas, podendo ser usado para criação de *joins* condicionais);
- O tipo de *join* (*left*, *right* etc).

Para esse exemplo vamos carregar a tabela *customers*.

- Vamos carregar essa tabela da forma clássica (sem o uso da função *table*).

In [0]:
#Repare que passamos a opção "header". Para alterar o separador utilizaríamos a opção "delimiter" e o delimitador necessário (que por padrão é ",")
customers = spark.read.option("header", True).csv("/FileStore/tables/customers_database.csv")

display(customers)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG


In [0]:
# Esse join é apenas um exemplo
df_with_customers = customers.join(df, on="customer_id", how="left")

display(df_with_customers)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,order_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,order_purchase_year,diff_in_days_between_purchase_and_approved
e3c7e245a96d7fa339fe6c16f8da4e90,79051ee5ee98c4bd6982e67e2e79dbcb,7847,franco da rocha,SP,8d9c47d02bd99ae8d7dd151ededa9281,delivered,2017-10-15 17:12:19,2017-10-15 17:56:24,2017-10-17 21:05:06,2017-10-23 22:04:54,2017-11-01 00:00:00,2017,0.0
a56b03f5e6015f1a502b9810309b98b7,b6cbe1a8674ee23e9fb086e3c61677b8,41308,salvador,BA,4b8f5652792df716a6e936ae0c78a935,delivered,2017-06-26 12:05:33,2017-07-01 21:33:24,2017-06-26 13:05:19,2017-07-12 19:47:28,2017-08-28 00:00:00,2017,5.0
d0615859a639a94c1fe472eba57d4a7c,9072b46e3b68961565477d90b24092e9,12900,braganca paulista,SP,d94486278633cac5f50e370851d24de6,delivered,2018-03-15 18:07:49,2018-03-15 18:28:40,2018-03-16 18:55:44,2018-03-19 16:52:07,2018-03-27 00:00:00,2018,0.0
c0fe0fbc24994167dce810f83cb96890,839bbfd4ff93b592c82a44dcaa9515c3,30330,belo horizonte,MG,d2adc517116e216add8b6f5b45c3b84e,delivered,2017-06-24 13:14:16,2017-06-24 13:25:20,2017-07-11 18:44:44,2017-07-19 18:59:36,2017-08-15 00:00:00,2017,0.0
5b5f4957a69d537a2aeadfa7dd2d09d9,bb03ed8d9549898e869f0e6774064096,71505,brasilia,DF,d178d8bdd78765955d2260fc1d639052,delivered,2017-06-23 22:56:47,2017-06-23 23:05:20,2017-06-27 10:33:13,2017-07-06 12:47:40,2017-07-21 00:00:00,2017,0.0
41b200d1ce8675f154c91c2da887bcee,9b05b38d7d9ef19a89815c217b30fd37,2114,sao paulo,SP,9dfd8f2ac479e34a16d68ff827759a23,invoiced,2018-05-15 15:55:01,2018-05-17 02:15:28,,,2018-06-01 00:00:00,2018,2.0
456c1e01c8ed3b83aa8fc564119bc81a,4ac6ec83ece04605a854d931c8e8d8c7,59030,natal,RN,01b1800f51715ab76f67caf732206437,delivered,2018-07-18 19:05:46,2018-07-18 19:23:14,2018-07-19 09:06:00,2018-07-27 19:26:24,2018-08-08 00:00:00,2018,0.0
8baeca32aac79a831b81f1f8af9fd6d8,59cd6345c0d1920b6a42f672173a78fd,13950,lindoia,SP,1184c1bbcfb488132ee2bca2c07cd743,delivered,2018-08-16 11:35:15,2018-08-16 11:45:09,2018-08-17 08:38:00,2018-08-21 20:07:33,2018-08-27 00:00:00,2018,0.0
a8004a3d658be3bb26c0ad71671ef73f,c1ccdcdeb2daa19091d2aefb8f1a2bd5,5143,sao paulo,SP,68e3cd57e173ebad230c2879e6c78401,delivered,2018-04-07 20:56:45,2018-04-10 04:15:40,2018-04-10 20:06:22,2018-04-11 20:48:32,2018-04-19 00:00:00,2018,3.0
860ac166573be76ffb00c3e483892094,919bf2617845a0f73acc0361cba58267,30535,belo horizonte,MG,ab77be4cf9bf132ca4d82049e786e528,delivered,2017-09-16 21:10:54,2017-09-19 03:43:56,2017-09-20 16:09:21,2017-09-29 21:27:36,2017-10-06 00:00:00,2017,3.0


Lembrando que uma boa prática é sempre selecionar quais colunas vamos utilizar com o método *.select*.

# Window

Outra função extremamente útil é a *Window*. Ela permite criar janelas, particionadas por colunas, para análise dos resultados. Diferentemente do método *.groupby*, que retorna um resultado agregado, o método *Window* retorna um valor (resultado de alguma operação) em todas as linhas do *dataframe*.

No exemplo abaixo vamos utilizar uma janela para determinar a primeira data de compra de cada um dos usuários e coloca-la em uma coluna chama *first_purchase_date*. Esse valor é útil para determinarmos quantos dias uma ordem de compra qualquer demorou para ocorrer em relação a primeira ordem de compra (de determinado usuário).

Outro método para fazer isso envolveria o uso de um método *.groupby* junto com um método *join*.

Esse método é útil para determinarmos coisas como:

- Diferença de tempo (TF - TI) entre eventos;
- Determinação do primeiro evento (ou "n-ésimo") em uma série de eventos;
- Determinção de médias móveis etc.

In [0]:
#Determinando a primeira data de cada ordem para cada customer_unique_id (o resultado será propagado ao longo de todo o dataframe)
window = Window().partitionBy("customer_unique_id")

df_with_customers = df_with_customers.withColumn("first_purchase_date", F.min(F.col("order_purchase_timestamp")).over(window))

display(df_with_customers.select("customer_unique_id", "order_purchase_timestamp", "first_purchase_date"))

customer_unique_id,order_purchase_timestamp,first_purchase_date
0005e1862207bf6ccc02e4228effd9a0,2017-03-04 23:32:12,2017-03-04 23:32:12
0006fdc98a402fceb4eb0ee528f6a8d4,2017-07-18 09:23:10,2017-07-18 09:23:10
00090324bbad0e9342388303bb71ba0a,2018-03-24 14:44:41,2018-03-24 14:44:41
000c8bdb58a29e7115cfc257230fb21b,2017-12-12 22:53:35,2017-12-12 22:53:35
00115fc7123b5310cf6d3a3aa932699e,2017-01-21 21:58:35,2017-01-21 21:58:35
001a3a8e11d76c9a366c31a4aa2cc529,2018-05-21 01:24:25,2018-05-21 01:24:25
001f3c4211216384d5fe59b041ce1461,2017-03-19 18:34:36,2017-03-19 18:34:36
0023557a94bef0038066b5d1b3dc763e,2018-03-28 19:53:14,2018-03-28 19:53:14
002aba8c1af80acacef6e011f9f23262,2018-02-27 12:47:48,2018-02-27 12:47:48
002b4cd83fabaffaa475f78ea5ef3e08,2017-02-01 13:53:16,2017-02-01 13:53:16


## Realizando a mesma operação com groupby

In [0]:
first_purchase_date = df_with_customers.groupby("customer_unique_id").agg(F.min(F.col("order_purchase_timestamp")).alias("first_purchase_date"))

display(df_with_customers.join(first_purchase_date, on="customer_unique_id", how="left").select("customer_unique_id", "order_purchase_timestamp", "first_purchase_date"))

customer_unique_id,order_purchase_timestamp,first_purchase_date
861eff4711a542e4b93843c6dd7febb0,2017-05-16 15:05:35,2017-05-16 15:05:35
e607ede0e63436308660236f5a52da5e,2017-08-13 10:03:36,2017-08-13 10:03:36
28da048f094c0c9cbbc5412bcf41b6b0,2017-04-04 12:48:23,2017-04-04 12:48:23
212c759d8c4f2d4d9d6fd4c7de0afbb3,2017-09-01 12:05:54,2017-09-01 12:05:54
f96176e892232662d1c1c5896a94e035,2017-08-19 10:08:37,2017-08-19 10:08:37
000c8bdb58a29e7115cfc257230fb21b,2017-12-12 22:53:35,2017-12-12 22:53:35
9ccbb5f759db041b2db8359d71c0547f,2017-12-11 20:03:10,2017-12-11 20:03:10
4b384b778ebc0449d0244902bfce7beb,2017-11-19 14:34:44,2017-06-22 20:05:39
4df43d4c7d3a093a519dbfe0b9dcc0d6,2018-07-08 19:25:52,2018-07-08 19:25:52
4be1583defacacfea129170626a62569,2017-09-22 23:09:49,2017-09-22 23:09:49


# Mãos a obra

Agora que temos um conhecimento sobre os principais métodos dessa API, vamos aplicá-los para resolvermos alguns problemas de negócio.

## A área de negócio nos fez as seguintes indagações:

1 - Quantidade de ordens agrupadas por ANO/MES/STATUS (eles precisam de um arquivo .CSV contendo todas essas informações);

2 - Quantidade de usuários por estado (para identificarem onde precisam focar os esforços de *marketing*);

3 - Além da quantidade, a área de negócio precisa do *ranking* de cada estado (qual é o primeiro, segundo etc) em termos da quantidade de usuários;

4 - Quantidade de usuários que tiveram mais de três ordens;

5 - Dos usuários que tiveram pelo menos três ordens, quantos dias isso (ter a terceira ordem) levou em relação a primeira ordem de compra;

6 - Qual o recebimento anual médio por tipo de pagamento.

## Devemos saber o seguinte sobre o conjunto de dados:

- *customer_id*: ID do cliente;
- *customer_unique_id*: ID único de cada cliente (esse ID engloba vários *customer_id*);
- *order_purchase_timestamp*: *timestamp* da data da ordem;
- *customer_state*: estado do cliente;
- *payment_type*: tipo de pagamento;

In [0]:
orders = table("orders")
customers = table("customers")
payments = table("payments")

In [0]:
display(orders)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


In [0]:
display(customers)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG


In [0]:
display(payments)

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45
298fcdf1f73eb413e4d26d01b25bc1cd,1,credit_card,2,96.12
771ee386b001f06208a7419e4fc1bbd7,1,credit_card,1,81.16
3d7239c394a212faae122962df514ac7,1,credit_card,3,51.84
1f78449c87a54faf9e96e88ba1491fa9,1,credit_card,6,341.09
0573b5e23cbd798006520e1d5b4c6714,1,boleto,1,51.95


### Primeira pergunta: Quantidade de ordens agrupadas por ANO/MES/STATUS (eles precisam de um arquivo .CSV contendo todas essas informações)

In [0]:
answer_1 = orders.groupby(F.date_format(F.col("order_purchase_timestamp"), format="y-M").alias("year_month"), "order_status").count()
display(answer_1)

year_month,order_status,count
2017-3,delivered,2546
2018-6,delivered,6099
2017-6,delivered,3135
2017-12,canceled,11
2017-5,canceled,29
2017-8,processing,18
2018-6,canceled,18
2017-12,shipped,57
2017-9,unavailable,38
2018-8,invoiced,23


In [0]:
#repartition é utilizado para salvarmos apenas um arquivo (por padrão os arquivos são particionados, então vários pedaços são gerado após um write, esse é um tópico mais avançado, que requer um material especial sobre)
answer_1.repartition(1).write.csv("/FileStore/tables/answer_1")

### Segunda pergunta: Quantidade de clientes por estado (para identificarem onde precisam focar os esforços de marketing)

In [0]:
display(customers)

customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP
879864dab9bc3047522c92c82e1212b8,4c93744516667ad3b8f1fb645a3116a4,89254,jaragua do sul,SC
fd826e7cf63160e536e0908c76c3f441,addec96d2e059c80c30fe6871d30d177,4534,sao paulo,SP
5e274e7a0c3809e14aba7ad5aae0d407,57b2a98a409812fe9618067b6b8ebe4f,35182,timoteo,MG
5adf08e34b2e993982a47070956c5c65,1175e95fb47ddff9de6b2b06188f7e0d,81560,curitiba,PR
4b7139f34592b3a31687243a302fa75b,9afe194fb833f79e300e37e580171f22,30575,belo horizonte,MG


In [0]:
answer_2 = customers.groupby("customer_state").count()
display(answer_2)

customer_state,count
SC,3637
RO,253
PI,495
AM,148
RR,46
GO,2020
TO,280
MT,907
SP,41746
ES,2033


### Terceira pergunta: Além da quantidade, a área de negócio precisa do ranking de cada estado (qual é o primeiro, segundo etc)

In [0]:
#Vamos utilizar os dados da resposta anterior

#Precisamos definir uma janela (sem partição, o que significa que essa janela é a nossa bases de dados inteira) e ordená-la do maior para o menor
window = Window().orderBy(F.col("count").desc())
#Após isso vamos aplicar a operação F.rank() sobre essa janela
answer_3 = answer_2.withColumn("rank", F.rank().over(window))

display(answer_3)

customer_state,count,rank
SP,41746,1
RJ,12852,2
MG,11635,3
RS,5466,4
PR,5045,5
SC,3637,6
BA,3380,7
DF,2140,8
ES,2033,9
GO,2020,10


### Quarta pergunta: Quantidade de usuários que tiveram mais de três ordens

In [0]:
display(orders)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00
76c6e866289321a7c93b82b54852dc33,f54a9f0e6b351c431402b8461ea51999,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00
e69bfb5eb88e0ed6a785585b27e16dbf,31ad1d1b63eb9962463f764d4e6e0c9d,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00


In [0]:
orders_with_customers = orders.join(customers, on="customer_id", how="left")

display(orders_with_customers)

customer_id,order_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
9ef432eb6251297304e76186b10a928d,e481f51cbdc54678b7cc49136f2d6af7,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,7c396fd4830fd04220f754e42b4e5bff,3149,sao paulo,SP
b0830fb4747a6c6d20dea0b8c802d7ef,53cdb2fc8bc7dce0b6741e2150273451,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,af07308b275d755c9edb36a90c618231,47813,barreiras,BA
41ce2a54c0b03bf3443c3d931a367089,47770eb9100c2d0c44946d9cf07ec65d,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,3a653a41f6f9fc3d2a113cf8398680e8,75265,vianopolis,GO
f88197465ea7920adcdbec7375364d82,949d5b44dbf5de918fe9c16f97b45f8a,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,7c142cf63193a1473d2e66489a9ae977,59296,sao goncalo do amarante,RN
8ab97904e6daea8866dbdbc4fb7aad2c,ad21c59c0840e6cb83a9ceb5573f8159,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,72632f0f9dd73dfee390c9b22eb56dd6,9195,santo andre,SP
503740e9ca751ccdda7ba28e9ab8f608,a4591c265e18cb1dcee52889e2d8acc3,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,80bb27c7c16e8f973207a5086ab329e2,86320,congonhinhas,PR
ed0271e0b7da060a393796590e7b737a,136cce7faa42fdb2cefd53fdc79a6098,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,36edbb3fb164b1f16485364b6fb04c73,98900,santa rosa,RS
9bdf08b4b3b52b5526ff42d37d47f222,6514b8ad8028c9f2cc2374ded245783f,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,932afa1e708222e5821dac9cd5db4cae,26525,nilopolis,RJ
f54a9f0e6b351c431402b8461ea51999,76c6e866289321a7c93b82b54852dc33,delivered,2017-01-23 18:29:09,2017-01-25 02:50:47,2017-01-26 14:16:31,2017-02-02 14:08:10,2017-03-06 00:00:00,39382392765b6dc74812866ee5ee92a7,99655,faxinalzinho,RS
31ad1d1b63eb9962463f764d4e6e0c9d,e69bfb5eb88e0ed6a785585b27e16dbf,delivered,2017-07-29 11:55:02,2017-07-29 12:05:32,2017-08-10 19:45:24,2017-08-16 17:14:30,2017-08-23 00:00:00,299905e3934e9e181bfb2e164dd4b4f8,18075,sorocaba,SP


In [0]:
#Encadeando operações
answer_4 = (orders_with_customers
            .groupby("customer_unique_id").count()
            .filter(F.col("count") >= 3)
           )

print(f"Quantidade de clientes: {answer_4.count()}.")

Quantidade de clientes: 252.


### Quinta pergunta: Dos usuários que tiveram pelo menos três ordens, quantos dias isso (ter a terceira ordem) levou em relação a primeira ordem de compra

In [0]:
window = Window().partitionBy("customer_unique_id").orderBy(F.col("order_purchase_timestamp"))

#Definindo a primeira data de compra de cada cliente
answer_5 = (orders_with_customers
            .withColumn("first_order_purchase_timestamp", 
                        F.first(F.col("order_purchase_timestamp")).over(window))
            .withColumn("order_rank", F.row_number().over(window))
            .filter(F.col("order_rank") == 3)
            .withColumn("date_diff", F.datediff(F.col("order_purchase_timestamp"), F.col("first_order_purchase_timestamp")))
            .select("customer_unique_id", "order_purchase_timestamp", "first_order_purchase_timestamp", "date_diff")
           )

display(answer_5)

customer_unique_id,order_purchase_timestamp,first_order_purchase_timestamp,date_diff
0e4cb268bd62da7db135af6349b4fc2a,2017-11-24 20:39:01,2017-11-24 13:15:25,0
182053495bc94c2f41090ce8c41be266,2018-08-19 12:58:22,2018-04-11 14:58:15,130
1da09dd64e235e7c2f29a4faff33535c,2018-01-11 11:16:49,2017-05-10 14:04:15,246
2e43e031f10de28e557c35ef668f9396,2017-01-26 13:15:42,2017-01-26 13:15:41,0
310647380793836bfa5b7b6b3f518423,2017-10-18 13:34:48,2017-04-18 09:46:40,183
320e13a9e22b4e4cda8c17ed1c140ce0,2017-12-05 22:17:41,2017-11-27 13:42:45,8
33176de67c05eeed870fd49f234387a0,2018-06-04 17:03:57,2018-02-22 09:03:57,102
39104edef5e46d7b8c61848cc95e6d97,2017-10-01 10:11:40,2017-07-12 16:04:41,81
3e43e6105506432c953e165fb2acf44c,2017-12-01 22:37:41,2017-09-18 18:53:15,74
4e1cce07cd5937c69dacac3c8b13d965,2018-07-30 16:04:45,2017-11-25 15:48:03,247


## Sexta pergunta:  Qual o recebimento anual médio por tipo de pagamento

In [0]:
display(payments)

order_id,payment_sequential,payment_type,payment_installments,payment_value
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45
298fcdf1f73eb413e4d26d01b25bc1cd,1,credit_card,2,96.12
771ee386b001f06208a7419e4fc1bbd7,1,credit_card,1,81.16
3d7239c394a212faae122962df514ac7,1,credit_card,3,51.84
1f78449c87a54faf9e96e88ba1491fa9,1,credit_card,6,341.09
0573b5e23cbd798006520e1d5b4c6714,1,boleto,1,51.95


In [0]:
orders_with_payment = orders.join(payments, on="order_id", how="left")

display(orders_with_payment)

order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,payment_sequential,payment_type,payment_installments,payment_value
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,2,voucher,1,18.59
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,3,voucher,1,2.0
e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18 00:00:00,1,credit_card,1,18.12
53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13 00:00:00,1,boleto,1,141.46
47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04 00:00:00,1,credit_card,3,179.12
949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15 00:00:00,1,credit_card,1,72.2
ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26 00:00:00,1,credit_card,1,28.62
a4591c265e18cb1dcee52889e2d8acc3,503740e9ca751ccdda7ba28e9ab8f608,delivered,2017-07-09 21:57:05,2017-07-09 22:10:13,2017-07-11 14:58:04,2017-07-26 10:57:55,2017-08-01 00:00:00,1,credit_card,6,175.26
136cce7faa42fdb2cefd53fdc79a6098,ed0271e0b7da060a393796590e7b737a,invoiced,2017-04-11 12:22:08,2017-04-13 13:25:17,,,2017-05-09 00:00:00,1,credit_card,1,65.95
6514b8ad8028c9f2cc2374ded245783f,9bdf08b4b3b52b5526ff42d37d47f222,delivered,2017-05-16 13:10:30,2017-05-16 13:22:11,2017-05-22 10:07:46,2017-05-26 12:55:51,2017-06-07 00:00:00,1,credit_card,3,75.16


In [0]:
answer_6 = (orders_with_payment
            .groupby(F.year(F.col("order_purchase_timestamp")), "payment_type")
            .agg(F.sum(F.col("payment_value"))
                 .alias("sum_payment_type"))
            .groupby("payment_type")
            .agg(F.avg(F.col("sum_payment_type"))
                 .alias("mean_total_payment_received_per_year"))
           )

display(answer_6)

payment_type,mean_total_payment_received_per_year
,
boleto,956453.756666668
not_defined,0.0
credit_card,4180694.730000008
voucher,126478.95666666668
debit_card,72663.26333333337


#### Por fim, vamos falar de como criar funções customizadas para operar sobre nossos dados

Essas funções são chamadas de *user defined functions* (UDF) e são úteis para criarmos funções personalizadas, pois temos todo o arsenal da linguagem *Python* ao nosso dispor.

Mas sempre usem com cautela, pois essas funções são vistas como "caixas-pretas" pelo *Spark*, ou seja, ele não consegue aplicar o processo de otimização de queries quando encontra esse tipo de operação.

Além disso, esse tipo de função gera um *overhead* excessivo, pois os dados precisam ser serializados para tanto na ida (para comunicação entre o *Python* e a JVM) quanto na volta (comunicação entre a JVM e o *Python*).

Esse é um assunto interessante, que engloba necessita de mais conhecimento do funcionamento interno do Spark para completo entendimento dos contras.

O que vocês devem ter em mente é:

## Somente usem UDF's quando o que necessitarem não estiver implementado nativamente!

In [0]:
@F.udf(returnType=T.FloatType())
def multiply_by_two(data):
  return 2*data

In [0]:
#Observem a conversão no tipo do dado (algumas pessoas realizam essa conversam passando a string "float" para o método cast)
display(payments.withColumn("test", multiply_by_two(F.col("payment_value").cast(T.FloatType()))))

order_id,payment_sequential,payment_type,payment_installments,payment_value,test
b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33,198.66
a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39,48.78
25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71,131.42
ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78,215.56
42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45,256.9
298fcdf1f73eb413e4d26d01b25bc1cd,1,credit_card,2,96.12,192.24
771ee386b001f06208a7419e4fc1bbd7,1,credit_card,1,81.16,162.32
3d7239c394a212faae122962df514ac7,1,credit_card,3,51.84,103.68
1f78449c87a54faf9e96e88ba1491fa9,1,credit_card,6,341.09,682.18
0573b5e23cbd798006520e1d5b4c6714,1,boleto,1,51.95,103.9


Também podemos escrever a UDF como uma função lambda:

```
multiply_by_two = F.udf(lambda x: 2*x, returnType=T.FloatType())
```