In [2]:
from pymongo import MongoClient
import pandas as pd
from sqlalchemy import create_engine, text, inspect
import logging
import os

MONGO_URI = os.getenv('MONGO_URI')
POSTGRES_URI = os.getenv('POSTGRES_URI')

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
# MongoDB
client = MongoClient(MONGO_URI)
db = client["raw_data"]

# PostgreSQL
DB_USER = "user_analytics"
DB_PASS = "password123"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "analytics_db"

engine = create_engine(POSTGRES_URI)

# Teste de conexÃ£o
try:
    with engine.connect() as conn:
        logging.info("Conectado ao PostgreSQL com sucesso!")
except Exception as e:
    logging.info("Erro:", e)


2026-02-16 22:38:42,639 - INFO - Conectado ao PostgreSQL com sucesso!


In [3]:
def converter_data_hibrida(coluna):
    """Trata Unix, ISO, SQL e Strings BR (DD/MM) sem erro de overflow."""
    num_vals = pd.to_numeric(coluna, errors='coerce')
    mask_unix = (num_vals > 0) & (num_vals < 4102444800)
    
    d_num = pd.to_datetime(num_vals[mask_unix], unit='s', utc=True)
    d_str = pd.to_datetime(coluna, errors='coerce', dayfirst=True, utc=True)
    
    return d_num.reindex(coluna.index).fillna(d_str).dt.tz_localize(None)

def limpar_tipos_complexos(df):
    """Converte listas/dicionÃ¡rios em string para compatibilidade SQL."""
    for col in df.columns:
        if df[col].apply(lambda x: isinstance(x, (dict, list))).any():
            df[col] = df[col].astype(str)
    return df


In [4]:
with engine.begin() as conn:
    for tbl in inspect(engine).get_table_names():
        conn.execute(text(f'DROP TABLE IF EXISTS "{tbl}" CASCADE;'))
    logging.info("Banco de dados resetado.")


2026-02-16 22:38:42,713 - INFO - Banco de dados resetado.


In [5]:
u_raw = pd.json_normalize(list(db["users"].find())).drop(columns=['_id'], errors='ignore')
u_raw.columns = [c.replace('.', '_') for c in u_raw.columns]

# DIM_LOCATIONS
geo_cols = ['address_city', 'address_state', 'address_stateCode', 'address_country']
df_locs = u_raw[geo_cols].drop_duplicates().reset_index(drop=True)
df_locs['location_id'] = df_locs.index
df_locs['state_fmt'] = df_locs['address_state'] + " (" + df_locs['address_stateCode'] + ")"
u_raw = u_raw.merge(df_locs, on=geo_cols)
dim_locations = df_locs[['location_id', 'address_city', 'state_fmt', 'address_country']].rename(
    columns={'address_city': 'city', 'state_fmt': 'state', 'address_country': 'country'}
)

# DIM_COMPANY
comp_cols = [c for c in u_raw.columns if c.startswith('company_')]
dim_company = u_raw[comp_cols].drop_duplicates().reset_index(drop=True)
dim_company['company_id'] = dim_company.index
u_raw = u_raw.merge(dim_company, on=comp_cols)

# DIM_FINANCE
fin_cols = [c for c in u_raw.columns if c.startswith('bank_') or c.startswith('crypto_')]
dim_finance = u_raw[['id'] + fin_cols].copy().rename(columns={'id': 'user_id'})

# DIM_USERS
keep_u = ['id', 'firstName', 'lastName', 'maidenName', 'age', 'gender', 'email', 'phone', 
          'username', 'password', 'birthDate', 'role', 'cpf', 'cnpj', 'address_address', 
          'address_postalCode', 'address_coordinates_lat', 'address_coordinates_lng',
          'location_id', 'company_id']
dim_users = u_raw[keep_u]


In [6]:
p_raw = pd.json_normalize(list(db["products"].find())).drop(columns=['_id'], errors='ignore')

p_raw['brand_raw'] = p_raw['brand'].fillna('').astype(str).str.strip()
u_brands = sorted([b for b in p_raw['brand_raw'].unique() if b != ''])
dim_brands = pd.DataFrame({
    'brand_id': range(len(['NÃ£o Informado'] + u_brands)), 
    'brand_name': ['NÃ£o Informado'] + u_brands
})
b_map = dict(zip(dim_brands['brand_name'], dim_brands['brand_id']))
p_raw['brand_id'] = p_raw['brand_raw'].apply(lambda x: b_map.get(x, 0))

