# Análise Generalizada de SAM (Spectral Angle Mapper)

Este notebook permite configurar parâmetros operacionais (RPM, Diâmetro de Falha) e realizar automaticamente:
1.  **Busca de Sinais Reais**: Seleção robusta de múltiplos sinais de referência no dataset.
2.  **Geração de Sinais Sintéticos**: Criação de sinais via métodos FFT e Impulso para as condições especificadas.
3.  **Comparação Quantitativa**: Cálculo da métrica SAM par-a-par e estatística (Média/Desvio).

In [14]:
import sys
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq
from scipy.spatial.distance import cosine
import random

# Adicionar src ao path para importar módulos do projeto
sys.path.append(os.path.abspath('src'))

import segment_and_split_data as ssd
import generate_synthetic_data as gsd
# generate_impulse_data não é usado diretamente pois definimos localmente
import bearing_utils as bu

## 1. Configuração de Parâmetros
Defina aqui as condições operacionais para a análise.

In [18]:
# --- Parâmetros de Entrada ---
TARGET_RPM = 1750          # Rotação do eixo (Ex: 1730, 1750, 1772, 1797)
TARGET_DIAMETER_MM = 0.5   # Diâmetro da falha alvo em mm (aprox 0.021")
NUM_SAMPLES = 20            # Número de amostras reais para usar na média do SAM

# --- Constantes do Sistema ---
FS = 12000                 # Frequência de Amostragem (Hz)
N_POINTS = 4096            # Tamanho do segmento

## 2. Funções Auxiliares
Funções para cálculo do SAM, espectro e filtragem de dados.

In [19]:
def calcular_sam_graus(fft_ref, fft_alvo):
    """Calcula o ângulo SAM (graus) entre dois espectros de magnitude."""
    if np.iscomplexobj(fft_ref): fft_ref = np.abs(fft_ref)
    if np.iscomplexobj(fft_alvo): fft_alvo = np.abs(fft_alvo)
    
    min_len = min(len(fft_ref), len(fft_alvo))
    dist = cosine(fft_ref[:min_len], fft_alvo[:min_len])
    sim = np.clip(1.0 - dist, -1.0, 1.0)
    return np.degrees(np.arccos(sim))

