<a href="https://colab.research.google.com/github/giankev/Ancient-to-Modern-Italian-Automatic-Translation/blob/main/dataset_comunication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install sionna tensorflow

Collecting sionna
  Downloading sionna-1.2.1-py3-none-any.whl.metadata (6.3 kB)
Collecting sionna-rt==1.2.1 (from sionna)
  Downloading sionna_rt-1.2.1-py3-none-any.whl.metadata (4.8 kB)
Collecting numpy<2.0,>=1.26 (from sionna)
  Downloading numpy-1.26.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
Collecting mitsuba==3.7.1 (from sionna-rt==1.2.1->sionna)
  Downloading mitsuba-3.7.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (6.9 kB)
Collecting drjit==1.2.0 (from sionna-rt==1.2.1->sionna)
  Downloading drjit-1.2.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.1 kB)
Collecting ipywidgets>=8.1.5 (from sionna-rt==1.2.1->sionna)
  Downloading ipywidgets-8.1.8-py3-none-any.whl.metadata (2.4 kB)
Collecting pythreejs>=2.4.2 (from sionna-rt==1.2.1->sionna)
  Downloading pythreejs-2.4.2-py3-none-

In [17]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Any, List
import numpy as np
import tensorflow as tf

# -----------------------
# Sionna imports (roabusti tra versioni)
# -----------------------
try:
    import sionna
    from sionna.phy.mapping import Constellation, Mapper, Demapper
    from sionna.phy.channel import AWGN
    from sionna.phy.fec.ldpc import LDPC5GEncoder, LDPC5GDecoder
except Exception:  # fallback legacy (Sionna < 1.0)
    import sionna  # type: ignore
    from sionna.mapping import Constellation, Mapper, Demapper  # type: ignore
    from sionna.channel import AWGN  # type: ignore
    from sionna.fec.ldpc import LDPC5GEncoder, LDPC5GDecoder  # type: ignore


@dataclass(frozen=True)
class SimConfig:
    """
    Single-carrier symbol-rate dataset generator: 16-QAM + LDPC + pilot-aided phase recovery + AWGN.

    Per sample:
      - info_bits u (k)
      - LDPC encode -> coded_bits c (n)
      - Mapper 16-QAM -> x_data (L simboli), con n = L*m
      - Pilot noto (P simboli) prepended: x_tx = [x_pilot, x_data]
      - Canale: fase costante phi + AWGN
      - RX baseline (NO NN):
          stima phi_hat dai pilot (noti) -> derotazione
          demapper (LLR sui coded bits)  -> LDPC decoder

    Scopo: baseline coerente e “receiver-like” senza ricorrere a genie-aided phi.
    """

    # Dataset size
    num_examples: int = 10000

    # Modulation
    seq_length: int = 128            # L (data symbols per sample)
    bits_per_symbol: int = 4         # m=4 -> 16-QAM

    # LDPC
    k: int = 256                     # info bits; n = L*m (es: 512)

    # Pilot-aided phase recovery
    pilot_len: int = 16              # P pilot symbols (NOT coded), known at RX
    pilot_seed: int = 999            # deterministic pilot sequence

    # Channel
    ebn0_db_min: float = 0.0
    ebn0_db_max: float = 10.0

    # Phase offset (constant per sample)
    phase_min: float = -np.pi
    phase_max: float = np.pi

    # Baseline demapper/decoder
    demap_method: str = "app"        # "app" or "maxlog"
    dec_num_iter: int = 20
    cn_update: str = "minsum"        # FIX: use cn_update (not cn_type)

    # Reproducibility
    seed: int = 46

    # DL tensor format
    channels_last: bool = True       # True -> [N, L, 2], False -> [N, 2, L]

    # Dtype
    tf_rdtype: tf.dtypes.DType = tf.float32
    tf_cdtype: tf.dtypes.DType = tf.complex64


def complex_to_2ch_real(x: tf.Tensor, channels_last: bool) -> tf.Tensor:
    """[N,L] complex -> [N,L,2] or [N,2,L] real (I,Q)."""
    i = tf.math.real(x)
    q = tf.math.imag(x)
    return tf.stack([i, q], axis=-1 if channels_last else 1)


