In [None]:
# =============================================================================
# Proyecto Integrador Comunicaciones Digitales
# "Dise√±o e implementaci√≥n de un transmisor y receptor LoRa PHY - Segunda Parte"
#
# Autores:
# - Saqib Daniel Mohammad Cabrejos
# - Enzo Leonel Laura Surco
# =============================================================================

# =============================================================================
# 1. IMPORTS
# =============================================================================
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import resample

# =============================================================================
# 2. FUNCIONES DE VISUALIZACI√ìN
# =============================================================================
def plot_parte_real(signal, title="la se√±al"):
    plt.figure(figsize=(12, 2))
    plt.plot(np.arange(len(np.real(signal))), np.real(signal), color='blue')
    plt.title("Parte real de " + title)
    plt.xlabel("Muestras")
    plt.ylabel("Amplitud")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_parte_imag(signal, title="la se√±al"):
    plt.figure(figsize=(12, 2))
    plt.plot(np.arange(len(np.imag(signal))), np.imag(signal), color='orange')
    plt.title("Parte imaginaria de " + title)
    plt.xlabel("Muestras")
    plt.ylabel("Amplitud")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def plot_frecuencia_instantanea(signal, title="la se√±al"):
    fase = np.unwrap(np.angle(signal))
    freq_inst = np.diff(fase) / (2 * np.pi)
    plt.figure(figsize=(12, 2))
    plt.plot(np.arange(len(freq_inst)), freq_inst)
    plt.title("Frecuencia instant√°nea de " + title)
    plt.xlabel("Muestras")
    plt.ylabel("Frecuencia (Hz)")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# =============================================================================
# 3. TRANSMISOR
# =============================================================================
def codificador(bits, SF):
    """Convierte bits a s√≠mbolos LoRa."""
    if len(bits) % SF != 0:
        raise ValueError("La longitud de los bits debe ser un m√∫ltiplo del SF")

    num_simbolos = len(bits) // SF
    simbolos = np.zeros(num_simbolos, dtype=int)

    for i in range(num_simbolos):
        bloque = bits[i * SF:(i + 1) * SF]
        simbolos[i] = np.sum(bloque * (2 ** np.arange(SF)[::-1]))

    return simbolos

def waveform_former(simbolos, SF, Bw, T):
    """Genera los waveforms para los s√≠mbolos LoRa."""
    N = 2 ** SF
    oversampling_ratio = int(round(1 / (T * Bw)))
    M = N * oversampling_ratio

    k = np.arange(M)
    argumento_base = (k / oversampling_ratio) * (k / oversampling_ratio) / (2 * N) - 0.5 * (k / oversampling_ratio)
    chirp_base = np.exp(1j * 2 * np.pi * argumento_base)

    waveform = np.zeros((len(simbolos), M), dtype=complex)

    for i, s in enumerate(simbolos):
        shift = int(round(s * oversampling_ratio))
        waveform[i] = np.roll(chirp_base, -shift)

    return waveform / np.sqrt(np.mean(np.abs(waveform)**2))

def up_chirp(SF, Bw, T):
    """Genera un up-chirp base (s√≠mbolo 0)."""
    return waveform_former([0], SF, Bw, T).flatten()

def down_chirp(SF, Bw, T):
    """Genera un down-chirp base (conjugado del up-chirp)."""
    return np.conj(up_chirp(SF, Bw, T)).flatten()

def preambulo_trama(SF, Bw, T):
    """Genera el pre√°mbulo (8 up-chirps)."""
    return np.tile(up_chirp(SF, Bw, T), 8)

def sfd_trama(SF, Bw, T):
    """Genera el SFD (2.25 down-chirps)."""
    chirp = down_chirp(SF, Bw, T)
    return np.concatenate((chirp, chirp, chirp[:int(0.25 * len(chirp))]))

