In [1]:
# imports and settings

import os
import time
import pickle
import warnings
import pandas as pd
import networkx as nx
from networkx.algorithms.coloring import greedy_color
import matplotlib.pyplot as plt
from copy import deepcopy

import numpy as np
from numpy import linalg as LA
from numpy import histogram2d

from scipy import signal
from scipy.fft import fft, fftfreq, fftshift
from scipy.signal import find_peaks, butter, filtfilt, welch, get_window
from scipy.ndimage import gaussian_filter
from scipy.io import wavfile
from scipy.stats import wasserstein_distance_nd

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics.pairwise import rbf_kernel, polynomial_kernel, linear_kernel

import utils as ut
%load_ext autoreload
%autoreload 2

# do not show warnings
warnings.filterwarnings("ignore")

print("Imports complete.")

Imports complete.
Settings: height=800, width=1400, font_size=16
Imports complete.


In [2]:
# --- Helper Functions ---

def awgn(signal, snr_linear):
    """
    Python implementation of MATLAB's awgn(sig, snr, 'measured', 'linear').
    
    Parameters:
        signal: The input signal vector
        snr_linear: The Signal-to-Noise ratio (linear scale, not dB)
    """
    # Measure signal power
    sig_power = np.mean(np.abs(signal) ** 2)
    
    # Calculate required noise power
    # SNR_linear = P_signal / P_noise  =>  P_noise = P_signal / SNR_linear
    noise_power = sig_power / snr_linear
    
    # Generate Gaussian noise
    noise = np.sqrt(noise_power) * np.random.randn(len(signal))
    
    return signal + noise



In [3]:
# Placeholders for your custom detection functions
# You will need to replace the logic inside these with your actual python implementations
def detect_itamar(rx_signal, threshold):
    # TODO: Paste logic from DetectItamar.m here
    return 0 

def detect_demon(rx_signal, threshold):
    # TODO: Paste logic from DetectDemon.m here
    return 0

In [25]:
class WelchDetector:
    def __init__(self, fs, nperseg, overlap, window, dc, crop_freq, norm_size):
        self.fs = fs
        self.nperseg = nperseg
        self.overlap = overlap
        self.window = window
        self.dc = dc
        self.crop_freq = crop_freq
        self.norm_size = norm_size
    
    def detect(self, signal, threshold, return_feature=False):
        F, T, Sxx, phasogram = ut.calc_spectrogram(signal, self.fs, nperseg=self.nperseg, percent_overlap=self.overlap, window=self.window, remove_dc=self.dc, crop_freq=self.crop_freq)
        pxx = ut.calc_welch_from_spectrogram(Sxx, normalization_window_size=self.norm_size)
        TH = np.mean(pxx) + threshold * np.std(pxx)
        detections = np.where(pxx >= TH)[0]
        if return_feature:
            return 1 if len(detections) > 0 else 0, (pxx, F, detections)
        else:
            return 1 if len(detections) > 0 else 0

In [26]:
class S2GDetector:
    def __init__(self, fs, nperseg, overlap, window, dc, crop_freq, quantization_levels):
        self.fs = fs
        self.nperseg = nperseg
        self.overlap = overlap
        self.window = window
        self.dc = dc
        self.crop_freq = crop_freq
        self.quantization_levels = quantization_levels
    
    def detect(self, signal, threshold, return_feature=False):
        F, T, Sxx, phasogram = ut.calc_spectrogram(signal, self.fs, nperseg=self.nperseg, percent_overlap=self.overlap, window=self.window, remove_dc=self.dc, crop_freq=self.crop_freq)
        K = ut.get_all_Ks(phasogram, F, n_levels=self.quantization_levels)
        th = np.mean(K) + (2+threshold) * np.std(K)
        detections = np.where(K >= th)[0]
        if return_feature:
            return 1 if len(detections) > 0 else 0, (K, F, detections)
        else:
            return 1 if len(detections) > 0 else 0

In [10]:
# --- Simulation Setup ---

# Parameters
snr_vec = np.linspace(10, 5, 50)
fc = 10e3
fs = 96e3
ts = 20
num_sim = 100
num_th = 10

# Detectors
welch_detector = WelchDetector(fs=fs, nperseg=16000, overlap=0., window='hanning', dc=20, crop_freq=4000, norm_size=5)
s2g_detector = S2GDetector(fs=fs, nperseg=16000, overlap=0., window='hanning', dc=20, crop_freq=4000, quantization_levels=10)

# Threshold vectors
th_itamar = np.linspace(0.1, 0.9, num_th)
th_demon = np.linspace(0.1, 0.9, num_th)

# Time and Signal
# Note: cast num_points to int for Python indexing
num_points = int(ts * fs)
t = np.linspace(0, ts, num_points)
sig = np.sqrt(2) * np.sin(2 * np.pi * t * fc)