def generate_dataset(cfg: SimConfig) -> Dict[str, Any]:
    """
    Dataset completo + baseline sensata (NO NN), con phase recovery pilot-aided.

    Metriche baseline calcolate (global average sul dataset):
      - ber_pre_coded  : BER sui coded bits (hard decision sulle LLR)
      - ber_post_info  : BER sugli info bits dopo decoder LDPC
      - bler_post_info : BLER sugli info bits dopo decoder LDPC
    """
    rng = tf.random.Generator.from_seed(cfg.seed)

    N = cfg.num_examples
    L = cfg.seq_length
    m = cfg.bits_per_symbol
    P = cfg.pilot_len
    n = L * m
    k = cfg.k

    if not (0 < k < n):
        raise ValueError(f"Invalid k={k}. Must satisfy 0 < k < n={n} (n=L*m).")

    R = float(k) / float(n)

    # Sionna blocks
    constellation = Constellation("qam", num_bits_per_symbol=m)
    mapper = Mapper(constellation=constellation)
    demapper = Demapper(cfg.demap_method, constellation=constellation)
    awgn = AWGN()

    encoder = LDPC5GEncoder(k=k, n=n, num_bits_per_symbol=m)
    decoder = LDPC5GDecoder(
        encoder=encoder,
        num_iter=cfg.dec_num_iter,
        return_infobits=True,
        hard_out=True,
        cn_update=cfg.cn_update,     # FIX: new API
    )

    # -----------------------
    # 0) Pilot noto (uguale per tutti gli esempi), riproducibile
    # -----------------------
    pilot_rng = tf.random.Generator.from_seed(cfg.pilot_seed)
    pilot_bits = pilot_rng.uniform([P * m], minval=0, maxval=2, dtype=tf.int32)     # [P*m]
    x_pilot = tf.cast(mapper(tf.reshape(pilot_bits, [1, P*m])), cfg.tf_cdtype)      # [1,P]
    x_pilot = tf.tile(x_pilot, [N, 1])                                              # [N,P]

    # -----------------------
    # 1) TX: info bits -> LDPC -> coded bits -> data symbols
    # -----------------------
    info_bits = rng.uniform([N, k], minval=0, maxval=2, dtype=tf.int32)
    info_bits_f = tf.cast(info_bits, cfg.tf_rdtype)

    coded_bits_f = encoder(info_bits_f)                 # float {0,1} [N,n]
    coded_bits = tf.cast(coded_bits_f, tf.int32)        # int {0,1}   [N,n]

    x_data = tf.cast(mapper(coded_bits), cfg.tf_cdtype) # [N,L]
    x_tx = tf.concat([x_pilot, x_data], axis=1)         # [N,P+L]

    # -----------------------
    # 2) Canale: fase costante + AWGN
    # -----------------------
    phi = rng.uniform([N, 1], minval=cfg.phase_min, maxval=cfg.phase_max, dtype=cfg.tf_rdtype)  # [N,1]
    rot = tf.cast(tf.complex(tf.math.cos(phi), tf.math.sin(phi)), cfg.tf_cdtype)                # [N,1]
    x_tx_phase = x_tx * rot                                                                     # [N,P+L]
    x_data_phase = x_data * rot                                                                 # [N,L] (solo output)

    # Eb/N0 per sample
    ebn0_db = rng.uniform([N], minval=cfg.ebn0_db_min, maxval=cfg.ebn0_db_max, dtype=cfg.tf_rdtype)
    ebn0_lin = tf.pow(tf.constant(10.0, cfg.tf_rdtype), ebn0_db / 10.0)

    # Eb/N0 -> no (include pilot overhead (P+L)/L)
    overhead = tf.constant((P + L) / L, dtype=cfg.tf_rdtype)
    es = tf.reduce_mean(tf.abs(x_tx_phase)**2, axis=1, keepdims=True)     # [N,1]
    no = es / (tf.reshape(ebn0_lin, [-1, 1]) * (m * R)) * overhead        # [N,1]

    y_rx = tf.cast(awgn(x_tx_phase, no), cfg.tf_cdtype)                   # [N,P+L]

    # -----------------------
    # 3) RX baseline: stima fase dai pilot + derotazione + demapper + decoder
    # -----------------------
    y_pilot = y_rx[:, :P]  # [N,P]

    # ML phase estimate (fase costante): phi_hat = angle(sum(y_pilot * conj(x_pilot)))
    z = tf.reduce_sum(y_pilot * tf.math.conj(x_pilot), axis=1, keepdims=True)  # [N,1]
    phi_hat0 = tf.cast(tf.math.angle(z), cfg.tf_rdtype)                        # [N,1]

    rot_inv = tf.cast(tf.complex(tf.math.cos(-phi_hat0), tf.math.sin(-phi_hat0)), cfg.tf_cdtype)  # [N,1]
    y_corr = y_rx * rot_inv                                                                           # [N,P+L]

    # Solo DATA per demapper/decoder
    y_data_raw = y_rx[:, P:]     # [N,L] (non corretto)
    y_data_corr = y_corr[:, P:]  # [N,L] (corretto)

    llr0 = tf.cast(demapper(y_data_corr, no), cfg.tf_rdtype)   # [N,n] LLR sui coded bits
    hard_coded_bits0 = tf.cast(llr0 > 0.0, tf.uint8)

    info_hat_f = decoder(llr0)                                 # hard_out=True -> float {0,1}
    info_bits_hat0 = tf.cast(info_hat_f > 0.5, tf.uint8)

    # -----------------------
    # 4) IQ per DL (solo DATA)
    # -----------------------
    iq_clean = complex_to_2ch_real(x_data, cfg.channels_last)
    iq_phase = complex_to_2ch_real(x_data_phase, cfg.channels_last)
    iq_noisy = complex_to_2ch_real(y_data_raw, cfg.channels_last)
    iq_corr = complex_to_2ch_real(y_data_corr, cfg.channels_last)

    # -----------------------
    # 5) Metriche baseline (global avg)
    # -----------------------
    coded_bits_u8 = tf.cast(coded_bits, tf.uint8)
    info_bits_u8 = tf.cast(info_bits, tf.uint8)

    ber_pre = tf.reduce_mean(tf.cast(hard_coded_bits0 != coded_bits_u8, cfg.tf_rdtype))
    ber_post = tf.reduce_mean(tf.cast(info_bits_hat0 != info_bits_u8, cfg.tf_rdtype))
    bler_post = tf.reduce_mean(tf.cast(tf.reduce_any(info_bits_hat0 != info_bits_u8, axis=1), cfg.tf_rdtype))

    return {
        "info_bits": info_bits.numpy().astype(np.uint8),
        "coded_bits": coded_bits.numpy().astype(np.uint8),
        "pilot_bits": pilot_bits.numpy().astype(np.uint8),

        "iq_clean": iq_clean.numpy().astype(np.float32),
        "iq_phase_shifted": iq_phase.numpy().astype(np.float32),
        "iq_noisy": iq_noisy.numpy().astype(np.float32),
        "iq_corrected": iq_corr.numpy().astype(np.float32),

        "phase_offset": phi.numpy().astype(np.float32),
        "phase_hat0": phi_hat0.numpy().astype(np.float32),
        "ebn0_db": ebn0_db.numpy().astype(np.float32),
        "no": no.numpy().astype(np.float32),

        "llr0": llr0.numpy().astype(np.float32),
        "hard_coded_bits0": hard_coded_bits0.numpy().astype(np.uint8),
        "info_bits_hat0": info_bits_hat0.numpy().astype(np.uint8),

        "metrics": {
            "ber_pre_coded": float(ber_pre.numpy()),
            "ber_post_info": float(ber_post.numpy()),
            "bler_post_info": float(bler_post.numpy()),
        },
        "meta": {
            "num_examples": cfg.num_examples,
            "seq_length_data": L,
            "pilot_len": P,
            "bits_per_symbol": m,
            "k": k,
            "n": int(n),
            "coderate_R": float(R),
            "demap_method": cfg.demap_method,
            "dec_num_iter": cfg.dec_num_iter,
            "cn_update": cfg.cn_update,
            "seed": cfg.seed,
            "pilot_seed": cfg.pilot_seed,
            "channels_last": cfg.channels_last,
            "sionna_version": getattr(sionna, "__version__", "unknown"),
            "channel": "single-carrier; const phase + AWGN; pilot-aided phase correction",
            "ebn0_note": "Eb/N0 w.r.t. info bits; pilot overhead via factor (P+L)/L",
        },
    }

