In [1]:
# Bibliotecas essenciais para processamento distribuído com PySpark
# Funções específicas para transformação de dados e criação de IDs
# Definição de tipos de dados para garantir integridade do schema

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, monotonically_increasing_id, year, month, dayofmonth, date_format, col, count, when, isnan, expr, lower
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, DoubleType
import os

In [2]:
# Inicialização do Spark com configuração local para processamento dos dados de vendas

spark = SparkSession.builder.appName("Modelagem Dimensional - Vendas").master("local[*]").getOrCreate()

### Definição do schema e carregamento dos dados brutos

In [3]:
# Schema explícito para evitar inferências incorretas de tipos
# Campos definidos como não-nullable (False) para garantir integridade

schema = StructType([
    StructField("nome_cliente", StringType(), False),
    StructField("cidade", StringType(), False),
    StructField("estado", StringType(), False), 
    StructField("nome_produto", StringType(), False),
    StructField("categoria", StringType(), False),
    StructField("fabricante", StringType(), False),
    StructField("data", DateType(), False),
    StructField("qtd_vendida", IntegerType(), False),
    StructField("valor_total", DoubleType(), False)
])

In [4]:
# Carregamento dos dados brutos com schema predefinido

df_bruto = spark.read.csv(os.path.join("dados","entrada","dados_brutos.csv"), header=True, schema=schema)

In [5]:
df_bruto.show(n=5)
df_bruto.printSchema()

+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
| nome_cliente|        cidade|estado|nome_produto|categoria|fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|    RS|  Detergente|  Limpeza|       Ypê|2024-01-26|          6|       90.0|
|Lucas Pereira|  Porto Alegre|    RS|      Feijão| Alimento|   Kicaldo|2024-01-14|         10|      240.0|
| Ana Oliveira|Rio de Janeiro|    RJ|Refrigerante|   Bebida| Coca-Cola|2024-01-15|          3|      150.0|
| Pedro Santos|      Curitiba|    PR|      Feijão| Alimento|   Kicaldo|2024-01-28|          4|      152.0|
| Pedro Santos|      Curitiba|    PR|       Arroz| Alimento|     Camil|2024-01-24|          3|       87.0|
+-------------+--------------+------+------------+---------+----------+----------+-----------+-----------+
only showing top 5 rows

root
 |-- no

In [6]:
# Renomeação de colunas para seguir padrão dimensional com prefixo 'nome_'
# Importante para clareza e consistência no modelo dimensional

colunas_renomeacao = {
    "cidade": "nome_cidade",
    "estado": "nome_estado",
    "categoria": "nome_categoria",
    "fabricante": "nome_fabricante"
}

for nome_antigo, nome_novo in colunas_renomeacao.items():
    df_bruto = df_bruto.withColumnRenamed(nome_antigo, nome_novo)

In [7]:
# Separação de colunas por tipo para facilitar análises específicas

colunas_numericas = [nome for nome, tipo in df_bruto.dtypes if tipo in ('int', 'double', 'float', 'long', 'short', 'decimal')]
colunas_categoricas = [nome for nome, tipo in df_bruto.dtypes if tipo in ('string', 'boolean')]

print("Colunas numéricas:", colunas_numericas)
print("Colunas categóricas:", colunas_categoricas)

Colunas numéricas: ['qtd_vendida', 'valor_total']
Colunas categóricas: ['nome_cliente', 'nome_cidade', 'nome_estado', 'nome_produto', 'nome_categoria', 'nome_fabricante']


### Verificação das estatísticas descritivas, inconsistências e valores ausentes

In [8]:
# Estatísticas descritivas dos dados numéricos para validação inicial

df_bruto[colunas_numericas].describe().show()

+-------+------------------+------------------+
|summary|       qtd_vendida|       valor_total|
+-------+------------------+------------------+
|  count|               100|               100|
|   mean|              5.58|            148.44|
| stddev|2.8645517215886613|108.93277697272707|
|    min|                 1|               9.0|
|    max|                10|             470.0|
+-------+------------------+------------------+



In [9]:
# Função robusta para detectar valores nulos conforme o tipo de dado
# Cálculo de percentual de nulos para avaliação de completude
# Retorno booleano para facilitar integração com pipelines de validação

def validar_qualidade_dados(df, nome_coluna):
    """Valida a qualidade dos dados em uma coluna"""
    coluna_dtype = dict(df.dtypes)[nome_coluna]
    
    if coluna_dtype in ['float', 'double', 'decimal', 'int', 'bigint', 'smallint', 'tinyint']:
        condicao = col(nome_coluna).isNull() | isnan(col(nome_coluna)) | (col(nome_coluna) == "")
    else:
        condicao = col(nome_coluna).isNull() | (col(nome_coluna) == "")
    
    contagem_nulos = df.filter(condicao).count()
    contagem_total = df.count()
    percentual_nulos = (contagem_nulos / contagem_total) * 100 if contagem_total > 0 else 0
    
    print(f"Coluna {nome_coluna}: {contagem_nulos} valores nulos ({percentual_nulos:.2f}%)")
    return contagem_nulos == 0

