In [None]:
# import basic packages
import os
import numpy as np
import wget
import sys
import gdown
import zipfile
import librosa
import pandas as pd
# in the notebook, we only can use one GPU
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [None]:
# Build the workspace and download the needed files

def create_path(path):
    if not os.path.exists(path):
        os.mkdir(path)

workspace = "./workspace_ADS_v3"
dataset_path = os.path.join(workspace, "mfg_robot")
checkpoint_path = os.path.join(workspace, "ckpt")
mfg_raw_path = os.path.join(dataset_path, "raw")
master_path = os.path.join(mfg_raw_path,"MFG-master")


create_path(workspace)
create_path(dataset_path)
create_path(checkpoint_path)
create_path(mfg_raw_path)
create_path(master_path)




In [None]:
# Process Manufacturing Dataset – Resampling Audio Files
meta_path = os.path.join(mfg_raw_path, 'MFG-master', 'meta', 'meta.csv')
audio_path = os.path.join(mfg_raw_path, 'MFG-master', 'CurrentRotationTorque')
resample_path = os.path.join(dataset_path, 'resample')
savedata_path = os.path.join(dataset_path, 'mfg-data.npy')
create_path(resample_path)


# load the meta you generated earlier
new_meta = pd.read_csv(r'C:\Users\Louis\PycharmProjects\HTS-Audio-Transformer\workspace_ADS_v3\mfg_robot\raw\MFG-master\meta\meta.csv')

# overwrite the tutorial’s meta.csv so the script picks it up
new_meta.to_csv(meta_path, index=False)

meta = np.loadtxt(meta_path , delimiter=',', dtype='str', skiprows=1)
audio_list = os.listdir(audio_path)

print("-------------Resample MFG-------------")
for f in audio_list:
    full_f = os.path.join(audio_path, f)
    resample_f = os.path.join(resample_path, f)
    if not os.path.exists(resample_f):
        os.system('sox -V1 ' + full_f + ' -r 32000 ' + resample_f)
print("-------------Resample Success-------------")


In [None]:
meta = np.loadtxt(meta_path , delimiter=',', dtype='str', skiprows=1)
print(f"Loaded {len(meta)} samples from meta.csv")


In [None]:
print("-------------Build Dataset-------------")
output_dict = [[] for _ in range(5)]
for label in meta:
    name = label[0]
    fold = label[1]
    target = label[2]
    y, sr = librosa.load(os.path.join(resample_path, name), sr = None)
    output_dict[int(fold) - 1].append(
        {
            "name": name,
            "target": int(target),
            "waveform": y
        }
    )
    
output_arr = np.array(output_dict, dtype=object)
np.save(savedata_path, output_arr)
print("-------------Success-------------")


In [None]:
full_dataset = np.load(savedata_path, allow_pickle=True)
print([len(fold) for fold in full_dataset])   # should sum to 89, e.g. [18, 18, 18, 18, 17]


In [None]:
# Load the model package
import torch
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
import warnings

from utils import create_folder, dump_config, process_idc
import mfg_config as config
from sed_model import SEDWrapper, Ensemble_SEDWrapper
from data_generator import ESC_Dataset
from model.htsat import HTSAT_Swin_Transformer



In [None]:
# Data Preparation
class data_prep(pl.LightningDataModule):
    def __init__(self, train_dataset, eval_dataset, device_num):
        super().__init__()
        self.train_dataset = train_dataset
        self.eval_dataset = eval_dataset
        self.device_num = device_num

    def train_dataloader(self):
        train_sampler = DistributedSampler(self.train_dataset, shuffle = False) if self.device_num > 1 else None
        train_loader = DataLoader(
            dataset = self.train_dataset,
            num_workers = config.num_workers,
            batch_size = config.batch_size // self.device_num,
            shuffle = False,
            sampler = train_sampler
        )
        
        return train_loader
    def val_dataloader(self):
        eval_sampler = DistributedSampler(self.eval_dataset, shuffle = False) if self.device_num > 1 else None
        eval_loader = DataLoader(
            dataset = self.eval_dataset,
            num_workers = config.num_workers,
            batch_size = config.batch_size // self.device_num,
            shuffle = False,
            sampler = eval_sampler
        )
        return eval_loader
    def test_dataloader(self):
        test_sampler = DistributedSampler(self.eval_dataset, shuffle = False) if self.device_num > 1 else None
        test_loader = DataLoader(
            dataset = self.eval_dataset,
            num_workers = config.num_workers,
            batch_size = config.batch_size // self.device_num,
            shuffle = False,
            sampler = test_sampler
        )
        return test_loader
    

