# Deploy `.pt` Single-Window Playground
This notebook is a standalone guide for your friend to run the deploy model in both full-audio and single-window modes, then batch-evaluate and export results.

In [None]:
from pathlib import Path
import glob
import json

import torch
import torchaudio
import pandas as pd

# ---- Parameters ----
DEPLOY_PT = Path("checkpoints/audio/deploy_single_label.pt")
AUDIO_PATH = None  # Optional override; if None, first .flac from data_root is used
START_SEC = 5.0

assert DEPLOY_PT.exists(), f"Deploy model not found: {DEPLOY_PT}"
cfg_path = DEPLOY_PT.parent / "config.json"
assert cfg_path.exists(), f"Missing checkpoint-side config: {cfg_path}"

cfg = json.load(open(cfg_path))
feature_cfg = cfg["audio"]["feature_params"]
TARGET_SR = int(feature_cfg["sampling_rate"])
CHUNK_SEC = float(feature_cfg["chunk_length_in_s"])
CHUNK_SAMPLES = int(TARGET_SR * CHUNK_SEC)
DATA_ROOT = Path(cfg["data_root"])

if AUDIO_PATH is None:
    all_files = sorted(glob.glob(str(DATA_ROOT / "**" / "*.flac"), recursive=True))
    assert all_files, f"No .flac files found under {DATA_ROOT}"
    AUDIO_PATH = Path(all_files[0])
else:
    AUDIO_PATH = Path(AUDIO_PATH)

assert AUDIO_PATH.exists(), f"Audio file not found: {AUDIO_PATH}"

deploy_model = torch.jit.load(str(DEPLOY_PT), map_location="cpu")
waveform, sr = torchaudio.load(str(AUDIO_PATH))
print(f"Loaded model: {DEPLOY_PT}")
print(f"Loaded audio: {AUDIO_PATH}")
print(f"Sample rate: {sr} | shape: {tuple(waveform.shape)}")
print(f"Config-driven window: chunk_sec={CHUNK_SEC} | chunk_samples={CHUNK_SAMPLES} | target_sr={TARGET_SR}")

Loaded model: checkpoints/audio/deploy_single_label.pt
Loaded audio: /data1/malto/therness/data/Hackathon/defect-weld/burnthrough_weld_10_12-04-22_butt_joint/12-04-22-0161-02/12-04-22-0161-02.flac
Sample rate: 16000 | shape: (1, 608000)


In [None]:
def decode_label(label_id: int) -> str:
    return "defect" if int(label_id) == 1 else "good_weld"

def run_deploy(wave: torch.Tensor):
    out = deploy_model(wave)
    label_id = int(out["label"].item())
    p_defect = float(out["p_defect"].item())
    return {
        "label_id": label_id,
        "label": decode_label(label_id),
        "p_defect": p_defect,
    }

def prepare_audio_for_windows(wave: torch.Tensor, sample_rate: int, target_sr: int | None = None):
    if target_sr is None:
        target_sr = TARGET_SR
    if sample_rate != target_sr:
        wave = torchaudio.functional.resample(wave, sample_rate, target_sr)
        sample_rate = target_sr
    if wave.shape[0] > 1:
        wave = wave.mean(dim=0, keepdim=True)
    return wave, sample_rate

def extract_window(wave: torch.Tensor, sample_rate: int, start_sec: float, chunk_samples: int | None = None):
    if chunk_samples is None:
        chunk_samples = CHUNK_SAMPLES
    start_sample = int(start_sec * sample_rate)
    end_sample = start_sample + chunk_samples
    if start_sample < 0:
        raise ValueError("start_sec must be >= 0")
    if end_sample > wave.shape[-1]:
        raise ValueError(
            f"Window [{start_sample}:{end_sample}] exceeds audio length {wave.shape[-1]}"
        )
    win = wave[:, start_sample:end_sample]
    if win.shape != (1, chunk_samples):
        raise ValueError(f"Unexpected window shape {tuple(win.shape)}")
    return win, start_sample, end_sample

In [3]:
# Mode A: full audio -> single label
wave_full, sr_full = prepare_audio_for_windows(waveform, sr, target_sr=16000)
mode_a = run_deploy(wave_full)
print("Mode A (full audio):", mode_a)

Mode A (full audio): {'label_id': 1, 'label': 'defect', 'p_defect': 0.77490234375}


In [4]:
# Mode B: one exact window at START_SEC -> single label
wave_base, sr_base = prepare_audio_for_windows(waveform, sr, target_sr=16000)
window, start_samp, end_samp = extract_window(
    wave=wave_base,
    sample_rate=sr_base,
    start_sec=START_SEC,
    chunk_samples=CHUNK_SAMPLES,
 )
mode_b = run_deploy(window)
print(f"Mode B (single window) | start_sec={START_SEC} | samples=[{start_samp}:{end_samp}] | shape={tuple(window.shape)}")
print(mode_b)

Mode B (single window) | start_sec=5.0 | samples=[80000:96000] | shape=(1, 16000)
{'label_id': 0, 'label': 'good_weld', 'p_defect': 0.16552846133708954}


In [None]:
# Batch inference over many files and time offsets
cfg = json.load(open("checkpoints/audio/config.json"))
data_root = cfg["data_root"]
files = sorted(glob.glob(str(Path(data_root) / "**" / "*.flac"), recursive=True))

offsets_sec = [0.0, 2.0, 5.0, 10.0]
records = []

for f in files[:200]:  # change/remove limit as needed
    wave_f, sr_f = torchaudio.load(f)
    wave_f, sr_f = prepare_audio_for_windows(wave_f, sr_f, target_sr=16000)

    for off in offsets_sec:
        try:
            win, s0, s1 = extract_window(wave_f, sr_f, off, CHUNK_SAMPLES)
            pred = run_deploy(win)
            records.append({
                "file": f,
                "start_sec": off,
                "start_sample": s0,
                "end_sample": s1,
                "label_id": pred["label_id"],
                "label": pred["label"],
                "p_defect": pred["p_defect"],
            })
        except Exception as e:
            records.append({
                "file": f,
                "start_sec": off,
                "error": str(e),
            })

print(f"Collected records: {len(records)}")
records[:3]

In [None]:
# Save per-window results to CSV + JSON
df = pd.DataFrame(records)
csv_path = Path("checkpoints/audio/deploy_window_scan_results.csv")
json_path = Path("checkpoints/audio/deploy_window_scan_results.json")
csv_path.parent.mkdir(parents=True, exist_ok=True)

df.to_csv(csv_path, index=False)
with open(json_path, "w") as f:
    json.dump(records, f, indent=2)

print(f"Saved CSV: {csv_path}")
print(f"Saved JSON: {json_path}")
print(df.head(5))