In [10]:
# Verificação de valores nulos em todas as colunas do dataset

for coluna in df_bruto.columns:
    validar_qualidade_dados(df_bruto, coluna)

Coluna nome_cliente: 0 valores nulos (0.00%)
Coluna nome_cidade: 0 valores nulos (0.00%)
Coluna nome_estado: 0 valores nulos (0.00%)
Coluna nome_produto: 0 valores nulos (0.00%)
Coluna nome_categoria: 0 valores nulos (0.00%)
Coluna nome_fabricante: 0 valores nulos (0.00%)
Coluna data: 0 valores nulos (0.00%)
Coluna qtd_vendida: 0 valores nulos (0.00%)
Coluna valor_total: 0 valores nulos (0.00%)


In [11]:
# Normalização de casing para garantir consistência nas dimensões
# Estratégia fundamental para evitar duplicidades nas dimensões

df_bruto = df_bruto.withColumn("nome_cliente", expr("initcap(nome_cliente)"))
df_bruto = df_bruto.withColumn("nome_cidade", expr("initcap(nome_cidade)"))
df_bruto = df_bruto.withColumn("nome_estado", expr("upper(nome_estado)"))
df_bruto = df_bruto.withColumn("nome_produto", expr("initcap(nome_produto)"))
df_bruto = df_bruto.withColumn("nome_categoria", expr("initcap(nome_categoria)"))
df_bruto = df_bruto.withColumn("nome_fabricante", expr("initcap(nome_fabricante)"))

In [12]:
df_bruto.show(10)

+-------------+--------------+-----------+------------+--------------+---------------+----------+-----------+-----------+
| nome_cliente|   nome_cidade|nome_estado|nome_produto|nome_categoria|nome_fabricante|      data|qtd_vendida|valor_total|
+-------------+--------------+-----------+------------+--------------+---------------+----------+-----------+-----------+
|Lucas Pereira|  Porto Alegre|         RS|  Detergente|       Limpeza|            Ypê|2024-01-26|          6|       90.0|
|Lucas Pereira|  Porto Alegre|         RS|      Feijão|      Alimento|        Kicaldo|2024-01-14|         10|      240.0|
| Ana Oliveira|Rio De Janeiro|         RJ|Refrigerante|        Bebida|      Coca-cola|2024-01-15|          3|      150.0|
| Pedro Santos|      Curitiba|         PR|      Feijão|      Alimento|        Kicaldo|2024-01-28|          4|      152.0|
| Pedro Santos|      Curitiba|         PR|       Arroz|      Alimento|          Camil|2024-01-24|          3|       87.0|
| Pedro Santos|      Cur

### Criação das tabelas dimensões e fato (modelagem dimensional)

#### Dimensão Cliente

In [13]:
# Extração de valores únicos para evitar redundância
# Geração de surrogate key utilizando monotonically_increasing_id()

dim_cliente = df_bruto.select("nome_cliente").distinct()
dim_cliente = dim_cliente.withColumn("id_cliente", monotonically_increasing_id())
print(f"Dimensão Cliente criada: {dim_cliente.count()} registros")
dim_cliente.show(5)

Dimensão Cliente criada: 5 registros
+-------------+----------+
| nome_cliente|id_cliente|
+-------------+----------+
| Ana Oliveira|         0|
| Pedro Santos|         1|
|Lucas Pereira|         2|
|   João Silva|         3|
|  Maria Souza|         4|
+-------------+----------+



#### Dimensão Produto

In [14]:
dim_produto = df_bruto.select("nome_produto", "nome_categoria", "nome_fabricante").distinct()
dim_produto = dim_produto.withColumn("id_produto", monotonically_increasing_id())
print(f"Dimensão Produto criada: {dim_produto.count()} registros")
dim_produto.show(5)

Dimensão Produto criada: 5 registros
+------------+--------------+---------------+----------+
|nome_produto|nome_categoria|nome_fabricante|id_produto|
+------------+--------------+---------------+----------+
| Sabão Em Pó|       Limpeza|            Omo|         0|
|  Detergente|       Limpeza|            Ypê|         1|
|       Arroz|      Alimento|          Camil|         2|
|Refrigerante|        Bebida|      Coca-cola|         3|
|      Feijão|      Alimento|        Kicaldo|         4|
+------------+--------------+---------------+----------+



#### Dimensão Data

In [15]:
# Extração de componentes temporais para facilitar análises por período e sazonalidade
# Estrutura completa para slice and dice temporal

