# ETL: Silver to Gold

Como já explicado anteriormente, o ETL é o processo de extração, transformação e carga de dados. Nesse notebook, vamos focar na transformação e carga dos dados do nível Silver para o nível Gold do nosso Data Warehouse. Dessa forma , estaremos preparando os dados para análises, possibilitando a utilização de ferramentas de BI para gerar os relatórios.

## 1. Configuração inicial

Essa célula importa todas as bibliotecas necessárias, define os caminhos para os arquivos de configuração (`.env`) e DDL (`gold_ddl.sql`), e cria a função `get_connection()` para se conectar ao PostgreSQL.

In [7]:
import pandas as pd
import numpy as np
import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values
from dotenv import load_dotenv
import os
from pathlib import Path

dotenv_path = Path('../../.env')
ddl_file_path = Path('../../data_layer/gold/gold_ddl.sql')
load_dotenv(dotenv_path=dotenv_path, override=True)

def get_connection():
    try:
        return psycopg2.connect(
            host=os.getenv('DB_HOST','localhost'),
            database=os.getenv('POSTGRES_DB','postgres'),
            user=os.getenv('POSTGRES_USER','postgres'),
            password=os.getenv('POSTGRES_PASSWORD','postgres'),
            port=os.getenv('DB_PORT', 5432)
        )
    except psycopg2.Error as e:
        print(f"Falha ao conectar ao banco de dados: {e}")
        raise

print("Configuração inicial concluída.")

Configuração inicial concluída.


## 2. Execução do DDL

Essa célula garante que nosso *Data Warehouse* (o *schema* `dw`) exista e esteja no estado correto. Efetuamos a leitura do arquivo `gold_ddl.sql` e o executamos. Dentro do DDL, usamos comando como `DROP TABLE IF EXISTS ...` e `CREATE TABLE IF NOT EXISTS` para garantir que o notebook possa ser executado várias vezes sem erros.

Algumas observações importantes sobre o DDL:

1. **Modelo Star Schema:** A arquitetura é composta por uma tabela fato_metricas_crpt (contendo as métricas numéricas, como valor de capitalização de mercado) e três dimensões de contexto (dim_crpt, dim_data, dim_hora).

2. **Chave Natural:** Ao analisar a camada silver, é possível observar que a coluna symbol estava corrompida (ex: 'USD' para todas as moedas), o que não condiz com a descrição das colunas no dataset. Dessa forma, optamos por utilizar a coluna name como chave natural. Assim, para um determinado snapshot (data e hora), cada criptomoeda é unicamente identificada pelo seu nome.


In [10]:
print(f"Executando DDL...")
try:
    with open(ddl_file_path, 'r') as f:
        ddl_script = f.read()
    
    with get_connection() as conn, conn.cursor() as cur:
        cur.execute(ddl_script)
        conn.commit()
    
    print("Schema 'dw' e tabelas criadas com sucesso.")
except Exception as e:
    print(f"Erro ao executar DDL: {e}")
    raise

Executando DDL...
Schema 'dw' e tabelas criadas com sucesso.


## 3. Extract

Na etapa de extração, lemos todos os dados do nosso Data Lakehouse da Camada Silver (`public.currencies_data`) e passamos para um DataFrame do Pandas (`df_silver`). Dessa forma, realizamos a extração completa dos dados, preparando-os para as etapas subsequentes com os dados em memória.

In [11]:
print("Extraindo dados da tabela 'public.currencies_data'...")
df_silver = None
try:
    with get_connection() as conn:
        df_silver = pd.read_sql_query("SELECT * FROM public.currencies_data", conn)
    
    if df_silver is None or df_silver.empty:
        raise Exception("A tabela Silver 'public.currencies_data' está vazia ou não foi carregada.")
    
    print(f"{len(df_silver)} linhas extraídas da Silver.")

except Exception as e:
    print(f"Erro ao extrair dados da Silver: {e}")
    raise

Extraindo dados da tabela 'public.currencies_data'...
20225 linhas extraídas da Silver.


  df_silver = pd.read_sql_query("SELECT * FROM public.currencies_data", conn)


## 4. Transform

Para iniciar a etapa de transformação, criamos três DataFrames do Panda que vão compor as nossas dimensões no Data Warehouse:

1.  **`df_dim_data`**: Extrai as datas únicas de `last_updated` e cria atributos (ano, mês, trimestre).
2.  **`df_dim_hora`**: Gera programaticamente os 86.400 segundos de um dia.
3.  **`df_dim_crpt`**: Extrai as moedas únicas (baseado no `name`), trata os valores `Infinity` (convertendo para `NaN`) e renomeia as colunas.

