In [1]:
# -*- coding: utf-8 -*-
# Sine Detector: CNN (classificazione) + DSP (f0 precisa) + HNR check + Widget Tkinter
# Requisiti: pip install librosa tensorflow

import os, glob, sys, threading
import numpy as np
import librosa
import tensorflow as tf
from tensorflow.keras import layers, models

In [2]:
# ===================== Config =====================
SR_CNN = 16000
DUR_CNN = 0.5
N_SAMPLES = int(SR_CNN * DUR_CNN)
F_MIN, F_MAX = 200.0, 4000.0
DEFAULT_CLS_THRESH = 0.50
DEFAULT_F0_WINDOW = 1.00

In [3]:
# ===================== Generatori sintetici =====================
def gen_sine(freq, sr=SR_CNN, dur=DUR_CNN):
    t = np.arange(int(sr*dur)) / sr
    phase = np.random.rand() * 2*np.pi
    y = 0.9 * np.sin(2*np.pi*freq*t + phase)
    y += 0.005*np.random.randn(len(y))
    return y.astype(np.float32)

def gen_other():
    t = np.arange(N_SAMPLES) / SR_CNN
    typ = np.random.choice(["noise","square","saw"])
    if typ == "noise":
        y = 0.5*np.random.randn(N_SAMPLES)
    elif typ == "square":
        f = np.random.uniform(F_MIN, F_MAX)
        y = 0.7*np.sign(np.sin(2*np.pi*f*t))
    else:
        f = np.random.uniform(F_MIN, F_MAX)
        y = 0.5*(2*((t*f) % 1) - 1)
    y += 0.01*np.random.randn(len(y))
    return y.astype(np.float32)

In [4]:
# ===================== Feature: log-mel =====================
def to_logmel(y, sr=SR_CNN, n_mels=64):
    S = librosa.feature.melspectrogram(
        y=y, sr=sr, n_fft=1024, hop_length=256, n_mels=n_mels,
        fmin=50, fmax=sr//2
    )
    S_db = librosa.power_to_db(S, ref=np.max)
    S_min, S_max = -80.0, 0.0
    return ((S_db - S_min) / (S_max - S_min)).astype(np.float32)

def center_crop_or_pad(y, target_len):
    n = len(y)
    if n == target_len:
        return y
    if n > target_len:
        start = (n - target_len) // 2
        return y[start:start+target_len]
    pad_left = (target_len - n) // 2
    pad_right = target_len - n - pad_left
    return np.pad(y, (pad_left, pad_right))

In [5]:
# ===================== Batch sintetico =====================
def make_batch(batch_size=64, n_mels=64):
    X, y_cls = [], []
    for _ in range(batch_size):
        if np.random.rand() < 0.5:
            f = np.random.uniform(F_MIN, F_MAX)
            y = gen_sine(f)
            y_cls.append(1.0)
        else:
            y = gen_other()
            y_cls.append(0.0)
        X.append(to_logmel(y, sr=SR_CNN, n_mels=n_mels))
    X = np.expand_dims(np.array(X), -1)
    y_cls = np.array(y_cls, dtype=np.float32)
    return X, y_cls

In [6]:
# ===================== Modello CNN =====================
def build_classifier(n_mels=64):
    inp = layers.Input(shape=(n_mels, None, 1))
    x = layers.Conv2D(16, 3, activation="relu", padding="same")(inp)
    x = layers.MaxPool2D(2)(x)
    x = layers.Conv2D(32, 3, activation="relu", padding="same")(x)
    x = layers.MaxPool2D(2)(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation="relu")(x)
    out = layers.Dense(1, activation="sigmoid", name="cls")(x)
    model = models.Model(inp, out)
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model

def quick_train(model, steps=200, batch_size=64, n_mels=64):
    for _ in range(steps):
        Xb, yb = make_batch(batch_size=batch_size, n_mels=n_mels)
        model.train_on_batch(Xb, yb)

In [7]:
# ===================== I/O file =====================
def load_for_cnn(path, target_sr=SR_CNN, dur_sec=DUR_CNN):
    y, sr = librosa.load(path, sr=target_sr, mono=True)
    return center_crop_or_pad(y, int(target_sr*dur_sec))

def load_native(path):
    y, sr = librosa.load(path, sr=None, mono=True)
    return y.astype(np.float32), int(sr)