def generar_trama(SF, simbolos, Bw, T):
    """Genera una trama LoRa completa con ruido inicial."""
    ruido_len = int(np.random.random() * 3 * 2**SF)

    # ‚úÖ CORREGIDO: Componentes de ruido independientes
    ruido_real = np.random.randn(ruido_len) * np.sqrt(0.001)
    ruido_imag = np.random.randn(ruido_len) * np.sqrt(0.001)
    ruido_complejo = ruido_real + 1j * ruido_imag

    preambulo = preambulo_trama(SF, Bw, T)
    sfd = sfd_trama(SF, Bw, T)
    waveforms_data = waveform_former(simbolos, SF, Bw, T)

    trama_tx = np.concatenate([ruido_complejo, preambulo, sfd, waveforms_data.flatten()])
    return trama_tx

# =============================================================================
# 4. RECEPTOR
# =============================================================================
def resample_signal_rx(signal, osf_target=2):
    """Resamplea la se√±al al factor de sobremuestreo objetivo."""
    return resample(signal, len(signal) * osf_target)

def dechirp(signal, x, SF, Bw, T, osf=2, direction='down', zero_padding_factor=4):
    """
    Aplica dechirping a un s√≠mbolo con CPA.

    Par√°metros:
    - signal: Se√±al remuestreada con OSF
    - x: √çndice de inicio del s√≠mbolo
    - SF: Spreading Factor
    - Bw: Ancho de banda
    - T: Per√≠odo de muestreo (para compatibilidad)
    - osf: Oversampling factor
    - direction: 'down' para demodular up-chirps, 'up' para down-chirps
    - zero_padding_factor: Factor de zero-padding para la FFT
    """
    M = 2**SF
    sample_num = osf * M
    bin_num = M * zero_padding_factor
    fft_len = sample_num * zero_padding_factor

    if x + sample_num > len(signal):
        return (0, 0, np.zeros(bin_num))

    # Extraer el s√≠mbolo
    simbolo_rx = signal[x:x + sample_num]

    # Generar chirp de referencia
    if direction == 'down':
        chirp_ref_base = down_chirp(SF, Bw, T)
    else:
        chirp_ref_base = up_chirp(SF, Bw, T)

    # Resamplear el chirp de referencia si es necesario
    if osf > 1:
        chirp_ref = resample(chirp_ref_base, len(chirp_ref_base) * osf)
    else:
        chirp_ref = chirp_ref_base

    # Aplicar dechirping
    dechirped = simbolo_rx * chirp_ref

    # FFT con zero-padding
    ft = np.fft.fft(dechirped, n=fft_len)

    # CPA: Sumar ambos segmentos de la FFT
    ft_ = np.abs(ft[:bin_num]) + np.abs(ft[fft_len - bin_num:])

    max_idx = np.argmax(ft_)
    max_val = ft_[max_idx]

    return max_val, max_idx, ft_

def detect(signal, SF, Bw, T, osf=2, zero_padding_factor=4, preamble_len=8, start_idx=0):
    """Detecta el pre√°mbulo usando ventana deslizante."""
    M = 2**SF
    sample_num = osf * M
    bin_num = M * zero_padding_factor

    ii = start_idx
    pk_bin_list = []

    while ii < len(signal) - sample_num * preamble_len:
        if len(pk_bin_list) == (preamble_len - 1):
            x = ii - round(pk_bin_list[-1] / (zero_padding_factor / 2))
            return x

        pk0 = dechirp(signal, ii, SF, Bw, T, osf=osf, direction='down',
                      zero_padding_factor=zero_padding_factor)

        if len(pk_bin_list) > 0:
            bin_diff = (pk_bin_list[-1] - pk0[1]) % bin_num
            if bin_diff > bin_num / 2:
                bin_diff = bin_num - bin_diff
            if bin_diff <= zero_padding_factor:
                pk_bin_list.append(pk0[1])
            else:
                pk_bin_list = [pk0[1]]
        else:
            pk_bin_list = [pk0[1]]

        ii += sample_num

    return -1