Depois, criamos o DataFrame da Fato (`df_fato`), que contém as métricas numéricas associadas a cada criptomoeda em um determinado snapshot (data e hora). Para isso, fazemos *merge* com as dimensões para obter os *surrogate keys* (SKs) correspondentes.

### 4.1 Transform das Dimensões

In [12]:
print("Transformando dw.dim_dta...")
df_dim_dta = pd.DataFrame(pd.to_datetime(df_silver['last_updated']).dt.date.unique(), columns=['dte_cpt'])
df_dim_dta = df_dim_dta.dropna()
df_dim_dta['srk_dta'] = df_dim_dta['dte_cpt'].apply(lambda x: int(x.strftime('%Y%m%d')))
df_dim_dta['num_dia'] = df_dim_dta['dte_cpt'].apply(lambda x: x.day)
df_dim_dta['num_mes'] = df_dim_dta['dte_cpt'].apply(lambda x: x.month)
df_dim_dta['num_ano'] = df_dim_dta['dte_cpt'].apply(lambda x: x.year)
df_dim_dta['nom_mes'] = df_dim_dta['dte_cpt'].apply(lambda x: x.strftime('%B'))
df_dim_dta['nom_sem'] = df_dim_dta['dte_cpt'].apply(lambda x: x.strftime('%A'))
df_dim_dta['num_tri'] = (df_dim_dta['num_mes'] - 1) // 3 + 1
df_dim_dta['num_sem'] = (df_dim_dta['num_mes'] - 1) // 6 + 1
df_dim_dta['flg_fds'] = df_dim_dta['dte_cpt'].apply(lambda x: x.weekday() >= 5)

df_dim_dta = df_dim_dta[['srk_dta', 'dte_cpt', 'num_dia', 'num_mes', 'num_ano', 'nom_mes', 'nom_sem', 'num_tri', 'num_sem', 'flg_fds']]

print("Transformando dw.dim_hra...")
def get_periodo_dia(h):
    return ('Manhã' if 6 <= h < 12 else 'Tarde' if 12 <= h < 18 else 'Noite' if 18 <= h < 24 else 'Madrugada')
times = pd.date_range('00:00:00', '23:59:59', freq='S').time
df_dim_hra = pd.DataFrame(times, columns=['hre_cpt'])
df_dim_hra['srk_hra'] = df_dim_hra['hre_cpt'].apply(lambda x: x.hour * 10000 + x.minute * 100 + x.second)
df_dim_hra['num_hra'] = [t.hour for t in times]
df_dim_hra['num_min'] = [t.minute for t in times]
df_dim_hra['num_seg'] = [t.second for t in times]
df_dim_hra['nom_per'] = df_dim_hra['num_hra'].apply(get_periodo_dia)
df_dim_hra = df_dim_hra[['srk_hra', 'hre_cpt', 'num_hra', 'num_min', 'num_seg', 'nom_per']]

print("Transformando dw.dim_crp...")
cols = ['name', 'symbol', 'max_supply', 'is_active', 'date_added']
df_dim_crp = df_silver[cols].drop_duplicates(subset=['name']).dropna(subset=['name']).copy()

df_dim_crp.rename(columns={
    'name': 'nky_nom', 'symbol': 'cod_sym', 'max_supply': 'vlr_max_sup',
    'is_active': 'flg_atv', 'date_added': 'dte_add'
}, inplace=True)

df_dim_crp['vlr_max_sup'] = df_dim_crp['vlr_max_sup'].replace([np.inf, -np.inf], np.nan)
df_dim_crp['dte_add'] = pd.to_datetime(df_dim_crp['dte_add']).dt.date
df_dim_crp = df_dim_crp[['nky_nom', 'cod_sym', 'vlr_max_sup', 'flg_atv', 'dte_add']]

print(f"✅ Preparado: {len(df_dim_dta)} datas, {len(df_dim_hra)} horas, {len(df_dim_crp)} criptos únicas.")

Transformando dw.dim_dta...
Transformando dw.dim_hra...
Transformando dw.dim_crp...
✅ Preparado: 1 datas, 86400 horas, 9193 criptos únicas.


  times = pd.date_range('00:00:00', '23:59:59', freq='S').time


## 5. Load das Dimensões

Para realizar a load das dimensões, carregamos os 3 DataFrames nas suas tabelas `dw.*`. Além disso, adicionamos um registro 'Desconhecido' (com SK = -1 ou 'N/A') em cada dimensão para garantir a integridade referencial, caso a Fato tenha uma chave que não foi encontrada.

Armazenamos essas chaves 'desconhecidas' na variável `unknowns`.