# Pre-allocate storage arrays
# Shape is (SNR steps, Simulations, Threshold steps)
detect_vec_s2g = np.zeros((len(snr_vec), num_sim, num_th))
detect_vec_welch = np.zeros((len(snr_vec), num_sim, num_th))

In [13]:
# # --- Main Simulation Code ---

# # Loops
# print("Starting simulation...")

# for snr_ind, current_snr in enumerate(snr_vec):
#     # Optional: Print progress
#     # print(f"Processing SNR: {current_snr:.2f} (Index {snr_ind})")
    
#     for sim_ind in range(num_sim):
#         # Generate Noisy Signal (using 'measured' and 'linear' equivalent)
#         rx = awgn(sig, current_snr)
        
#         for th_ind in range(num_th):
#             # Run detection functions
#             detect_vec_s2g[snr_ind, sim_ind, th_ind] = s2g_detector.detect(rx, th_itamar[th_ind])
#             detect_vec_welch[snr_ind, sim_ind, th_ind] = welch_detector.detect(rx, th_demon[th_ind])

# print("Simulation complete.")

Starting simulation...
Simulation complete.


In [None]:
# pickle.dump(detect_vec_s2g, open("detect_vec_s2g.pkl", "wb"))
# pickle.dump(detect_vec_welch, open("detect_vec_welch.pkl", "wb"))

In [15]:
detect_vec_s2g = pickle.load(open("detect_vec_s2g.pkl", "rb"))
detect_vec_welch = pickle.load(open("detect_vec_welch.pkl", "rb"))

In [17]:
detect_vec_s2g.shape, detect_vec_welch.shape

((50, 100, 10), (50, 100, 10))

In [19]:
d_avg_s2g = np.mean(detect_vec_s2g, axis=1)
d_avg_welch = np.mean(detect_vec_welch, axis=1)

d_avg_s2g.shape, d_avg_welch.shape

((50, 10), (50, 10))

In [22]:
fig = make_subplots(rows=1, cols=2, subplot_titles=("Demon Detector", "S2G Detector"), horizontal_spacing=0.1, shared_xaxes=True, shared_yaxes=True)

# fig = go.Figure()
fig.add_trace(go.Heatmap(z=d_avg_welch.T, x=snr_vec, y=th_itamar, colorscale='Viridis'), row=1, col=1)
fig.add_trace(go.Heatmap(z=d_avg_s2g.T, x=snr_vec, y=th_itamar, colorscale='Viridis'), row=1, col=2)
fig.show()

In [46]:
num_points = int(ts * fs)
t = np.linspace(0, ts, num_points)
sig = np.sqrt(2) * np.sin(2 * np.pi * t * fc)
sig_F, sig_T, sig_Sxx, sig_phasogram = ut.calc_spectrogram(sig, fs, nperseg=16000, percent_overlap=0., window='hanning', remove_dc=20, crop_freq=20000)
sig_pxx = ut.calc_welch_from_spectrogram(sig_Sxx, normalization_window_size=5)

rx = awgn(sig, -1)
rx_F, rx_T, rx_Sxx, rx_phasogram = ut.calc_spectrogram(rx, fs, nperseg=16000, percent_overlap=0., window='hanning', remove_dc=20, crop_freq=20000)
rx_pxx = ut.calc_welch_from_spectrogram(rx_Sxx, normalization_window_size=5)

welch_detector = WelchDetector(fs=fs, nperseg=16000, overlap=0., window='hanning', dc=20, crop_freq=20000, norm_size=5)
s2g_detector = S2GDetector(fs=fs, nperseg=16000, overlap=0., window='hanning', dc=20, crop_freq=20000, quantization_levels=10)
welch_detection, (pxx, F, welch_detections) = welch_detector.detect(rx, th_itamar[5], return_feature=True)
s2g_detection, (K, F, s2g_detections) = s2g_detector.detect(rx, th_itamar[5], return_feature=True)

fig = make_subplots(rows=4, cols=1)
fig.add_trace(go.Scatter(x=sig_F, y=sig_pxx), row=1, col=1)
fig.add_trace(go.Scatter(x=rx_F, y=rx_pxx), row=2, col=1)

fig.add_trace(go.Scatter(x=F, y=pxx), row=3, col=1)
fig.add_trace(go.Scatter(x=F[welch_detections], y=pxx[welch_detections], mode='markers', marker=dict(color='red', size=10)), row=3, col=1)

fig.add_trace(go.Scatter(x=F, y=K), row=4, col=1)
fig.add_trace(go.Scatter(x=F[s2g_detections], y=K[s2g_detections], mode='markers', marker=dict(color='red', size=10)), row=4, col=1)  

fig.update_layout(height=800, width=1200, title_text=f"Spectral Analysis and Detections, SNR={50}dB")
fig.show()