In [15]:
import sys
from datetime import datetime
import pandas as pd
import json

print("="*80)
print("DATA TRANSFORMATION - CAMADA TRUSTED")
print("="*80)
print(f"\nIniciado em: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Ambiente: {sys.platform}")
print(f"Python: {sys.version.split()}")


DATA TRANSFORMATION - CAMADA TRUSTED

Iniciado em: 2026-01-10 18:53:28
Ambiente: win32
Python: ['3.14.0', '(tags/v3.14.0:ebf955d,', 'Oct', '7', '2025,', '10:15:03)', '[MSC', 'v.1944', '64', 'bit', '(AMD64)]']


In [16]:
# ===== CARREGAR DADOS RAW =====
print("\n" + "="*80)
print("CARREGANDO DADOS RAW")
print("="*80)

customers_raw = '../data/raw/customers_raw.csv'
products_raw = '../data/raw/products_raw.json'
sales_raw = '../data/raw/sales_raw.csv'

try:
    customers_raw = pd.read_csv(customers_raw)
    print(f"customers_raw: {len(customers_raw):,} registros carregados")
except FileNotFoundError:
    print(f"Arquivo não encontrado: {customers_raw}")
    customers_raw = None

try:
    with open(products_raw, 'r') as f:
        products_raw = pd.DataFrame(json.load(f))
    print(f"products_raw: {len(products_raw):,} registros carregados")
except FileNotFoundError:
    print(f"Arquivo não encontrado: {products_raw}")
    products_raw = None

try:
    sales_raw = pd.read_csv(sales_raw)
    print(f"sales_raw: {len(sales_raw):,} registros carregados")
except FileNotFoundError:
    print(f"Arquivo não encontrado: {sales_raw}")
    sales_raw = None





CARREGANDO DADOS RAW
customers_raw: 5,000 registros carregados
products_raw: 10,000 registros carregados
sales_raw: 120,000 registros carregados


In [17]:

# ===== TRANSFORMAÇÃO: PRODUCTS =====
print("\n" + "="*80)
print("TRANSFORMAÇÃO: PRODUCTS_RAW → PRODUCTS_TRUSTED")
print("="*80)


products_trusted = products_raw.copy()

# Tipo de dados
products_trusted['load_capacity'] = pd.to_numeric(products_trusted['load_capacity'], errors='coerce')
products_trusted['max_speed'] = pd.to_numeric(products_trusted['max_speed'], errors='coerce')
products_trusted['temperature_limit'] = pd.to_numeric(products_trusted['temperature_limit'], errors='coerce')
products_trusted['unit_cost'] = pd.to_numeric(products_trusted['unit_cost'], errors='coerce')
products_trusted['list_price'] = pd.to_numeric(products_trusted['list_price'], errors='coerce')

print("\n-  Tipos de dados padronizados")

# FIX CRÍTICO: Margem de lucro negativa
problematic_count = (products_trusted['unit_cost'] >= products_trusted['list_price']).sum()
print(f"\n-  Detectados {problematic_count} produtos com margem negativa")

# Corrigir: set list_price = unit_cost * 1.25 (margem mínima 25%)
products_trusted.loc[
    products_trusted['unit_cost'] >= products_trusted['list_price'],
    'list_price'
] = products_trusted.loc[
    products_trusted['unit_cost'] >= products_trusted['list_price'],
    'unit_cost'
] * 1.25

print(f"Corrigidos com margem mínima de 25%")

# Adicionar campo de descrição técnica
def generate_technical_description(row):
    return f"""{row['product_name']} é um rolamento do tipo {row['bearing_type']} em {row['material']}, fabricado pela {row['manufacturer']} (modelo {row['model']}). 
    Capacidade de carga: {row['load_capacity']:.0f}N, velocidade máxima: {row['max_speed']:,.0f} RPM, 
    limite de temperatura: {row['temperature_limit']}°C. 
    Indicado para resolver problemas de {row['problem_type']}. 
    Preço: R$ {row['list_price']:.2f}, custo: R$ {row['unit_cost']:.2f}."""

products_trusted['technical_description'] = products_trusted.apply(
    generate_technical_description, axis=1
)

print("- Campo technical_description adicionado")


# Extrair features técnicas como TAGS categóricas para ML
def extract_technical_features(row):

    features = []
    
    # Tipo de rolamento
    features.append(f"tipo_{row['bearing_type'].lower().replace(' ', '_')}")
    
    # Material
    features.append(f"material_{row['material'].lower().replace(' ', '_')}")
    
    # Categoria de carga
    if row['load_capacity'] < 10000:
        features.append("carga_leve")
    elif row['load_capacity'] < 30000:
        features.append("carga_media")
    else:
        features.append("carga_pesada")
    
    # Categoria de velocidade
    if row['max_speed'] < 5000:
        features.append("velocidade_baixa")
    elif row['max_speed'] < 10000:
        features.append("velocidade_media")
    else:
        features.append("velocidade_alta")
    
    # Resistência a temperatura
    if row['temperature_limit'] < 120:
        features.append("temp_padrao")
    elif row['temperature_limit'] < 180:
        features.append("temp_elevada")
    else:
        features.append("temp_extrema")
    
    # Problema que resolve
    features.append(f"resolve_{row['problem_type'].lower().replace(' ', '_')}")
    
    return features

