In [9]:
your_project_root = 'lab_new'

In [10]:
! pip install librosa

Defaulting to user installation because normal site-packages is not writeable
Collecting librosa
  Downloading librosa-0.11.0-py3-none-any.whl (260 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m260.7/260.7 KB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting soxr>=0.3.2
  Downloading soxr-1.0.0-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (242 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.3/242.3 KB[0m [31m12.3 MB/s[0m eta [36m0:00:00[0m
Collecting soundfile>=0.12.1
  Downloading soundfile-0.13.1-py2.py3-none-manylinux_2_28_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting audioread>=2.1.9
  Downloading audioread-3.1.0-py3-none-any.whl (23 kB)
Collecting msgpack>=1.0
  Downloading msgpack-1.1.2-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (406

In [13]:
import os
import tarfile
import urllib.request
from pathlib import Path
import random
import numpy as np
import librosa
from tqdm import tqdm

In [16]:
root_path = Path(your_project_root).expanduser()
data_root = root_path / "data"
full_data_root = data_root / "full"
sc_version = "speech_commands_v0.02"
sc_url = "https://storage.googleapis.com/download.tensorflow.org/data/speech_commands_v0.02.tar.gz"

In [None]:
LABEL_NAMES = [
    "yes", "no", "up", "down", "left",
    "right", "on", "off", "stop", "go",
    "silence", "unknown",
]
TARGET_WORDS = LABEL_NAMES[:10]   
SILENCE_LABEL = "silence"
UNKNOWN_LABEL = "unknown"

In [None]:
SR = 16000         # sampling rate
DURATION = 1.0     # duration
N_MFCC = 49        # MFCC dimension
N_FRAMES = 10      # time frames (1, 10, 49)

MAX_UNKNOWN_PER_SPLIT = 3000
N_UNKNOWN_TRAIN = MAX_UNKNOWN_PER_SPLIT
N_UNKNOWN_VAL = 400
N_UNKNOWN_TEST = 400

N_SILENCE_TRAIN = 3000
N_SILENCE_VAL = 400
N_SILENCE_TEST = 400

OUT_NPZ = data_root / "kws_12cls_mfcc_10x49.npz"

In [17]:
def download_and_extract_speech_commands():
    data_root.mkdir(parents=True, exist_ok=True)
    tar_path = data_root / f"{sc_version}.tar.gz"
    sc_root = data_root / sc_version

    if sc_root.exists():
        print(f"[INFO] data tar exists：{sc_root}")
        return sc_root

    if not tar_path.exists():
        print(f"[INFO] downloading Speech Commands v0.02 tar to：{tar_path}")
        urllib.request.urlretrieve(sc_url, tar_path)
        print(f"[INFO] download completed.")

    print(f"[INFO] extracting to：{full_data_root}")
    with tarfile.open(tar_path, "r:gz") as tar:
        tar.extractall(path=full_data_root)
    print(f"[INFO] extraction completed：{sc_root}")

    return sc_root

download_and_extract_speech_commands()

[INFO] extracting to：lab_new/data/full
[INFO] extraction completed：lab_new/data/speech_commands_v0.02


PosixPath('lab_new/data/speech_commands_v0.02')

In [19]:
def read_list_file(list_path: Path):
    s = set()
    with open(list_path, "r") as f:
        for line in f:
            line = line.strip()
            if line:
                s.add(line)
    return s


def extract_mfcc_10x49_from_array(y, sr=SR, duration=DURATION,
                                  n_mfcc=N_MFCC, n_frames=N_FRAMES):
    desired_len = int(sr * duration)
    if len(y) < desired_len:
        y = np.pad(y, (0, desired_len - len(y)))
    else:
        y = y[:desired_len]

    mfcc = librosa.feature.mfcc(
        y=y,
        sr=sr,
        n_mfcc=n_mfcc,
        n_fft=1024,
        hop_length=160
    )  # shape: (49, T)

    T = mfcc.shape[1]
    if T < n_frames:
        mfcc = np.pad(mfcc, ((0, 0), (0, n_frames - T)), mode="edge")
        T = mfcc.shape[1]

    idx = np.linspace(0, T - 1, n_frames).astype(int)
    feat = mfcc[:, idx]          # (49, 10)
    feat = feat.T.astype(np.float32)  # (10, 49)

    feat = feat[np.newaxis, ...]
    return feat


def extract_mfcc_10x49_from_file(wav_path: Path):
    y, sr = librosa.load(wav_path, sr=SR)
    return extract_mfcc_10x49_from_array(y, sr=sr)


In [None]:
val_list = read_list_file(full_data_root / "validation_list.txt")
test_list = read_list_file(full_data_root / "testing_list.txt")


label_to_idx = {name: i for i, name in enumerate(LABEL_NAMES)}
print("[INFO] label to index mapping:")
for k, v in label_to_idx.items():
    print(f"  {k:>8s} -> {v}")




[INFO] label to index mapping:
       yes -> 0
        no -> 1
        up -> 2
      down -> 3
      left -> 4
     right -> 5
        on -> 6
       off -> 7
      stop -> 8
        go -> 9
   silence -> 10
   unknown -> 11


In [21]:
X_train, y_train = [], []
X_val,   y_val   = [], []
X_test,  y_test  = [], []

In [22]:
print("\n[INFO] dealing with target words ...")
for word in TARGET_WORDS:
    word_dir = full_data_root / word
    if not word_dir.exists():
        print(f"[WARN] path does not exist, skipping: {word_dir}")
        continue

    wav_files = sorted(word_dir.glob("*.wav"))
    print(f"[INFO] {word}: {len(wav_files)} samples found")

    for wav_path in tqdm(wav_files, desc=f"{word:>5s}", ncols=80):
        rel_path = wav_path.relative_to(full_data_root).as_posix()

        if rel_path in val_list:
            subset = "val"
        elif rel_path in test_list:
            subset = "test"
        else:
            subset = "train"

        feat = extract_mfcc_10x49_from_file(wav_path)
        label_idx = label_to_idx[word]

        if subset == "train":
            X_train.append(feat)
            y_train.append(label_idx)
        elif subset == "val":
            X_val.append(feat)
            y_val.append(label_idx)
        else:
            X_test.append(feat)
            y_test.append(label_idx)


[INFO] dealing with target words ...
[INFO] yes: 4044 samples found


  yes: 100%|███████████████████████████████| 4044/4044 [00:27<00:00, 148.66it/s]


[INFO] no: 3941 samples found


   no: 100%|███████████████████████████████| 3941/3941 [00:16<00:00, 236.74it/s]


[INFO] up: 3723 samples found


   up: 100%|███████████████████████████████| 3723/3723 [00:14<00:00, 263.59it/s]


[INFO] down: 3917 samples found


 down: 100%|███████████████████████████████| 3917/3917 [00:17<00:00, 218.90it/s]


[INFO] left: 3801 samples found


 left: 100%|███████████████████████████████| 3801/3801 [00:16<00:00, 228.95it/s]


[INFO] right: 3778 samples found


right: 100%|███████████████████████████████| 3778/3778 [00:16<00:00, 235.28it/s]


[INFO] on: 3845 samples found


   on: 100%|███████████████████████████████| 3845/3845 [00:17<00:00, 220.77it/s]


[INFO] off: 3745 samples found


  off: 100%|███████████████████████████████| 3745/3745 [00:16<00:00, 220.92it/s]


[INFO] stop: 3872 samples found


 stop: 100%|███████████████████████████████| 3872/3872 [00:17<00:00, 225.03it/s]


[INFO] go: 3880 samples found


   go: 100%|███████████████████████████████| 3880/3880 [00:17<00:00, 224.64it/s]


In [23]:
print("\n[INFO] gathering unknown class samples ...")
all_dirs = [d for d in full_data_root.iterdir() if d.is_dir()]
unknown_dirs = [
    d for d in all_dirs
    if d.name not in TARGET_WORDS
    and d.name not in ["_background_noise_"]
    and not d.name.startswith(".")
]

unknown_train_files, unknown_val_files, unknown_test_files = [], [], []
for d in unknown_dirs:
    wav_files = sorted(d.glob("*.wav"))
    for wav_path in wav_files:
        rel_path = wav_path.relative_to(full_data_root).as_posix()
        if rel_path in val_list:
            unknown_val_files.append(wav_path)
        elif rel_path in test_list:
            unknown_test_files.append(wav_path)
        else:
            unknown_train_files.append(wav_path)

print(f"[INFO] unknown ori num: train={len(unknown_train_files)}, "
        f"val={len(unknown_val_files)}, test={len(unknown_test_files)}")

def sample_files(file_list, max_n):
    if len(file_list) > max_n:
        return random.sample(file_list, max_n)
    return file_list

unknown_train_files = sample_files(unknown_train_files, N_UNKNOWN_TRAIN)
unknown_val_files   = sample_files(unknown_val_files,   N_UNKNOWN_VAL)
unknown_test_files  = sample_files(unknown_test_files,  N_UNKNOWN_TEST)

print(f"[INFO] unknown sampled: train={len(unknown_train_files)}, "
        f"val={len(unknown_val_files)}, test={len(unknown_test_files)}")

unk_idx = label_to_idx[UNKNOWN_LABEL]

for split_name, file_list in [
    ("train", unknown_train_files),
    ("val",   unknown_val_files),
    ("test",  unknown_test_files),
]:
    print(f"[INFO] extracting unknown {split_name} features, total {len(file_list)}")
    for wav_path in tqdm(file_list, desc=f"unk-{split_name}", ncols=80):
        feat = extract_mfcc_10x49_from_file(wav_path)
        if split_name == "train":
            X_train.append(feat)
            y_train.append(unk_idx)
        elif split_name == "val":
            X_val.append(feat)
            y_val.append(unk_idx)
        else:
            X_test.append(feat)
            y_test.append(unk_idx)


[INFO] gathering unknown class samples ...
[INFO] unknown ori num: train=54074, val=6278, test=6931
[INFO] unknown sampled: train=3000, val=400, test=400
[INFO] extracting unknown train features, total 3000


unk-train: 100%|███████████████████████████| 3000/3000 [00:13<00:00, 229.05it/s]


[INFO] extracting unknown val features, total 400


unk-val: 100%|███████████████████████████████| 400/400 [00:01<00:00, 201.32it/s]


[INFO] extracting unknown test features, total 400


unk-test: 100%|██████████████████████████████| 400/400 [00:03<00:00, 126.49it/s]


In [24]:
print("\n[INFO] generating silence samples ...")
noise_dir = full_data_root / "_background_noise_"
noise_files = sorted(noise_dir.glob("*.wav"))
if not noise_files:
    print("[WARN] not found _background_noise_/*.wav，silence class will be empty")
    N_sil_train = N_sil_val = N_sil_test = 0
else:
    noise_clips = []
    for nf in noise_files:
        y, sr = librosa.load(nf, sr=SR)
        noise_clips.append(y)

    def gen_silence_samples(n_samples, desc):
        feats = []
        desired_len = int(SR * DURATION)
        for _ in tqdm(range(n_samples), desc=desc, ncols=80):
            y = random.choice(noise_clips)
            if len(y) > desired_len:
                start = random.randint(0, len(y) - desired_len)
                y_seg = y[start:start + desired_len]
            else:
                y_seg = np.pad(y, (0, desired_len - len(y)))
            feat = extract_mfcc_10x49_from_array(y_seg, sr=SR)
            feats.append(feat)
        return feats

    sil_idx = label_to_idx[SILENCE_LABEL]

    sil_train = gen_silence_samples(N_SILENCE_TRAIN, "sil-train")
    sil_val   = gen_silence_samples(N_SILENCE_VAL,   "sil-val")
    sil_test  = gen_silence_samples(N_SILENCE_TEST,  "sil-test")

    for feat in sil_train:
        X_train.append(feat)
        y_train.append(sil_idx)
    for feat in sil_val:
        X_val.append(feat)
        y_val.append(sil_idx)
    for feat in sil_test:
        X_test.append(feat)
        y_test.append(sil_idx)


[INFO] generating silence samples ...


sil-train: 100%|███████████████████████████| 3000/3000 [00:13<00:00, 223.53it/s]
sil-val: 100%|███████████████████████████████| 400/400 [00:01<00:00, 209.94it/s]
sil-test: 100%|██████████████████████████████| 400/400 [00:01<00:00, 261.81it/s]


In [25]:
out_npz = OUT_NPZ
X_train = np.stack(X_train, axis=0).astype(np.float32)
X_val   = np.stack(X_val,   axis=0).astype(np.float32)
X_test  = np.stack(X_test,  axis=0).astype(np.float32)

y_train = np.array(y_train, dtype=np.int64)
y_val   = np.array(y_val,   dtype=np.int64)
y_test  = np.array(y_test,  dtype=np.int64)

print("\n[INFO] dataset shapes:")
print("  X_train:", X_train.shape, "y_train:", y_train.shape)
print("  X_val  :", X_val.shape,   "y_val  :", y_val.shape)
print("  X_test :", X_test.shape,  "y_test :", y_test.shape)

np.savez_compressed(
    out_npz,
    X_train=X_train,
    y_train=y_train,
    X_valid=X_val,
    y_valid=y_val,
    X_test=X_test,
    y_test=y_test,
    label_names=np.array(LABEL_NAMES)
)
print(f"[DONE] preprocessde data saved to: {out_npz}")


[INFO] dataset shapes:
  X_train: (36769, 1, 10, 49) y_train: (36769,)
  X_val  : (4503, 1, 10, 49) y_val  : (4503,)
  X_test : (4874, 1, 10, 49) y_test : (4874,)
[DONE] preprocessde data saved to: lab_new/data/kws_12cls_mfcc_10x49.npz


In [26]:
def inspect_npz(npz_path: Path):
    print(f"\n[INFO] checking :{npz_path}")
    data = np.load(npz_path, allow_pickle=True)

    X_train = data["X_train"]
    y_train = data["y_train"]
    X_val   = data["X_valid"]
    y_val   = data["y_valid"]
    X_test  = data["X_test"]
    y_test  = data["y_test"]
    label_names = data["label_names"]

    print("\n[INFO] label_names:", label_names)
    print("\n[INFO] shape checking:")
    print("  X_train:", X_train.shape, "y_train:", y_train.shape)
    print("  X_val  :", X_val.shape,   "y_val  :", y_val.shape)
    print("  X_test :", X_test.shape,  "y_test :", y_test.shape)

    # compute label distribution
    def print_label_stats(name, y):
        unique, counts = np.unique(y, return_counts=True)
        print(f"\n  {name} label distribution:")
        for u, c in zip(unique, counts):
            lbl = label_names[u] if u < len(label_names) else "NA"
            print(f"    idx={u:2d} ({lbl:8s}): {c:6d}")

    print_label_stats("Train", y_train)
    print_label_stats("Val",   y_val)
    print_label_stats("Test",  y_test)






In [27]:
inspect_npz(OUT_NPZ) 


[INFO] checking :lab_new/data/kws_12cls_mfcc_10x49.npz

[INFO] label_names: ['yes' 'no' 'up' 'down' 'left' 'right' 'on' 'off' 'stop' 'go' 'silence'
 'unknown']

[INFO] shape checking:
  X_train: (36769, 1, 10, 49) y_train: (36769,)
  X_val  : (4503, 1, 10, 49) y_val  : (4503,)
  X_test : (4874, 1, 10, 49) y_test : (4874,)

  Train label distribution:
    idx= 0 (yes     ):   3228
    idx= 1 (no      ):   3130
    idx= 2 (up      ):   2948
    idx= 3 (down    ):   3134
    idx= 4 (left    ):   3037
    idx= 5 (right   ):   3019
    idx= 6 (on      ):   3086
    idx= 7 (off     ):   2970
    idx= 8 (stop    ):   3111
    idx= 9 (go      ):   3106
    idx=10 (silence ):   3000
    idx=11 (unknown ):   3000

  Val label distribution:
    idx= 0 (yes     ):    397
    idx= 1 (no      ):    406
    idx= 2 (up      ):    350
    idx= 3 (down    ):    377
    idx= 4 (left    ):    352
    idx= 5 (right   ):    363
    idx= 6 (on      ):    363
    idx= 7 (off     ):    373
    idx= 8 (stop   