# ¿Qué hace este Notebook?

Aquí se realizan las pruebas que como resultado generan las métricas de rendimiento cualitativas para distintos canales LoRa. Permite elegir canales tanto simulados como reales, modificar parámetros LoRa, de sistema o de SDRs.

> Cada celda tiene una razón de ser explicada arriba.

### 1. Cambia la dirección de referencia del Notebook para poder importar nuestro Source Code

In [None]:
#------------------- Change working directory to project root -------------------#
from pathlib import Path, os

cur = Path().resolve()
while not (cur / "src").is_dir():
    if cur == cur.parent: raise RuntimeError("No 'src' dir")
    cur = cur.parent

os.chdir(cur)
print(f"[INFO] Changed working directory to project root: {cur}")

### 2. Importa Librerias Internas y Externas

In [None]:
# -------------------------------------- External Libraries --------------------------------------
import numpy as np
from datetime import datetime
from typing import Callable
import matplotlib.pyplot as plt
import random
from collections import defaultdict
import time
# ----------------------------------------- Local Imports ----------------------------------------
from src.core                  import LoRaPhyParams, LoRaFrameParams
from src.mod                   import LoRaModulator
from src.demod                 import LoRaDemodulator
from src.sync                  import DechirpBasedSynchronizer
from src.core.vpn_utils        import VPNKeepAlive

import src.core.sdr_utils as sdr_utils
import src.core.snr_utils as snr_utils
import src.core.perf_metrics_utils as perf_utils


### 3. Define Constantes o Variables con poco cambio

In [None]:
SIMULATIONS_BUFFER_SIZE = 2**20             # Tamaño del buffer al simular transmisiones SDR (Cuando no se simula, el tamaño del buffer ya está dentro del perfil LoRa)
DEFAULT_MAX_TRANSCEIVE_ATTEMPTS = 2_000     # Máximo número de ejecuciones de canal (funciona como un límite inferior)
AUTO_DELETE_TMP_LOG = False                 # Eliminar el log temporal al finalizar
MOD_BACKEND = "numpy"
DEMOD_BACKEND = "cupy" 


### 4. Genera decoradores para clasificar los distintos canales de transmisión a evaluar

In [None]:
# Decorador para identificar el canal

def channel_fn_identifier(id_:str):
    """
    Decorator to identify the channel function by its ID.
    """
    def decorator(fn: Callable):
        fn.channel_id = id_
        return fn
    return decorator

# Decorador para identificar si el canal necesita ser sincronizado
def needs_sync(sync: bool):
    """
    Decorator to indicate if the channel function requires synchronization.
    """
    def decorator(fn: Callable):
        fn.needs_sync = sync
        return fn
    return decorator

# Decorador para identificar si el canal necesita un vpn
def needs_vpn(vpn: bool):
    """
    Decorator to indicate if the channel function requires a VPN.
    """
    def decorator(fn: Callable):
        fn.needs_vpn = vpn
        return fn
    return decorator

### 5. Define como son los canales simulados
- AWGN o AWGN + Frecuencia Selectiva
- Con o sin Sincronziación de Trama

In [None]:
# Funciones de simulación de canal SDR