products_trusted['technical_features'] = products_trusted.apply(extract_technical_features, axis=1)

print("Features técnicas (tags) extraídas")
print(f"\nExemplo: {products_trusted.iloc[0]['technical_features']}")


# Esse campo será usado para gerar embeddings (na Fase 5)
products_trusted['llm_product_description'] = products_trusted['technical_description']

print("Campo llm_product_description criado a partir de technical_description")
print(f"\nExemplo:\n{products_trusted.iloc[0]['llm_product_description'][:200]}...")


# ===== VALIDAÇÕES =====
# Validar
assert (products_trusted['unit_cost'] < products_trusted['list_price']).all(), \
    "Ainda existem produtos com unit_cost >= list_price!"

assert products_trusted.isnull().sum().sum() == 0, \
    "Existem valores nulos!"

print("- Validações passaram")


TRANSFORMAÇÃO: PRODUCTS_RAW → PRODUCTS_TRUSTED

-  Tipos de dados padronizados

-  Detectados 359 produtos com margem negativa
Corrigidos com margem mínima de 25%
- Campo technical_description adicionado
Features técnicas (tags) extraídas

Exemplo: ['tipo_autocompensador', 'material_aço', 'carga_media', 'velocidade_alta', 'temp_padrao', 'resolve_vibração']
Campo llm_product_description criado a partir de technical_description

Exemplo:
Rolamento Industrial 1 é um rolamento do tipo Autocompensador em Aço, fabricado pela SKF (modelo MD-859). 
    Capacidade de carga: 11549N, velocidade máxima: 13,066 RPM, 
    limite de temperatura: 1...
- Validações passaram


In [18]:

# ===== TRANSFORMAÇÃO: CUSTOMERS =====
print("\n" + "="*80)
print("TRANSFORMAÇÃO: CUSTOMERS_RAW → CUSTOMERS_TRUSTED")
print("="*80)

customers_trusted = customers_raw.copy()

# Parse data columns
customers_trusted['relationship_start_date'] = pd.to_datetime(customers_trusted['relationship_start_date'])
customers_trusted['last_updated'] = pd.to_datetime(customers_trusted['last_updated'])

# Numeric types
customers_trusted['annual_revenue_estimated'] = pd.to_numeric(
    customers_trusted['annual_revenue_estimated'], errors='coerce'
)
customers_trusted['maintenance_budget_annual'] = pd.to_numeric(
    customers_trusted['maintenance_budget_annual'], errors='coerce'
)
customers_trusted['downtime_cost_per_hour'] = pd.to_numeric(
    customers_trusted['downtime_cost_per_hour'], errors='coerce'
)

# Coluna adicionada a fim de resolver issue nos testes de qualidade de dados
customers_trusted['country'] = 'Brasil'  # valor padrão para o dataset

print("- Tipos de dados padronizados")

# Mapeamento de indústria para problemas esperados
industry_problems = {
    'Mineração': ['Vibração', 'Desgaste'],
    'Siderurgia': ['Superaquecimento', 'Vibração'],
    'Alimentos': ['Contaminação', 'Desgaste'],
    'Automotiva': ['Vibração', 'Superaquecimento'],
    'Papel e Celulose': ['Desgaste', 'Contaminação'],
    'Química': ['Contaminação', 'Superaquecimento'],
    'Cimento': ['Desgaste', 'Vibração'],
    'Energia': ['Superaquecimento', 'Vibração'],
}

# Mapeia indústria → lista de problemas
customers_trusted['expected_problems'] = customers_trusted['industry'].map(industry_problems)

print("- Mapeamento de problemas esperados por indústria")

# Problema padrão se a indústria não estiver no dicionário ou estiver nula
default_problems = ['Vibração', 'Desgaste']

def fix_expected_problems(val):
    if isinstance(val, list) and len(val) > 0:
        return val
    return default_problems

customers_trusted['expected_problems'] = customers_trusted['expected_problems'].apply(
    fix_expected_problems
)

assert customers_trusted.isnull().sum().sum() == 0, "Existem valores nulos!"

# Garantir que expected_problems é sempre uma lista
def normalize_expected_problems(val):
    """Normaliza expected_problems para sempre retornar uma lista"""
    if isinstance(val, list):
        return val
    elif isinstance(val, str):
        try:
            import ast
            return ast.literal_eval(val)
        except:
            # Se for string simples, encapsular em lista
            return [val] if val else ['Uso Geral']
    else:
        return ['Uso Geral']

