In [4]:
!pip -q install numpy pandas soundfile librosa tensorflow tqdm

In [11]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
from pathlib import Path

DRIVE_ROOT = Path("/content/drive/MyDrive")
DRIVE_SNEEZE = DRIVE_ROOT / "sneeze_models"          # 드라이브에 있는 sneeze_model 폴더
ZIP_PATH = DRIVE_ROOT / "raw_data.zip"      # 효민님이 새로 만든 zip 파일명으로 맞추기

print("DRIVE_SNEEZE exists:", DRIVE_SNEEZE.exists(), DRIVE_SNEEZE)
print("ZIP exists:", ZIP_PATH.exists(), ZIP_PATH)

# 필수 체크
assert DRIVE_SNEEZE.exists(), "드라이브의 sneeze_model 폴더 경로가 틀렸습니다. DRIVE_SNEEZE를 수정하십시오."
assert ZIP_PATH.exists(), "zip 경로가 틀렸습니다. ZIP_PATH를 수정하십시오."


DRIVE_SNEEZE exists: True /content/drive/MyDrive/sneeze_models
ZIP exists: True /content/drive/MyDrive/raw_data.zip


In [14]:
import shutil, os

WORK = Path("/content/work")

# zip 풀기
!unzip -q "{ZIP_PATH}" -d "{WORK}"

print("WORK exists:", WORK.exists())
print("WORK contents:")
for p in sorted(WORK.iterdir()):
    print(" -", p, "(dir)" if p.is_dir() else "(file)")


WORK exists: True
WORK contents:
 - /content/work/MS-SNSD-NOISE (dir)
 - /content/work/esc-50 (dir)
 - /content/work/recordings (dir)
 - /content/work/sneeze (dir)


In [18]:
from pathlib import Path

DRIVE_DIR = Path("/content/drive/MyDrive/sneeze_models")
assert DRIVE_DIR.exists(), f"not found: {DRIVE_DIR}"

STATS_SRC = DRIVE_DIR / "norm_stats.npz"
TFLITE_SRC = DRIVE_DIR / "v3_1_model.tflite"
KERAS_SRC = DRIVE_DIR / "v3_1_model.keras"
THR_SRC = DRIVE_DIR / "v3_1_threshold.txt"

print("STATS:", STATS_SRC.exists(), STATS_SRC)
print("TFLITE:", TFLITE_SRC.exists(), TFLITE_SRC)
print("KERAS:", KERAS_SRC.exists(), KERAS_SRC)
print("THR:", THR_SRC.exists(), THR_SRC)

assert STATS_SRC.exists(), "norm_stats.npz 없음"
assert TFLITE_SRC.exists(), "v3_1_model.tflite 없음"
assert KERAS_SRC.exists(), "v3_1_model.keras 없음"
assert THR_SRC.exists(), "v3_1_threshold.txt 없음"


STATS: True /content/drive/MyDrive/sneeze_models/norm_stats.npz
TFLITE: True /content/drive/MyDrive/sneeze_models/v3_1_model.tflite
KERAS: True /content/drive/MyDrive/sneeze_models/v3_1_model.keras
THR: True /content/drive/MyDrive/sneeze_models/v3_1_threshold.txt


In [19]:
import shutil
from pathlib import Path

WORK = Path("/content/work")
OUT_DIR = WORK / "out"
OUT_DIR.mkdir(parents=True, exist_ok=True)

shutil.copy2(STATS_SRC, OUT_DIR / "norm_stats.npz")
shutil.copy2(TFLITE_SRC, OUT_DIR / "v3_1_model.tflite")
shutil.copy2(KERAS_SRC, OUT_DIR / "v3_1_model.keras")
shutil.copy2(THR_SRC, OUT_DIR / "v3_1_threshold.txt")

print("Copied to:", OUT_DIR)
!ls -lah /content/work/out


Copied to: /content/work/out
total 332K
drwxr-xr-x 3 root root 4.0K Feb 15 16:01 .
drwxr-xr-x 7 root root 4.0K Feb 15 15:59 ..
drwxr-xr-x 2 root root 4.0K Feb 15 15:59 hard_negs_v3
-rw------- 1 root root  912 Feb 15 06:32 norm_stats.npz
-rw------- 1 root root  17K Feb 15 06:32 sneeze_ds_cnn_dynamic.tflite
-rw------- 1 root root 132K Feb 15 06:32 sneeze_ds_cnn.keras
-rw------- 1 root root   19 Feb 15 06:32 threshold.txt
-rw------- 1 root root 131K Feb 15 06:32 v3_1_model.keras
-rw------- 1 root root  21K Feb 15 06:32 v3_1_model.tflite
-rw------- 1 root root   18 Feb 15 06:32 v3_1_threshold.txt


