# FastDetect

### Utils.py

In [11]:
import plotly.graph_objects as go
import logging
import os
from scipy.optimize import minimize
import math
from math import ceil, floor
import numpy as np
import matplotlib.pyplot as plt
import pickle
from tqdm import tqdm

USE_GPU = os.getenv("USE_GPU", "0") == "1"

if USE_GPU:
    try:
        import cupy as xp
        import cupyx.scipy.fft as xfft
        on_gpu = True

        asnumpy = xp.asnumpy    # CuPy → NumPy
        asarray = xp.asarray    # Python/NumPy → CuPy
    except ImportError:
        # fallback if cupy not installed
        import numpy as xp
        import scipy.fft as xfft
        on_gpu = False

        asnumpy = lambda a: a
        asarray = xp.asarray
else:
    import numpy as xp
    import scipy.fft as xfft
    on_gpu = False

    asnumpy = lambda a: a
    asarray = xp.asarray

logging.basicConfig( format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s', level=logging.INFO )
logger = logging.getLogger(__name__)
file_handler = logging.FileHandler(f'run_250828.log')
file_handler.setLevel(level=logging.INFO)
logger.addHandler(file_handler)

# -------- light helpers (backend-agnostic) --------
def wrap(x):
    return (x + xp.pi) % (2 * xp.pi) - xp.pi

def sqlist(lst):
    return xp.array([item if isinstance(item, (int, float)) else item.item() for item in lst])

def to_device(x):
    """Move/convert Python or NumPy data to the active backend array (CuPy on GPU, NumPy on CPU)."""
    return asarray(x)

def to_host(x):
    """Ensure a NumPy array on host memory (identity on CPU)."""
    return asnumpy(x)

def to_scalar(x):
    """Return a Python scalar if x is 0-d array/array-like; otherwise return x unchanged."""
    # Works for both NumPy and CuPy
    try:
        if getattr(x, "shape", None) == ():
            return x.item()
    except Exception:
        pass
    return x


def myfft(chirp_data, n, plan):
    if USE_GPU:
        return xfft.fftshift(xfft.fft(chirp_data.astype(xp.complex64), n=n, plan=plan))
    else:
        return xfft.fftshift(xfft.fft(chirp_data.astype(xp.complex64), n=n))


def optimize_1dfreq_fast(sig2, tsymbr, freq1, margin):
    def obj1(freq, xdata, ydata):
        return -xp.abs(ydata.dot(xp.exp(xdata * -1j * 2 * xp.pi * freq.item()))).item()
    result = minimize(obj1, freq1.item(), args=(tsymbr, sig2), bounds=[(freq1 - margin, freq1 + margin)]) #!!!
    return result.x[0], - result.fun / xp.sum(xp.abs(sig2))

def to_scalar_list(lst):
    """Map list of numbers/0-d arrays to pure Python scalars."""
    out = []
    for v in lst:
        try:
            if getattr(v, "shape", None) == ():
                out.append(v.item())
            else:
                out.append(v)
        except Exception:
            out.append(v)
    return out

def around(x):
    """Round to nearest integer (works for Python num or 0-d array)."""
    return round(float(to_scalar(x)))

def optimize_1dfreq(sig2, tsymbr, freq, margin):
    def obj1(xdata, ydata, freq):
        return xp.abs(ydata.dot(xp.exp(-1j * 2 * xp.pi * freq * xdata)))

    margin = 500
    val = obj1(tsymbr, sig2, freq)
    for i in range(10):
        xvals = xp.linspace(freq - margin, freq + margin, 1001)
        yvals = [obj1(tsymbr, sig2, f) for f in xvals]
        yvals = sqlist(yvals)
        freq = xvals[xp.argmax(yvals)]
        valnew = xp.max(yvals)
        if valnew < val * (1 - 1e-7):
            pltfig1(xvals, yvals, addvline=(freq,), title=f"{i=} {val=} {valnew=}").show()
        assert valnew >= val * (1 - 1e-7), f"{val=} {valnew=} {i=} {val-valnew=}"
        if abs(valnew - val) < 1e-7: margin /= 4
        val = valnew
    return freq, val / xp.sum(xp.abs(sig2))

def pltfig(datas, title = None, yaxisrange = None, modes = None, marker = None, addvline = None, addhline = None, line_dash = None, fig = None, line=None):
    """
    Plot a figure with the given data and parameters.

    Parameters:
    datas : list of tuples
        Each tuple contains two lists or array-like elements, the data for the x and y axes.
        If only y data is provided, x data will be generated using xp.arange.
    title : str, optional
        The title of the plot (default is None).
    yaxisrange : tuple, optional
        The range for the y-axis as a tuple (min, max) (default is None).
    mode : str, optional
        The mode of the plot (e.g., 'line', 'scatter') (default is None).
    marker : str, optional
        The marker style for the plot (default is None).
    addvline : float, optional
        The x-coordinate for a vertical line (default is None).
    addhline : float, optional
        The y-coordinate for a horizontal line (default is None).
    line_dash : str, optional
        The dash style for the line (default is None).
    fig : matplotlib.figure.Figure, optional
        The figure object to plot on (default is None).
    line : matplotlib.lines.Line2D, optional
        The line object for the plot (default is None).

    Returns:
    None
    """
    if fig is None: fig = go.Figure(layout_title_text=title)
    elif title is not None: fig.update_layout(title_text=title)
    if not all(len(data) == 2 for data in datas): datas = [(xp.arange(len(data)), data) for data in datas]
    if modes is None:
        modes = ['lines' for _ in datas]
    elif isinstance(modes, str):
        modes = [modes for _ in datas]
    for idx, ((xdata, ydata), mode) in enumerate(zip(datas, modes)):
        if line == None and idx == 1: line = dict(dash='dash')
        fig.add_trace(go.Scatter(x=to_scalar_list(xdata), y=to_scalar_list(ydata), mode=mode, marker=marker, line=line))
        assert len(to_scalar_list(xdata)) == len(to_scalar_list(ydata))
    pltfig_hind(addhline, addvline, line_dash, fig, yaxisrange)
    return fig



def pltfig1(xdata, ydata, title = None, yaxisrange = None, mode = None, marker = None, addvline = None, addhline = None, line_dash = None, fig = None, line=None):
    """
    Plot a figure with the given data and parameters.

    Parameters:
    xdata : list or array-like or None
        The data for the x-axis.
        If is None, and only y data is provided, x data will be generated using xp.arange.
    ydata : list or array-like
        The data for the y-axis.
    title : str, optional
        The title of the plot (default is None).
    yaxisrange : tuple, optional
        The range for the y-axis as a tuple (min, max) (default is None).
    mode : str, optional
        The mode of the plot (e.g., 'line', 'scatter') (default is None).
    marker : str, optional
        The marker style for the plot (default is None).
    addvline : float, optional
        The x-coordinate for a vertical line (default is None).
    addhline : float, optional
        The y-coordinate for a horizontal line (default is None).
    line_dash : str, optional
        The dash style for the line (default is None).
    fig : matplotlib.figure.Figure, optional
        The figure object to plot on (default is None).
    line : matplotlib.lines.Line2D, optional
        The line object for the plot (default is None).

    Returns:
    None
    """
    if xdata is None: xdata = xp.arange(len(ydata))
    if fig is None: fig = go.Figure(layout_title_text=title)
    elif title is not None: fig.update_layout(title_text=title)
    if mode is None: mode = 'lines'
    fig.add_trace(go.Scatter(x=to_scalar_list(xdata), y=to_scalar_list(ydata), mode=mode, marker=marker, line=line))
    assert len(to_scalar_list(xdata)) == len(to_scalar_list(ydata))
    pltfig_hind(addhline, addvline, line_dash, fig, yaxisrange)
    return fig

def pltfig_hind(addhline, addvline, line_dash_in, fig, yaxisrange):
    if yaxisrange: fig.update_layout(yaxis=dict(range=yaxisrange), )
    if addvline is not None:
        if line_dash_in is None:
            line_dash = ['dash' for _ in range(len(addvline))]
            line_dash[0] = 'dot'
        elif isinstance(line_dash_in, str):
            line_dash = [line_dash_in for _ in range(len(addvline))]
        else: line_dash = line_dash_in
        for x, ldash in zip(addvline, line_dash): fig.add_vline(x=to_scalar(x), line_dash=ldash)
    if addhline is not None:
        if line_dash_in is None:
            line_dash = ['dash' for _ in range(len(addhline))]
            line_dash[0] = 'dot'
        elif isinstance(line_dash_in, str):
            line_dash = [line_dash_in for _ in range(len(addhline))]
        else: line_dash = line_dash_in
        for y, ldash in zip(addhline, line_dash): fig.add_hline(y=to_scalar(y), line_dash=ldash)


### Config.py

In [3]:
# parser = argparse.ArgumentParser()
# parser.add_argument('--sf', type=int, default=10, help="Set the value of sf")
# args = parser.parse_args(args=[])

class Config:
    sf = 12
    bw = 406250
    sig_freq = 2.4e9
    preamble_len = 240
    payload_len = 70
    guess_f = -40000
    fs = 1e6
    skip_preambles = 8
    code_len = 2

    sfdpos = preamble_len + code_len
    sfdend = sfdpos + 2
    total_len = sfdend + payload_len

    cfo_range = bw // 4
    n_classes = 2 ** sf
    tsig = 2 ** sf / bw * fs  # in samples
    nsamp = around(n_classes * fs / bw)
    nsampf = (n_classes * fs / bw)

    tstandard = xp.linspace(0, nsamp / fs, nsamp + 1)[:-1]
    decode_matrix_a = xp.zeros((n_classes, nsamp), dtype=xp.complex64)
    decode_matrix_b = xp.zeros((n_classes, nsamp), dtype=xp.complex64)

    betai = bw / ((2 ** sf) / bw)
    # wflag = True
    # for code in range(n_classes):
    #     if (code-1)%4!=0 and sf>=11 and wflag:
    #         wflag = False
    #         continue
    #     nsamples = around(nsamp / n_classes * (n_classes - code))
    #     f01 = bw * (-0.5 + code / n_classes)
    #     refchirpc1 = xp.exp(-1j * 2 * xp.pi * (f01 * tstandard + 0.5 * betai * tstandard * tstandard))
    #     f02 = bw * (-1.5 + code / n_classes)
    #     refchirpc2 = xp.exp(-1j * 2 * xp.pi * (f02 * tstandard + 0.5 * betai * tstandard * tstandard))
    #     decode_matrix_a[code, :nsamples] = refchirpc1[:nsamples]
    #     if code > 0: decode_matrix_b[code, nsamples:] = refchirpc2[nsamples:]

    detect_range_pkts = 1000 # !!! TODO
    fft_n = int(fs)
    if USE_GPU:
        plan = xfft.get_fft_plan(xp.zeros(fft_n, dtype=xp.complex64))
        plan2 = xfft.get_fft_plan(xp.zeros(nsamp, dtype=xp.complex64))
    else:
        plan = None
        plan2 = None

### Reader.py

In [4]:

class SlidingComplex64Reader: # TODO reader slow unbuffered
    dtype = xp.complex64
    itemsize = 8  # complex64

    def __init__(self, file_path):
        """
        Initializes the reader for a complex64 binary file.

        Args:
            file_path (str): The path to the binary file.
        """
        self.file_path = file_path

    def get(self, start, length):
        """
        Reads a chunk of data from the file.

        Args:
            start (int): The starting index (in terms of complex64 elements).
            length (int): The number of complex64 elements to read.

        Returns:
            xp.ndarray: An array containing the requested data.
        """
        byte_start = start * self.itemsize
        byte_length = length * self.itemsize

        try:
            with open(self.file_path, 'rb') as f:
                f.seek(byte_start)
                data_bytes = f.read(byte_length)

            if len(data_bytes) != byte_length:
                raise IOError(f"Read fewer bytes than expected. Expected {byte_length}, got {len(data_bytes)}")

            # Convert the bytes to a NumPy/CuPy array
            # Note: The .frombuffer method is efficient as it creates a view of the bytes.
            # We then copy it to the appropriate device (CPU or GPU) if necessary.
            data_array = xp.frombuffer(data_bytes, dtype=self.dtype)
            return data_array

        except FileNotFoundError:
            print(f"Error: The file '{self.file_path}' was not found.")
            return None
        except Exception as e:
            print(f"An error occurred: {e}")
            return None

### Main: Start Running

In [6]:
fstart = -40971.948630148894
tstart =  4240090.873306715
file_path = "data/test_1226"

file_size = os.path.getsize(file_path)
complex64_size = xp.dtype(xp.complex64).itemsize
assert complex64_size == 8
print(f"{file_path=} Size in Number of symbols: {file_size // complex64_size // Config.nsamp}")

reader = SlidingComplex64Reader(file_path)

file_path='data/test_1226' Size in Number of symbols: 1430


### Fitcoef: 

- Polynomial fit unwrapped phase of each symbol, generating 240 quadratic coefs 
    - Does not smooth the coef results
- Compute phase difference between neighboring symbols 
    - Difference between two coefs at the tjump (time estimations from input)
    - Wrap into 2pi
    - Evaluate time difference that cause this (Use BW from input guessf, dt = dphase / estBW, estBW = Bw * (1 + estF / sigF))
    - Linear fit the time differences
    - Return new observations of tjump as a corrected coeft (len=2)


In [7]:
def fitcoef2(coeff: xp.array, coeft: xp.array, reader: SlidingComplex64Reader):
    betai = Config.bw / ((2 ** Config.sf) / Config.bw) * xp.pi # frequency slope to phase 2d slope, *pi
    coeflist = []

    for pidx in range(0, Config.preamble_len):
        
        # compute coef2d_est2: polynomial curve fitting unwrapped phase of symbol pidx
        # time: tstart to tend
        # frequency at tstart: - estbw * 0.5 + estf
        estf = xp.polyval(coeff, pidx)
        estbw = Config.bw * (1 + estf / Config.sig_freq)
        beta1 = betai * (1 + 2 * estf / Config.sig_freq)
        tstart = xp.polyval(coeft, pidx)
        tend = xp.polyval(coeft, pidx + 1)
        beta2 = 2 * xp.pi * (- estbw * 0.5 + estf) - tstart * 2 * beta1
        coef2d_est2 = xp.array([to_scalar(beta1), to_scalar(beta2), 0])

        # align 3rd parameter of coef2d_est2 to observed phase at tstart
        nsymbr_start = math.ceil(tstart * Config.fs + Config.nsamp / 8)
        nsymbr_end = math.ceil(tend * Config.fs - Config.nsamp / 8)
        nsymbr = xp.arange(math.ceil(tstart * Config.fs + Config.nsamp / 8), math.ceil(tend * Config.fs - Config.nsamp / 8))
        tsymbr = nsymbr / Config.fs

        sig0 = reader.get(nsymbr_start, nsymbr_end - nsymbr_start)
        sig1 = sig0 * xp.exp(-1j * xp.polyval(coef2d_est2, tsymbr))
        data0 = myfft(sig1, n=Config.fft_n, plan=Config.plan)
        freq1 = xp.fft.fftshift(xp.fft.fftfreq(Config.fft_n, d=1 / Config.fs))[xp.argmax(xp.abs(data0))]
        freq, valnew = optimize_1dfreq_fast(sig1, tsymbr, freq1, Config.fs / Config.fft_n * 5)
        # freqf, valnew = optimize_1dfreq(sig1, tsymbr, freq1, Config.fs / Config.fft_n * 5)
        # print(f"Initial freq offset: {freq1}, after FFT fit: {freq}, after precise fit: {freqf}, diff: {freqf - freq}")

        # adjust coef2d_est2[1] according to freq difference
        coef2d_est2[1] = 2 * xp.pi * (- estbw * 0.5 + estf + freq) - tstart * 2 * beta1
        sig2 = sig0 * xp.exp(-1j * xp.polyval(coef2d_est2, tsymbr))
        # freq, valnew = optimize_1dfreq(sig2, tsymbr, freq1, Config.fs / Config.fft_n * 5)
        # logger.warning(f"{freq=} should be zero {valnew=}")
        coef2d_est2[2] += xp.angle(sig0.dot(xp.exp(-1j * xp.polyval(coef2d_est2, tsymbr))))
        # print(f"{xp.angle(sig0.dot(xp.exp(-1j * xp.polyval(coef2d_est2, tsymbr))))} should be zero")
        coeflist.append(coef2d_est2)
    return xp.array(coeflist)



In [9]:
def fitcoef4(coeff: xp.array, coeft: xp.array, reader: SlidingComplex64Reader):
    betai = Config.bw / ((2 ** Config.sf) / Config.bw) * xp.pi # frequency slope to phase 2d slope, *pi
    coeflist = fitcoef2(coeff, coeft, reader)
    
    # plot phase difference between consecutive symbols
    phasedifflist = xp.zeros((Config.preamble_len - 1,), dtype=xp.float32)
    for pidx in range(Config.preamble_len - 1):
        tjump = xp.polyval(coeft, pidx + 1)
        phasediff = wrap(xp.polyval(coeflist[pidx + 1], tjump) - xp.polyval(coeflist[pidx], tjump))
        phasedifflist[pidx] = phasediff
    phasedifflist_unwrap = xp.unwrap(xp.array(phasedifflist))
    # pltfig1(range(Config.preamble_len - 1), phasedifflist_unwrap, title="plot phase difference between consecutive symbols").show()

    # fit a line to phase difference to estimate cfo and time drift
    tdifflist = xp.zeros_like(phasedifflist_unwrap)
    for pidx in range(Config.preamble_len - 1):
        estbw = Config.bw * (1 + xp.polyval(coeff, pidx + 0.5) / Config.sig_freq)
        tdifflist[pidx] = phasedifflist_unwrap[pidx] / 2 / xp.pi / estbw # phasediff is caused by mismatched symbol change time, -> bw mismatch. phase = 2pi * bw * dt
    xrange = xp.arange(50, len(tdifflist)) # !!! ignore first 50 points
    tdiff_coef = xp.polyfit(xrange, tdifflist[xrange], 1)
    coeft_new = coeft.copy()
    coeft_new[-2:] += tdiff_coef
    logger.warning(f"{tdiff_coef=} {coeft=} {coeft_new=} cfo ppm from time: {1 - coeft_new[0] / Config.nsampf * Config.fs} cfo: {(1 - coeft_new[0] / Config.nsampf * Config.fs) * Config.sig_freq} unwrapped phasediff so t may shift by margin {1/Config.bw}")

    return coeft_new

In [12]:
tsymblen = 2 ** Config.sf / Config.bw * (1 - fstart / Config.sig_freq)
coeff = xp.array((0, fstart))
coeft = xp.array((tsymblen, tstart / Config.fs))
coeft = fitcoef4(coeff, coeft, reader)
# coeft = fitcoef4(coeff, coeft, reader) # do this to check correctness of fitcoef4