@channel_fn_identifier("sim_sync_awgn")
@needs_sync(True)
@needs_vpn(False)
def sim_channel_sync_awgn(
    profile:snr_utils.SDRProfile, 
    modulated_frame_waveform, 
    reference_payload_waveform, 
    snr_db:float
    ) -> tuple[bool, np.ndarray]:
    """
    Simulates an SDR channel that needs synchronization and is rolled, returning the modulated frame with noise.
    """

    # Añade ruido blanco gaussiano al frame modulado
    noisy_frame, _, _ = snr_utils.generate_awgn(f"{snr_db}db", modulated_frame_waveform, reference_payload_waveform)

    # Repite el frame para que tenga el tamaño de un buffer
    buffer_size = SIMULATIONS_BUFFER_SIZE

    if len(noisy_frame) < buffer_size:
        reps = (buffer_size // len(noisy_frame)) + 1
        noisy_frame = np.tile(noisy_frame, reps)

    # Desplaza el frame para simular un canal real y un offset aleatorio
    noisy_frame = np.roll(noisy_frame, random.randint(0, len(noisy_frame) - 1))

    # Nos aseguramos de que el frame tenga el tamaño del buffer
    return noisy_frame[:buffer_size]

@channel_fn_identifier("sim_async_awgn")
@needs_sync(False)
@needs_vpn(False)
def sim_channel_async_awgn(
    profile:snr_utils.SDRProfile,
    modulated_frame_waveform,
    reference_payload_waveform,
    snr_db:float
    ) -> tuple[bool, np.ndarray]:
    """
    Simulates an SDR channel that doesn't need synchronization, returning the modulated payload with noise.
    """
    NEEDS_SYNC = False
    # Añade ruido blanco gaussiano al frame modulado
    noisy_frame, _, _ = snr_utils.generate_awgn(f"{snr_db}db", reference_payload_waveform)

    return noisy_frame

@channel_fn_identifier("sim_sync_echo")
@needs_sync(True)
@needs_vpn(False)
def sim_channel_sync_echo(
    profile:snr_utils.SDRProfile,
    modulated_frame_waveform, 
    reference_payload_waveform, 
    snr_db:float
    ) -> tuple[bool, np.ndarray]:
    """
    Simulates an SDR channel with multipath, returning the modulated end echoed frame with noise.
    """

    # Simula un canal con multipath, un "eco" del 20% de la señal retardada en una muestra.
    h = np.zeros(2, dtype=np.complex64)
    h[0] = np.sqrt(0.8)
    h[1] = np.sqrt(0.2)
# Funciones de simulación de canal SDR

@channel_fn_identifier("sim_sync_awgn")
@needs_sync(True)
@needs_vpn(False)
def sim_channel_sync_awgn(
    profile:snr_utils.SDRProfile, 
    modulated_frame_waveform, 
    reference_payload_waveform, 
    snr_db:float
    ) -> tuple[bool, np.ndarray]:
    """
    Simulates an SDR channel that needs synchronization and is rolled, returning the modulated frame with noise.
    """

    # Añade ruido blanco gaussiano al frame modulado
    noisy_frame, _, _ = snr_utils.generate_awgn(f"{snr_db}db", modulated_frame_waveform, reference_payload_waveform)

    # Repite el frame para que tenga el tamaño de un buffer
    buffer_size = SIMULATIONS_BUFFER_SIZE

    if len(noisy_frame) < buffer_size:
        reps = (buffer_size // len(noisy_frame)) + 1
        noisy_frame = np.tile(noisy_frame, reps)

    # Desplaza el frame para simular un canal real y un offset aleatorio
    noisy_frame = np.roll(noisy_frame, random.randint(0, len(noisy_frame) - 1))

    # Nos aseguramos de que el frame tenga el tamaño del buffer
    return noisy_frame[:buffer_size]

@channel_fn_identifier("sim_a
    convolved = np.convolve(modulated_frame_waveform, h, mode="full")
    echoed_waveform = convolved[:len(modulated_frame_waveform)]

    # Añade ruido blanco gaussiano al frame modulado
    noisy_frame, _, _ = snr_utils.generate_awgn(f"{snr_db}db", echoed_waveform, reference_payload_waveform)

    return noisy_frame

@channel_fn_identifier("sim_async_echo")
@needs_sync(False)
@needs_vpn(False)
def sim_channel_async_echo(
    profile:snr_utils.SDRProfile,
    modulated_frame_waveform,
    reference_payload_waveform,
    snr_db:float
    ) -> tuple[bool, np.ndarray]:
    """
    Simulates an SDR channel with multipath and no synchronization, returning the modulated echoed payload with noise.
    """

    # Simula un canal con multipath, un "eco" del 20% de la señal retardada en una muestra.
    h = np.zeros(2, dtype=np.complex64)
    h[0] = np.sqrt(0.8)
    h[1] = np.sqrt(0.2)

    convolved = np.convolve(reference_payload_waveform, h, mode="full")
    echoed_waveform = convolved[:len(reference_payload_waveform)]

    # Añade ruido blanco gaussiano al frame modulado
    noisy_frame, _, _ = snr_utils.generate_awgn(f"{snr_db}db", echoed_waveform)

    return noisy_frame
    


## 6. Define como son los canales por SDR
- Tiene cuenta sleeps y otras guardas como la limpieza de los buffers, además de llamar a la librería de Analog Devices

In [None]:
# Funciones de canal SDR reales

@channel_fn_identifier("real_single_sdr")
@needs_sync(True)
@needs_vpn(True)
def real_channel_single_sdr(
    profile: snr_utils.SDRProfile,
    modulated_frame_waveform,
    reference_payload_waveform,
    snr_db: float
) -> tuple[bool, np.ndarray]:
    """
    Real SDR channel using a single SDR for both TX and RX.
    """
    sdr_norm = (2**15 - 1)

    tx_sdr_params: sdr_utils.SDRParams = profile.tx_sdr_params

    needed_tx_atten = profile.get_atten_for_snr(snr_db)
    if needed_tx_atten is None:
        raise ValueError("No se pudo determinar la atenuación necesaria para el SNR dado.")

    tx_sdr_params.tx_attenuation = needed_tx_atten
    tx_sdr_params.tx_cyclic_buffer = True

    txrx_sdr = sdr_utils.init_sdr(tx_sdr_params)
    txrx_sdr.tx(modulated_frame_waveform * sdr_norm)
    time.sleep(0.5)
    for _ in range(2):
        _unused = txrx_sdr.rx()
    received_signal = txrx_sdr.rx() / sdr_norm

    sdr_utils.soft_delete_sdr(txrx_sdr)
    return received_signal

@channel_fn_identifier("real_dual_sdr")
@needs_sync(True)
@needs_vpn(True)
def real_channel_dual_sdr(
    profile: snr_utils.SDRProfile,
    modulated_frame_waveform,
    reference_payload_waveform,
    snr_db: float
) -> tuple[bool, np.ndarray]:
    """
    Real SDR channel using two separate SDRs for TX and RX.
    """
    sdr_norm = (2**15 - 1)

    tx_sdr_params: sdr_utils.SDRParams = profile.tx_sdr_params
    rx_sdr_params: sdr_utils.SDRParams = profile.rx_sdr_params

    needed_tx_atten = profile.get_atten_for_snr(snr_db)
    if needed_tx_atten is None:
        raise ValueError("No se pudo determinar la atenuación necesaria para el SNR dado.")

    tx_sdr_params.tx_attenuation = needed_tx_atten
    tx_sdr_params.tx_cyclic_buffer = True

    tx_sdr = sdr_utils.init_sdr(tx_sdr_params)
    rx_sdr = sdr_utils.init_sdr(rx_sdr_params)

    tx_sdr.tx(modulated_frame_waveform * sdr_norm)
    time.sleep(profile.phy_params.sample_duration * profile.rx_sdr_params.rx_buffer_size)

    for _ in range(2):
        _unused = rx_sdr.rx()
    received_signal = rx_sdr.rx() / sdr_norm

    sdr_utils.soft_delete_sdr(tx_sdr)
    sdr_utils.soft_delete_sdr(rx_sdr)

    return received_signal





### 7. Define en qué consiste una transcepción de una trama LoRa
En este caso:
1. Genera payload.
2. Intenta transmitir a través del canal.
3. Determina errores de trama, de símbolo y de bit a partir de lo recibido.

In [None]:
from src.sync import SynchronizationError
class FrameError(Exception):
    """Custom exception for frame errors that contribute to FER but cannot arise in the Synchronizer."""
    pass

def transceive_lora_frame(
    profile: snr_utils.SDRProfile,
    payload_syms_count:int, 
    snr_db:float,
    channel_fn: Callable[[snr_utils.SDRProfile, np.ndarray, np.ndarray, float], tuple[bool, np.ndarray]],
    modulator: LoRaModulator,
    synchronizer: DechirpBasedSynchronizer,
    demodulator: LoRaDemodulator
) -> dict:
    """
    Transceive a LoRa frame with the given SNR and payload symbol count.

    • Uses the provided channel function to simulate or execute transmission.
    • Returns a dict with:
        - 'fer_err': 0/1 (frame failed?)
        - 'ser_err': int   (symbol errors in this frame, should be ignored if fer_err is 1)
        - 'ber_err': int   (bit errors in this frame, should be ignored if fer_err is 1)
    """

    # Preparación para transmitir
    payload = np.random.randint(0, modulator.phy_params.chips_per_symbol, size=payload_syms_count)
    modulated_frame = modulator.modulate(payload, include_frame=True)
    reference_payload = modulator.modulate(payload, include_frame=False)

    try:
        # Simula la transmisión del frame a través del canal
        received_iq_buffer = channel_fn(profile, modulated_frame, reference_payload, snr_db) 
        needs_sync = channel_fn.needs_sync

    except Exception as e:

        print(f"[ERROR] Channel function failed. This shouldn't happen: {e}")

        raise
    
    try:
        if needs_sync:
            aligned_payload = synchronizer.run(received_iq_buffer) # Retorna el payload alineado o lanza SynchronizationError si falla
        else:
            aligned_payload = received_iq_buffer

        expected_samples = payload_syms_count * modulator.phy_params.samples_per_symbol
        if len(aligned_payload) != expected_samples:
            raise FrameError("Aligned payload is different to the expected value. Will contribute to FER.")

        demodulated_symbols = demodulator.demodulate(aligned_payload)
        demodulator.backend.clear_memory() # Limpiar la memoria del demodulador (Si se usa cupy sirve)
        demodulated_symbols = demodulated_symbols.get() if hasattr(demodulated_symbols, 'get') else demodulated_symbols


        # Calcula Errores
        ser_err = np.sum(demodulated_symbols != np.asarray(payload))
        ber_err = sum(bin(m ^ dm).count('1') for m, dm in zip(payload, demodulated_symbols))

        return {
            'fer_err': 0,
            'ser_err': ser_err,
            'ber_err': ber_err
        }
    
    except SynchronizationError as e:
        return {
            'fer_err': 1,
            'ser_err': 0,
            'ber_err': 0
        }
    except FrameError as e:
        return {
            'fer_err': 1,
            'ser_err': 0,
            'ber_err': 0
        }
    except Exception as e:
        print(f"WEIRD ERROR: {e}")
        print(f"payload syms count: {payload_syms_count}")
        print(f"payload length: {len(payload)}")
        print(f"aligned payload length in symbols: {len(aligned_payload)*modulator.phy_params.samples_per_symbol}")
        print(f"demodulated symbols length: {len(demodulated_symbols)}")
        
        raise

### 8. Define como se trata al resultado de un conjunto de transcepciones LoRa a un SNR específico
Realiza transcepciones LoRa en bucle hasta que se cumpla algun límite o punto se satisfacción estadístico para un SNR dado y devuelve el resultado de evaluar ese punto de métrica de rendimiento.

In [None]:
def run_snr_point_metric(
    profile: snr_utils.SDRProfile,
    snr_db: float,
    payload_syms_count: int,
    channel_fn: Callable[[snr_utils.SDRProfile, np.ndarray, np.ndarray, float],  tuple[bool, np.ndarray]],
    vpn_guard: VPNKeepAlive,
    modulator: LoRaModulator,
    synchronizer: DechirpBasedSynchronizer,
    demodulator: LoRaDemodulator,
    rel_tol: float,  # Relative tolerance for confidence intervals
    max_transceive_attempts: int,
    tmp_log_path: Path
): 
    """
    Run a full transmission test at a given SNR point a number of times, collecting metrics.

    Returns performance metrics and flags whether the result is trustworthy.
    """
    print(f"\n[INFO] --- SNR target {snr_db:+.1f} dB ---")

    MIN_TRANSCEIVE_ATTEMPTS = max(100, max_transceive_attempts // 5)
    print(f"[INFO] Minimum transceive attempts: {MIN_TRANSCEIVE_ATTEMPTS}")

    frames     = 0
    fer_errs   = 0
    ser_errs   = 0
    ber_errs   = 0
    syms_total = 0

    while True:
        vpn_guard.maybe_reconnect(channel_fn.needs_vpn)

        # Ejecuta una transcepción de frame LoRa, o en su defecto de un payload
        result = transceive_lora_frame(
            profile=profile,
            payload_syms_count=payload_syms_count,
            snr_db=snr_db,
            channel_fn=channel_fn,
            modulator=modulator,
            synchronizer=synchronizer,
            demodulator=demodulator
        )

        frames     += 1
        fer_errs   += result['fer_err']
        ser_errs   += result['ser_err']
        ber_errs   += result['ber_err']
        if result['fer_err'] == 0:
            syms_total += payload_syms_count

        perf_utils.log_frame_result(tmp_log_path, snr_db, result, frames, syms_total)
    
        # Computa puntos de confianza
        fer_point = perf_utils.SinglePerformancePoint.from_event_count(fer_errs, frames) if channel_fn.needs_sync else None
        ser_point = perf_utils.SinglePerformancePoint.from_event_count(ser_errs, syms_total) if syms_total else None
        ber_point = perf_utils.SinglePerformancePoint.from_event_count(ber_errs, syms_total * modulator.phy_params.spreading_factor) if syms_total else None
        
        fer_ok = (fer_point.is_trusted(2*rel_tol) if fer_point else True) and frames > MIN_TRANSCEIVE_ATTEMPTS
        ser_ok = (ser_point.is_trusted(rel_tol) if ser_point else True) and frames > MIN_TRANSCEIVE_ATTEMPTS
        ber_ok = (ber_point.is_trusted(rel_tol) if ber_point else True) and frames > MIN_TRANSCEIVE_ATTEMPTS
        
        # Heurísticas de parada
        if all([fer_ok, ser_ok, ber_ok]):
            print(f"[INFO] Trust criterion met.")
            print(f"[INFO] SER {ser_point}, BER {ber_point}, FER {fer_point}")
            break

        if frames >= max_transceive_attempts:
            print("[WARN] Trust criterion not met, but too many transceive attempts made. Moving on.")
            print(f"[INFO] SER {ser_point}, BER {ber_point}, FER {fer_point}")
            break
    
    # Retorna los resultados
    return perf_utils.SNRPointPerformanceMetrics(
        snr_db=snr_db,
        ber=ber_point,
        ser=ser_point,
        fer=fer_point
    )



### 9. Realiza el anterior procedimiento en bucle para varios SNR, para armar un perfil de rendimiento a partir de los resultados

In [None]:
# --- Notebook Configuration ------------------------------------------
def run_sdr_quantitative_test(
    profile: snr_utils.SDRProfile,
    snr_points: np.ndarray,
    channel_fn: Callable[[snr_utils.SDRProfile, np.ndarray, np.ndarray, float], tuple[bool, np.ndarray]],
    max_transceive_attempts: int = DEFAULT_MAX_TRANSCEIVE_ATTEMPTS,
    filename: str= None
):
    """ Runs a quantitative test for LoRa frames over SDR channels, measuring performance metrics at various SNR points. """

    payload_symbol_count = sdr_utils.optimize_payload_symbols( # Cuantos símbolos hay de payload por trama
        buffer_len=profile.rx_sdr_params.rx_buffer_size if not profile.is_simulation() else SIMULATIONS_BUFFER_SIZE,
        preamble_symbols=profile.frame_params.preamble_symbol_count,
        samples_per_symbol=profile.phy_params.samples_per_symbol,
        pad_samples=0
    )
    
    log_dir = Path("tmp_quant_logs")
    tmp_log_path = perf_utils.create_new_log_file(log_dir)


    vpn_guard = VPNKeepAlive()
    
    if channel_fn.needs_vpn:
        print(f"[INFO] Channel function {channel_fn.__name__} requires VPN. Connecting...")
        vpn_guard.reconnect()

    results = []
    for snr_db in snr_points:

        point = run_snr_point_metric(
            profile=profile,
            snr_db=snr_db,
            payload_syms_count=payload_symbol_count,
            channel_fn=channel_fn,
            vpn_guard=vpn_guard,
            modulator=LoRaModulator(profile.phy_params, profile.frame_params, backend=MOD_BACKEND),
            synchronizer=DechirpBasedSynchronizer(profile.phy_params, profile.frame_params, backend=DEMOD_BACKEND, fold_mode=profile.fold_mode, max_sync_candidates=50),
            demodulator=LoRaDemodulator(profile.phy_params, backend=DEMOD_BACKEND, fold_mode=profile.fold_mode),
            rel_tol=0.05,
            max_transceive_attempts=max_transceive_attempts,
            tmp_log_path=tmp_log_path
        )
        
        results.append(point)

    if channel_fn.needs_vpn:
        vpn_guard.disconnect()

    perf_metrics = perf_utils.SNRPerformanceMetrics(
        profile=profile, 
        snr_points=results,
        channel_function=channel_fn.__name__
        )
    
    perf_metrics.save(filename=filename)
    print(f"[INFO] Saved performance metrics to {filename if filename else perf_metrics.auto_name()}")

    if AUTO_DELETE_TMP_LOG:
        tmp_log_path.unlink(missing_ok=True)
        print(f"[INFO] Deleted temporary log file: {tmp_log_path}")

    perf_utils.plot_snr_performance(perf_metrics)


In [None]:
# elegimos perfil
profile_name = "sf8_500k_2spc_cpa.json"

# elegimos canal
channel_fn = real_channel_dual_sdr

# Cargamos perfil
profile = snr_utils.SDRProfile.load(f"{real_channel_dual_sdr.channel_id}/{profile_name}")

# Definimos rango de puntos de SNR
snr_points = np.arange(-10, -6, -2)  

# Corremos pruebas cuantitativas
run_sdr_quantitative_test(
    profile=profile,
    snr_points=snr_points,
    channel_fn=channel_fn,
    max_transceive_attempts=2000,
    filename=f"{channel_fn.channel_id}/perf_{profile_name}" if hasattr(channel_fn, 'channel_id') else None
)