In [20]:
from pathlib import Path
WORK = Path("/content/work")
OUT_DIR = WORK / "out"

V3_STATS = OUT_DIR / "norm_stats.npz"
V3_KERAS = OUT_DIR / "v3_1_model.keras"   # 이어학습 시작점
V3_TFLITE = OUT_DIR / "v3_1_model.tflite"

print("V3_STATS:", V3_STATS.exists(), V3_STATS)
print("V3_KERAS:", V3_KERAS.exists(), V3_KERAS)
print("V3_TFLITE:", V3_TFLITE.exists(), V3_TFLITE)

assert V3_STATS.exists()
assert V3_KERAS.exists()
assert V3_TFLITE.exists()


V3_STATS: True /content/work/out/norm_stats.npz
V3_KERAS: True /content/work/out/v3_1_model.keras
V3_TFLITE: True /content/work/out/v3_1_model.tflite


In [21]:
ESC50_DIR  = WORK / "esc-50"
MS_SNSD_DIR = WORK / "MS-SNSD-NOISE"
SNEEZE_DIR = WORK / "sneeze"
REC_DIR    = WORK / "recordings"
OUT_DIR    = WORK / "out"

# v3 기준(이어학습 시작점)
V3_KERAS  = OUT_DIR / "sneeze_ds_cnn.keras"
V3_TFLITE = OUT_DIR / "sneeze_ds_cnn_dynamic.tflite"
V3_STATS  = OUT_DIR / "norm_stats.npz"

# v3.2에서 추가할 새 네거티브 파일
YAHO_WAV = REC_DIR / "yaho.wav"

print("ESC50_DIR:", ESC50_DIR.exists(), ESC50_DIR)
print("MS_SNSD_DIR:", MS_SNSD_DIR.exists(), MS_SNSD_DIR)
print("SNEEZE_DIR:", SNEEZE_DIR.exists(), SNEEZE_DIR)
print("REC_DIR:", REC_DIR.exists(), REC_DIR)
print("OUT_DIR:", OUT_DIR.exists(), OUT_DIR)

print("V3_KERAS:", V3_KERAS.exists(), V3_KERAS)
print("V3_TFLITE:", V3_TFLITE.exists(), V3_TFLITE)
print("V3_STATS:", V3_STATS.exists(), V3_STATS)
print("YAHO_WAV:", YAHO_WAV.exists(), YAHO_WAV)

assert V3_STATS.exists(), "norm_stats.npz가 없습니다. 드라이브 out 복사가 제대로 됐는지 확인하십시오."
assert YAHO_WAV.exists(), "recordings/yaho.wav가 zip에 포함되지 않았습니다. zip 구성 확인 필요."


ESC50_DIR: True /content/work/esc-50
MS_SNSD_DIR: True /content/work/MS-SNSD-NOISE
SNEEZE_DIR: True /content/work/sneeze
REC_DIR: True /content/work/recordings
OUT_DIR: True /content/work/out
V3_KERAS: True /content/work/out/sneeze_ds_cnn.keras
V3_TFLITE: True /content/work/out/sneeze_ds_cnn_dynamic.tflite
V3_STATS: True /content/work/out/norm_stats.npz
YAHO_WAV: True /content/work/recordings/yaho.wav


In [22]:
import numpy as np

OUT_DIR = WORK / "out"
STATS_PATH = OUT_DIR / "norm_stats.npz"
assert STATS_PATH.exists(), f"not found: {STATS_PATH}"

st = np.load(str(STATS_PATH), allow_pickle=True)
mu = st["mu"].astype(np.float32).reshape(-1)
sdv = st["sd"].astype(np.float32).reshape(-1)

print("stats loaded:", STATS_PATH, "mu:", mu.shape, "sd:", sdv.shape)


stats loaded: /content/work/out/norm_stats.npz mu: (64,) sd: (64,)


## 3) 노트북 셀: 네거티브 샘플링을 “새 파일에 편향”시키기

In [23]:
import random, librosa
import numpy as np