def wrap_angle_rad(a: float) -> float:
    """Riporta l'angolo in (-pi, pi]."""
    return float(np.angle(np.exp(1j * a)))

# -----------------------
# TEST: baseline per vari Eb/N0 (curve)
# -----------------------
if __name__ == "__main__":
    base_cfg = SimConfig(
        num_examples=4000,      # per punto Eb/N0 (aumenta se vuoi BLER più stabile)
        seq_length=128,
        bits_per_symbol=4,
        k=256,                  # n=512 => R=1/2
        pilot_len=16,
        demap_method="app",
        dec_num_iter=20,
        cn_update="minsum",
        seed=46,
        channels_last=True,
    )

    ebn0_grid_db = list(range(0, 11))  # 0..10 dB

    print("Curve baseline vs Eb/N0 (post-decoder):")
    for idx, e in enumerate(ebn0_grid_db):
        cfg_e = SimConfig(
            **{**base_cfg.__dict__,
               "ebn0_db_min": float(e),
               "ebn0_db_max": float(e),
               "seed": base_cfg.seed + 1000 + idx}  # seed diverso per punto
        )
        ds_e = generate_dataset(cfg_e)
        ber = ds_e["metrics"]["ber_post_info"]
        bler = ds_e["metrics"]["bler_post_info"]
        print(f"  Eb/N0={e:2d} dB -> BER={ber:.3e}, BLER={bler:.3e}")

