# ü´Ä Anota√ß√£o de Picos PPG para Fine-tuning

Este notebook permite anotar picos sist√≥licos no sinal PPG coletado.

## Workflow:
1. Buscar sess√µes do Supabase
2. Selecionar sess√£o para anotar
3. Detec√ß√£o autom√°tica de picos
4. Corre√ß√£o manual (adicionar/remover)
5. Salvar anota√ß√µes em CSV

## 1. Imports e Configura√ß√£o

**IMPORTANTE**: Execute a c√©lula abaixo para instalar/habilitar o backend interativo!

In [31]:
# Instalar ipympl no ambiente correto do kernel (s√≥ precisa executar uma vez)
import sys
!{sys.executable} -m pip install ipympl -q

In [32]:
# Habilitar backend interativo
# Tente: %matplotlib widget OU %matplotlib notebook OU %matplotlib tk
%matplotlib notebook

In [33]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
from scipy.signal import find_peaks
from sqlalchemy import create_engine
import os
from datetime import datetime

# Configura√ß√£o do Supabase
USER = 'postgres'
PASSWORD = '_xs#hiUAWeN6LMK'
HOST = 'db.pthfxmypcxqjfstqwokf.supabase.co'
PORT = '5432'
DBNAME = 'postgres'

url_conexao = f'postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}'
engine = create_engine(url_conexao)

print("‚úÖ Conex√£o configurada!")

‚úÖ Conex√£o configurada!


## 2. Carregar Sess√µes do Supabase

In [34]:
# Carregar todas as sess√µes
query = "SELECT * FROM hrv_sessions ORDER BY created_at DESC"
df = pd.read_sql(query, engine)

print(f"üìä {len(df)} sess√µes encontradas")
print("\n√öltimas 10 sess√µes:")
df[['id', 'created_at', 'device_id', 'user_name', 'sampling_rate_hz']].head(10)

üìä 46 sess√µes encontradas

√öltimas 10 sess√µes:


Unnamed: 0,id,created_at,device_id,user_name,sampling_rate_hz
0,55cc94e7-0472-4c39-a7e4-6af74f82ecb9,2026-01-31 13:20:56.868823+00:00,ESP32-S3-v18-Hybrid,Douglas,756
1,7596e2bb-ff08-4181-9d65-3cfbf5033471,2026-01-31 13:17:08.853109+00:00,ESP32-S3-v18-Hybrid,Douglas,756
2,1f341831-3889-4dfe-8b97-c6ba2c14c1d5,2026-01-31 13:14:01.501299+00:00,ESP32-S3-v18-Hybrid,Douglas,757
3,72398e86-d724-4aba-adb5-482c159743ba,2026-01-31 13:11:50.172076+00:00,ESP32-S3-v18-Hybrid,Douglas,757
4,5630a07a-dcb5-4b10-9007-d7bc1544aed1,2026-01-31 13:08:13.590132+00:00,ESP32-S3-v18-Hybrid,Douglas,756
5,fda369b0-d62b-471c-b7c9-60b1685886fe,2026-01-31 12:05:58.824464+00:00,ESP32-S3-v17-800Hz-PSRAM,Douglas,757
6,4e6aeba9-1656-4544-885f-1dfc18cc0c6b,2026-01-31 09:05:38.256203+00:00,ESP32-S3-v17-800Hz-PSRAM,Douglas,756
7,474d2398-733c-46c7-ac20-868f3a294308,2026-01-31 09:03:52.180332+00:00,ESP32-S3-v17-800Hz-PSRAM,Douglas,757
8,a0545541-42c1-484b-9377-faa45d1ef542,2026-01-31 09:02:14.306501+00:00,ESP32-S3-v17-800Hz-PSRAM,Douglas,757
9,b142a849-1a67-4473-842b-8bf8e5920623,2026-01-31 08:57:21.528928+00:00,ESP32-S3-v17-800Hz-PSRAM,Douglas,757


## 3. Selecionar Sess√£o para Anotar

In [35]:
# Selecione o √≠ndice da sess√£o (0 = mais recente)
SESSAO_INDEX = 0  # Altere aqui para anotar outra sess√£o