dim_data = df_bruto.select("data").distinct()
dim_data = dim_data.withColumn("id_data", monotonically_increasing_id()) \
    .withColumn("ano", year("data")) \
    .withColumn("mes", month("data")) \
    .withColumn("dia", dayofmonth("data")) \
    .withColumn("dia_semana", date_format("data", "EEEE"))
print(f"Dimensão Tempo criada: {dim_data.count()} registros")
dim_data.show(5)

Dimensão Tempo criada: 29 registros
+----------+-------+----+---+---+----------+
|      data|id_data| ano|mes|dia|dia_semana|
+----------+-------+----+---+---+----------+
|2024-01-07|      0|2024|  1|  7|    Sunday|
|2024-01-11|      1|2024|  1| 11|  Thursday|
|2024-01-30|      2|2024|  1| 30|   Tuesday|
|2024-01-14|      3|2024|  1| 14|    Sunday|
|2024-01-02|      4|2024|  1|  2|   Tuesday|
+----------+-------+----+---+---+----------+
only showing top 5 rows



#### Dimensão Local

In [16]:
dim_local = df_bruto.select("nome_cidade", "nome_estado").distinct()
dim_local = dim_local.withColumn("id_local", monotonically_increasing_id())
print(f"Dimensão Cidade criada: {dim_local.count()} registros")
dim_local.show(5)

Dimensão Cidade criada: 5 registros
+--------------+-----------+--------+
|   nome_cidade|nome_estado|id_local|
+--------------+-----------+--------+
|      Curitiba|         PR|       0|
|     São Paulo|         SP|       1|
|  Porto Alegre|         RS|       2|
|Belo Horizonte|         MG|       3|
|Rio De Janeiro|         RJ|       4|
+--------------+-----------+--------+



#### Fato Venda

In [17]:
# Abordagem robusta com joins progressivos para garantir integridade referencial
# Uso de chaves substitutas (surrogate keys) de todas as dimensões
# Medidas de negócio: quantidade vendida e valor total
# Granularidade da tabela fato: uma linha para cada item de venda 

df_bruto_com_id = df_bruto.withColumn("id_venda", monotonically_increasing_id())

# 1. Conectar com dimensão Cliente
df_com_cliente = df_bruto_com_id.join(
    dim_cliente.select("nome_cliente", "id_cliente"), 
    on=["nome_cliente"], 
    how="left"
)

# 2. Conectar com dimensão Produto
df_com_produto = df_com_cliente.join(
    dim_produto.select("nome_produto","id_produto", "nome_categoria", "nome_fabricante"),
    on=["nome_produto", "nome_categoria", "nome_fabricante"],
    how="left"
)

# 3. Conectar com dimensão Local
df_com_local = df_com_produto.join(
    dim_local.select("nome_cidade", "nome_estado", "id_local"),
    on=["nome_cidade","nome_estado"],
    how="left"
)

# 4. Conectar com dimensão Tempo
df_com_data = df_com_local.join(
    dim_data.select("data", "id_data"),
    on=["data"],
    how="left"
)

# 5. Criar a tabela fato mantendo apenas as colunas necessárias
fato_vendas = df_com_data.select(
    "id_venda",
    "id_cliente", 
    "id_produto", 
    "id_local",
    "id_data", 
    "qtd_vendida", 
    "valor_total"
)

print(f"Tabela fato Vendas criada: {fato_vendas.count()} registros")
fato_vendas.orderBy("id_venda").show()

Tabela fato Vendas criada: 100 registros
+--------+----------+----------+--------+-------+-----------+-----------+
|id_venda|id_cliente|id_produto|id_local|id_data|qtd_vendida|valor_total|
+--------+----------+----------+--------+-------+-----------+-----------+
|       0|         2|         1|       2|     11|          6|       90.0|
|       1|         2|         4|       2|      3|         10|      240.0|
|       2|         0|         3|       4|     15|          3|      150.0|
|       3|         1|         4|       0|      8|          4|      152.0|
|       4|         1|         2|       0|     14|          3|       87.0|
|       5|         1|         3|       0|     16|          6|      192.0|
|       6|         3|         1|       1|      4|         10|      190.0|
|       7|         2|         4|       2|     19|         10|      260.0|
|       8|         1|         0|       0|     14|          2|       90.0|
|       9|         3|         4|       1|      7|         10|      470.

### Exportação dos resultados

In [18]:
# dim_cliente.write.csv(os.path.join("dados","saida","dim_cliente.csv"), header=True, mode="overwrite")
# dim_produto.write.csv(os.path.join("dados","saida","dim_produto.csv"), header=True, mode="overwrite")
# dim_data.write.csv(os.path.join("dados","saida","dim_data.csv"), header=True, mode="overwrite")
# dim_local.write.csv(os.path.join("dados","saida","dim_local.csv"), header=True, mode="overwrite")

# fato_vendas.write.csv(os.path.join("dados","saida","fato_vendas.csv"), header=True, mode="overwrite")

In [19]:
# Encerrar a sessão
spark.stop()