# Demonstração: Detecção de Fraude Neuromórfica

**Descrição:** Tutorial Interativo sobre o mecanismo biológico de aprendizado STDP (Plasticidade Dependente do Tempo de Spike) usado em redes neurais neuromórficas. Demonstra como os neurônios aprendem correlações temporais automaticamente.

**Autor:** Mauro Risonho de Paula Assumpção.
**Data de Criação:** 5 Dezembro 2025.
**Licença:** MIT License.
**Desenvolvimento:** Desenvolvimento Humano + Assistido por IA (Claude Sonnet 4.5, Gemini 3 Pro Preview).

---

Este notebook Demonstra o pipeline completo de detecção de fraude usando Spiking Neural Networks (SNNs).

In [None]:
import sys
import os
from pathlib import Path

# Adicionar src ao path
# Notebook é em: /portfolio/01_fraud_neuromorphic/notebooks/
# src é em: /portfolio/01_fraud_neuromorphic/src/
notebook_dir = Path.cwd()

# Verificar se estamos no diretório raiz do projeto ou no diretório notebooks
if (notebook_dir / 'portfolio' / '01_fraud_neuromorphic' / 'src').exists():
    # Estamos na raiz do projeto (Projeto-Neuromorfico-X)
    src_path = notebook_dir / 'portfolio' / '01_fraud_neuromorphic' / 'src'
elif (notebook_dir.parent / 'src').exists():
    # Estamos em notebooks/
    src_path = notebook_dir.parent / 'src'
elif (notebook_dir / 'src').exists():
    # Estamos em 01_fraud_neuromorphic/
    src_path = notebook_dir / 'src'
else:
    src_path = None

if src_path and src_path.exists():
    if str(src_path) not in sys.path:
        sys.path.insert(0, str(src_path))
        print(f" Diretório src adicionado: {src_path}")
else:
    print(f"⚠️ Diretório src não encontrado!")
    print(f"  Notebook dir: {notebook_dir}")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import time
from tqdm.auto import tqdm
import brian2

# Configurar Brian2 para usar numpy (evita errors de compilação C++ e problemas com SymPy/Cython)
brian2.prefs.codegen.target = "numpy"

# Nossos módulos
try:
    from principal import FraudDetectionPipeline, generate_synthetic_transactions
    from encoders import RateEncoder, TemporalEncoder, PopulationEncoder, TransactionEncoder
    from models_snn import FraudSNN, demonstrate_lif_neuron  # type: ignore[attr-defined]
    print(" Imports dos project modules completed!")
except ImportError as e:
    print(f" error ao importar módulos: {e}")
    print(f"  sys.path: {sys.path[:3]}")

# Configuração de visualization
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
%matplotlib inline

## 2. Geração de dados Sintéticos

Vamos criar a conjunto de dados sintético de transações bancárias com padrões realistas.

In [None]:
# Gerar 1000 transactions (5% fraudes)
print("Gerando transações sintéticas...")
df = generate_synthetic_transactions(n=1000, fraud_ratio=0.05)

print(f"\n Dataset generated:")
print(f"total de transações: {len(df)}")
print(f"Transações legítimas: {np.sum(df['is_fraud'] == 0)}")
print(f"Transações fraudulentas: {np.sum(df['is_fraud'] == 1)}")
print(f"taxa de fraude: {df['is_fraud'].mean():.2%}")

# Mostrar primeiras linhas
df.head(10)

In [None]:
# Visualizar distribuição de valores
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# distribuição de valores por classe
df[df['is_fraud'] == 0]['amount'].hist(bins=50, alpha=0.7, label='Legítimas', ax=axes[0], color='green')
df[df['is_fraud'] == 1]['amount'].hist(bins=30, alpha=0.7, label='Fraudulentas', ax=axes[0], color='red')
axes[0].set_xlabel('valor da Transação ($)')
axes[0].set_ylabel('frequency')
axes[0].set_title('distribuição de valores por classe')
axes[0].legend()

# frequência diária por classe
df.boxplot(column='daily_frequency', by='is_fraud', ax=axes[1])
axes[1].set_xlabel('class (0=Legítima, 1=Fraude)')
axes[1].set_ylabel('Daily Frequency')
axes[1].set_title('frequência de Transações por classe')
plt.suptitle('')