In [8]:
# ===================== DSP: f0 + HNR =====================
def estimate_f0_acf(y, sr, fmin=50.0, fmax=8000.0):
    y = librosa.util.normalize(y.astype(np.float32))
    y = y - np.mean(y)
    corr = librosa.autocorrelate(y)
    min_lag = max(1, int(sr / fmax))
    max_lag = min(len(corr)-1, int(sr / fmin))
    region = corr[min_lag:max_lag]
    k = int(np.argmax(region)) + min_lag
    f0 = sr / k if k > 0 else None
    return float(f0) if (f0 and np.isfinite(f0)) else None

def estimate_f0_fft(y, sr):
    N = len(y)
    if N < 1024: return None
    win = np.hanning(N)
    n_fft = 1 << int(np.ceil(np.log2(4*N)))
    Y = np.fft.rfft(win * y, n=n_fft)
    mag = np.abs(Y)
    k = int(np.argmax(mag[1:])) + 1
    f0 = (k * sr) / n_fft
    return float(f0) if np.isfinite(f0) else None

def estimate_f0_precise_segment(y, sr, window_sec=DEFAULT_F0_WINDOW):
    seg = center_crop_or_pad(y, int(sr*window_sec))
    f0 = estimate_f0_acf(seg, sr) or estimate_f0_fft(seg, sr)
    return f0

def compute_hnr(y, sr, f0=None):
    if f0 is None:
        return None
    S = np.abs(librosa.stft(y, n_fft=2048, hop_length=512))
    freqs = librosa.fft_frequencies(sr=sr, n_fft=2048)
    k0 = np.argmin(np.abs(freqs - f0))
    fund_energy = np.sum(S[k0-1:k0+2, :])
    total_energy = np.sum(S)
    noise_energy = total_energy - fund_energy
    if noise_energy <= 0: return 100.0
    return 10 * np.log10(fund_energy / noise_energy)

In [13]:
# ===================== Inference =====================
HNR_THRESH_DB = 20.0  # soglia consigliata: >20 dB = sinusoide "pura

def infer_file(path, model, cls_thresh=DEFAULT_CLS_THRESH, n_mels=64,
               f0_window_sec=DEFAULT_F0_WINDOW, hnr_thresh_db=HNR_THRESH_DB):
    # 1) CNN (16 kHz) → probabilità di sinusoide
    y_cnn = load_for_cnn(path, target_sr=SR_CNN, dur_sec=DUR_CNN)
    M = to_logmel(y_cnn, sr=SR_CNN, n_mels=n_mels)
    Xm = np.expand_dims(np.expand_dims(M, -1), 0)
    prob_sine = float(model.predict(Xm, verbose=0)[0,0])

    # 2) Se la CNN dice "probabile sinusoide", stimo f0 e HNR sul file nativo
    f0, hnr = None, None
    if prob_sine >= cls_thresh:
        y_nat, sr_nat = load_native(path)
        f0 = estimate_f0_precise_segment(y_nat, sr_nat, window_sec=f0_window_sec)
        if f0:
            hnr = compute_hnr(y_nat, sr_nat, f0=f0)

    # 3) Decisione finale: serve sia CNN che HNR sopra soglia
    passes_hnr = (hnr is not None) and (hnr >= hnr_thresh_db)
    is_sine_final = (prob_sine >= cls_thresh) and passes_hnr

    in_range = (f0 is not None) and (F_MIN <= f0 <= F_MAX)

    return {
        "file": path,
        "prob_sine": prob_sine,
        "freq_hz": f0,
        "hnr_db": hnr,
        "is_sine": is_sine_final,           # <- decisione filtrata da HNR
        "cnn_pass": (prob_sine >= cls_thresh),
        "hnr_pass": passes_hnr,
        "hnr_thresh_db": hnr_thresh_db,
        "in_range": in_range,
        "range_min": F_MIN,
        "range_max": F_MAX,
    }