def sync(signal, x_sfd, SF, Bw, T, osf=2, zero_padding_factor=4):
    """Sincronizaci√≥n fina usando el SFD."""
    M = 2**SF
    sample_num = osf * M
    bin_num = M * zero_padding_factor

    found = False
    x = x_sfd

    # Buscar transici√≥n up->down (inicio del SFD)
    while x < len(signal) - sample_num:
        up_peak = dechirp(signal, x, SF, Bw, T, osf=osf, direction='down',
                          zero_padding_factor=zero_padding_factor)
        down_peak = dechirp(signal, x, SF, Bw, T, osf=osf, direction='up',
                            zero_padding_factor=zero_padding_factor)

        if abs(down_peak[0]) > abs(up_peak[0]):
            found = True
            break
        x += sample_num

    if not found:
        return -1, 0, 0

    # Alineaci√≥n fina de ventana
    pkd = dechirp(signal, x, SF, Bw, T, osf=osf, direction='up',
                  zero_padding_factor=zero_padding_factor)
    bin_idx = pkd[1]

    if bin_idx >= bin_num / osf:
        offset_bins = bin_idx - bin_num
    else:
        offset_bins = bin_idx

    to = round(offset_bins / (zero_padding_factor * osf))
    x = x + to

    # Estimaci√≥n de CFO
    pku = dechirp(signal, x - 2 * sample_num, SF, Bw, T, osf=osf, direction='down',
                  zero_padding_factor=zero_padding_factor)
    preamble_bin = pku[1]

    if preamble_bin > bin_num / osf:
        cfo_val = preamble_bin - bin_num
    else:
        cfo_val = preamble_bin

    cfo_hz = (cfo_val * Bw) / bin_num

    # Determinar posici√≥n del payload
    pku_check = dechirp(signal, x - sample_num, SF, Bw, T, osf=osf, direction='down',
                        zero_padding_factor=zero_padding_factor)
    pkd_check = dechirp(signal, x - sample_num, SF, Bw, T, osf=osf, direction='up',
                        zero_padding_factor=zero_padding_factor)

    if abs(pkd_check[0]) > abs(pku_check[0]):
        x_payload = x + round(1.25 * sample_num)
    else:
        x_payload = x + round(2.25 * sample_num)

    return x_payload, cfo_hz, preamble_bin

def dynamic_compensation(symbols, cfo_hz, SF, Bw, rf_freq=915e6):
    """Compensa el SFO bas√°ndose en el CFO."""
    M = 2**SF
    data = np.array(symbols)
    sfo_drift_per_symbol = M * cfo_hz / rf_freq
    indices = np.arange(1, len(data) + 1) + 1
    sfo_drift = indices * sfo_drift_per_symbol
    symbols_compensated = np.mod(data - sfo_drift, M)
    return symbols_compensated

def demodulate(signal, SF, Bw, T, payload_len, osf=2, zero_padding_factor=4,
               preamble_len=8, rf_freq=915e6):
    """Demodula una se√±al LoRa completa."""
    M = 2**SF
    signal_resampled = resample_signal_rx(signal, osf_target=osf)
    sample_num = osf * M
    bin_num = M * zero_padding_factor

    x = 0
    simbolos = []
    cfo_hz = 0

    while x < len(signal_resampled):
        # Detectar pre√°mbulo
        x = detect(signal_resampled, SF, Bw, T, osf=osf,
                   zero_padding_factor=zero_padding_factor,
                   preamble_len=preamble_len, start_idx=x)

        if x < 0:
            break

        # Sincronizaci√≥n fina
        x_payload, cfo_hz, preamble_bin = sync(signal_resampled, x, SF, Bw, T,
                                                osf=osf,
                                                zero_padding_factor=zero_padding_factor)

        if x_payload < 0:
            return [], 0

        if x_payload + payload_len * sample_num > len(signal_resampled):
            print("No hay suficiente se√±al para el payload")
            return simbolos, cfo_hz

        # Demodular payload
        symbols = []
        for ii in range(payload_len):
            pk = dechirp(signal_resampled, x_payload + ii * sample_num,
                         SF, Bw, T, osf=osf, direction='down',
                         zero_padding_factor=zero_padding_factor)
            symbol_raw = (pk[1] + bin_num - preamble_bin) / zero_padding_factor
            symbols.append(np.mod(symbol_raw, M))

        # Compensaci√≥n SFO
        symbols_compensated = dynamic_compensation(symbols, cfo_hz, SF, Bw, rf_freq)
        symbols_final = np.mod(np.round(symbols_compensated), M).astype(int)
        simbolos.append(symbols_final)

        x = x_payload + payload_len * sample_num

    return simbolos, cfo_hz