# Aplicar normalização
customers_trusted['expected_problems'] = customers_trusted['expected_problems'].apply(
    normalize_expected_problems
)

print("- Validações passaram")



TRANSFORMAÇÃO: CUSTOMERS_RAW → CUSTOMERS_TRUSTED
- Tipos de dados padronizados
- Mapeamento de problemas esperados por indústria
- Validações passaram


In [19]:

# ===== TRANSFORMAÇÃO: SALES =====
print("\n" + "="*80)
print("TRANSFORMAÇÃO: SALES_RAW → SALES_TRUSTED")
print("="*80)

sales_trusted = sales_raw.copy()

# Parse dates
sales_trusted['sale_date'] = pd.to_datetime(sales_trusted['sale_date'])
sales_trusted['last_updated'] = pd.to_datetime(sales_trusted['last_updated'])

# Numeric types
sales_trusted['quantity'] = pd.to_numeric(sales_trusted['quantity'], errors='coerce').astype('int64')
sales_trusted['unit_price'] = pd.to_numeric(sales_trusted['unit_price'], errors='coerce')
sales_trusted['total_price'] = pd.to_numeric(sales_trusted['total_price'], errors='coerce')


# Agrupamento cumulativo por cliente (rastreia ordem de compra)
# Coluna adicionada para resolver issue nos testes de qualidade de dados
sales_trusted['subsession'] = sales_trusted.groupby('customer_id').cumcount() + 1

print("- Tipos de dados padronizados")

# Validar cálculo de total_price
tolerance = 0.01
mismatches = (
    abs(sales_trusted['total_price'] - (sales_trusted['quantity'] * sales_trusted['unit_price'])) 
    > tolerance
).sum()

if mismatches > 0:
    print(f"!  {mismatches} registros com total_price inconsistente")
    # Corrigir
    sales_trusted['total_price'] = sales_trusted['quantity'] * sales_trusted['unit_price']
    print(f"- Corrigidos")
else:
    print("- Total_price validado (100% correto)")

assert sales_trusted.isnull().sum().sum() == 0, "Existem valores nulos!"

print("- Validações passaram")



TRANSFORMAÇÃO: SALES_RAW → SALES_TRUSTED
- Tipos de dados padronizados
- Total_price validado (100% correto)
- Validações passaram


In [20]:

# ===== SALVAR TRUSTED LAYER =====
print("\n" + "="*80)
print("SALVANDO TRUSTED LAYER")
print("="*80)

import os
os.makedirs('../data/trusted', exist_ok=True)

products_trusted.to_parquet('../data/trusted/products_trusted.parquet', index=False)
customers_trusted.to_parquet('../data/trusted/customers_trusted.parquet', index=False)
sales_trusted.to_parquet('../data/trusted/sales_trusted.parquet', index=False)

print("products_trusted.parquet")
print("customers_trusted.parquet")
print("sales_trusted.parquet")


SALVANDO TRUSTED LAYER
products_trusted.parquet
customers_trusted.parquet
sales_trusted.parquet


In [21]:

# ===== RELATÓRIO FINAL =====
print("\n" + "="*80)
print("RELATÓRIO FINAL - PHASE 2")
print("="*80)

print(f"""
RESUMO DAS TRANSFORMAÇÕES:

Products:
  • Registros: {len(products_trusted):,}
  • Campos: {len(products_trusted.columns)}
  • Nulos: {products_trusted.isnull().sum().sum()}
  • Margens corrigidas: {problematic_count}
  • Descrições técnicas: Adicionadas

Customers:
  • Registros: {len(customers_trusted):,}
  • Campos: {len(customers_trusted.columns)}
  • Nulos: {customers_trusted.isnull().sum().sum()}
  • Mapeamento de problemas por indústria: Adicionado

Sales:
  • Registros: {len(sales_trusted):,}
  • Campos: {len(sales_trusted.columns)}
  • Nulos: {sales_trusted.isnull().sum().sum()}
  • Total price validado: {(sales_trusted['quantity'] * sales_trusted['unit_price']).round(2).equals(sales_trusted['total_price'].round(2))}
""")


RELATÓRIO FINAL - PHASE 2

RESUMO DAS TRANSFORMAÇÕES:

Products:
  • Registros: 10,000
  • Campos: 17
  • Nulos: 0
  • Margens corrigidas: 359
  • Descrições técnicas: Adicionadas

Customers:
  • Registros: 5,000
  • Campos: 15
  • Nulos: 0
  • Mapeamento de problemas por indústria: Adicionado

Sales:
  • Registros: 120,000
  • Campos: 15
  • Nulos: 0
  • Total price validado: True

