In [20]:
from datetime import date

from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, min, max, day, year, dayofweek, month

# Modelagem dos dados

Simulando uma estrutura no qual no qual os dados brutos (fonte de dados) estão em ``data/raw`` e os dados tratados são salvos em ``data/processed``, que pode ser um cloud storage, por exemplo.

```
data
├───processed
│   └── cliente
│       ├── clientes_YYYYmmmdd.csv
│       └── clientes.csv
│   └── datas
│       ├── datas_YYYYmmmdd.csv
│       └── datas.csv
│   └── produtos
│       ├── produtos_YYYYmmmdd.csv
│       └── produtos.csv
│   └── vendas_fato.csv
│       ├── vendas_fato_YYYYmmmdd.csv
│       └── vendas_fato.csv
└───raw
    └─── Dados brutos.csv
```

## Etapas do notebook

1. Configurações iniciais para execução do notebook e leitura dos dados
2. Construção das tabelas de dimensão
    - Clientes
    - Produtos
    - Datas
3. Construção da tabela fato


## 1. Preparação dos dados

Lendo o DataFrame com dados brutos

In [2]:
today = date.today()

In [3]:
spark = SparkSession.builder.appName("Desafio Gaudium").getOrCreate()

In [4]:
df = spark.read.csv('../data/raw/Dados brutos.csv', header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- nome_cliente: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- nome_produto: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- data: date (nullable = true)
 |-- qtd_vendida: integer (nullable = true)
 |-- valor_total: integer (nullable = true)



In [6]:
df.show(5)

+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
| nome_cliente|        cidade|estado|nome_produto|categoria|fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|    RS|  Detergente|  Limpeza|       Ypê|2024-01-26|          6|         90|
|Lucas Pereira|  Porto Alegre|    RS|      Feijão| Alimento|   Kicaldo|2024-01-14|         10|        240|
| Ana Oliveira|Rio de Janeiro|    RJ|Refrigerante|   Bebida| Coca-Cola|2024-01-15|          3|        150|
| Pedro Santos|      Curitiba|    PR|      Feijão| Alimento|   Kicaldo|2024-01-28|          4|        152|
| Pedro Santos|      Curitiba|    PR|       Arroz| Alimento|     Camil|2024-01-24|          3|         87|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
only showing top 5 rows



## 2. Construção das tabelas de dimensões

### Dimensão cliente

In [7]:
clientes_df = (
    df.select(["nome_cliente","cidade","estado"])
    .drop_duplicates(subset=["nome_cliente","cidade","estado"])
    .withColumn("id_cliente", hash("nome_cliente","cidade","estado"))
)

In [8]:
clientes_df.show(5)

+-------------+--------------+------+-----------+
| nome_cliente|        cidade|estado| id_cliente|
+-------------+--------------+------+-----------+
| Pedro Santos|      Curitiba|    PR| -192200573|
|   João Silva|     São Paulo|    SP| -864470709|
|Lucas Pereira|  Porto Alegre|    RS| 1375153528|
| Ana Oliveira|Rio de Janeiro|    RJ|  865087950|
|  Maria Souza|Belo Horizonte|    MG|-1678988022|
+-------------+--------------+------+-----------+



In [9]:
clientes_pd_df = clientes_df.toPandas()

clientes_pd_df.to_csv(f"../data/processed/clientes/clientes_{today.strftime("%Y%m%d")}.csv", index=False)
clientes_pd_df.to_csv("../data/processed/clientes/clientes.csv", index=False)

### Dimensão produtos

Premissa para geração de tabelas:
- O nome de produto e fabricante são suficientes para determinar um produto único. Ou seja, não espera que exista um produto Detergente da Ypê com categoria Limpeza e outra com categoria Alimento/Bebida

In [11]:
produtos_df = (
    df.select(["nome_produto","categoria","fabricante"])
    .drop_duplicates(subset=["nome_produto","fabricante"])
    .withColumn("id_produto", hash("nome_produto", "fabricante"))
)

In [12]:
produtos_df.show()

+------------+---------+----------+-----------+
|nome_produto|categoria|fabricante| id_produto|
+------------+---------+----------+-----------+
|       Arroz| Alimento|     Camil|-1114310623|
|  Detergente|  Limpeza|       Ypê|  711337539|
|      Feijão| Alimento|   Kicaldo| -585386349|
|Refrigerante|   Bebida| Coca-Cola|  665068878|
| Sabão em pó|  Limpeza|       OMO|-2084300233|
+------------+---------+----------+-----------+



In [13]:
produtos_pd_df = produtos_df.toPandas()

produtos_pd_df.to_csv(f"../data/processed/produtos/produtos_{today.strftime("%Y%m%d")}.csv", index=False)
produtos_pd_df.to_csv("../data/processed/produtos/produtos.csv", index=False)

### Dimensão de data

In [None]:
ini_date = df.select(min("data")).collect()[0][0]
end_date = df.select(max("data")).collect()[0][0]

data_df = spark.sql(
    f"""
    SELECT explode(sequence(to_date('{ini_date.strftime("%Y-%m-%d")}'), to_date('{end_date.strftime("%Y-%m-%d")}'), interval 1 day)) as data
    """
)

data_df = (
    data_df.withColumn("dia", day('data'))
        .withColumn("mes", month('data'),)
        .withColumn('ano', year('data'))
        .withColumn('dia_da_semana',dayofweek('data')) 
)

In [22]:
data_df.printSchema()

root
 |-- data: date (nullable = false)
 |-- dia: integer (nullable = false)
 |-- mes: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- dayofweek: integer (nullable = false)



In [15]:
data_pd_df = data_df.toPandas()

data_pd_df.to_csv(f"../data/processed/datas/datas_{today.strftime("%Y%m%d")}.csv", index=False)
data_pd_df.to_csv("../data/processed/datas/datas.csv", index=False)

## 3. Construção a tabela vendas fato

In [16]:
df.show(5)

+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
| nome_cliente|        cidade|estado|nome_produto|categoria|fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|    RS|  Detergente|  Limpeza|       Ypê|2024-01-26|          6|         90|
|Lucas Pereira|  Porto Alegre|    RS|      Feijão| Alimento|   Kicaldo|2024-01-14|         10|        240|
| Ana Oliveira|Rio de Janeiro|    RJ|Refrigerante|   Bebida| Coca-Cola|2024-01-15|          3|        150|
| Pedro Santos|      Curitiba|    PR|      Feijão| Alimento|   Kicaldo|2024-01-28|          4|        152|
| Pedro Santos|      Curitiba|    PR|       Arroz| Alimento|     Camil|2024-01-24|          3|         87|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
only showing top 5 rows



In [17]:
vendas_fato = (
    df.withColumn("id_cliente", hash("nome_cliente","cidade","estado"))
        .withColumn("id_produto", hash("nome_produto", "fabricante"))
        .select(["data","id_cliente","id_produto","qtd_vendida","valor_total"])
    )

In [18]:
vendas_fato.show(5)

+----------+----------+-----------+-----------+-----------+
|      data|id_cliente| id_produto|qtd_vendida|valor_total|
+----------+----------+-----------+-----------+-----------+
|2024-01-26|1375153528|  711337539|          6|         90|
|2024-01-14|1375153528| -585386349|         10|        240|
|2024-01-15| 865087950|  665068878|          3|        150|
|2024-01-28|-192200573| -585386349|          4|        152|
|2024-01-24|-192200573|-1114310623|          3|         87|
+----------+----------+-----------+-----------+-----------+
only showing top 5 rows



In [19]:
vendas_fato_pd_df = vendas_fato.toPandas()

vendas_fato_pd_df.to_csv(f"../data/processed/vendas_fato/vendas_fato_{today.strftime("%Y%m%d")}.csv", index=False)
vendas_fato_pd_df.to_csv("../data/processed/vendas_fato/vendas_fato.csv", index=False)