# Full Pipeline

This script this intents to demonstrate the full processing pipeline.


In [None]:
from pluto_sdr_pr.ioutils import SdriqSampleIO
from pluto_sdr_pr.processing import clean, fast_ambiguity
from pluto_sdr_pr.signal import ENodeB, MultiSignalStream
from scipy.constants import c

import matplotlib.pyplot as plt
import numpy as np

import os

In [None]:
reference_file_path = "data/pluto_a_ref.2021-08-13T17_56_09_570.sdriq"
surveillance_file_path = "data/pluto_b_surv.2021-08-13T17_56_12_704.sdriq"

num_frames = 40

reference_io = SdriqSampleIO(reference_file_path)
surveillance_io = SdriqSampleIO(surveillance_file_path)

enb = ENodeB(num_resource_blocks=25)

cpi = 0.5 # seconds
max_distance = 20e3
max_velocity = 1000 # meters / second

din_dimensions = (11.69, 8.27)

In [None]:
mss = MultiSignalStream()
cell_id, pss_correlations, _ = mss.start_synchronized(
    reference_io, surveillance_io, enb=enb, pss_only=True, num_frames=num_frames
)
print(f"Cell ID: {cell_id}")

In [None]:
fig = plt.figure(constrained_layout=True, figsize=din_dimensions)
fig.suptitle("PSS Korrelationsergebnisse", fontsize=16, fontweight="bold")
subfigs = fig.subfigures(len(pss_correlations), 1)
row_titles = ["Referenzkanal", "Überwachungskanal"]

for (row, subfig), corr_result in zip(enumerate(subfigs), pss_correlations):
    subfig.suptitle(f"{row_titles[row]}", fontsize=14)
    axs = subfig.subplots(1, corr_result.magnitudes.shape[0])
    for (col, ax), corr in zip(enumerate(axs), corr_result.magnitudes):
        norm_corr = (
            corr / corr_result.max_magnitude[corr_result.max_peak_index[1]]
        )
        time = np.arange(norm_corr.shape[0]) / corr_result.sample_rate

        ax.plot(time, norm_corr, "tab:red" if col == 1 else "tab:blue")
        ax.set_title(f"Sequenz {col + 1}")
        ax.set_xlabel("Zeit [s]")
        ax.set_ylabel("Korrelation [norm]")
        ax.set_xticks(
            np.linspace(0, num_frames * ENodeB.T_FRAME, num_frames // 10 + 1)
        )
        ax.set_xlim(0, num_frames * ENodeB.T_FRAME)
        ax.set_yticks(np.linspace(0, 1, 6))
        ax.set_ylim(0, 1.05)
        ax.grid()

output_dir = "docu/images/generated"
os.makedirs(output_dir, exist_ok=True)
fig.savefig(os.path.join(output_dir, "pss_correlation_results.svg"))

In [None]:
sample_rate = reference_io.sample_rate
center_freq = reference_io.center_frequency
delay_bins = int(max_distance / c * sample_rate)
freq_bins = int(max_velocity * cpi * center_freq / c)

ref_samples, surv_samples = mss.read(int(cpi * sample_rate))

amb_pre_clean = fast_ambiguity(delay_bins, freq_bins, ref_samples, surv_samples)
amb_abs_pre_clean = np.abs(amb_pre_clean)

In [None]:
surv_samples_cleaned, _ = clean(
    ref_samples,
    surv_samples,
    ambfun=lambda ref, surv: fast_ambiguity(delay_bins, freq_bins, ref, surv)
)

amb_post_clean = fast_ambiguity(
    delay_bins, freq_bins, ref_samples, surv_samples_cleaned
)
amb_abs_post_clean = np.abs(amb_post_clean)

In [None]:
x = np.arange(amb_pre_clean.shape[0])
y = np.arange(amb_pre_clean.shape[1])
X, Y = np.meshgrid(x, y) 

fig = plt.figure(figsize=(din_dimensions[0], 4))
ax = fig.add_subplot(1, 3, (1, 2))
ax.imshow(
    amb_abs_pre_clean.T,
    interpolation="nearest",
    aspect="auto",
    cmap="twilight"
)
ax.set_yticks(np.linspace(0, freq_bins, 9, endpoint=True))
ax.set_yticklabels(
    map(lambda y: f"{y - freq_bins // 2:.0f}", ax.get_yticks())
)
ax.set_xlabel("Entfernung")
ax.set_ylabel("Doppler")
ax = fig.add_subplot(1, 3, 3, projection="3d")
ax.plot_surface(
    X, Y, amb_abs_pre_clean.T / amb_abs_pre_clean.max(), cmap="inferno"
)
ax.set_xlabel("Entfernung")
ax.set_ylabel("Doppler")
fig.tight_layout()

fig = plt.figure(figsize=(din_dimensions[0], 4))
ax = fig.add_subplot(1, 3, (1, 2))
ax.imshow(
    amb_abs_post_clean.T,
    interpolation="antialiased",
    aspect="auto",
    cmap="twilight"
)
ax.set_yticks(np.linspace(0, freq_bins, 9, endpoint=True))
ax.set_yticklabels(
    map(lambda y: f"{y - freq_bins // 2:.0f}", ax.get_yticks())
)
ax.set_xlabel("Entfernung")
ax.set_ylabel("Doppler")
ax = fig.add_subplot(1, 3, 3, projection="3d")
ax.plot_surface(
    X, Y, amb_abs_post_clean.T / amb_abs_post_clean.max(), cmap="inferno"
)
ax.set_xlabel("Entfernung")
ax.set_ylabel("Doppler")
fig.tight_layout()


np.max(amb_post_clean)