# Processamento de Dados: Camada Raw para Silver
**Disciplina:** Sistemas de Banco de Dados 2  
**Semestre:** 2025/2  
**Professor:** Thiago Luiz de Souza Gomes  
**Grupo 15**

**Integrantes:**
* Caio Ferreira Duarte (231026901)
* Laryssa Felix Ribeiro Lopes (231026840)
* Luísa de Souza Ferreira (232014807)
* Henrique Fontenelle Galvão Passos (231030771)
* Marjorie Mitzi Cavalcante Rodrigues (231039140)

---

## Contextualização e Objetivos
O objetivo deste script é extrair os dados da One Big Table(`aviao`) localizada no schema `silver` do banco de dados, transformá-los em tabelas de fatos e dimensões por meio da modelagem dimensional no formato star schema e, por fim, carregá-los no schema `dw`.

# 1. Preparação de Bibliotecas e conexões externas

In [12]:
import pandas as pd
import sqlparse
from sqlalchemy import create_engine, text
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

# Configurações de conexão
DB_URI = "postgresql://admin:admin@localhost:5432/db_aviao"
ARQUIVO_DDL = '../Data Layer/gold/ddl.sql'
engine = create_engine(DB_URI)

print("Bibliotecas importadas e conexão configurada!")

Bibliotecas importadas e conexão configurada!


# 2. Executa DDL

O trecho de código abre o arquivo de DDL (ARQUIVO_DDL) em modo leitura com codificação UTF-8 e utiliza o sqlparse para limpar o conteúdo, removendo comentários e deixando o SQL mais organizado; 

em seguida, a string limpa é dividida em comandos SQL individuais (commands) e, dentro de um contexto transacional (engine.begin()), cada comando é executado no banco usando conn.execute(text(cmd)), garantindo que todas as instruções sejam aplicadas corretamente.

Ao final o DDL é executado e as tabelas são criadas no schema dw.

In [13]:
# Abre o arquivo DDL (arquivo com instruções SQL de criação de tabelas)
with open(ARQUIVO_DDL, 'r', encoding='utf-8') as f:
    # Lê o conteúdo do arquivo e remove comentários usando sqlparse
    sql_ddl_clean = sqlparse.format(f.read(), strip_comments=True)

# Divide o SQL em comandos individuais e remove espaços em branco
commands = [cmd.strip() for cmd in sqlparse.split(sql_ddl_clean) if cmd.strip()]

# Inicia uma transação com o banco de dados
with engine.begin() as conn:
    # Executa cada comando SQL individualmente
    for cmd in commands:
        conn.execute(text(cmd))
        
print("DDL executado! Tabelas criadas no schema dw")


DDL executado! Tabelas criadas no schema dw


# 2. Extração dos dados da Silver

Utiliza do pandas para extrair os dados da One Big Table da camada Silver e armazena em um DataFrame

In [14]:
df_obt = pd.read_sql("SELECT * FROM silver.aviao", engine)
print(f"{len(df_obt):,} registros extraídos da silver.aviao")
df_obt.head()

87,951 registros extraídos da silver.aviao


Unnamed: 0,event_id,investigation_type,accident_number,event_date,publication_date,location,country,latitude,longitude,airport_code,...,air_carrier,broad_phase_of_flight,report_status,total_fatal_injuries,total_serious_injuries,total_minor_injuries,total_uninjured,injury_severity,weather_condition,created_at
0,20001218X45444,Accident,SEA87LA080,1948-10-24,,"MOOSE CREEK, ID",United States,,,,...,,Cruise,Probable Cause,2,0,0,0,Fatal,UNK,2026-01-19 12:35:00.806
1,20001218X45447,Accident,LAX94LA336,1962-07-19,1996-09-19,"BRIDGEPORT, CA",United States,,,,...,,Unknown,Probable Cause,4,0,0,0,Fatal,UNK,2026-01-19 12:35:00.806
2,20061025X01555,Accident,NYC07LA005,1974-08-30,2007-02-26,"Saltville, VA",United States,36.922223,-81.878056,,...,,Cruise,Probable Cause,3,0,0,0,Fatal,IMC,2026-01-19 12:35:00.806
3,20001218X45448,Accident,LAX96LA321,1977-06-19,2000-09-12,"EUREKA, CA",United States,,,,...,,Cruise,Probable Cause,2,0,0,0,Fatal,IMC,2026-01-19 12:35:00.806
4,20041105X01764,Accident,CHI79FA064,1979-08-02,1980-04-16,"Canton, OH",United States,,,,...,,Approach,Probable Cause,1,2,0,0,Fatal,VMC,2026-01-19 12:35:00.806


