In [6]:
import numpy as np
import matplotlib.pyplot as plt

from pathlib import Path
from typing import Optional

from ipywidgets import (
    interact,
    IntSlider,
    FloatSlider,
    Checkbox,
    Button,
    Text,
    HBox,
    VBox,
    Output,
    Layout,
    Dropdown,
)
from IPython.display import display

from scipy.interpolate import interp1d

%matplotlib inline


In [7]:


def load_iq_file(path: Path, normalize: bool = True) -> np.ndarray:
    """Считывает int16-последовательность I/Q и формирует комплексный массив."""
    raw = np.fromfile(path, dtype=np.int16)
    if raw.size < 2:
        raise ValueError("Файл должен содержать минимум одну пару I/Q.")
    if raw.size % 2:
        raw = raw[:-1]  # отбрасываем последний непарный отсчёт
    i_samples = raw[0::2].astype(np.float32)
    q_samples = raw[1::2].astype(np.float32)
    iq = i_samples + 1j * q_samples
    if normalize:
        peak = np.max(np.abs(iq))
        if peak > 0:
            iq = iq / peak
    return iq.astype(np.complex64)


def apply_integer_delay(signal: np.ndarray, delay: int) -> np.ndarray:
    if signal.size == 0 or delay == 0:
        return signal
    if delay > 0:
        if delay >= signal.size:
            return np.array([], dtype=signal.dtype)
        return signal[delay:]
    # отрицательная задержка: продвигаем сигнал вперёд, заполняя начало нулями
    shift = abs(delay)
    padded = np.pad(signal, (shift, 0), mode="constant")
    return padded[: signal.size]


def apply_frequency_correction(signal: np.ndarray, freq_offset: float) -> np.ndarray:
    if signal.size == 0 or abs(freq_offset) < 1e-12:
        return signal
    n = np.arange(signal.size, dtype=np.float32)
    correction = np.exp(-1j * 2 * np.pi * freq_offset * n)
    return signal * correction


def apply_fractional_delay(signal: np.ndarray, timing_error: float) -> np.ndarray:
    """Компенсирует дробное смещение с помощью кубической интерполяции (локальный многочлен Лагранжа)."""
    if signal.size == 0 or abs(timing_error) < 1e-6:
        return signal
    n = np.arange(signal.size, dtype=np.float32)
    interpolator = interp1d(
        n,
        signal,
        kind="cubic",
        bounds_error=False,
        fill_value="extrapolate",
        assume_sorted=True,
    )
    shifted = interpolator(n + timing_error)
    return shifted.astype(signal.dtype)


def design_rrc(beta: float, sps: int, span: int) -> np.ndarray:
    """Формирует импульсную характеристику RRC-фильтра с заданным roll-off."""
    if sps <= 0:
        raise ValueError("sps должен быть положительным")
    span = max(2, span)
    t = np.arange(-span * sps, span * sps + 1, dtype=np.float64) / sps
    h = np.zeros_like(t)
    for idx, time in enumerate(t):
        if np.isclose(time, 0.0):
            h[idx] = 1 - beta + 4 * beta / np.pi
        elif beta != 0 and np.isclose(abs(time), 1 / (4 * beta)):
            h[idx] = (
                beta
                / np.sqrt(2)
                * (
                    (1 + 2 / np.pi) * np.sin(np.pi / (4 * beta))
                    + (1 - 2 / np.pi) * np.cos(np.pi / (4 * beta))
                )
            )
        else:
            numerator = (
                np.sin(np.pi * time * (1 - beta))
                + 4 * beta * time * np.cos(np.pi * time * (1 + beta))
            )
            denominator = np.pi * time * (1 - (4 * beta * time) ** 2)
            h[idx] = numerator / denominator
    h = h / np.linalg.norm(h)
    return h.astype(np.float32)


def moving_average_kernel(length: int) -> np.ndarray:
    if length <= 1:
        return np.array([1.0], dtype=np.float32)
    kernel = np.ones(length, dtype=np.float32)
    return kernel / length