In [None]:
# ===================== Widget Tkinter =====================
def launch_inference_widget(model, default_thresh=DEFAULT_CLS_THRESH, default_f0_win=DEFAULT_F0_WINDOW):
    import tkinter as tk
    from tkinter import filedialog, ttk
    from tkinter.scrolledtext import ScrolledText

    root = tk.Tk()
    root.title("Sine Detector — Inference (CNN + HNR)")
    root.geometry("820x520")

    controls = ttk.Frame(root, padding=10); controls.pack(fill="x")
    controls.pack(fill="x")

    # Soglia CNN
    ttk.Label(controls, text="Soglia CNN (P sine):").grid(row=0, column=0, sticky="w")
    thresh_var = tk.DoubleVar(value=default_thresh)
    ttk.Scale(controls, from_=0.0, to=1.0, orient="horizontal", variable=thresh_var, length=240)\
        .grid(row=0, column=1, padx=8)
    ttk.Entry(controls, textvariable=thresh_var, width=6).grid(row=0, column=2)

    # Finestra f0
    ttk.Label(controls, text="Finestra f0 (s):").grid(row=0, column=3, sticky="w", padx=(16,0))
    f0win_var = tk.DoubleVar(value=default_f0_win)
    ttk.Scale(controls, from_=0.25, to=2.0, orient="horizontal", variable=f0win_var, length=180)\
        .grid(row=0, column=4, padx=8)
    ttk.Entry(controls, textvariable=f0win_var, width=6).grid(row=0, column=5)
    
    # Soglia HNR
    ttk.Label(controls, text="Soglia HNR (dB):").grid(row=0, column=6, sticky="w", padx=(16,0))
    hnr_var = tk.DoubleVar(value=HNR_THRESH_DB)
    ttk.Scale(controls, from_=0.0, to=40.0, orient="horizontal", variable=hnr_var, length=180)\
        .grid(row=0, column=7, padx=8)
    ttk.Entry(controls, textvariable=hnr_var, width=6).grid(row=0, column=8)

    # Pulsanti
    def choose_files():
        paths = filedialog.askopenfilenames(
            title="Seleziona file WAV",
            filetypes=[("WAV files", "*.wav"), ("Tutti i file", "*.*")]
        )
        if not paths:
            return
        run_inference(paths)

    def clear_output():
        out_text.configure(state="normal")
        out_text.delete("1.0", tk.END)
        out_text.configure(state="disabled")

    ttk.Button(controls, text="Seleziona file WAV…", command=choose_files)\
        .grid(row=0, column=6, padx=12)
    ttk.Button(controls, text="Pulisci output", command=clear_output)\
        .grid(row=0, column=7)

    out_frame = ttk.Frame(root, padding=(10, 0, 10, 10))
    out_frame.pack(fill="both", expand=True)
    out_text = ScrolledText(out_frame, wrap="word", height=22)
    out_text.pack(fill="both", expand=True)
    out_text.configure(state="disabled")

    def log(line: str):
        out_text.configure(state="normal")
        out_text.insert(tk.END, line + "\n")
        out_text.see(tk.END)
        out_text.configure(state="disabled")

    def run_inference(paths):
        def worker():
            thr = float(thresh_var.get())
            f0w = float(f0win_var.get())
            hnr_thr = float(hnr_var.get())
            log(f"[INFO] {len(paths)} file | CNN thr={thr:.2f} | f0 win={f0w:.2f}s | HNR thr={hnr_thr:.1f} dB")
            for p in paths:
                try:
                    res = infer_file(p, model, cls_thresh=thr, n_mels=64,
                                     f0_window_sec=f0w, hnr_thresh_db=hnr_thr)
                    fname = os.path.basename(p)
                    # messaggi
                    if res["is_sine"] and res["freq_hz"] is not None:
                        tag = "IN RANGE" if res["in_range"] else "OUT OF RANGE"
                        log(f"{fname:35s} → SINUSOIDE ✓  P={res['prob_sine']:.2f}, "
                            f"f0≈{res['freq_hz']:.2f} Hz [{tag}]  HNR={res['hnr_db']:.1f} dB")
                    else:
                        # spiega perché bocciato
                        reason = []
                        if not res["cnn_pass"]: reason.append(f"P<{thr:.2f}")
                        if res["hnr_db"] is not None and not res["hnr_pass"]:
                            reason.append(f"HNR<{hnr_thr:.1f}dB ({res['hnr_db']:.1f}dB)")
                        why = " & ".join(reason) if reason else "no f0/HNR"
                        log(f"{fname:35s} → NON sinusoide ✗  ({why})")
                except Exception as e:
                    log(f"{os.path.basename(p):35s} → ERRORE: {e}")
            log("[DONE]")
        threading.Thread(target=worker, daemon=True).start()

In [11]:
model = build_classifier(n_mels=64)
quick_train(model, steps=200, batch_size=64, n_mels=64)

In [12]:
# Avvia widget
launch_inference_widget(model, default_thresh=DEFAULT_CLS_THRESH, default_f0_win=DEFAULT_F0_WINDOW)