# 3. Transformação das Dimensões

Função para organizar as colunas da One Big Table em novas tabelas dimensões.

Receberá a One Big Table e um mapeamento de colunas, este mapeamento tem nome das colunas na Silver como chave e nome das colunas no DW como valor.

Por fim a função retorna um dataframe da nova tabela



In [15]:
def prep_dim(df, mapping):
    """Filtra colunas da Silver, remove duplicatas e renomeia para o DW."""
    return df[list(mapping.keys())].drop_duplicates().rename(columns=mapping)

Agora para cada uma das dimensões(tabelas) que desejamos ter no DW, aplicaremos na função `prep_dim` a df_obt, que é de onde sairão os dados, e o mapeamento das colunas referente á aquela dimensão a ser obtida.

In [16]:

# 1. Dimensão Temporal
dim_time = prep_dim(df_obt, {
    # nome da coluna na OBT:  nome da coluna no DW
    "event_date": "evt_dat", 
    "publication_date": "pub_dat"
})

# 2. Dimensão Severidade
dim_severity = prep_dim(df_obt, {
    "injury_severity": "inj_sev",
    "investigation_type": "inv_typ",
    "report_status": "rpt_sta"
})

# 3. Dimensão Clima
dim_weather = prep_dim(df_obt, {
    "weather_condition": "wth_con"
})

# 4. Dimensão Fase de Voo
dim_flight_phase = prep_dim(df_obt, {
    "broad_phase_of_flight": "brd_phs_off_flt"
})

# 5. Dimensão Geográfica
dim_geograph = prep_dim(df_obt, {
    "country": "ctr",
    "latitude": "lat",
    "longitude": "lon", 
    "airport_code": "apt_cod",
    "airport_name": "apt_nam",
    "location": "loc"
})

# 6. Dimensão Aeronave
dim_aircraft = prep_dim(df_obt, {
    "aircraft_category": "arc_cat", 
    "make": "mak", 
    "model": "mod", 
    "registration_number": "reg_num", 
    "engine_type": "eng_typ", 
    "number_of_engines": "num_off_eng", 
    "amateur_built": "ama_blt", 
    "aircraft_damage": "arc_dam"
})

# 7. Dimensão Operação
dim_operation = prep_dim(df_obt, {
    "purpose_of_flight": "prp_off_flt", 
    "schedule": "sch", 
    "air_carrier": "air_car", 
    "far_description": "far_dsc"
})

print(f"dim_tim: {len(dim_time):,} registros únicos")
print(f"dim_sev: {len(dim_severity):,} registros únicos")
print(f"dim_wth: {len(dim_weather):,} registros únicos")
print(f"dim_flt_phs: {len(dim_flight_phase):,} registros únicos")
print(f"dim_geo: {len(dim_geograph):,} registros únicos")
print(f"dim_arc: {len(dim_aircraft):,} registros únicos")
print(f"dim_opt: {len(dim_operation):,} registros únicos")


dim_tim: 49,823 registros únicos
dim_sev: 10,667 registros únicos
dim_wth: 5 registros únicos
dim_flt_phs: 13 registros únicos
dim_geo: 60,245 registros únicos
dim_arc: 86,058 registros únicos
dim_opt: 14,656 registros únicos


# 4. Carregamento das Dimensões
O trecho de código a seguir realiza a carga dos DataFrames das dimensões para o banco de dados utilizando o método to_sql do pandas.