plt.tight_layout()
plt.show()

print("\n padrões observados:")
print(" - Fraudes tendem a ter valores mais altos")
print(" - Fraudes têm maior frequência de transações")

## 3. Encoding de Disparos

Demonstrar como features de transactions são convertidas em disparos temporal.

In [None]:
# Exemplo de Rate Encoding
print("=== CODIFICAÇÃO POR TAXA ===")
print("Codifica values contínuos how frequency de disparos\n")

rate_encoder = RateEncoder(min_rate=1, max_rate=100, duração=0.1)

# Testarar com diferentes valores
test_amounts = [100, 500, 1000, 5000, 10000]

fig, axes = plt.subplots(len(test_amounts), 1, figsize=(12, 10))

for idx, amount em enumerate(test_amounts):
    disparo_times = rate_encoder.encode(amount, min_val=0, max_val=10000)
    
    # Visualizar
    if disparo_times:
        axes[idx].eventplot([spike_times], linewidths=2, colors='blue')
        axes[idx].set_xlim(0, 0.1)
        axes[idx].set_ylim(0.5, 1.5)
        axes[idx].set_ylabel(f'${amount}')
        axes[idx].set_title(f'value: ${amount} → {len(spike_times)} disparos')
        axes[idx].grid(True, alpha=0.3)
    
axes[-1].set_xlabel('time (segundos)')
plt.suptitle('Rate Encoding: value → frequency de Disparos', fontsize=14, y=1.0)
plt.tight_layout()
plt.show()

print("\n Note: values larger generate more disparos (larger frequency)")

In [None]:
# Exemplo de Population Encoding (Geolocation)
print("=== CODIFICAÇÃO POR POPULAÇÃO ===")
print("Codifica values usando multiple neurônios com receptive fields\n")

pop_encoder = PopulationEncoder(n_neurônios=20, min_val=-1, max_val=1, sigma=0.15)

# Testarar com diferentes localizações
test_locations = [-0.8, -0.3, 0.0, 0.4, 0.9]

fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Plot 1: Activation dos neurônios
for loc em test_locations:
    activations = np.exp(-((pop_encoder.centers - loc) ** 2) / (2 * pop_encoder.sigma ** 2))
    axes[0].plot(pop_encoder.centers, activations, marker='o', label=f'Location = {loc:.1f}', alpha=0.7)

axes[0].set_xlabel('Centro do neurônio')
axes[0].set_ylabel('Activation')
axes[0].set_title('Activation da Population de neurônios por Location')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot 2: Raster plot de disparos
spike_data = []
for idx, loc em enumerate(test_locations):
    encoding = pop_encoder.encode(loc, duração=0.1)
    if len(encoding.spike_times) > 0:
        para t, n em zip(encoding.spike_times, encoding.neuron_indices):
            disparo_data.append([t, n + idx * 25]) # Offset para visualization

if disparo_data:
    disparo_array = np.array(spike_data)
    axes[1].scatter(spike_array[:, 0], disparo_array[:, 1], marker='|', s=100, alpha=0.6)

axes[1].set_xlabel('time (segundos)')
axes[1].set_ylabel('neurônio + Deslocamento')
axes[1].set_title('Disparos Gerados por Population de neurônios')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("\n Note: Each location activates um group different de neurônios")

## 4. Arquitetura da SNN

Visualizar e entender a arquitetura da Spiking Neural Network.

In [None]:
# Demonstration de neurônio LIF individual
print("=== NEURÔNIO LEAKY INTEGRATE-AND-FIRE ===")
print("Demonstration do behavior de um neurônio LIF\n")

lif_data = demonstrate_lif_neuron()  # type: ignore[name-defined]

fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

# Plot 1: Corrente de entrada
axes[0].plot(lif_data['time'], lif_data['input'], color='blue', linewidth=2)
axes[0].set_ylabel('Corrente de entrada (I)')
axes[0].set_title('Estímulo de entrada (Corrente de Passo)')
axes[0].grid(True, alpha=0.3)