In [13]:
print("Carregando dimensões no schema dw...")
unknowns = {}
try:
    with get_connection() as conn, conn.cursor() as cur:
        execute_values(cur, sql.SQL("""
            INSERT INTO dw.dim_dta (srk_dta, dte_cpt, num_dia, num_mes, num_ano, nom_mes, nom_sem, num_tri, num_sem, flg_fds)
            VALUES %s ON CONFLICT (srk_dta) DO NOTHING
        """), df_dim_dta.values.tolist())

        cur.execute("INSERT INTO dw.dim_dta (srk_dta, dte_cpt, num_dia, num_mes, num_ano) VALUES (-1, '1900-01-01', 1, 1, 1900) ON CONFLICT (srk_dta) DO NOTHING RETURNING srk_dta")
        unknowns['data'] = (cur.fetchone() or [-1])[0]

        cur.execute("SELECT COUNT(*) FROM dw.dim_hra")
        if cur.fetchone()[0] < 86400:
            execute_values(cur, sql.SQL("""
                INSERT INTO dw.dim_hra (srk_hra, hre_cpt, num_hra, num_min, num_seg, nom_per)
                VALUES %s ON CONFLICT (srk_hra) DO NOTHING
            """), df_dim_hra.values.tolist(), page_size=5000)

        cur.execute("INSERT INTO dw.dim_hra (srk_hra, hre_cpt, num_hra, num_min, num_seg) VALUES (-1, '00:00:00', 0, 0, 0) ON CONFLICT (srk_hra) DO NOTHING RETURNING srk_hra")
        unknowns['hora'] = (cur.fetchone() or [-1])[0]

        execute_values(cur, sql.SQL("""
            INSERT INTO dw.dim_crp (nky_nom, cod_sym, vlr_max_sup, flg_atv, dte_add)
            VALUES %s ON CONFLICT (nky_nom) DO UPDATE SET
                cod_sym = EXCLUDED.cod_sym,
                vlr_max_sup = EXCLUDED.vlr_max_sup,
                flg_atv = EXCLUDED.flg_atv,
                dte_add = EXCLUDED.dte_add
        """), df_dim_crp.where(pd.notna(df_dim_crp), None).values.tolist())

        cur.execute("INSERT INTO dw.dim_crp (nky_nom) VALUES ('N/A') ON CONFLICT (nky_nom) DO NOTHING RETURNING srk_crp")
        res_crp = cur.fetchone()
        unknowns['crpt'] = res_crp[0] if res_crp else cur.execute("SELECT srk_crp FROM dw.dim_crp WHERE nky_nom = 'N/A'") or cur.fetchone()[0]
        
        conn.commit()
    
    print(f"✅ Dimensões carregadas. Unknowns: {unknowns}")

except Exception as e:
    print(f"❌ Erro ao carregar dimensões: {e}")
    raise

Carregando dimensões no schema dw...
✅ Dimensões carregadas. Unknowns: {'data': -1, 'hora': -1, 'crpt': 9194}


## 6. Transform e Load da Tabela Fato

Agora, já com as dimensões carregadas no Data Warehouse, podemos transformar e carregar a tabela `fato_metricas_crpt`. Para isso, seguimos os seguintes passos:

1. Lemos as surrogate keys das dimensões do Data Warehouse para DataFrames.
2. Fazemos *merge* entre o DataFrame da Fato e os DataFrames das dimensões para obter as SKs.
3. Renomeamos as colunas da Silver para padronizar com o que foi definido na camada Gold. Além disso tratamos valores `Infinity` e `NaN`, e selecionamos a ordem final das colunas.
4. Executamos `TRUNCATE` na Fato para garantir que quaiquer dados antigos sejam removidos. Em seguida, usamos `execute_values` para carregar o DataFrame direto na tabela `dw.fato_metricas_crpt`.