Curve baseline vs Eb/N0 (post-decoder):
  Eb/N0= 0 dB -> BER=2.514e-01, BLER=1.000e+00
  Eb/N0= 1 dB -> BER=2.333e-01, BLER=1.000e+00
  Eb/N0= 2 dB -> BER=2.108e-01, BLER=1.000e+00
  Eb/N0= 3 dB -> BER=1.762e-01, BLER=9.800e-01
  Eb/N0= 4 dB -> BER=8.808e-02, BLER=6.472e-01
  Eb/N0= 5 dB -> BER=1.052e-02, BLER=9.625e-02
  Eb/N0= 6 dB -> BER=4.980e-04, BLER=4.500e-03
  Eb/N0= 7 dB -> BER=0.000e+00, BLER=0.000e+00
  Eb/N0= 8 dB -> BER=0.000e+00, BLER=0.000e+00
  Eb/N0= 9 dB -> BER=0.000e+00, BLER=0.000e+00
  Eb/N0=10 dB -> BER=0.000e+00, BLER=0.000e+00


In [24]:
def wrap_angle_rad(a: float) -> float:
    """Riporta l'angolo in (-pi, pi]."""
    return float(np.angle(np.exp(1j * a)))

def to_complex_iq(iq_2ch: np.ndarray) -> np.ndarray:
    """
    iq_2ch: [L,2] (channels_last) oppure [2,L] (channels_first)
    ritorna complesso [L]
    """
    iq_2ch = np.asarray(iq_2ch)
    if iq_2ch.ndim != 2:
        raise ValueError(f"Expected 2D IQ array, got {iq_2ch.shape}")

    if iq_2ch.shape[-1] == 2:   # [L,2]
        I = iq_2ch[:, 0]
        Q = iq_2ch[:, 1]
    elif iq_2ch.shape[0] == 2:  # [2,L]
        I = iq_2ch[0, :]
        Q = iq_2ch[1, :]
    else:
        raise ValueError(f"Unrecognized IQ shape: {iq_2ch.shape}")

    return I + 1j * Q