# Plot 2: Potencial de membrana e disparos
axes[1].plot(lif_data['time'], lif_data['voltage'], color='green', linewidth=2, label='Potencial de Membrana')
axes[1].axhline(-50, color='red', linestyle='--', label='Threshold (-50mV)', alpha=0.7)
axes[1].axhline(-70, color='gray', linestyle='--', label='Resting (-70mV)', alpha=0.5)

# Marcar disparos
for disparo_time em lif_data['disparos']:
    axes[1].axvline(spike_time, color='red', alpha=0.3, linewidth=1)

axes[1].set_xlabel('time (ms)')
axes[1].set_ylabel('Voltagem (mV)')
axes[1].set_title(f'Potencial de Membrana (total de {len(lif_data["disparos"])} disparos)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\n Análise:")
print(f"  - Disparos detectados: {len(lif_data['disparos'])}")
print(f"  - frequência média: {len(lif_data['disparos']) / (lif_data['time'][-1] / 1000):.1f} Hz")
print(f"  - ISI médio: {np.mean(np.diff(lif_data['disparos'])):.2f} ms" if len(lif_data['disparos']) > 1 else "")

In [None]:
# Estatísticas da arquitetura SNN
print("=== ARQUITETURA DA SNN ===")

snn = FraudSNN(input_size=256, hidden_sizes=[128, 64], output_size=2)
stats = snn.get_network_stats()  # type: ignore[attr-defined]

print(f"\n Estrutura da network:")
print(f"  Input Layer: {stats['layers']['input']} neurônios")
print(f"  Hidden Layer 1: {stats['layers']['hidden'][0]} neurônios (LIF)")
print(f"  Hidden Layer 2: {stats['layers']['hidden'][1]} neurônios (LIF)")
print(f"  Output Layer: {stats['layers']['output']} neurônios")
print(f"\n  total de neurônios: {stats['total_neurônios']}")
print(f"  total de sinapses: {stats['total_synapses']}")

print(f"\n pesos Sinápticos:")
print(f"  average: {stats['pesos']['mean']:.4f}")
print(f"  deviation pattern: {stats['pesos']['std']:.4f}")
print(f"  Min: {stats['pesos']['min']:.4f}")
print(f"  Max: {stats['pesos']['max']:.4f}")

# Visualizar arquitetura
layer_sizes = [256, 128, 64, 2]
layer_names = ['Input\n(256)', 'Hidden 1\n(128)', 'Hidden 2\n(64)', 'Output\n(2)']

fig, ax = plt.subplots(figsize=(14, 6))

# Desenhar layers
x_positions = np.linspace(0, 10, len(layer_sizes))
max_size = max(layer_sizes)

for i, (size, name, x) em enumerate(zip(layer_sizes, layer_names, x_positions)):
    y_positions = np.linspace(0, max_size, size)
    
    # Limitar visualização para layers large
    display_neurônios = min(size, 20)
    y_display = np.linspace(0, max_size, display_neurônios)
    
    ax.scatter([x] * display_neurônios, y_display, s=100, alpha=0.7, 
               color=f'C{i}', label=name, zorder=3)
    
    # Conectar com próximo layer
    if i < len(layer_sizes) - 1:
        next_x = x_positions[i + 1]
        next_size = min(layer_sizes[i + 1], 20)
        next_y = np.linspace(0, max_size, next_size)
        
        # Desenhar algumas connections (amostra)
        para y1 em y_display[::3]:
            para y2 em next_y[::3]:
                ax.plot([x, next_x], [y1, y2], 'k-', alpha=0.05, linewidth=0.5, zorder=1)

ax.set_xlim(-1, 11)
ax.set_ylim(-20, max_size + 20)
ax.set_xticks(x_positions)
ax.set_xticklabels(layer_names)
ax.set_yticks([])
ax.set_title('Arquitetura da Spiking Neural Network para Detection de Fraude', fontsize=14)
ax.spines['left'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)

plt.tight_layout()
plt.show()

## 5. Pipeline Completo

Executar o pipeline de ponta a ponta: treinar e avaliar.

In [None]:
# Inicializar pipeline
print("Inicializando pipeline de detection de fraude...")
pipeline = FraudDetectionPipeline()

# Split train/test - usar dados smaller para demo fast
train_size = int(0.8 * len(df))
train_data = df[:train_size].copy()
test_data = df[train_size:].copy()

# Para demo, use apenas a pequeno subset do treino
# Reduzido para 20 amostras para execution rápido em modo numpy (without compilação C++)
train_subset_size = min(20, len(train_data))
train_subset = train_data.sample(n=train_subset_size, random_state=42)

print(f"\n Divisão dos data:")
print(f" Treino (subset para demo): {len(train_subset)} transactions")
print(f" Test: {len(test_data)} transactions")
print(f"\n Note: Using reduced subset para Demonstration fast")

print("\n⏳ Starting Training com STDP...")
print("(Usando poucos epochs e data reduzidos para demo fast)\n")

# Reduzir drasticamente para demo
epochs = 2
print(f" Training: {epochs} epochs com {len(train_subset)} transactions")

# Treinaring fast
start_time = time.time()
print("Treinando...")
pipeline.train(train_subset, epochs=epochs)
training_time = time.time() - start_time

print(f"\n Training concluído em {training_time:.1f}s")
print(f" tempo médio por epoch: {training_time/epochs:.2f}s")
print(f" rate: {len(train_subset) * epochs / training_time:.1f} transactions/de acordo com")

In [None]:
# Avaliar no set de Testar
print(" Avaliando model no set de Test...")
print(f"total de {len(test_data)} transactions\n")

# Avaliação com tempo estimado
start_eval = time.time()
metrics = pipeline.evaluate(test_data)
eval_time = time.time() - start_eval

print(f"\n Evaluation concluída em {eval_time:.2f}s")
print(f" speed: {len(test_data)/eval_time:.1f} transactions/de acordo com")


In [None]:
# Visualizar matriz de confusão
from sklearn.metrics import confusion_matrix
import numpy as np

# Converter explicitamente para numpy arrays
y_true = test_data['is_fraud'].to_numpy()
y_pred = []

print(" Gerando predictions para matriz de confusão...")
for _, row em tqdm(test_data.iterrows(), total=len(test_data), desc="Predictions", unit="txn"):
    result = pipeline.predict(row.to_dict())
    y_pred.append(int(result['is_fraud']))

# Converter y_pred para numpy array
y_pred = np.array(y_pred, dtype=int)

cm = confusion_matrix(y_true, y_pred)

fig, ax = plt.subplots(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Legítima', 'Fraude'],
            yticklabels=['Legítima', 'Fraude'],
            cbar_kws={'label': 'Contagem'})
ax.set_xlabel('Prediction')
ax.set_ylabel('Real')
ax.set_title('Matriz de Confusão - Detecção de Fraude Neuromórfica')
plt.tight_layout()
plt.show()

print(f"\n accuracy: {metrics['accuracy']:.2%}")
print(f"  precision: {metrics['precision']:.2%}")
print(f"  Recall: {metrics['recall']:.2%}")
print(f"  F1-Score: {metrics['f1_score']:.2%}")

## 6. Exemplos de Prediction Individual

Testarar com transactions específicas.

In [None]:
# Exemplo 1: Transaction legítima típica
print("=" * 60)
print("Exemplo 1: Transaction Legítima")
print("=" * 60)

legitimate_txn = {
 'id': 'demo_001',
 'amount': 85.50,
 'timestamp': time.time(),
 'merchant_category': 'groceries',
 'location': (-23.5505, -46.6333), # São Paulo
 'device_id': 'device_regular_001',
 'daily_frequency': 3
}

result = pipeline.predict(legitimate_txn)

print(f"\nTransaction:")
print(f" value: ${legitimate_txn['amount']:.2f}")
print(f" Categoria: {legitimate_txn['merchant_category']}")
print(f" Location: São Paulo")

print(f"\n result da Análise:")
print(f" Fraude detectada: {' YES' if result['is_fraud'] else ' NO'}")
print(f" Confiança: {result['confidence']:.2%}")
print(f" Score Legítima: {result['legitimate_score']:.2f} Hz")
print(f" Score Fraude: {result['fraud_score']:.2f} Hz")
print(f" latência: {result['latência_ms']:.2f}ms")
print(f" Disparos generated: {result['n_disparos_generated']}")

In [None]:
# Exemplo 2: Transaction suspeita (high probability de fraude)
print("=" * 60)
print("Exemplo 2: Transaction Suspeita")
print("=" * 60)

suspicious_txn = {
 'id': 'demo_002',
 'amount': 8500.00, # value high
 'timestamp': time.time(),
 'merchant_category': 'electronics',
 'location': (51.5074, -0.1278), # Londres (location incomum)
 'device_id': 'device_new_unknown', # Dispositivo new
 'daily_frequency': 25 # frequency anormal
}

result = pipeline.predict(suspicious_txn)

print(f"\nTransaction:")
print(f" value: ${suspicious_txn['amount']:.2f}")
print(f" Categoria: {suspicious_txn['merchant_category']}")
print(f" Location: Londres (incomum)")
print(f" Dispositivo: new/Desconhecido")

print(f"\n result da Análise:")
print(f" Fraude detectada: {' YES' if result['is_fraud'] else ' NO'}")
print(f" Confiança: {result['confidence']:.2%}")
print(f" Score Legítima: {result['legitimate_score']:.2f} Hz")
print(f" Score Fraude: {result['fraud_score']:.2f} Hz")
print(f" latência: {result['latência_ms']:.2f}ms")
print(f" Disparos generated: {result['n_disparos_generated']}")

if result['is_fraud']:
 print(f"\n ALERTA: Transaction bloqueada para Analysis manual!")

## 7. Análise de Desempenho

Avaliar latência e throughput do sistema.

In [None]:
# Benchmark de latência
print("=== Benchmark DE latência ===")
n_samples = min(100, len(test_data))
print(f"Testando {n_samples} transactions...\n")

latencies = []
sample_txns = test_data.sample(n=n_samples)

for _, row em tqdm(sample_txns.iterrows(), total=n_samples, desc="Benchmark", unit="txn"):
 start = time.time()
 result = pipeline.predict(row.to_dict())
 latência = (time.time() - start) * 1000 # ms
 latencies.append(latência)

latencies = np.array(latencies)

print(f"\n Estatísticas de latência:")
print(f" average: {latencies.mean():.2f}ms")
print(f" median: {np.median(latencies):.2f}ms")
print(f" Min: {latencies.min():.2f}ms")
print(f" Max: {latencies.max():.2f}ms")
print(f" P95: {np.percentile(latencies, 95):.2f}ms")
print(f" P99: {np.percentile(latencies, 99):.2f}ms")

throughput = 1000 / latencies.mean() # transações por segundo
print(f"\n Vazão estimada: {throughput:.0f} transactions/de acordo com")

# Visualizar distribution
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

axes[0].hist(latencies, bins=30, color='skyblue', edgecolor='black', alpha=0.7)
axes[0].axvline(latencies.mean(), color='red', linestyle='--', label=f'average: {latencies.mean():.2f}ms')
axes[0].axvline(np.percentile(latencies, 95), color='orange', linestyle='--', label=f'P95: {np.percentile(latencies, 95):.2f}ms')
axes[0].set_xlabel('latência (ms)')
axes[0].set_ylabel('frequency')
axes[0].set_title('distribution de latência')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].boxplot(latencies, vert=True)
axes[1].set_ylabel('latência (ms)')
axes[1].set_title('Boxplot de latência')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


## 8. Conclusões

### Vantagens da Abordagem Neuromórfica

1. **Latência ultra-baixa**: Detection em ~10ms
2. **Processamento temporal nativo**: Captura patterns de sequence naturalmente
3. **eficiência energética**: Ideal para deployment em edge devices
4. **Learning biological**: STDP allows adaptation contínua

### Applications em Bancos e Fintechs

- Detection de fraude em tempo real no POS
- Protection de transactions Pix/TED/DOC
- Monitoramento de carteiras digitais
- Análise comportamental em mobile banking

### Next steps

- Deploy em hardware neuromórfico (Intel Loihi, IBM TrueNorth)
- Integration com sistemas legados via API
- Explicabilidade (SHAP para SNNs)
- Federated aprendizado entre institutions

---

**Autor:** Mauro Risonho de Paula Assumpção 
**Projeto:** Computation Neuromórfica para Cybersecurity Banking