In [None]:
# Set the workspace
device_num = torch.cuda.device_count()
print("device_num:", device_num)
print("each batch size:", config.batch_size // device_num)

full_dataset = np.load(os.path.join(config.dataset_path, "mfg-data.npy"), allow_pickle = True)

# set exp folder
exp_dir = os.path.join(config.workspace, "results", config.exp_name)
checkpoint_dir = os.path.join(config.workspace, "results", config.exp_name, "checkpoint")
if not config.debug:
    create_folder(os.path.join(config.workspace, "results"))
    create_folder(exp_dir)
    create_folder(checkpoint_dir)
    dump_config(config, os.path.join(exp_dir, config.exp_name), False)

print("Using ESC Dataset in data_generator.py")
dataset = ESC_Dataset(
    dataset = full_dataset,
    config = config,
    eval_mode = False
)
eval_dataset = ESC_Dataset(
    dataset = full_dataset,
    config = config,
    eval_mode = True
)

audioset_data = data_prep(dataset, eval_dataset, device_num)
checkpoint_callback = ModelCheckpoint(
    monitor = "acc",
    filename='l-{epoch:d}-{acc:.3f}',
    save_top_k = 20,
    mode = "max"
)




In [None]:
# Set the Trainer
trainer = pl.Trainer(
    deterministic=False,
    default_root_dir = checkpoint_dir,
    gpus = device_num, 
    val_check_interval = 1.0,
    max_epochs = config.max_epoch,
    auto_lr_find = True,    
    sync_batchnorm = True,
    callbacks = [checkpoint_callback],
    accelerator = "ddp" if device_num > 1 else None,
    num_sanity_val_steps = 0,
    resume_from_checkpoint = None, 
    replace_sampler_ddp = False,
    log_every_n_steps=1, 
    gradient_clip_val=1.0
)

sed_model = HTSAT_Swin_Transformer(
    spec_size=config.htsat_spec_size,
    patch_size=config.htsat_patch_size,
    in_chans=1,
    num_classes=config.classes_num,
    window_size=config.htsat_window_size,
    config = config,
    depths = config.htsat_depth,
    embed_dim = config.htsat_dim,
    patch_stride=config.htsat_stride,
    num_heads=config.htsat_num_head
)

model = SEDWrapper(
    sed_model = sed_model, 
    config = config,
    dataset = dataset
)

if config.resume_checkpoint is not None:
    print("Load Checkpoint from ", config.resume_checkpoint)
    ckpt = torch.load(config.resume_checkpoint, map_location="cpu")
    ckpt["state_dict"].pop("sed_model.head.weight")
    ckpt["state_dict"].pop("sed_model.head.bias")
    # finetune on the esc and spv2 dataset
    ckpt["state_dict"].pop("sed_model.tscam_conv.weight")
    ckpt["state_dict"].pop("sed_model.tscam_conv.bias")
    model.load_state_dict(ckpt["state_dict"], strict=False)



In [None]:
trainer.fit(
    model,
    train_dataloaders=[train_loader],
    val_dataloaders=  [val_loader],
)

In [None]:
print("Train samples:", len(audioset_data.train_dataset))

print("Train batches:", len(train_loader))



In [None]:
from torch.utils.data import DataLoader

# make absolutely sure both loaders use num_workers=0:
train_loader = DataLoader(
    audioset_data.train_dataset,
    batch_size=config.batch_size // device_num,
    shuffle=False,
    num_workers=0
)

print("Train batches:", len(train_loader))

val_loader = DataLoader(
    audioset_data.eval_dataset,
    batch_size=config.batch_size // device_num,
    shuffle=False,
    num_workers=0
)

# now pass them explicitly:
# trainer.fit(
#     model,
#     datamodule=audioset_data
# )
trainer.fit(
    model,
    train_dataloader=train_loader,   # singular
    val_dataloader= val_loader
)

In [None]:
# infer the single data to check the result
# get a model you saved
model_path = r"C:\Users\Louis\PycharmProjects\HTS-Audio-Transformer\workspace_ADS_v3\results\exp_htsat_mfg\checkpoint\lightning_logs\version_9\checkpoints\l-epoch=3-acc=1.000.ckpt"

# get the groundtruth
meta = np.loadtxt(meta_path , delimiter=',', dtype='str', skiprows=1)
gd = {}
for label in meta:
    name = label[0]
    target = label[2]
    gd[name] = target

import librosa
import torch

# class Audio_Classification:
#     def __init__(self, model_path, config):
#         # Device
#         self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 
#         # Build model
#         self.sed_model = HTSAT_Swin_Transformer(
#             spec_size=config.htsat_spec_size,
#             patch_size=config.htsat_patch_size,
#             in_chans=1,
#             num_classes=config.classes_num,
#             window_size=config.htsat_window_size,
#             config=config,
#             depths=config.htsat_depth,
#             embed_dim=config.htsat_dim,
#             patch_stride=config.htsat_stride,
#             num_heads=config.htsat_num_head
#         )
# 
#         # Load checkpoint
#         ckpt = torch.load(model_path, map_location='cpu')
#         # strip Lightning prefixes if needed
#         state_dict = {k.replace('sed_model.', ''): v for k, v in ckpt['state_dict'].items()}
#         self.sed_model.load_state_dict(state_dict, strict=False)
# 
#         # Move to device and eval mode
#         self.sed_model.to(self.device)
#         self.sed_model.eval()
# 
#         # Fixed-length settings (must match training)
#         self.SR         = 32000
#         self.TARGET_SEC = 5
#         self.fixed_len  = self.SR * self.TARGET_SEC
# 
#     def predict(self, audiofile):
#         # 1) Load & resample
#         waveform, sr = librosa.load(audiofile, sr=self.SR)
# 
#         # 2) Pad or truncate to fixed length
#         waveform = librosa.util.fix_length(waveform, size=self.fixed_len)
# 
#         # 3) To tensor & add batch + channel dims: [1,1,T]
#         x = torch.from_numpy(waveform).float().to(self.device)
#         x = x.unsqueeze(0).unsqueeze(0)
# 
#         # 4) Forward pass
#         with torch.no_grad():
#             output_dict = self.sed_model(x, None, True)
#             post = output_dict['clipwise_output'][0].cpu().numpy()
#             pred_label = int(np.argmax(post))
#             pred_prob  = float(np.max(post))
# 
#         return pred_label, pred_prob

class Audio_Classification:
    def __init__(self, model_path, config):
        # 0️⃣ Device selection
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        # 1️⃣ Build the HTS‑AT model
        self.sed_model = HTSAT_Swin_Transformer(
            spec_size    = config.htsat_spec_size,
            patch_size   = config.htsat_patch_size,
            in_chans     = 1,
            num_classes  = config.classes_num,
            window_size  = config.htsat_window_size,
            config       = config,
            depths       = config.htsat_depth,
            embed_dim    = config.htsat_dim,
            patch_stride = config.htsat_stride,
            num_heads    = config.htsat_num_head
        )

        # 2️⃣ Load checkpoint weights
        ckpt = torch.load(model_path, map_location='cpu')
        state_dict = {
            k.replace('sed_model.', ''): v
            for k, v in ckpt['state_dict'].items()
        }
        self.sed_model.load_state_dict(state_dict, strict=False)

        # 3️⃣ Disable STFT centering/padding to avoid the NotImplementedError
        try:
            self.sed_model.spectrogram_extractor.stft.center = False
        except AttributeError:
            # If your extractor structure differs, skip silently
            pass

        # 4️⃣ Move model to device and set eval mode
        self.sed_model.to(self.device)
        self.sed_model.eval()

        # 5️⃣ Fixed‑length audio settings (must match training)
        self.SR         = 32000
        self.TARGET_SEC = 5
        self.fixed_len  = self.SR * self.TARGET_SEC

    def predict(self, audiofile: str) -> (int, float):
        """
        Load an audio file, pad/truncate to fixed length,
        run it through the model, and return (label, probability).
        """
        # a) Load & resample
        waveform, sr = librosa.load(audiofile, sr=self.SR)

        # b) Pad or truncate to exactly fixed_len
        waveform = librosa.util.fix_length(waveform, size=self.fixed_len)

        # c) To tensor & add batch dimension -> [1, T]
        x = torch.from_numpy(waveform).float().to(self.device).unsqueeze(0)

        # d) Forward pass
        with torch.no_grad():
            output_dict = self.sed_model(x, None, True)
            post = output_dict['clipwise_output'][0].cpu().numpy()
            pred_label = int(np.argmax(post))
            pred_prob  = float(np.max(post))

        return pred_label, pred_prob

In [None]:
# Inference
Audiocls = Audio_Classification(model_path, config)

# pick any audio you like in the ESC-50 testing set (cross-validation)
pred_label, pred_prob = Audiocls.predict(r"C:\Users\Louis\PycharmProjects\HTS-Audio-Transformer\workspace_ADS_v3\mfg_robot\raw\MFG-master\CurrentRotationTorque\cycle_026.wav")

print('Audiocls predict output: ', pred_label, pred_prob, gd["cycle_026.wav"])

In [None]:
clf = Audio_Classification(model_path, config)
label, prob = clf.predict(audio_path)
print("Pred:", label, "prob:", prob, "GT:", true_label)


In [None]:
import os

# 1) Instantiate classifier
clf = Audio_Classification(model_path, config)

# 2) Point to a specific .wav file
test_file = os.path.join(
    r"C:\Users\Louis\PycharmProjects\HTS-Audio-Transformer",
    "workspace_ADS_v3",
    "mfg_robot",
    "raw",
    "MFG-master",
    "CurrentRotationTorque",
    "cycle_026.wav"
)

# 3) Run prediction
pred_label, pred_prob = clf.predict(test_file)

# 4) Compare to ground truth
true_label = int(gd["cycle_062.wav"])
print(f"Predicted: {pred_label} (p={pred_prob:.3f}),  True: {true_label}")