# --- genera 3 blocchi (usa il TUO SimConfig + generate_dataset pilot-aided già definiti) ---
cfg3 = SimConfig(
    num_examples=10,
    seq_length=128,
    bits_per_symbol=4,
    k=256,
    pilot_len=16,
    ebn0_db_min=4.0,   # puoi anche lasciare un range; qui fissato per esempio
    ebn0_db_max=4.0,
    dec_num_iter=5,
    cn_update="minsum",
    seed=1234,
    channels_last=True
)

ds = generate_dataset(cfg3)

print("---- METRICHE PER-BLOCCO (3 esempi) ----")
for i in range(10):
    # Parametri canale per-blocco (costanti nel blocco)
    ebn0_db = float(ds["ebn0_db"][i])
    no = float(ds["no"][i, 0])

    phi = float(ds["phase_offset"][i, 0])
    phi_hat = float(ds["phase_hat0"][i, 0])
    phi_err = wrap_angle_rad(phi_hat - phi)

    # “SNR” effettivo (Es/N0) stimato sui simboli dati (post-mapper)
    x_clean = to_complex_iq(ds["iq_clean"][i])   # DATA symbols (L)
    Es = float(np.mean(np.abs(x_clean)**2))
    esn0_db = 10.0 * np.log10(Es / no)

    # --- PRE-DECODER: coded bits (hard decision da LLR baseline) ---
    c_true = ds["coded_bits"][i].astype(np.uint8)
    c_hat = ds["hard_coded_bits0"][i].astype(np.uint8)
    n_c = c_true.size
    err_c = int(np.sum(c_true != c_hat))
    ber_pre = err_c / n_c

    # --- POST-DECODER: info bits ---
    u_true = ds["info_bits"][i].astype(np.uint8)
    u_hat = ds["info_bits_hat0"][i].astype(np.uint8)
    n_u = u_true.size
    err_u = int(np.sum(u_true != u_hat))
    ber_post = err_u / n_u
    bler_post = 1 if err_u > 0 else 0

    print(f"\nBlocco #{i}")
    print(f"  Eb/N0 (info-bit) : {ebn0_db:6.2f} dB")
    print(f"  Es/N0 stimato    : {esn0_db:6.2f} dB   (da Es/no sui DATA symbols)")
    print(f"  no (var compl.)  : {no:.3e}")
    print(f"  phi vero         : {phi:+.3f} rad")
    print(f"  phi stimato      : {phi_hat:+.3f} rad")
    print(f"  errore fase      : {phi_err:+.3f} rad  (wrapped)")

    print(f"  PRE  (coded)     : err={err_c:4d}/{n_c}  BER={ber_pre:.3e}")
    print(f"  POST (info)      : err={err_u:4d}/{n_u}  BER={ber_post:.3e}  BLER={bler_post}")

---- METRICHE PER-BLOCCO (3 esempi) ----

Blocco #0
  Eb/N0 (info-bit) :   4.00 dB
  Es/N0 stimato    :   6.64 dB   (da Es/no sui DATA symbols)
  no (var compl.)  : 2.152e-01
  phi vero         : -0.461 rad
  phi stimato      : -0.393 rad
  errore fase      : +0.068 rad  (wrapped)
  PRE  (coded)     : err=  73/512  BER=1.426e-01
  POST (info)      : err=  31/256  BER=1.211e-01  BLER=1

Blocco #1
  Eb/N0 (info-bit) :   4.00 dB
  Es/N0 stimato    :   6.67 dB   (da Es/no sui DATA symbols)
  no (var compl.)  : 2.314e-01
  phi vero         : -0.236 rad
  phi stimato      : -0.171 rad
  errore fase      : +0.065 rad  (wrapped)
  PRE  (coded)     : err=  65/512  BER=1.270e-01
  POST (info)      : err=  18/256  BER=7.031e-02  BLER=1

Blocco #2
  Eb/N0 (info-bit) :   4.00 dB
  Es/N0 stimato    :   6.66 dB   (da Es/no sui DATA symbols)
  no (var compl.)  : 2.252e-01
  phi vero         : +1.976 rad
  phi stimato      : +2.145 rad
  errore fase      : +0.169 rad  (wrapped)
  PRE  (coded)     : err