SR = 16000
CLIP_SECONDS = 2.0
CLIP_SAMPLES = int(SR * CLIP_SECONDS)

def rms(x, eps=1e-8):
    x = np.asarray(x, np.float32)
    return float(np.sqrt(np.mean(x*x) + eps))

def load_wav_mono_16k(path):
    y, _ = librosa.load(str(path), sr=SR, mono=True)
    return y.astype(np.float32)

def fix_2s(y):
    if len(y) >= CLIP_SAMPLES:
        return y[:CLIP_SAMPLES].astype(np.float32)
    return np.pad(y.astype(np.float32), (0, CLIP_SAMPLES - len(y)))

def clip_from_long(y, start_sec):
    s = int(start_sec * SR)
    e = s + CLIP_SAMPLES
    return fix_2s(y[s:e])

def sample_neg_from_long(path, n_samples, min_rms=0.003):
    y = load_wav_mono_16k(path)
    if len(y) < CLIP_SAMPLES:
        return []
    dur = len(y) / SR
    out = []
    tries = 0
    max_tries = n_samples * 40

    while len(out) < n_samples and tries < max_tries:
        tries += 1
        start = random.uniform(0, max(0.0, dur - CLIP_SECONDS))
        seg = clip_from_long(y, start)
        if rms(seg) < min_rms:
            continue
        out.append(seg)
    return out

# pos 개수 기반으로 네거티브 목표 결정
SNEEZE_DIR = WORK / "sneeze"
pos_files = sorted([p for p in SNEEZE_DIR.rglob("*.wav")])
POS_N = len(pos_files)
NEG_TARGET = POS_N * 4

# 새 파일에서 얼마나 많이 뽑을지
NEG_FROM_YAHO = int(NEG_TARGET * 0.55)  # 55%를 yaho에서
NEG_FROM_OTHERS = NEG_TARGET - NEG_FROM_YAHO

print("POS_N:", POS_N, "NEG_TARGET:", NEG_TARGET)
print("NEG_FROM_YAHO:", NEG_FROM_YAHO, "NEG_FROM_OTHERS:", NEG_FROM_OTHERS)

neg_yaho = sample_neg_from_long(YAHO_WAV, NEG_FROM_YAHO, min_rms=0.003)
print("neg_yaho:", len(neg_yaho))

ESC50_DIR = WORK / "esc-50"
MS_SNSD_DIR = WORK / "MS-SNSD-NOISE"
REC_FILES = sorted([p for p in (WORK/"recordings").rglob("*.wav") if p.name != YAHO_WAV.name])
esc_files = sorted([p for p in ESC50_DIR.rglob("*.wav")])
ms_files  = sorted([p for p in MS_SNSD_DIR.rglob("*.wav")])

def sample_neg_from_short(path):
    y = load_wav_mono_16k(path)
    return fix_2s(y)

neg_other = []