sessao = df.iloc[SESSAO_INDEX]
print(f"üìù Sess√£o selecionada:")
print(f"   ID: {sessao['id']}")
print(f"   Device: {sessao['device_id']}")
print(f"   User: {sessao['user_name']}")
print(f"   Created: {sessao['created_at']}")
print(f"   Sampling Rate: {sessao['sampling_rate_hz']} Hz")
print(f"   IR samples: {len(sessao['ir_waveform'])}")

üìù Sess√£o selecionada:
   ID: 55cc94e7-0472-4c39-a7e4-6af74f82ecb9
   Device: ESP32-S3-v18-Hybrid
   User: Douglas
   Created: 2026-01-31 13:20:56.868823+00:00
   Sampling Rate: 756 Hz
   IR samples: 45419


## 4. Preprocessamento do Sinal

In [36]:
def preprocess_ppg(signal):
    """
    Preprocessa o sinal PPG:
    1. Converte para numpy array
    2. Normaliza (0 a 1)
    3. Inverte (picos sist√≥licos para cima)
    """
    signal = np.array(signal)
    
    # Normalizar
    sig_min = np.min(signal)
    sig_max = np.max(signal)
    sig_norm = (signal - sig_min) / (sig_max - sig_min)
    
    # Inverter (sinal do sensor √© invertido)
    sig_inverted = 1.0 - sig_norm
    
    return sig_inverted

# Preprocessar
ir_signal = preprocess_ppg(sessao['ir_waveform'])
sampling_rate = sessao['sampling_rate_hz']

print(f"‚úÖ Sinal preprocessado: {len(ir_signal)} amostras")
print(f"   Dura√ß√£o: {len(ir_signal) / sampling_rate:.1f} segundos")

‚úÖ Sinal preprocessado: 45419 amostras
   Dura√ß√£o: 60.1 segundos


## 5. Detec√ß√£o Autom√°tica de Picos

In [37]:
def detect_peaks_auto(signal, fs, min_hr=40, max_hr=200):
    """
    Detecta picos automaticamente usando scipy.signal.find_peaks
    
    Args:
        signal: Sinal PPG (j√° invertido, picos para cima)
        fs: Taxa de amostragem em Hz
        min_hr: Frequ√™ncia card√≠aca m√≠nima em BPM
        max_hr: Frequ√™ncia card√≠aca m√°xima em BPM
    
    Returns:
        Array com √≠ndices dos picos
    """
    # Calcular dist√¢ncia m√≠nima entre picos
    # min_distance = tempo m√≠nimo entre batimentos = 60/max_hr segundos
    min_distance = int(fs * 60 / max_hr)
    max_distance = int(fs * 60 / min_hr)
    
    # Detectar picos
    peaks, properties = find_peaks(
        signal,
        distance=min_distance,
        height=0.3,  # Altura m√≠nima (0-1)
        prominence=0.1  # Proemin√™ncia m√≠nima
    )
    
    return peaks

# Detectar picos
peaks_auto = detect_peaks_auto(ir_signal, sampling_rate)
print(f"üîç {len(peaks_auto)} picos detectados automaticamente")

# Calcular HR m√©dio
if len(peaks_auto) > 1:
    rr_intervals = np.diff(peaks_auto) / sampling_rate * 1000  # em ms
    hr_mean = 60000 / np.mean(rr_intervals)
    print(f"‚ù§Ô∏è HR m√©dio estimado: {hr_mean:.1f} BPM")

üîç 58 picos detectados automaticamente
‚ù§Ô∏è HR m√©dio estimado: 57.3 BPM


## 6. Visualiza√ß√£o dos Picos

In [38]:
def plot_peaks(signal, peaks, fs, window_start=0, window_size=10):
    """
    Plota o sinal com os picos marcados
    
    Args:
        signal: Sinal PPG
        peaks: √çndices dos picos
        fs: Taxa de amostragem
        window_start: In√≠cio da janela em segundos
        window_size: Tamanho da janela em segundos
    """
    # Converter para samples
    start_sample = int(window_start * fs)
    end_sample = int((window_start + window_size) * fs)
    end_sample = min(end_sample, len(signal))
    
    # Extrair janela
    sig_window = signal[start_sample:end_sample]
    time = np.arange(len(sig_window)) / fs + window_start
    
    # Filtrar picos na janela
    peaks_in_window = peaks[(peaks >= start_sample) & (peaks < end_sample)]
    peak_times = peaks_in_window / fs
    peak_values = signal[peaks_in_window]
    
    # Plot
    fig, ax = plt.subplots(figsize=(15, 5))
    ax.plot(time, sig_window, 'g-', linewidth=0.8, label='PPG')
    ax.scatter(peak_times, peak_values, c='red', s=50, zorder=5, label='Picos')
    
    ax.set_xlabel('Tempo (s)')
    ax.set_ylabel('Amplitude (normalizada)')
    ax.set_title(f'PPG com Picos Detectados ({window_start}s - {window_start + window_size}s)')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return fig