def decodificador(simbolos_codificados, SF):
    """Convierte s√≠mbolos a bits."""
    num_bits = len(simbolos_codificados) * SF
    bits_decodificados = np.zeros(num_bits, dtype=int)

    for i, simbolo in enumerate(simbolos_codificados):
        for h in range(SF-1, -1, -1):
            bits_decodificados[i * SF + (SF - 1 - h)] = (int(simbolo) >> h) & 1

    return bits_decodificados

print("‚úÖ Sistema LoRa cargado correctamente (TX + RX)")

‚úÖ Sistema LoRa cargado correctamente (TX + RX)


In [None]:
# =============================================================================
# PRUEBA DEL SISTEMA COMPLETO
# =============================================================================

# Par√°metros
SF_test = 7
Bw_test = 125e3
T_test = 1 / Bw_test

# Generar datos
num_bits_test = SF_test * 5
bits_tx = np.random.randint(0, 2, num_bits_test)
simbolos_tx = codificador(bits_tx, SF_test)

print("="*60)
print("PRUEBA DEL SISTEMA LoRa")
print("="*60)
print(f"\nüì° TRANSMISOR:")
print(f"   SF = {SF_test}, BW = {Bw_test/1e3} kHz")
print(f"   Bits TX: {bits_tx}")
print(f"   S√≠mbolos TX: {simbolos_tx}")

# Generar trama
trama_tx = generar_trama(SF_test, simbolos_tx, Bw_test, T_test)
print(f"   Longitud trama: {len(trama_tx)} muestras")

# Demodular
print(f"\nüìª RECEPTOR:")
simbolos_rx_list, cfo = demodulate(trama_tx, SF_test, Bw_test, T_test,
                                    payload_len=len(simbolos_tx))

if len(simbolos_rx_list) > 0:
    simbolos_rx = simbolos_rx_list[0]
    bits_rx = decodificador(simbolos_rx, SF_test)

    print(f"   S√≠mbolos RX: {simbolos_rx}")
    print(f"   Bits RX: {bits_rx}")
    print(f"   CFO: {cfo:.2f} Hz")

    errores = np.sum(bits_tx != bits_rx)
    print(f"\nüìä RESULTADO:")
    print(f"   Errores: {errores}/{len(bits_tx)}")
    print(f"   {'‚úÖ Transmisi√≥n exitosa!' if errores == 0 else '‚ùå Errores detectados'}")
else:
    print("   ‚ùå No se detect√≥ el paquete")
print("="*60)

PRUEBA DEL SISTEMA LoRa

üì° TRANSMISOR:
   SF = 7, BW = 125.0 kHz
   Bits TX: [0 0 0 1 0 0 1 1 1 1 0 1 0 0 0 0 1 0 1 1 0 0 0 0 1 0 0 1 1 1 1 0 1 1 0]
   S√≠mbolos TX: [  9 116  22   9 118]
   Longitud trama: 2267 muestras

üìª RECEPTOR:
   S√≠mbolos RX: [  9 116  22   9 118]
   Bits RX: [0 0 0 1 0 0 1 1 1 1 0 1 0 0 0 0 1 0 1 1 0 0 0 0 1 0 0 1 1 1 1 0 1 1 0]
   CFO: 0.00 Hz

üìä RESULTADO:
   Errores: 0/35
   ‚úÖ Transmisi√≥n exitosa!
