# Desenvolvendo a Dimensão Produto com 
- scd2


In [4]:
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))


In [18]:
# Importando dados

# importando 

import pandas as pd 
from datetime import datetime
from sqlalchemy import text
from sqlalchemy import create_engine
from dotenv import load_dotenv
from utils.acessobanco import acessobanco

In [19]:
# Cria a conexão com o banco usando a função acessobanco()
load_dotenv(dotenv_path="/home/danielpedro/Python/Projetos/SuperStorePY/Bancos/acessobanco.env")


True

In [20]:
# Cria a conexão com o banco usando a função acessobanco()
engine = acessobanco()

In [21]:
# Criando a df puxando do banco
df = pd.read_sql_table("orders",engine, schema="stg")

In [22]:
# Verificando os dados
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9994 entries, 0 to 9993
Data columns (total 21 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   id             9994 non-null   object
 1   id_pedido      9994 non-null   object
 2   data_pedido    9994 non-null   object
 3   data_envio     9994 non-null   object
 4   modo_envio     9994 non-null   object
 5   id_cliente     9994 non-null   object
 6   nome_cliente   9994 non-null   object
 7   segmento       9994 non-null   object
 8   pais           9994 non-null   object
 9   cidade         9994 non-null   object
 10  estado         9994 non-null   object
 11  ce             9994 non-null   object
 12  regiao         9994 non-null   object
 13  id_produto     9994 non-null   object
 14  categoria      9994 non-null   object
 15  sub_categoria  9994 non-null   object
 16  nome_produto   9994 non-null   object
 17  valor_venda    9994 non-null   object
 18  quantidade     9994 non-null

In [23]:
# Definido as colunas de dim Produtos
df_DimProduto = df[['id_produto','nome_produto','categoria','sub_categoria']]

In [24]:
# Verificando se tem duplicado e a quantidade
df_DimProduto.duplicated().value_counts()

True     8098
False    1896
Name: count, dtype: int64

In [25]:
# Excluindo os duplicado pelo indice 'id_produto'
df_DimProduto = df_DimProduto.drop_duplicates(subset=['id_produto'])

In [26]:
# Verificando novamente e tem duplicado 
df_DimProduto.duplicated().value_counts()

False    1862
Name: count, dtype: int64

In [27]:
# Ordenando pelo nome_produto
df_DimProduto = df_DimProduto.sort_values(by=['nome_produto'], ascending= True)
df_DimProduto.head()

Unnamed: 0,id_produto,nome_produto,categoria,sub_categoria
708,OFF-PA-10003424,"""While you Were Out"" Message Book, One Form pe...",Office Supplies,Paper
924,OFF-EN-10001137,"#10 Gummed Flap White Envelopes, 100/Box",Office Supplies,Envelopes
1487,OFF-EN-10002312,#10 Self-Seal White Envelopes,Office Supplies,Envelopes
2188,OFF-EN-10004483,"#10 White Business Envelopes,4 1/8 x 9 1/2",Office Supplies,Envelopes
199,OFF-EN-10000461,"#10- 4 1/8"" x 9 1/2"" Recycled Envelopes",Office Supplies,Envelopes


In [28]:
# Passando algumas variaveis para construir o processo de scd2

now = datetime.now()                       # timestamp da carga
SENTINEL = '2199-12-31 00:00:00'           # data que representa "ativo" (não nulo)
schema = 'dw'                              # schema alvo
tmp_table = 'tmp_dim_produto'              # nome da tabela temporária (no schema dw)
dim_table = 'dim_produto_scd2'             # tabela de destino

In [29]:
# Captura a data e hora atual para uso como data_inicio
data_atual = datetime.now()

#  Cria tabela temporaria no schema DW, para poder fazer o controle
with engine.begin() as conexao:
    df_DimProduto.to_sql(
        "tmp_dim_produto",
        conexao,
        schema="dw",
        if_exists="replace",
        index=False,
        method="multi",
        chunksize=5000
    )


In [30]:
# Verificando a quantidade de linhas na temp 
count = pd.read_sql("SELECT COUNT(*) as total_registros FROM dw.tmp_dim_produto", engine).iloc[0,0]
print(f"Total de registros na tabela: {count}")

Total de registros na tabela: 1862


In [31]:
#  Atualiza registros antigos, encerrando a validade dos produtos alterados
encerrar_sql = """
UPDATE dw.dim_produto_scd2 AS d
SET data_fim = :data_atual
FROM dw.tmp_dim_produto AS t
WHERE d.id_produto = t.id_produto
  AND d.data_fim = '2199-12-31'
  AND (
        d.nome_produto IS DISTINCT FROM t.nome_produto OR
        d.categoria IS DISTINCT FROM t.categoria OR
        d.sub_categoria IS DISTINCT FROM t.sub_categoria
      );
"""

with engine.begin() as conexao:
    conexao.execute(text(encerrar_sql), {"data_atual": data_atual})

print(f"Registros antigos encerrados com sucesso! {len(df_DimProduto)} linhas inseridas como versao=1.")


Registros antigos encerrados com sucesso! 1862 linhas inseridas como versao=1.


In [32]:
# Insere novos produtos e novas versões dos produtos alterados
inserir_sql = """
INSERT INTO dw.dim_produto_scd2 (
    id_produto, nome_produto, categoria, sub_categoria, versao, data_inicio, data_fim
)
SELECT 
    t.id_produto,
    t.nome_produto,
    t.categoria,
    t.sub_categoria,
    COALESCE(d.versao, 0) + 1 AS versao,
    :data_atual AS data_inicio,
    '2199-12-31' AS data_fim
FROM dw.tmp_dim_produto AS t
LEFT JOIN dw.dim_produto_scd2 AS d
    ON d.id_produto = t.id_produto
    AND d.data_fim = :data_fim_padrao
WHERE 
    d.id_produto IS NULL
    OR d.nome_produto IS DISTINCT FROM t.nome_produto
    OR d.categoria IS DISTINCT FROM t.categoria
    OR d.sub_categoria IS DISTINCT FROM t.sub_categoria;
"""

# Executa o SQL e conta quantas linhas foram inseridas
with engine.begin() as conexao:
    resultado = conexao.execute(
        text(inserir_sql),
        {"data_atual": data_atual, "data_fim_padrao": "2199-12-31"}
    )


linhas_inseridas = resultado.rowcount
print(f"{linhas_inseridas} produto(s) novo(s) ou versão(ões) inserido(s) com sucesso!")


0 produto(s) novo(s) ou versão(ões) inserido(s) com sucesso!


In [33]:
# Remove a tabela temporária após o processamento
with engine.begin() as conexao:
    conexao.execute(text("DROP TABLE IF EXISTS dw.tmp_dim_produto;"))

print("Tabela temporária removida com sucesso!")


Tabela temporária removida com sucesso!


In [34]:
#Criando dados para verificar o funcionamento do SCD2
# dfteste = {
#     "id_produto": 999
#     ,"nome_produto": 'Caixa de Pizza'
#     ,"categoria": 'Caixa'
#     ,"sub_categoria": 'Caixa de Comida'
# }

In [35]:
#Inserindo dados para verificar o funcionamento do SCD2

# df_DimProduto = pd.concat([df_DimProduto, pd.DataFrame([dfteste])], ignore_index=True)