dim_products = p_raw.drop(columns=['brand', 'brand_raw'])
dim_products.columns = [c.replace('.', '_') for c in dim_products.columns]


In [7]:
carts_raw = list(db["carts"].find())
df_c_base = pd.DataFrame(carts_raw)

# fact_sales
fact_sales = df_c_base.drop(columns=['products', '_id'], errors='ignore')
fact_sales['transaction_date'] = converter_data_hibrida(fact_sales['transaction_date'])

# fact_sales_items
df_exp = df_c_base.explode('products')
items_norm = pd.json_normalize(df_exp['products'])
fact_sales_items = pd.concat([
    df_exp[['id', 'userId']].rename(columns={'id': 'cart_id'}).reset_index(drop=True),
    items_norm.rename(columns={'id': 'product_id'}).reset_index(drop=True)
], axis=1)


  d_str = pd.to_datetime(coluna, errors='coerce', dayfirst=True, utc=True)


In [8]:
tabelas = {
    'dim_users': dim_users, 'dim_locations': dim_locations, 
    'dim_company': dim_company, 'dim_finance': dim_finance,
    'dim_products': dim_products, 'dim_brands': dim_brands,
    'fact_sales': fact_sales, 'fact_sales_items': fact_sales_items
}

for nome, df in tabelas.items():
    limpar_tipos_complexos(df).to_sql(nome, engine, if_exists='replace', index=False)
    logging.info(f"{nome} carregada.")


2026-02-16 22:38:42,951 - INFO - dim_users carregada.
2026-02-16 22:38:42,964 - INFO - dim_locations carregada.
2026-02-16 22:38:42,978 - INFO - dim_company carregada.
2026-02-16 22:38:42,998 - INFO - dim_finance carregada.
2026-02-16 22:38:43,054 - INFO - dim_products carregada.
2026-02-16 22:38:43,062 - INFO - dim_brands carregada.
2026-02-16 22:38:43,091 - INFO - fact_sales carregada.
2026-02-16 22:38:43,213 - INFO - fact_sales_items carregada.


In [9]:
with engine.begin() as conn:
    # PKs
    conn.execute(text('ALTER TABLE dim_users ADD PRIMARY KEY (id);'))
    conn.execute(text('ALTER TABLE dim_products ADD PRIMARY KEY (id);'))
    conn.execute(text('ALTER TABLE dim_locations ADD PRIMARY KEY (location_id);'))
    conn.execute(text('ALTER TABLE dim_brands ADD PRIMARY KEY (brand_id);'))
    conn.execute(text('ALTER TABLE dim_company ADD PRIMARY KEY (company_id);'))
    conn.execute(text('ALTER TABLE fact_sales ADD PRIMARY KEY (id);'))

    # FKs
    conn.execute(text('ALTER TABLE dim_finance ADD CONSTRAINT fk_finance_user FOREIGN KEY (user_id) REFERENCES dim_users (id);'))
    conn.execute(text('ALTER TABLE fact_sales_items ADD CONSTRAINT fk_items_user FOREIGN KEY ("userId") REFERENCES dim_users (id);'))
    conn.execute(text('ALTER TABLE dim_users ADD CONSTRAINT fk_user_location FOREIGN KEY (location_id) REFERENCES dim_locations (location_id);'))
    conn.execute(text('ALTER TABLE dim_users ADD CONSTRAINT fk_user_company FOREIGN KEY (company_id) REFERENCES dim_company (company_id);'))
    conn.execute(text('ALTER TABLE dim_products ADD CONSTRAINT fk_product_brand FOREIGN KEY (brand_id) REFERENCES dim_brands (brand_id);'))
    conn.execute(text('ALTER TABLE fact_sales_items ADD CONSTRAINT fk_items_sale FOREIGN KEY (cart_id) REFERENCES fact_sales (id);'))
    conn.execute(text('ALTER TABLE fact_sales_items ADD CONSTRAINT fk_items_product FOREIGN KEY (product_id) REFERENCES dim_products (id);'))

logging.info("ğŸ”— Integridade referencial estabelecida com sucesso!")


2026-02-16 22:38:41,944 - INFO - ğŸ”— Integridade referencial estabelecida com sucesso!
