> # MVP CONCLUSÃO DE SPRINT - CRIAÇÃO DE UM PIPELINE DE DADOS COM DELTA LIVE TABLES 
Lucas de Oliveira Noronha

### O notebook "vendas" prepara e organiza dados de vendas em um pipeline DLT. Inicia configurando o ambiente e copiando os dados necessários para o DBFS. Em seguida, define uma tabela de fatos de vendas e tres dimensões (clientes, categorias e filial) para análise, garantindo a qualidade dos dados com expectativas DLT.

A base de dados é uma amostra de tres meses de vendas de uma empresa

paths
https://raw.githubusercontent.com/LDONoronha/data_engineering/main/vendas.csv

METADOS:

- COD_CLIENTE;
int;
PK CLIENTE

- DEPARTAMENTO;
string;
Categoria ou familia do produto

- MODO_PAGAMENTO;
string;
TIPO DE PAGAMENTO DO CLIENTE

- COD_FILIAL;
int;
LOJA

- UF;
string;
ESTADO

- QTDE;
int;
QUANTIDADE DE ITENS COMPRADOS

- VALOR_TOTAL;
double;
VALOR DE RECEITA DE VENDAS

- DATA;
timestamp;
DATA DE COMPRA

- MES_ANO;
string;
DATA DE COMPRA FORMATO RESUMIDO



In [0]:
# Esta célula importa o módulo dlt, utilizado para definir tabelas e transformações em pipelines Delta Live Tables (DLT), e todas as funções do módulo pyspark.sql.functions, que são usadas para manipulação de colunas e dados em DataFrames do PySpark.

import dlt
from pyspark.sql.functions import *

In [0]:
#Esta célula configura o ambiente, definindo variáveis de ambiente para o caminho do volume no catálogo Unity, a URL de download do conjunto de dados e o nome do arquivo. Em seguida, copia o arquivo CSV de vendas da URL especificada para o caminho do volume no Databricks File System (DBFS).

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/lucasdeoliveira/default/fatovendas/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://raw.githubusercontent.com/LDONoronha/data_engineering/main/vendas.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "vendas.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

True

In [0]:
#Esta célula define uma tabela DLT chamada fato_vendas, contendo dados de vendas de um período de 3 meses. A função lê o arquivo CSV de vendas do caminho especificado, utilizando o Spark para inferir o esquema das colunas automaticamente e considerando a primeira linha como cabeçalho.

@dlt.table(
  comment="Base de dados com histórico de 3 meses de vendas de uma empresa."
)
def fato_vendas():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  return df

Name,Type
COD_CLIENTE,int
DEPARTAMENTO,string
MODO_PAGAMENTO,string
COD_FILIAL,int
UF,string
QTDE,int
VALOR_TOTAL,double
DATA,timestamp
MES_ANO,string


In [0]:
# Esta célula cria uma tabela DLT chamada dim_clientes, contendo uma lista única de códigos de clientes. Utiliza a expectativa dlt.expect para garantir que a coluna COD_CLIENTE não contenha valores nulos. A função seleciona a coluna COD_CLIENTE da tabela fato_vendas e remove duplicatas.

@dlt.table(
  comment="Base de Cliente."
)
@dlt.expect("valid_COD_CLIENTE", "COD_CLIENTE IS NOT NULL")
def dim_cliente():
  return (
    dlt.read("fato_vendas")
      .select("COD_CLIENTE")
      .distinct()
  )

Name,Type
COD_CLIENTE,int


In [0]:
# Esta célula define outra tabela DLT, dim_categoria, contendo categorias únicas de produtos vendidos. Assim como na célula anterior, uma expectativa é definida para garantir que a coluna DEPARTAMENTO não tenha valores nulos. A função seleciona a coluna DEPARTAMENTO da tabela fato_vendas e aplica o método distinct() para remover duplicatas.

@dlt.table(
  comment="Categoria dos produtos vendidos."
)
@dlt.expect("valid_DEPARTAMENTO", "DEPARTAMENTO IS NOT NULL")
def dim_categoria():
  return (
    dlt.read("fato_vendas")
      .select("DEPARTAMENTO")
      .distinct()
  )

Name,Type
DEPARTAMENTO,string


In [0]:
# Esta célula define outra tabela DLT, dim_filial, contendo filiais únicas (lojas de vendasd) por estados. Assim como na célula anterior, uma expectativa é definida para garantir que a coluna COD_FILIAL não tenha valores nulos. A função seleciona as colunas UF e cod_filial da tabela fato_vendas e aplica o método distinct() para remover duplicatas.

@dlt.table(
  comment="Região e loja de compra."
)
@dlt.expect("valid_COD_FILIAL", "COD_FILIAL IS NOT NULL")
def dim_filial():
  return (
    dlt.read("fato_vendas")
      .select("UF","COD_FILIAL")
      .distinct()
  )