# Visualizar primeira janela de 10 segundos
fig = plot_peaks(ir_signal, peaks_auto, sampling_rate, window_start=0, window_size=10)

<IPython.core.display.Javascript object>

## 7. Interface de Anota√ß√£o Manual

In [None]:
class PeakAnnotator:
    """
    Interface interativa para corrigir anota√ß√µes de picos
    
    Clique esquerdo: Adicionar pico
    Clique direito: Remover pico mais pr√≥ximo
    """
    
    def __init__(self, signal, peaks_initial, fs, window_size=10):
        self.signal = signal
        self.peaks = list(peaks_initial)
        self.fs = fs
        self.window_size = window_size
        self.current_window = 0
        self.total_windows = int(np.ceil(len(signal) / fs / window_size))
        
        # Setup figure
        self.fig, self.ax = plt.subplots(figsize=(15, 6))
        plt.subplots_adjust(bottom=0.2)
        
        # Bot√µes de navega√ß√£o
        ax_prev = plt.axes([0.2, 0.05, 0.1, 0.05])
        ax_next = plt.axes([0.4, 0.05, 0.1, 0.05])
        ax_save = plt.axes([0.6, 0.05, 0.1, 0.05])
        
        self.btn_prev = Button(ax_prev, '‚Üê Anterior')
        self.btn_next = Button(ax_next, 'Pr√≥xima ‚Üí')
        self.btn_save = Button(ax_save, 'Salvar')
        
        self.btn_prev.on_clicked(self.prev_window)
        self.btn_next.on_clicked(self.next_window)
        self.btn_save.on_clicked(self.save_annotations)
        
        # Conectar eventos de clique
        self.cid = self.fig.canvas.mpl_connect('button_press_event', self.on_click)
        
        self.update_plot()
        
    def update_plot(self):
        self.ax.clear()
        
        # Calcular janela
        start_sample = int(self.current_window * self.window_size * self.fs)
        end_sample = int((self.current_window + 1) * self.window_size * self.fs)
        end_sample = min(end_sample, len(self.signal))
        
        # Extrair dados
        sig_window = self.signal[start_sample:end_sample]
        time = np.arange(len(sig_window)) / self.fs + self.current_window * self.window_size
        
        # Filtrar picos
        peaks_in_window = [p for p in self.peaks if start_sample <= p < end_sample]
        peak_times = [p / self.fs for p in peaks_in_window]
        peak_values = [self.signal[p] for p in peaks_in_window]
        
        # Plot
        self.ax.plot(time, sig_window, 'g-', linewidth=0.8)
        self.ax.scatter(peak_times, peak_values, c='red', s=80, zorder=5, marker='v')
        
        self.ax.set_xlabel('Tempo (s)')
        self.ax.set_ylabel('Amplitude')
        self.ax.set_title(
            f'Janela {self.current_window + 1}/{self.total_windows} | '
            f'{len(self.peaks)} picos | '
            f'Clique: Esquerdo=Adicionar, Direito=Remover'
        )
        self.ax.grid(True, alpha=0.3)
        
        self.fig.canvas.draw()
    
    def on_click(self, event):
        if event.inaxes != self.ax:
            return
        
        # Converter tempo para sample
        sample_clicked = int(event.xdata * self.fs)
        
        if event.button == 1:  # Clique esquerdo - adicionar
            # Encontrar m√°ximo local pr√≥ximo
            window = 50  # ¬±50 samples
            start = max(0, sample_clicked - window)
            end = min(len(self.signal), sample_clicked + window)
            local_max = start + np.argmax(self.signal[start:end])
            
            if local_max not in self.peaks:
                self.peaks.append(local_max)
                self.peaks.sort()
                print(f"‚ûï Pico adicionado em {local_max / self.fs:.2f}s")
            
        elif event.button == 3:  # Clique direito - remover
            if self.peaks:
                # Encontrar pico mais pr√≥ximo
                distances = [abs(p - sample_clicked) for p in self.peaks]
                closest_idx = np.argmin(distances)
                
                if distances[closest_idx] < 100 * self.fs / 1000:  # Dentro de 100ms
                    removed = self.peaks.pop(closest_idx)
                    print(f"‚ûñ Pico removido de {removed / self.fs:.2f}s")
        
        self.update_plot()
    
    def prev_window(self, event):
        if self.current_window > 0:
            self.current_window -= 1
            self.update_plot()
    
    def next_window(self, event):
        if self.current_window < self.total_windows - 1:
            self.current_window += 1
            self.update_plot()
    
    def save_annotations(self, event):
        self.fig.canvas.mpl_disconnect(self.cid)
        plt.close(self.fig)
        print(f"üíæ Anota√ß√£o finalizada com {len(self.peaks)} picos")
    
    def get_peaks(self):
        return np.array(sorted(self.peaks))