# recordings에서 일부
if len(REC_FILES) > 0:
    per_file = max(1, (NEG_FROM_OTHERS // 2) // len(REC_FILES))
    for p in REC_FILES:
        neg_other.extend(sample_neg_from_long(p, per_file, min_rms=0.003))

# esc/ms에서 일부
need = NEG_FROM_OTHERS - len(neg_other)
need = max(0, need)

take_esc = min(len(esc_files), need // 2)
take_ms  = min(len(ms_files),  need - take_esc)

for p in random.sample(esc_files, take_esc):
    neg_other.append(sample_neg_from_short(p))
for p in random.sample(ms_files, take_ms):
    neg_other.append(sample_neg_from_short(p))

# 부족하면 recordings에서 더 채움
while len(neg_other) < NEG_FROM_OTHERS and len(REC_FILES) > 0:
    p = random.choice(REC_FILES)
    neg_other.extend(sample_neg_from_long(p, 30, min_rms=0.003))
    neg_other = neg_other[:NEG_FROM_OTHERS]

print("neg_other:", len(neg_other))


POS_N: 968 NEG_TARGET: 3872
NEG_FROM_YAHO: 2129 NEG_FROM_OTHERS: 1743
neg_yaho: 2129
neg_other: 1743


## 4) 노트북 셀: 포지티브 로드 (2초 고정)

In [24]:
pos_audio = []
for p in pos_files:
    y = load_wav_mono_16k(p)
    pos_audio.append(fix_2s(y))

neg_audio = neg_yaho + neg_other

print("pos_audio:", len(pos_audio), "neg_audio:", len(neg_audio))


pos_audio: 968 neg_audio: 3872


## 5) 노트북 셀: 특징(log-mel) 추출 + v3 stats로 정규화

In [25]:
import librosa
import numpy as np

N_MELS = 64
N_FFT  = 400
HOP    = 160
TARGET_RMS = 0.1

def normalize_rms(x, target=TARGET_RMS):
    r = rms(x)
    if r > 1e-6:
        x = x * (target / (r + 1e-8))
    return np.clip(x, -1.0, 1.0).astype(np.float32)

def logmel(y):
    S = librosa.feature.melspectrogram(
        y=y, sr=SR, n_fft=N_FFT, hop_length=HOP, n_mels=N_MELS, power=2.0
    )
    return np.log(S + 1e-6).T.astype(np.float32)

def feats_list(audio_list):
    feats = []
    for y in audio_list:
        y = normalize_rms(fix_2s(y))
        f = logmel(y)
        f = (f - mu) / (sdv + 1e-6)
        feats.append(f)
    return np.array(feats, dtype=np.float32)

X_pos = feats_list(pos_audio)
X_neg = feats_list(neg_audio)

X = np.concatenate([X_pos, X_neg], axis=0)[..., None]
y = np.array([1]*len(X_pos) + [0]*len(X_neg), dtype=np.int64)

print("X:", X.shape, "y:", y.shape, "pos:", int(y.sum()), "neg:", int((y==0).sum()))


X: (4840, 201, 64, 1) y: (4840,) pos: 968 neg: 3872


## 6) 노트북 셀: 분할 + v3에서 이어학습해서 v3.2 만들기

In [26]:
from sklearn.model_selection import train_test_split
import tensorflow as tf

SEED = 42
idx = np.arange(len(y))
idx_train, idx_tmp, y_train, y_tmp = train_test_split(idx, y, test_size=0.30, random_state=SEED, stratify=y)
idx_val, idx_test, y_val, y_test = train_test_split(idx_tmp, y_tmp, test_size=0.50, random_state=SEED, stratify=y_tmp)

X_train, X_val, X_test = X[idx_train], X[idx_val], X[idx_test]
y_train, y_val, y_test = y[idx_train], y[idx_val], y[idx_test]

print("train:", X_train.shape, "pos_rate:", float(y_train.mean()))
print("val:", X_val.shape, "pos_rate:", float(y_val.mean()))
print("test:", X_test.shape, "pos_rate:", float(y_test.mean()))

V3_KERAS = OUT_DIR / "sneeze_ds_cnn.keras"  # 효민님 out 목록 기준
assert V3_KERAS.exists(), f"not found: {V3_KERAS}"

model = tf.keras.models.load_model(str(V3_KERAS))
print("loaded:", V3_KERAS)

pos = int((y_train==1).sum())
neg = int((y_train==0).sum())
class_weight = {0: 1.0, 1: neg / (pos + 1e-6)}
print("class_weight:", class_weight)

model.compile(
    optimizer=tf.keras.optimizers.Adam(5e-4),
    loss="binary_crossentropy",
    metrics=[tf.keras.metrics.AUC(name="auc"), tf.keras.metrics.Precision(name="precision"), tf.keras.metrics.Recall(name="recall")],
)

cb = [
    tf.keras.callbacks.EarlyStopping(monitor="val_auc", mode="max", patience=4, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_auc", mode="max", factor=0.5, patience=2, min_lr=1e-5),
]

history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=15,
    batch_size=64,
    class_weight=class_weight,
    callbacks=cb,
    verbose=1
)


train: (3388, 201, 64, 1) pos_rate: 0.2001180637544274
val: (726, 201, 64, 1) pos_rate: 0.19972451790633608
test: (726, 201, 64, 1) pos_rate: 0.19972451790633608
loaded: /content/work/out/sneeze_ds_cnn.keras
class_weight: {0: 1.0, 1: 3.997050141597271}
Epoch 1/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 959ms/step - auc: 0.9370 - loss: 0.5065 - precision: 0.6560 - recall: 0.8489 - val_auc: 0.9804 - val_loss: 0.2333 - val_precision: 0.6931 - val_recall: 0.9655 - learning_rate: 5.0000e-04
Epoch 2/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m83s[0m 982ms/step - auc: 0.9824 - loss: 0.2400 - precision: 0.8619 - recall: 0.9582 - val_auc: 0.9848 - val_loss: 0.2534 - val_precision: 0.6847 - val_recall: 0.9586 - learning_rate: 5.0000e-04
Epoch 3/15
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m51s[0m 957ms/step - auc: 0.9789 - loss: 0.2631 - precision: 0.8768 - recall: 0.9376 - val_auc: 0.9903 - val_loss: 0.1348 - val_precision: 0.8726 - val_r

## 7) 노트북 셀: threshold 재선정 + 저장(v3.2)

In [27]:
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix, classification_report
import numpy as np

val_prob = model.predict(X_val, verbose=0).reshape(-1)
test_prob = model.predict(X_test, verbose=0).reshape(-1)

print("val auc:", roc_auc_score(y_val, val_prob))
print("test auc:", roc_auc_score(y_test, test_prob))

fpr, tpr, thr = roc_curve(y_val, val_prob)

target_fpr = 0.01
candidates = [(t, tp, fp) for t, tp, fp in zip(thr, tpr, fpr) if fp <= target_fpr]
best_thr = 0.5 if len(candidates)==0 else sorted(candidates, key=lambda x: x[1], reverse=True)[0][0]

print("chosen threshold:", float(best_thr))

test_pred = (test_prob >= best_thr).astype(int)
cm = confusion_matrix(y_test, test_pred)
print("confusion_matrix:\n", cm)
print(classification_report(y_test, test_pred, digits=4))

SAVE_KERAS  = OUT_DIR / "v3_2_model.keras"
SAVE_TFLITE = OUT_DIR / "v3_2_model.tflite"
SAVE_THR    = OUT_DIR / "v3_2_threshold.txt"

model.save(str(SAVE_KERAS))

converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()
SAVE_TFLITE.write_bytes(tflite_model)

SAVE_THR.write_text(str(float(best_thr)))

print("saved:", SAVE_KERAS)
print("saved:", SAVE_TFLITE)
print("saved:", SAVE_THR)


val auc: 0.9977802836963617
test auc: 0.993673215027598
chosen threshold: 0.3950975239276886
confusion_matrix:
 [[571  10]
 [ 11 134]]
              precision    recall  f1-score   support

           0     0.9811    0.9828    0.9819       581
           1     0.9306    0.9241    0.9273       145

    accuracy                         0.9711       726
   macro avg     0.9558    0.9535    0.9546       726
weighted avg     0.9710    0.9711    0.9710       726

Saved artifact at '/tmp/tmpuybbpmje'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 201, 64, 1), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 1), dtype=tf.float32, name=None)
Captures:
  138536561976400: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138536561976592: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138536561978320: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138536561978128: TensorSpec(shape=(), dtype=

In [30]:
!ls /content/work/out

hard_negs_v3		      threshold.txt	  v3_2_model.keras
norm_stats.npz		      v3_1_model.keras	  v3_2_model.tflite
sneeze_ds_cnn_dynamic.tflite  v3_1_model.tflite   v3_2_threshold.txt
sneeze_ds_cnn.keras	      v3_1_threshold.txt


In [28]:
!cp -r /content/work/out /content/drive/MyDrive/sneeze_models_v3.2/

In [29]:
import numpy as np

val_prob = model.predict(X_val, verbose=0).reshape(-1)

pos = val_prob[y_val == 1]
neg = val_prob[y_val == 0]

print("VAL pos count:", len(pos), "neg count:", len(neg))
print("POS  mean/std/min/p10/median/p90/max:",
      float(pos.mean()), float(pos.std()), float(pos.min()),
      float(np.percentile(pos,10)), float(np.median(pos)),
      float(np.percentile(pos,90)), float(pos.max()))

print("NEG  mean/std/min/p10/median/p90/max:",
      float(neg.mean()), float(neg.std()), float(neg.min()),
      float(np.percentile(neg,10)), float(np.median(neg)),
      float(np.percentile(neg,90)), float(neg.max()))


VAL pos count: 145 neg count: 581
POS  mean/std/min/p10/median/p90/max: 0.9120925664901733 0.16790898144245148 0.14783266186714172 0.7078515291213989 0.9860939979553223 0.9974110722541809 0.9995041489601135
NEG  mean/std/min/p10/median/p90/max: 0.029235580936074257 0.08051810413599014 3.2762800401542336e-05 0.0015809355536475778 0.009090131148695946 0.06179283931851387 0.9890066385269165