Onde cada DataFrame é inserido na tabela correspondente dentro do esquema dw por meio da conexão engine; o parâmetro if_exists='append' garante que, se a tabela já existir, os novos registros sejam adicionados sem apagar os dados anteriores, isso vale principalmente para não apagar as colunas sem dados, e index=False evita que o índice do DataFrame vire uma coluna no banco, mantendo apenas os dados relevantes.

In [17]:
dim_time.to_sql('dim_tim', engine, schema='dw', if_exists='append', index=False)
dim_severity.to_sql('dim_sev', engine, schema='dw', if_exists='append', index=False)
dim_weather.to_sql('dim_wth', engine, schema='dw', if_exists='append', index=False)
dim_flight_phase.to_sql('dim_flt_phs', engine, schema='dw', if_exists='append', index=False)
dim_geograph.to_sql('dim_geo', engine, schema='dw', if_exists='append', index=False)
dim_aircraft.to_sql('dim_arc', engine, schema='dw', if_exists='append', index=False)
dim_operation.to_sql('dim_opt', engine, schema='dw', if_exists='append', index=False)

print("\nTODAS AS DIMENSÕES CARREGADAS NO BANCO!")



TODAS AS DIMENSÕES CARREGADAS NO BANCO!


# 5. Transformação da Fato

O código a seguir faz leitura das tabelas dimensionais já carregadas no banco de dados com os dados já preenchdios e com SRKs, e armazena em váriaveis do tipo dataframe, assim permitindo que os dados posteriormente sejam utilizados para encontrar as Foreign Keys da tabela fato.

In [18]:
dim_time_db = pd.read_sql("SELECT * FROM dw.dim_tim", engine)
dim_severity_db = pd.read_sql("SELECT * FROM dw.dim_sev", engine)
dim_weather_db = pd.read_sql("SELECT * FROM dw.dim_wth", engine)
dim_flight_phase_db = pd.read_sql("SELECT * FROM dw.dim_flt_phs", engine)
dim_geograph_db = pd.read_sql("SELECT * FROM dw.dim_geo", engine)
dim_aircraft_db = pd.read_sql("SELECT * FROM dw.dim_arc", engine)
dim_operation_db = pd.read_sql("SELECT * FROM dw.dim_opt", engine)

print("Dimensões lidas do banco com surrogate keys!")


Dimensões lidas do banco com surrogate keys!


Na célula abaixo, copiamos o dataframe da One Big Table e renomeamos as colunas necessárias para realizar os merges com as dimensões.
O objetivo é obter as SRKs (surrogate keys) de cada tabela dimensional, que serão utilizadas como FKs na tabela fato.
O resultado é armazenado em df_fact_prep.

In [19]:
# Preparar df_obt para os merges (renomear colunas para mnemônicos)
df_fact_prep = df_obt.copy()

# Renomear colunas para facilitar os merges
df_fact_prep = df_fact_prep.rename(columns={
    "event_date": "evt_dat",
    "publication_date": "pub_dat",
    "injury_severity": "inj_sev",
    "investigation_type": "inv_typ",
    "report_status": "rpt_sta",
    "weather_condition": "wth_con",
    "broad_phase_of_flight": "brd_phs_off_flt",
    "country": "ctr",
    "latitude": "lat",
    "longitude": "lon",
    "airport_code": "apt_cod",
    "airport_name": "apt_nam",
    "location": "loc",
    "aircraft_category": "arc_cat",
    "make": "mak",
    "model": "mod",
    "registration_number": "reg_num",
    "engine_type": "eng_typ",
    "number_of_engines": "num_off_eng",
    "amateur_built": "ama_blt",
    "aircraft_damage": "arc_dam",
    "purpose_of_flight": "prp_off_flt",
    "schedule": "sch",
    "air_carrier": "air_car",
    "far_description": "far_dsc"
})

print("Dataframe preparado para merges!")

Dataframe preparado para merges!


O código a seguir realiza uma série de `merge` (junções) entre o DataFrame de fatos (df_fact_prep) e as tabelas dimensionais já carregadas no banco, 
com o objetivo de trazer as surrogate keys de cada dimensão para o fato e se tornarem as FK da fato.