## 8. Iniciar Anota√ß√£o Interativa

Execute a c√©lula abaixo para iniciar a anota√ß√£o:
- **Clique esquerdo**: Adicionar pico (snap para m√°ximo local)
- **Clique direito**: Remover pico mais pr√≥ximo
- **Bot√µes**: Navegar entre janelas e salvar

In [None]:
# Iniciar anotador
annotator = PeakAnnotator(ir_signal, peaks_auto, sampling_rate, window_size=10)
plt.show()

## 9. Salvar Anota√ß√µes

In [None]:
# Obter picos finais ap√≥s anota√ß√£o
peaks_final = annotator.get_peaks()

print(f"üìä Resumo da Anota√ß√£o:")
print(f"   Picos autom√°ticos: {len(peaks_auto)}")
print(f"   Picos finais: {len(peaks_final)}")

# Calcular m√©tricas
if len(peaks_final) > 1:
    rr_intervals = np.diff(peaks_final) / sampling_rate * 1000  # em ms
    hr_mean = 60000 / np.mean(rr_intervals)
    hr_std = np.std(60000 / rr_intervals)
    print(f"   HR m√©dio: {hr_mean:.1f} ¬± {hr_std:.1f} BPM")
    print(f"   SDNN: {np.std(rr_intervals):.1f} ms")

In [None]:
# Salvar anota√ß√µes em CSV
def save_annotations(session_id, peaks, signal, sampling_rate, output_dir='./annotations'):
    """
    Salva as anota√ß√µes de picos em CSV
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # Criar DataFrame com anota√ß√µes
    df_annotations = pd.DataFrame({
        'peak_index': peaks,
        'peak_time_s': peaks / sampling_rate,
        'peak_value': signal[peaks]
    })
    
    # Calcular RR intervals
    if len(peaks) > 1:
        rr_intervals = np.diff(peaks) / sampling_rate * 1000
        df_annotations['rr_interval_ms'] = np.concatenate([[np.nan], rr_intervals])
    
    # Nome do arquivo
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'{output_dir}/peaks_{session_id[:8]}_{timestamp}.csv'
    
    # Salvar
    df_annotations.to_csv(filename, index=False)
    print(f"‚úÖ Anota√ß√µes salvas em: {filename}")
    
    return filename

# Salvar
output_file = save_annotations(
    session_id=str(sessao['id']),
    peaks=peaks_final,
    signal=ir_signal,
    sampling_rate=sampling_rate
)

## 10. Visualiza√ß√£o Final

In [None]:
# Plot final com todos os picos anotados
def plot_full_session(signal, peaks, fs, title="Sess√£o Completa"):
    """
    Plota a sess√£o completa com todos os picos
    """
    time = np.arange(len(signal)) / fs
    peak_times = peaks / fs
    peak_values = signal[peaks]
    
    fig, ax = plt.subplots(figsize=(20, 5))
    ax.plot(time, signal, 'g-', linewidth=0.5, alpha=0.7)
    ax.scatter(peak_times, peak_values, c='red', s=20, zorder=5)
    
    ax.set_xlabel('Tempo (s)')
    ax.set_ylabel('Amplitude')
    ax.set_title(f'{title} | {len(peaks)} picos anotados')
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    return fig

fig_final = plot_full_session(ir_signal, peaks_final, sampling_rate, 
                               title=f"Sess√£o {sessao['device_id']} - {sessao['user_name']}")