In [14]:
print("Preparando dw.fat_mtr...")
try:
    with get_connection() as conn:
        df_dta_lkp = pd.read_sql("SELECT srk_dta, dte_cpt FROM dw.dim_dta", conn)
        df_hra_lkp = pd.read_sql("SELECT srk_hra, hre_cpt FROM dw.dim_hra", conn)
        df_crp_lkp = pd.read_sql("SELECT srk_crp, nky_nom FROM dw.dim_crp", conn)

    df_fato = df_silver.copy()
    df_fato['join_date'] = pd.to_datetime(df_fato['last_updated']).dt.date
    df_fato['join_time'] = pd.to_datetime(df_fato['last_updated']).dt.floor('S').dt.time
    
    df_dta_lkp['dte_cpt'] = pd.to_datetime(df_dta_lkp['dte_cpt']).dt.date
    df_hra_lkp['hre_cpt'] = pd.to_datetime(df_hra_lkp['hre_cpt'], format='%H:%M:%S').dt.time

    df_fato = df_fato.merge(df_crp_lkp, left_on='name', right_on='nky_nom', how='left')\
                     .merge(df_dta_lkp, left_on='join_date', right_on='dte_cpt', how='left')\
                     .merge(df_hra_lkp, left_on='join_time', right_on='hre_cpt', how='left')

    df_fato['srk_crp'] = df_fato['srk_crp'].fillna(unknowns['crpt']).astype(int)
    df_fato['srk_dta'] = df_fato['srk_dta'].fillna(unknowns['data']).astype(int)
    df_fato['srk_hra'] = df_fato['srk_hra'].fillna(unknowns['hora']).astype(int)

    rename_cols = {
        'cmc_rank': 'rnk_cmc', 'price': 'vlr_pre_usd', 'volume_24h': 'vlr_vlm_24h',
        'market_cap': 'vlr_mkt', 'dominance': 'vlr_dom', 'market_pair_count': 'qtd_par',
        'circulating_supply': 'qtd_cir_sup', 'total_supply': 'qtd_tot_sup',
        'fully_dillutted_market_cap': 'vlr_fld_mkt', 'market_cap_by_total_supply': 'vlr_mkt_tot',
        'ytd_price_change_percentage': 'pct_ytd', 'percent_change_1h': 'pct_1hr', 'percent_change_24h': 'pct_24h',
        'percent_change_7d': 'pct_7dd', 'percent_change_30d': 'pct_30d', 'percent_change_60d': 'pct_60d', 'percent_change_90d': 'pct_90d'
    }
    df_fato.rename(columns=rename_cols, inplace=True)
    df_fato['vlr_tvr'] = np.nan
    
    cols_final = [
        'srk_crp', 'srk_dta', 'srk_hra',
        'rnk_cmc', 'vlr_pre_usd', 'vlr_vlm_24h', 'vlr_mkt', 'vlr_dom', 'vlr_tvr',
        'qtd_par', 'qtd_cir_sup', 'qtd_tot_sup', 'vlr_fld_mkt', 'vlr_mkt_tot',
        'pct_ytd', 'pct_1hr', 'pct_24h', 'pct_7dd', 'pct_30d', 'pct_60d', 'pct_90d'
    ]
    
    df_fato_final = df_fato[cols_final].replace([np.inf, -np.inf], np.nan)
    df_fato_list = df_fato_final.where(pd.notna(df_fato_final), None).values.tolist()

    with get_connection() as conn, conn.cursor() as cur:
        print("Limpando Fato (TRUNCATE)...")
        cur.execute("TRUNCATE TABLE dw.fat_mtr RESTART IDENTITY;")
        
        print(f"Inserindo {len(df_fato_list)} registros na Fato...")
        query = sql.SQL("INSERT INTO dw.fat_mtr ({}) VALUES %s").format(sql.SQL(', ').join(map(sql.Identifier, cols_final)))
        execute_values(cur, query, df_fato_list, page_size=1000)
        conn.commit()

    print(f"✅ Fato carregada com sucesso: {len(df_fato_list)} registros.")
except Exception as e:
    print(f"❌ Erro ao carregar fato: {e}")
    raise

Preparando dw.fat_mtr...


  df_dta_lkp = pd.read_sql("SELECT srk_dta, dte_cpt FROM dw.dim_dta", conn)
  df_hra_lkp = pd.read_sql("SELECT srk_hra, hre_cpt FROM dw.dim_hra", conn)
  df_crp_lkp = pd.read_sql("SELECT srk_crp, nky_nom FROM dw.dim_crp", conn)
  df_fato['join_time'] = pd.to_datetime(df_fato['last_updated']).dt.floor('S').dt.time


Limpando Fato (TRUNCATE)...
Inserindo 20225 registros na Fato...
✅ Fato carregada com sucesso: 20225 registros.


## Validação final

Esta seção executa uma contagem de linhas em todas as tabelas para verificar o resultado final. 

In [15]:
try:
    with get_connection() as conn:
        query = """
        SELECT 'dw.dim_crp' AS Tabela, COUNT(*) AS Total FROM dw.dim_crp
        UNION ALL SELECT 'dw.dim_dta', COUNT(*) FROM dw.dim_dta
        UNION ALL SELECT 'dw.dim_hra', COUNT(*) FROM dw.dim_hra
        UNION ALL SELECT 'dw.fat_mtr', COUNT(*) FROM dw.fat_mtr;
        """
        display(pd.read_sql_query(query, conn))
except Exception as e:
    print(f"❌ Erro: {e}")

  display(pd.read_sql_query(query, conn))


Unnamed: 0,tabela,total
0,dw.dim_crp,9194
1,dw.dim_dta,2
2,dw.dim_hra,86401
3,dw.fat_mtr,20225