def estimate_gardner_correction(
    signal: np.ndarray,
    sps: int,
    mu: float = 0.02,
    max_symbols: int = 4000,
) -> float:
    if signal.size < 3 * sps or sps < 2:
        return 0.0
    half = sps / 2.0
    n = np.arange(signal.size, dtype=np.float32)
    interpolator = interp1d(
        n,
        signal,
        kind="linear",
        bounds_error=False,
        fill_value="extrapolate",
        assume_sorted=True,
    )
    symbols_available = int((signal.size - sps) // sps)
    symbols_to_use = int(min(max_symbols, symbols_available))
    if symbols_to_use <= 1:
        return 0.0
    errors = []
    for k in range(1, symbols_to_use):
        t = k * sps
        current = interpolator(t)
        amp = np.abs(current)
        denom = float(amp * amp + 1e-7)
        early = interpolator(t - half)
        late = interpolator(t + half)
        error = np.real((late - early) * np.conj(current)) / denom
        errors.append(error)
    if not errors:
        return 0.0
    avg_error = float(np.mean(errors))
    correction = mu * avg_error
    return float(np.clip(correction, -0.5, 0.5))


def decision_qpsk(symbols: np.ndarray) -> np.ndarray:
    if symbols.size == 0:
        return symbols
    real_part = np.where(symbols.real >= 0, 1.0, -1.0)
    imag_part = np.where(symbols.imag >= 0, 1.0, -1.0)
    decided = real_part + 1j * imag_part
    return decided.astype(np.complex64)


def decision_square_qam(symbols: np.ndarray, order: int) -> np.ndarray:
    m = int(np.sqrt(order))
    if m * m != order:
        raise ValueError("Порядок QAM должен быть квадратом целого числа")
    levels = np.arange(-(m - 1), m, 2, dtype=np.float32)
    def quantize(values: np.ndarray) -> np.ndarray:
        idx = np.abs(values[:, None] - levels[None, :]).argmin(axis=1)
        return levels[idx]
    real_levels = quantize(symbols.real.astype(np.float32))
    imag_levels = quantize(symbols.imag.astype(np.float32))
    return (real_levels + 1j * imag_levels).astype(np.complex64)


def hard_decision(symbols: np.ndarray, scheme: str) -> Optional[np.ndarray]:
    if symbols.size == 0:
        return None
    scheme = scheme.lower()
    if scheme == "qpsk":
        return decision_qpsk(symbols)
    if scheme == "16qam":
        return decision_square_qam(symbols, 16)
    if scheme == "64qam":
        return decision_square_qam(symbols, 64)
    return None


def slice_symbols(signal: np.ndarray, sps: int, max_points: int) -> np.ndarray:
    if sps < 1:
        raise ValueError("Параметр sps должен быть >= 1")
    if signal.size == 0:
        return signal
    symbols = signal[::sps]
    if max_points > 0:
        symbols = symbols[:max_points]
    return symbols


def ensure_complex(signal: Optional[np.ndarray]) -> np.ndarray:
    if signal is None:
        return np.array([], dtype=np.complex64)
    return signal


In [8]:
from typing import Optional, Tuple


_FILTER_DELAY_CACHE: dict[Tuple[str, int, int, float], Tuple[np.ndarray, int]] = {}


def _get_filter_taps(
    filter_type: str,
    sps: int,
    span: int,
    beta: float,
) -> Tuple[Optional[np.ndarray], int]:
    key = (filter_type, int(sps), int(span), float(beta))
    if key in _FILTER_DELAY_CACHE:
        taps, gd = _FILTER_DELAY_CACHE[key]
        return taps.copy(), gd
    taps: Optional[np.ndarray] = None
    if filter_type == "RRC (β=0.35)":
        taps = design_rrc(beta, sps, span)
    elif filter_type == "Moving Average":
        taps = moving_average_kernel(max(2, span * sps))
    if taps is None:
        return None, 0
    group_delay = max((taps.size - 1) // 2, 0)
    _FILTER_DELAY_CACHE[key] = (taps.astype(np.float32), group_delay)
    return taps.copy(), group_delay


def apply_matched_filter(
    signal: np.ndarray,
    sps: int,
    filter_type: str,
    span: int,
    beta: float,
) -> tuple[np.ndarray, int]:
    if signal.size == 0:
        return signal, 0
    taps, group_delay = _get_filter_taps(filter_type, sps, span, beta)
    if taps is None:
        return signal, 0
    filtered = np.convolve(signal, taps, mode="same")
    if group_delay:
        tail = np.zeros(group_delay, dtype=filtered.dtype)
        filtered = np.concatenate((filtered[group_delay:], tail))
    return filtered.astype(signal.dtype), group_delay



In [9]:
iq_signal: Optional[np.ndarray] = None
current_path: Optional[Path] = None


def _noop():
    pass


request_redraw = _noop

file_input = Text(
    description="Файл",
    placeholder="/path/to/record.pcm",
    layout=Layout(width="70%"),
)
load_button = Button(description="Загрузить", button_style="primary", icon="upload")
normalize_checkbox = Checkbox(value=True, description="Нормализовать", indent=False)

status_box = Output(layout=Layout(border="1px solid #ccc", padding="6px"))
plot_box = Output()
info_box = Output()


def on_load_clicked(_):
    global iq_signal, current_path, request_redraw
    with status_box:
        status_box.clear_output()
        try:
            path = Path(file_input.value).expanduser()
            if not path.exists():
                raise FileNotFoundError(f"Файл {path} не найден")
            iq_signal = load_iq_file(path, normalize=normalize_checkbox.value)
            current_path = path
            print(f"Загружено {iq_signal.size} комплексных отсчётов из {path.name}")
            request_redraw()
        except Exception as exc:
            iq_signal = None
            current_path = None
            print(f"Ошибка: {exc}")
            request_redraw()


load_button.on_click(on_load_clicked)

controls_header = HBox([file_input, load_button, normalize_checkbox])
display(VBox([controls_header, status_box]))


VBox(children=(HBox(children=(Text(value='', description='Файл', layout=Layout(width='70%'), placeholder='/pat…

In [10]:
max_points_slider = IntSlider(
    value=10000,
    min=500,
    max=50000,
    step=500,
    description="Точек",
    readout=True,
)
freeze_checkbox = Checkbox(value=False, description="Заморозить", indent=False)

sps_slider = IntSlider(min=1, max=16, value=4, description="SPS")
DELAY_LIMIT = 4096
delay_slider = IntSlider(min=-DELAY_LIMIT, max=DELAY_LIMIT, value=0, description="Delay")
timing_slider = FloatSlider(min=-0.5, max=0.5, step=0.05, value=0.0, description="Timing")
freq_slider = FloatSlider(min=-0.1, max=0.1, step=0.01, value=0.0, description="Freq")
resampling_checkbox = Checkbox(value=False, description="Ресемплинг", indent=False)
freq_checkbox = Checkbox(value=True, description="Частота", indent=False)

filter_checkbox = Checkbox(value=False, description="Фильтр", indent=False)
filter_type_dropdown = Dropdown(
    options=["RRC (β=0.35)", "Moving Average"],
    value="RRC (β=0.35)",
    description="Тип",
    layout=Layout(width="160px"),
)
filter_span_slider = IntSlider(
    min=2,
    max=16,
    value=8,
    step=1,
    description="Span",
)
filter_beta_slider = FloatSlider(
    min=0.1,
    max=0.6,
    step=0.05,
    value=0.35,
    description="β",
    readout_format=".2f",
)

gardner_checkbox = Checkbox(value=False, description="Gardner", indent=False)
gardner_mu_slider = FloatSlider(
    min=0.001,
    max=0.1,
    step=0.001,
    value=0.02,
    description="μ",
    readout_format=".3f",
)

decision_checkbox = Checkbox(value=False, description="Decision", indent=False)
modulation_dropdown = Dropdown(
    options=["QPSK", "16QAM", "64QAM"],
    value="QPSK",
    description="Схема",
    layout=Layout(width="140px"),
)


def update_constellation(
    sps: int,
    delay: int,
    timing_error: float,
    freq_offset: float,
    apply_resampling: bool,
    apply_freq_corr: bool,
    max_points: int,
    freeze_update: bool,
    apply_filter: bool,
    filter_type: str,
    filter_span: int,
    filter_beta: float,
    use_gardner: bool,
    gardner_mu: float,
    use_decision: bool,
    decision_scheme: str,
):
    """Основная функция обновления графика созвездия."""
    global iq_signal

    signal = ensure_complex(iq_signal)
    with info_box:
        info_box.clear_output()

    if signal.size == 0:
        with plot_box:
            plot_box.clear_output()
            print("Загрузите .pcm файл и нажмите \"Загрузить\".")
        with info_box:
            print("Нет данных для отображения.")
        return

    processed = signal
    if apply_freq_corr:
        processed = apply_frequency_correction(processed, freq_offset)
    elif abs(freq_offset) > 1e-6:
        with info_box:
            print("Частотная коррекция выключена — freq_offset не применяется.")

    filter_group_delay = 0
    if apply_filter:
        processed, filter_group_delay = apply_matched_filter(
            processed,
            sps=sps,
            filter_type=filter_type,
            span=filter_span,
            beta=filter_beta,
        )

    processed = apply_integer_delay(processed, delay)

    total_fractional_shift = 0.0
    if apply_resampling:
        processed = apply_fractional_delay(processed, timing_error)
        total_fractional_shift += timing_error
    elif abs(timing_error) > 1e-6:
        with info_box:
            print("Ресемплинг выключен — timing_error игнорируется.")

    gardner_shift = 0.0
    if use_gardner:
        gardner_shift = estimate_gardner_correction(processed, sps, mu=gardner_mu)
        if abs(gardner_shift) > 1e-6:
            processed = apply_fractional_delay(processed, gardner_shift)
            total_fractional_shift += gardner_shift

    symbols = slice_symbols(processed, sps, max_points)
    decisions = None
    decision_rms = None
    if use_decision:
        decisions = hard_decision(symbols, decision_scheme)
        if decisions is None:
            with info_box:
                print(f"Схема {decision_scheme} пока не поддерживается для принятия решений.")
        elif decisions.size:
            diff = symbols - decisions
            decision_rms = float(np.sqrt(np.mean(np.abs(diff) ** 2)))

    with info_box:
        print(
            f"Исходных отсчётов: {signal.size:,}; после обработки: {processed.size:,}; отображаемых точек: {symbols.size:,}",
        )
        if current_path:
            print(f"Файл: {current_path.name}")
        if apply_filter:
            beta_info = f", β={filter_beta:.2f}" if "RRC" in filter_type else ""
            print(
                f"Фильтр: {filter_type}, span={filter_span}{beta_info}, компенс. задержка={filter_group_delay}"
            )
        if use_gardner:
            print(f"Gardner коррекция: {gardner_shift:+.4f} отсчёта")
        if delay != 0:
            print(f"Целая задержка: {delay:+d} отсчёта")
        if total_fractional_shift:
            print(f"Итоговый дробный сдвиг: {total_fractional_shift:+.4f} отсчёта")
        if decision_rms is not None:
            print(f"Среднее отклонение после решений: {decision_rms:.4f}")

    if freeze_update:
        with plot_box:
            plot_box.clear_output()
            print("Автообновление отключено. Снимите чекбокс, чтобы перерисовать график.")
        return

    with plot_box:
        plot_box.clear_output(wait=True)
        fig, ax = plt.subplots(figsize=(6, 6))
        if symbols.size == 0:
            ax.text(0.5, 0.5, "Недостаточно данных", ha="center", va="center")
        else:
            ax.scatter(symbols.real, symbols.imag, s=5, alpha=0.7, label="Отсчёты")
            if decisions is not None and decisions.size:
                ax.scatter(
                    decisions.real,
                    decisions.imag,
                    s=9,
                    alpha=0.4,
                    color="tab:red",
                    label="Решения",
                )
        ax.grid(True, alpha=0.3)
        ax.set_xlabel("In-phase")
        ax.set_ylabel("Quadrature")
        ax.set_aspect("equal", "box")
        ax.set_title("Созвездие")
        if decisions is not None and decisions.size:
            ax.legend(loc="upper right")
        plt.show()


controls_row1 = HBox([sps_slider, delay_slider, max_points_slider])
controls_row2 = HBox([timing_slider, freq_slider])
controls_row3 = HBox([resampling_checkbox, freq_checkbox, freeze_checkbox])
controls_row4 = HBox([filter_checkbox, filter_type_dropdown, filter_span_slider, filter_beta_slider])
controls_row5 = HBox([gardner_checkbox, gardner_mu_slider])
controls_row6 = HBox([decision_checkbox, modulation_dropdown])
controls_panel = VBox(
    [
        controls_row1,
        controls_row2,
        controls_row3,
        controls_row4,
        controls_row5,
        controls_row6,
    ]
)

display(controls_panel, info_box, plot_box)


def trigger_update(change=None):
    update_constellation(
        sps_slider.value,
        delay_slider.value,
        timing_slider.value,
        freq_slider.value,
        resampling_checkbox.value,
        freq_checkbox.value,
        max_points_slider.value,
        freeze_checkbox.value,
        filter_checkbox.value,
        filter_type_dropdown.value,
        filter_span_slider.value,
        filter_beta_slider.value,
        gardner_checkbox.value,
        gardner_mu_slider.value,
        decision_checkbox.value,
        modulation_dropdown.value,
    )


for widget in [
    sps_slider,
    delay_slider,
    timing_slider,
    freq_slider,
    resampling_checkbox,
    freq_checkbox,
    max_points_slider,
    freeze_checkbox,
    filter_checkbox,
    filter_type_dropdown,
    filter_span_slider,
    filter_beta_slider,
    gardner_checkbox,
    gardner_mu_slider,
    decision_checkbox,
    modulation_dropdown,
]:
    widget.observe(trigger_update, names="value")


def _toggle_filter_controls(change):
    filter_type_dropdown.disabled = not filter_checkbox.value
    filter_span_slider.disabled = not filter_checkbox.value
    filter_beta_slider.disabled = not filter_checkbox.value or filter_type_dropdown.value != "RRC (β=0.35)"


def _toggle_decision_controls(change):
    modulation_dropdown.disabled = not decision_checkbox.value


def _toggle_gardner_controls(change):
    gardner_mu_slider.disabled = not gardner_checkbox.value


filter_checkbox.observe(_toggle_filter_controls, names="value")
filter_type_dropdown.observe(_toggle_filter_controls, names="value")
decision_checkbox.observe(_toggle_decision_controls, names="value")
gardner_checkbox.observe(_toggle_gardner_controls, names="value")

_toggle_filter_controls(None)
_toggle_decision_controls(None)
_toggle_gardner_controls(None)


request_redraw = trigger_update
trigger_update()


VBox(children=(HBox(children=(IntSlider(value=4, description='SPS', max=16, min=1), IntSlider(value=0, descrip…

Output()

Output()

TraitError: The 'value' trait of an IntSlider instance expected an int, not the NoneType None.

## Как работают корректировки

- **Частотная коррекция.** Применяется множителем `exp(-j·2π·f_off·n)`, что компенсирует вращение созвездия, вызванное смещением несущей.
- **Целочисленная задержка.** Сдвигает массив отсчётов, чтобы синхронизировать начало символов.
- **Дробная задержка.** При включённом ресемплинге сигнал пересчитывается через кубическую интерполяцию (по сути, локальный многочлен Лагранжа 3-го порядка), позволяя подбирать оптимальный момент съёма выборки.
- **Фильтр соответствия.** Дополнительный RRC или сглаживающий фильтр снижает межсимвольные и шумовые искажения перед выделением символов.
- **Gardner-петля.** Автоматически подстраивает дробный сдвиг, оценивая ошибку синхронизации по среднему значению критерия Гарднера.
- **Декримирование.** Параметр `sps` выбирает каждый `sps`-й отсчёт как потенциальный символ. Ограничение `Точек` защищает от чрезмерной нагрузки при больших массивах данных.
- **Решения по созвездию.** Для QPSK/QAM можно построить жёсткие решения и оценить среднее отклонение от решётки.
- **Заморозка обновления.** Если необходимо быстро переключать параметры без перерисовки, активируется чекбокс «Заморозить обновление». Следующее обновление произойдёт после его отключения.


### Дополнительные идеи для расширения

- добавить автоматический подбор `sps` по максимуму диаграммы автокорреляции;
- реализовать загрузку через `widgets.FileUpload` для работы прямо в браузере;
- встроить фильтры (например, RRC) или эквализаторы для улучшения диаграммы;
- сохранять текущие настройки и графики для последующего анализа.