def get_mag_spectrum(sig):
    """Calcula o espectro unilateral de magnitude normalizado."""
    yf = fft(sig)
    return 2.0/len(sig) * np.abs(yf[0:len(sig)//2])

def criar_resposta_impulso(taxa_amostral, tipo_falha, damping, duracao_pulso, num_modos=6):
    """Cria resposta ao impulso somando modos vibracionais naturais (Bearing Utils)."""
    # Obter frequências naturais
    df_nat = bu.get_bearing_natural_frequencies()
    
    if tipo_falha == 'Pista Externa':
        df_race = df_nat[df_nat['Race'] == 'Outer']
    else: 
        df_race = df_nat[df_nat['Race'] == 'Inner']
    
    modos = df_race.head(num_modos)
    
    n_pontos = int(duracao_pulso * taxa_amostral)
    t_pulse = np.linspace(0, duracao_pulso, n_pontos, endpoint=False)
    pulso_total = np.zeros(n_pontos)
    
    for _, modo in modos.iterrows():
        fn = modo['Freq_Hz']
        mass = modo['Mass_kg']
        wn = 2 * np.pi * fn
        
        if wn > 0:
            recept = 1.0 / (mass * wn**2)
            A = damping * wn
            wd = wn * np.sqrt(1 - damping**2)
            
            pulso_modo = recept * np.exp(-A * t_pulse) * np.sin(wd * t_pulse)
            pulso_total += pulso_modo
            
    return pulso_total

def gerar_sinal_impulso_completo(fs, duration_points, defect_freq_hz, tipo_falha_str):
    """Gera o sinal completo via convolução do trem de impulsos com a resposta."""
    duration_sec = duration_points / fs
    t_total = np.linspace(0, duration_sec, duration_points, endpoint=False)
    
    # 1. Trem de Impulsos
    trem = np.zeros(duration_points)
    periodo_s = 1.0 / defect_freq_hz
    ts = 1.0 / fs
    
    for t_imp in np.arange(0, duration_sec, periodo_s):
        idx = int(t_imp / ts)
        if idx < duration_points:
            trem[idx] = 1.0
            
    # 2. Resposta ao Impulso
    resp_imp = criar_resposta_impulso(
        taxa_amostral=fs,
        tipo_falha=tipo_falha_str,
        damping=0.1,
        duracao_pulso=0.02,
        num_modos=6
    )
    if np.max(np.abs(resp_imp)) > 0:
        resp_imp /= np.max(np.abs(resp_imp))
        
    # 3. Convolução
    sinal_conv = np.convolve(trem, resp_imp, mode='same')
    return sinal_conv

def get_real_signals_metadata_batch(rpm, type_code, target_dia_mm, num_samples):
    cwru_map = {0.1778: '0.007"', 0.3556: '0.014"', 0.5334: '0.021"', 0.7112: '0.028"'}
    closest_mm = min(cwru_map.keys(), key=lambda k: abs(k - target_dia_mm))
    target_dia_str = cwru_map[closest_mm]
    type_map = {'OR': 'Pista Externa', 'IR': 'Pista Interna', 'B': 'Esfera', 'Normal': 'Normal'}
    target_type = type_map.get(type_code, type_code)
    
    candidates = []
    for chave, df in ssd.dicionario_teste.items():
        if df.empty: continue
        try:
            row_rpm = int(df['rotacao_rpm'].iloc[0])
            row_type = str(df['tipo_falha'].iloc[0])
            row_dia = str(df['diametro_falha'].iloc[0]).replace('"', '').strip()
            target_dia_clean = target_dia_str.replace('"', '').strip()
            
            if abs(row_rpm - rpm) < 50:
                if row_type == target_type:
                    if target_type == 'Normal' or row_dia == target_dia_clean:
                        candidates.append((chave, df['amplitude'].values))
        except: continue
    if len(candidates) > num_samples:
        selected = random.sample(candidates, num_samples)
    else:
        selected = candidates
    return selected

def get_normal_signal(rpm):
    normals = get_real_signals_metadata_batch(rpm, 'Normal', 0, 1)
    if normals:
        return normals[0][1]
    return np.random.normal(0, 0.01, N_POINTS)

## 3. Execução da Análise
Geração e Comparação para todas as falhas.

In [20]:
# Buscar Ruído de Fundo (Normal)
sinal_normal = get_normal_signal(TARGET_RPM)

# Ajustar tamanho
if len(sinal_normal) > N_POINTS:
    sinal_normal = sinal_normal[:N_POINTS]
elif len(sinal_normal) < N_POINTS:
    sinal_normal = np.pad(sinal_normal, (0, N_POINTS - len(sinal_normal)))

# Lista de Falhas
falhas = [
    ('Pista Externa', 'OR', 'outer'),
    ('Pista Interna', 'IR', 'inner'),
    ('Esfera',       'B',  'ball')
]

print(f"\n{'Tipo Falha':<15} | {'Nº Refs':<7} | {'SAM FFT (Média)':<20} | {'SAM Impulso (Média)':<20}")
print("-"*90)

for nome, type_code, gen_type in falhas:
    
    # 1. Obter Sinais Reais de Referência
    refs = get_real_signals_metadata_batch(TARGET_RPM, type_code, TARGET_DIAMETER_MM, NUM_SAMPLES)
    
    if not refs:
        print(f"{nome:<15} | {'0':<7} | {'Sem Dados Reais':<20} | {'-':<20}")
        continue
        
    # 2. Calcular Frequência de Falha (para Impulso)
    tandon_coeffs = bu.calculate_tandon_coefficients(TARGET_DIAMETER_MM, TARGET_RPM, gen_type)
    char_freq = tandon_coeffs['frequencies']['defect_freq_hz']
    
    # --- Método A: FFT Model (USANDO CALCULAR_ESPECTRO + SYNTHESIZE) ---
    # K ajustado conforme sugestão: Ball=0.05, Inner=0.1, Outer=0.008
    
    try:
        spec_df = None
        
        if type_code == 'OR':
            spec_df = bu.calcular_espectro_outer_race(TARGET_DIAMETER_MM, TARGET_RPM, K=0.008)
        elif type_code == 'IR':
            spec_df = bu.calcular_espectro_inner_completo(TARGET_DIAMETER_MM, TARGET_RPM, K=0.1)
        elif type_code == 'B':
            spec_df = bu.calcular_espectro_ball_completo(TARGET_DIAMETER_MM, TARGET_RPM, K=0.05)
            
        if spec_df is not None:
            sig_fft_pure = gsd.synthesize_time_signal(spec_df, duration=N_POINTS/FS, fs=FS)
            
            # Ajuste de tamanho
            if len(sig_fft_pure) > N_POINTS:
                sig_fft_pure = sig_fft_pure[:N_POINTS]
            elif len(sig_fft_pure) < N_POINTS:
                sig_fft_pure = np.pad(sig_fft_pure, (0, N_POINTS - len(sig_fft_pure)))
                
            sig_fft_final = sinal_normal + sig_fft_pure
        else:
             sig_fft_final = np.zeros(N_POINTS)
             
    except Exception as e:
        print(f"Erro FFT Gen: {e}")
        sig_fft_final = np.zeros(N_POINTS)
    
    # --- Método B: Impulse Model ---
    try:
        sig_imp_pure = gerar_sinal_impulso_completo(
            fs=FS,
            duration_points=N_POINTS,
            defect_freq_hz=char_freq,
            tipo_falha_str=nome 
        )
        sig_imp_final = sinal_normal + sig_imp_pure
    except Exception as e:
        print(f"Erro Impulso: {e}")
        sig_imp_final = np.zeros(N_POINTS)

    # 3. Calcular SAM e Médias
    sam_fft_vals = []
    sam_imp_vals = []
    
    spec_syn_fft = get_mag_spectrum(sig_fft_final)
    spec_syn_imp = get_mag_spectrum(sig_imp_final)
    
    for _, sig_real in refs:
        spec_real = get_mag_spectrum(sig_real)
        sam_fft_vals.append(calcular_sam_graus(spec_real, spec_syn_fft))
        sam_imp_vals.append(calcular_sam_graus(spec_real, spec_syn_imp))
        
    res_fft = f"{np.mean(sam_fft_vals):.2f} +/- {np.std(sam_fft_vals):.1f}°"
    res_imp = f"{np.mean(sam_imp_vals):.2f} +/- {np.std(sam_imp_vals):.1f}°"
    
    print(f"{nome:<15} | {len(refs):<7} | {res_fft:<20} | {res_imp:<20}") 


Tipo Falha      | Nº Refs | SAM FFT (Média)      | SAM Impulso (Média) 
------------------------------------------------------------------------------------------
Pista Externa   | 20      | 84.24 +/- 2.3°       | 82.23 +/- 0.9°      
Pista Interna   | 20      | 82.47 +/- 2.4°       | 77.87 +/- 2.0°      
Esfera          | 20      | 77.43 +/- 2.5°       | 74.21 +/- 1.2°      