Em cada etapa, o merge é feito usando as colunas de ligação (por exemplo, evt_dat e pub_dat para a dimensão de tempo, mais a Primary Key da dimensão correspondente) isso vai no primeiro parametro.
No `on`, colocamos as chaves que farão o merge entre as duas tabelas,
O `how='left'` (LEFT JOIN) preserva todos os registros do fato, mesmo que não haja correspondência em alguma dimensão. Ao final, o DataFrame df_fact contém todas as chaves das dimensões (como dim_time_srk, dim_severity_srk, etc.), que são essenciais para montar a tabela fato no modelo dimensional. 

In [20]:
# Merge com dim_tim
df_fact = df_fact_prep.merge(
    dim_time_db[['srk_tim', 'evt_dat', 'pub_dat']],
    on=['evt_dat', 'pub_dat'],
    how='left'
)

# Merge com dim_sev
df_fact = df_fact.merge(
    dim_severity_db[['srk_sev', 'inj_sev', 'inv_typ', 'rpt_sta']],
    on=['inj_sev', 'inv_typ', 'rpt_sta'],
    how='left'
)

# Merge com dim_wth
df_fact = df_fact.merge(
    dim_weather_db[['srk_wth', 'wth_con']],
    on=['wth_con'],
    how='left'
)

# Merge com dim_flt_phs
df_fact = df_fact.merge(
    dim_flight_phase_db[['srk_flt_phs', 'brd_phs_off_flt']],
    on=['brd_phs_off_flt'],
    how='left'
)

# Merge com dim_geo
df_fact = df_fact.merge(
    dim_geograph_db[['srk_geo', 'ctr', 'lat', 'lon', 'apt_cod', 'apt_nam', 'loc']],
    on=['ctr', 'lat', 'lon', 'apt_cod', 'apt_nam', 'loc'],
    how='left'
)

# Merge com dim_arc
df_fact = df_fact.merge(
    dim_aircraft_db[['srk_arc', 'arc_cat', 'mak', 'mod', 'reg_num', 'eng_typ', 'num_off_eng', 'ama_blt', 'arc_dam']],
    on=['arc_cat', 'mak', 'mod', 'reg_num', 'eng_typ', 'num_off_eng', 'ama_blt', 'arc_dam'],
    how='left'
)

# Merge com dim_opt
df_fact = df_fact.merge(
    dim_operation_db[['srk_opt', 'prp_off_flt', 'sch', 'air_car', 'far_dsc']],
    on=['prp_off_flt', 'sch', 'air_car', 'far_dsc'],
    how='left'
)

print("Merges completados! Surrogate keys obtidos!")


Merges completados! Surrogate keys obtidos!


Cria a tabela fato, após obtidos os SRK (surrogate keys) na célula anterior, fazemos o filtro no dataframe que é resultado de muitos merges para manter somente as colunas que nos interessam para a tabela fato e por fim armazenamos em `fat_acident`

In [21]:
fat_acident = df_fact[[
    'srk_tim',
    'srk_sev',
    'srk_arc',
    'srk_geo',
    'srk_flt_phs',
    'srk_opt',
    'srk_wth',
    'event_id',
    'accident_number',
    'total_fatal_injuries',
    'total_serious_injuries',
    'total_minor_injuries',
    'total_uninjured'
]].rename(columns={
    'event_id': 'evt_ide',
    'accident_number': 'acc_num',
    'total_fatal_injuries': 'tot_fat_inj',
    'total_serious_injuries': 'tot_ser_inj',
    'total_minor_injuries': 'tot_min_inj',
    'total_uninjured': 'tot_uni'
})

print("Tabela fato preparada!")


Tabela fato preparada!


# 6. Carregamento da Fato

Carrega a tabela fato no banco com as devidas SRK passando:
- o dataframe que possue somente as colunas da fato;
- o schema na qual queremos adicioná-lo
- if_exists='append' para não substituir os dados obtidos anteriormente
- index=false para não usar os index do dataframe no banco.

In [22]:
fat_acident.to_sql('fat_acc', engine, schema='dw', if_exists='append', index=False)

print("TABELA FATO CARREGADA NO BANCO!")


TABELA FATO CARREGADA NO BANCO!