Name,Type
UF,string
COD_FILIAL,int


## Consultar as tabelas criadas

In [0]:
%sql
SELECT * FROM lucasdeoliveira.vendas.fato_vendas

COD_CLIENTE,DEPARTAMENTO,MODO_PAGAMENTO,COD_FILIAL,UF,QTDE,VALOR_TOTAL,DATA,MES_ANO
35001645,PNEU,FATURADO,54,RR,100,140000.0,2024-07-02T00:00:00Z,07/2024
53007667,PNEU,FATURADO,9,AM,700,121100.0,2024-07-05T00:00:00Z,07/2024
2937400,PNEU,FATURADO,31,AM,200,100000.0,2024-06-07T00:00:00Z,06/2024
66804100,PNEU,FATURADO,58,PA,40,86607.2,2024-06-26T00:00:00Z,06/2024
18000195,PNEU,DEPOSITO,18,AM,50,71500.0,2024-07-02T00:00:00Z,07/2024
28003251,PNEU,FATURADO,72,PA,34,70722.04,2024-06-12T00:00:00Z,06/2024
31039974,PNEU,FATURADO,31,AM,40,56800.0,2024-06-06T00:00:00Z,06/2024
31039974,PNEU,FATURADO,31,AM,32,55052.8,2024-06-18T00:00:00Z,06/2024
38000777,PNEU,DEPOSITO,35,RR,28,51256.8,2024-06-07T00:00:00Z,06/2024
60006491,PNEU,FATURADO,46,AP,24,50246.4,2024-06-11T00:00:00Z,06/2024


In [0]:
%sql
SELECT * FROM lucasdeoliveira.vendas.dim_cliente

COD_CLIENTE
55002214
55001979
58005009
72001855
1036659
68005078
61000310
32012490
35001879
40001475


In [0]:
%sql
SELECT * FROM lucasdeoliveira.vendas.dim_filial

UF,COD_FILIAL
PA,58
AM,89
MA,15
AM,41
AM,48
PA,67
PA,68
MT,4
RR,47
AP,40


In [0]:
%sql
SELECT * FROM lucasdeoliveira.vendas.dim_categoria

DEPARTAMENTO
FILTRO
PNEU
LUBRIFICANTES E ADITIVOS
FAROL E ILUMINACAO
AUTOPECAS
ELETRICA


### Realizar análises a partir dos dados

### # Top 10 dos clientes com maior valor de compras em junho de 2024

In [0]:
%sql
SELECT COD_CLIENTE, SUM(VALOR_TOTAL) AS total_compras
FROM lucasdeoliveira.vendas.fato_vendas
WHERE DATA >= '2024-06-01' AND DATA <= '2024-06-30'
GROUP BY COD_CLIENTE
ORDER BY total_compras DESC
LIMIT 10;

COD_CLIENTE,total_compras
58,591500.4299999999
90,432429.38
63,353835.97000000003
40,315186.4199999998
46,313450.3500000002
76,303499.7599999999
49,288297.28999999986
35,254281.03999999975
68,243192.2
55,241168.41999999995


Databricks visualization. Run in Databricks to view.

### # Top 10 dos clientes com maior valor de compras em junho de 2024

In [0]:
%sql
WITH TopClientes AS (
    SELECT COD_CLIENTE, SUM(VALOR_TOTAL) AS total_compras
    FROM lucasdeoliveira.vendas.fato_vendas
    WHERE DATA >= '2024-06-01' AND DATA <= '2024-06-30'
    GROUP BY COD_CLIENTE
    ORDER BY total_compras DESC
    LIMIT 10
)
SELECT tc.COD_CLIENTE, d.DEPARTAMENTO, SUM(fv.VALOR_TOTAL) AS total_compras_departamento
FROM TopClientes tc
JOIN lucasdeoliveira.vendas.fato_vendas fv ON tc.COD_CLIENTE = fv.COD_CLIENTE
JOIN lucasdeoliveira.vendas.dim_categoria d ON fv.DEPARTAMENTO = d.DEPARTAMENTO
GROUP BY tc.COD_CLIENTE, d.DEPARTAMENTO
ORDER BY total_compras_departamento DESC;

COD_CLIENTE,DEPARTAMENTO,total_compras_departamento
58,PNEU,654967.3700000001
90,PNEU,474572.5700000002
49,PNEU,345266.0399999999
76,PNEU,337125.3999999999
40,PNEU,333116.45
46,PNEU,332442.03999999986
35,PNEU,296873.4899999999
63,PNEU,277690.06000000006
68,PNEU,222186.8399999999
63,ELETRICA,218276.47999999995


Databricks visualization. Run in Databricks to view.