In [2]:
"""
Random Forest –¥–ª—è 0.1—Å –∞—É–¥–∏–æ —Å –ø–æ–¥–±–æ—Ä–æ–º –≥–∏–ø–µ—Ä–ø–∞—Ä–∞–º–µ—Ç—Ä–æ–≤ —á–µ—Ä–µ–∑ Optuna
- –£—Å—Ç–æ–π—á–∏–≤–æ–µ —á—Ç–µ–Ω–∏–µ WAV (soundfile‚Üílibrosa‚Üíscipy)
- –§–∏–∫—Å–∞—Ü–∏—è –¥–ª–∏–Ω—ã 0.1—Å –æ—Ç —Ñ–∞–∫—Ç–∏—á–µ—Å–∫–æ–≥–æ SR
- –ü—Ä–∏–∑–Ω–∞–∫–∏ MFCC+Delta –∏ —Å–ø–µ–∫—Ç—Ä–∞–ª—å–Ω—ã–µ —Å—Ç–∞—Ç–∏—Å—Ç–∏–∫–∏ (–∫–æ—Ä–æ—Ç–∫–∏–µ –æ–∫–Ω–∞)
- –ü–∞—Ä–∞–ª–ª–µ–ª—å–Ω–∞—è –æ–±—Ä–∞–±–æ—Ç–∫–∞
- –ü–æ–ª–Ω—ã–µ –æ—Ç—á—ë—Ç—ã (train –∏ test)
- Optuna: n_trials=30, StratifiedKFold(n_splits=3)
"""

import os
import sys
import numpy as np
import librosa
import soundfile as sf
from scipy.io import wavfile as scipy_wav
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, f1_score, accuracy_score
from joblib import Parallel, delayed
from tqdm import tqdm
import optuna
import warnings

warnings.filterwarnings('ignore')
optuna.logging.set_verbosity(optuna.logging.WARNING)

# ===================== –ù–ê–°–¢–†–û–ô–ö–ò =====================
ROOT_DATA_DIR = "raw_audio"
DATASETS = [
    os.path.join(ROOT_DATA_DIR, "asphalt_dry"),
    os.path.join(ROOT_DATA_DIR, "asphalt_wet"),
    os.path.join(ROOT_DATA_DIR, "cobblestones_dry"),
    os.path.join(ROOT_DATA_DIR, "cobblestones_wet"),
]
POSSIBLE_PAV_TYPES = {"asphalt", "cobblestones"}

TARGET_SR = None
SEGMENT_SECONDS = 0.1
ENERGY_THR = 1e-8

N_MFCC = 13
N_FFT = 512
HOP_LENGTH = 256
N_MELS = 40

N_JOBS = -1
MAX_FILES_PER_FOLDER = None
PRINT_FIRST_ERRORS = 10
USE_TQDM = True

N_TRIALS = 30
N_CV_SPLITS = 3

# ===================== –ê–£–î–ò–û –ò –ü–†–ò–ó–ù–ê–ö–ò =====================

def parse_label(folder_name: str):
    fl = folder_name.lower()
    for p in POSSIBLE_PAV_TYPES:
        if p in fl:
            return p
    return None


def safe_load(file_path: str, target_sr: int | None = TARGET_SR):
    try:
        y, sr = sf.read(file_path, always_2d=False)
        if y is None or (hasattr(y, '__len__') and len(y) == 0):
            raise ValueError('empty (soundfile)')
        if hasattr(y, 'ndim') and y.ndim > 1:
            y = y.mean(axis=1)
        y = y.astype(np.float32)
        if target_sr and sr != target_sr:
            y = librosa.resample(y, orig_sr=sr, target_sr=target_sr, res_type='kaiser_fast')
            sr = target_sr
        return y, sr, None
    except Exception as e1:
        last_err = f"sf:{type(e1).__name__}:{e1}"
    try:
        y, sr = librosa.load(file_path, sr=target_sr, mono=True, res_type='kaiser_fast')
        if y is None or len(y) == 0:
            raise ValueError('empty (librosa)')
        return y.astype(np.float32), sr, None
    except Exception as e2:
        last_err = last_err + f"|lb:{type(e2).__name__}:{e2}"
    try:
        sr0, y = scipy_wav.read(file_path)
        if y is None or len(y) == 0:
            raise ValueError('empty (scipy)')
        y = y.astype(np.float32)
        if y.ndim > 1:
            y = y.mean(axis=1)
        if target_sr and sr0 != target_sr:
            y = librosa.resample(y, orig_sr=sr0, target_sr=target_sr, res_type='kaiser_fast')
            sr0 = target_sr
        return y.astype(np.float32), sr0, None
    except Exception as e3:
        last_err = last_err + f"|sp:{type(e3).__name__}:{e3}"
    return None, None, last_err


def expected_samples(sr: int) -> int:
    return int(round(sr * SEGMENT_SECONDS))


def fix_length_0p1sec(y: np.ndarray, sr: int) -> np.ndarray:
    exp = expected_samples(sr)
    n = len(y)
    if n > exp:
        y = y[:exp]
    elif n < exp:
        y = np.pad(y, (0, exp - n), mode='constant')
    return y


def energy_gate(y: np.ndarray, thr: float = ENERGY_THR) -> bool:
    return (np.mean(y**2) >= thr)


def extract_features(y: np.ndarray, sr: int) -> np.ndarray:
    m = np.max(np.abs(y))
    if m > 0:
        y = y / (m + 1e-9)
    feats = []
    mfcc = librosa.feature.mfcc(
        y=y, sr=sr, n_mfcc=N_MFCC,
        n_fft=N_FFT, hop_length=HOP_LENGTH, n_mels=N_MELS
    )
    mfcc_d = librosa.feature.delta(mfcc, order=1)
    for feat in [mfcc, mfcc_d]:
        feats.extend(np.mean(feat, axis=1).tolist())
        feats.extend(np.std(feat, axis=1).tolist())
    sc = librosa.feature.spectral_centroid(y=y, sr=sr, n_fft=N_FFT, hop_length=HOP_LENGTH, center=True)[0]
    zcr = librosa.feature.zero_crossing_rate(y, frame_length=N_FFT, hop_length=HOP_LENGTH, center=True)[0]
    rms = librosa.feature.rms(y=y, frame_length=N_FFT, hop_length=HOP_LENGTH, center=True)[0]
    for arr in [sc, zcr, rms]:
        feats.append(float(np.mean(arr)))
        feats.append(float(np.std(arr)))
    v = np.array(feats, dtype=np.float32)
    v = np.nan_to_num(v, nan=0.0, posinf=0.0, neginf=0.0)
    return v


def process_file(file_path: str, label: str):
    y, sr, err = safe_load(file_path)
    if y is None:
        return None, f"{os.path.basename(file_path)} -> load_failed: {err}"
    y = fix_length_0p1sec(y, sr)
    if not energy_gate(y):
        return None, f"{os.path.basename(file_path)} -> too_silent"
    try:
        v = extract_features(y, sr)
        if not np.isfinite(v).all():
            return None, f"{os.path.basename(file_path)} -> nan_or_inf_in_features"
        return (v, label), None
    except Exception as e:
        return None, f"{os.path.basename(file_path)} -> feature_error: {type(e).__name__}: {e}"


def load_dataset():
    X, y = [], []
    for d in DATASETS:
        folder = os.path.basename(d)
        print(f"–û–±—Ä–∞–±–æ—Ç–∫–∞: {d}")
        if not os.path.exists(d):
            print("  ‚ö†Ô∏è  –ü–∞–ø–∫–∞ –Ω–µ –Ω–∞–π–¥–µ–Ω–∞, –ø—Ä–æ–ø—É—â–µ–Ω–∞")
            continue
        label = parse_label(folder)
        if label is None:
            print("  ‚ö†Ô∏è  –ù–µ —É–¥–∞–ª–æ—Å—å –æ–ø—Ä–µ–¥–µ–ª–∏—Ç—å –º–µ—Ç–∫—É, –ø—Ä–æ–ø—É—â–µ–Ω–∞")
            continue
        files = [f for f in os.listdir(d) if f.lower().endswith('.wav')]
        if len(files) == 0:
            print("  ‚ö†Ô∏è  –ù–µ—Ç .wav/.WAV —Ñ–∞–π–ª–æ–≤")
            continue
        if MAX_FILES_PER_FOLDER is not None:
            files = files[:MAX_FILES_PER_FOLDER]
        print(f"  üìÅ –§–∞–π–ª–æ–≤ –¥–ª—è –æ–±—Ä–∞–±–æ—Ç–∫–∏: {len(files)}")
        print(f"  üè∑Ô∏è  –ú–µ—Ç–∫–∞: {label}")
        paths = [os.path.join(d, f) for f in files]

        iterator = paths
        if USE_TQDM:
            iterator = tqdm(paths, desc=f"  {folder}", ncols=80, leave=False, mininterval=0.2)

        results = Parallel(n_jobs=N_JOBS)(
            delayed(process_file)(p, label) for p in iterator
        )

        ok = 0
        errs_local = []
        for res, err in results:
            if err is None:
                v, lbl = res
                X.append(v)
                y.append(lbl)
                ok += 1
            else:
                if len(errs_local) < PRINT_FIRST_ERRORS:
                    errs_local.append(err)

        print(f"  ‚úÖ –£—Å–ø–µ—à–Ω–æ –æ–±—Ä–∞–±–æ—Ç–∞–Ω–æ: {ok}/{len(paths)}")
        if errs_local:
            print("  –ü—Ä–∏–º–µ—Ä—ã –ø—Ä–∏—á–∏–Ω –æ—Ç–∫–∞–∑–∞ (–ø–µ—Ä–≤—ã–µ):")
            for e in errs_local:
                print("   -", e)

    return np.array(X, dtype=np.float32), np.array(y)

# ===================== OPTUNA =====================

def rf_optuna_search(X_tr, y_tr, n_trials=N_TRIALS, n_splits=N_CV_SPLITS):
    print(f"–ó–∞–ø—É—Å–∫ Optuna: trials={n_trials}, CV folds={n_splits}‚Ä¶")

    def objective(trial):
        n_estimators = trial.suggest_int('n_estimators', 200, 800)
        max_depth = trial.suggest_int('max_depth', 8, 40)
        min_samples_split = trial.suggest_int('min_samples_split', 2, 20)
        min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 8)
        max_features = trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
        bootstrap = trial.suggest_categorical('bootstrap', [True, False])

        model = RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            min_samples_split=min_samples_split,
            min_samples_leaf=min_samples_leaf,
            max_features=max_features,
            bootstrap=bootstrap,
            class_weight='balanced_subsample',
            n_jobs=-1,
            random_state=42
        )

        cv = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
        scores = cross_val_score(model, X_tr, y_tr, cv=cv, scoring='f1_weighted', n_jobs=-1)
        return scores.mean()

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    print(f"–õ—É—á—à–∏–µ –ø–∞—Ä–∞–º–µ—Ç—Ä—ã: {study.best_params}")
    print(f"–õ—É—á—à–∏–π F1 (CV): {study.best_value:.4f}")
    return study.best_params, study.best_value

# ===================== –û–ë–£–ß–ï–ù–ò–ï –ò –û–¶–ï–ù–ö–ê =====================

def train_and_evaluate(X: np.ndarray, y: np.ndarray):
    if len(X) == 0:
        print("‚ùå –ù–µ —É–¥–∞–ª–æ—Å—å –∏–∑–≤–ª–µ—á—å –ø—Ä–∏–∑–Ω–∞–∫–∏ –Ω–∏ –∏–∑ –æ–¥–Ω–æ–≥–æ —Ñ–∞–π–ª–∞")
        sys.exit(1)

    print(f"–í—Å–µ–≥–æ —Å–µ–≥–º–µ–Ω—Ç–æ–≤: {len(X)} | –†–∞–∑–º–µ—Ä –ø—Ä–∏–∑–Ω–∞–∫–∞: {X.shape[1]}")
    le = LabelEncoder()
    y_enc = le.fit_transform(y)
    X_tr, X_te, y_tr, y_te = train_test_split(X, y_enc, test_size=0.2, stratify=y_enc, random_state=42)

    # –û–±—è–∑–∞—Ç–µ–ª—å–Ω—ã–π Optuna-–ø–æ–¥–±–æ—Ä
    best_params, best_cv = rf_optuna_search(X_tr, y_tr, n_trials=N_TRIALS, n_splits=N_CV_SPLITS)

    # –§–∏–Ω–∞–ª—å–Ω–∞—è –º–æ–¥–µ–ª—å
    model = RandomForestClassifier(
        **best_params,
        class_weight='balanced_subsample',
        n_jobs=-1,
        random_state=42
    )

    print("–û–±—É—á–µ–Ω–∏–µ —Ñ–∏–Ω–∞–ª—å–Ω–æ–π –º–æ–¥–µ–ª–∏ Random Forest‚Ä¶")
    model.fit(X_tr, y_tr)

    # –ü—Ä–æ–≥–Ω–æ–∑—ã –∏ –æ—Ç—á—ë—Ç—ã (TRAIN/TEST)
    y_tr_pred = model.predict(X_tr)
    y_te_pred = model.predict(X_te)

    print("================= RANDOM FOREST ‚Äî –û–¢–ß–Å–¢–´ =================")
    print("–û—Ç—á—ë—Ç (TRAIN):")
    print(classification_report(y_tr, y_tr_pred, target_names=le.classes_))
    print("–û—Ç—á—ë—Ç (TEST):")
    print(classification_report(y_te, y_te_pred, target_names=le.classes_))

    f1_tr = f1_score(y_tr, y_tr_pred, average='weighted')
    f1_te = f1_score(y_te, y_te_pred, average='weighted')
    acc_tr = accuracy_score(y_tr, y_tr_pred)
    acc_te = accuracy_score(y_te, y_te_pred)

    print("–ò–¢–û–ì–û:")
    print(f"Accuracy: train={acc_tr:.4f} | test={acc_te:.4f}")
    print(f"F1-weighted: train={f1_tr:.4f} | test={f1_te:.4f}")
    print(f"–†–∞–∑–Ω–∏—Ü–∞ (train-test) –ø–æ F1: {f1_tr - f1_te:.4f}")


def main():
    if not os.path.exists(ROOT_DATA_DIR):
        print("‚ùå –£–∫–∞–∂–∏—Ç–µ –∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–π ROOT_DATA_DIR")
        print("   –¢–µ–∫—É—â–µ–µ –∑–Ω–∞—á–µ–Ω–∏–µ:", ROOT_DATA_DIR)
        sys.exit(1)
    print("–°—Ç–∞—Ä—Ç –æ–±—Ä–∞–±–æ—Ç–∫–∏ –∞—É–¥–∏–æ-—Å–µ–≥–º–µ–Ω—Ç–æ–≤ 0.1—Å‚Ä¶")
    X, y = load_dataset()
    print("–ü–µ—Ä–µ—Ö–æ–¥–∏–º –∫ –æ–±—É—á–µ–Ω–∏—é –∏ –æ—Ü–µ–Ω–∫–µ Random Forest (—Å Optuna)‚Ä¶")
    train_and_evaluate(X, y)
    print("–ì–æ—Ç–æ–≤–æ.")

if __name__ == '__main__':
    main()


–°—Ç–∞—Ä—Ç –æ–±—Ä–∞–±–æ—Ç–∫–∏ –∞—É–¥–∏–æ-—Å–µ–≥–º–µ–Ω—Ç–æ–≤ 0.1—Å‚Ä¶
–û–±—Ä–∞–±–æ—Ç–∫–∞: raw_audio/asphalt_dry
  üìÅ –§–∞–π–ª–æ–≤ –¥–ª—è –æ–±—Ä–∞–±–æ—Ç–∫–∏: 19736
  üè∑Ô∏è  –ú–µ—Ç–∫–∞: asphalt


                                                                                

  ‚úÖ –£—Å–ø–µ—à–Ω–æ –æ–±—Ä–∞–±–æ—Ç–∞–Ω–æ: 19736/19736
–û–±—Ä–∞–±–æ—Ç–∫–∞: raw_audio/asphalt_wet
  üìÅ –§–∞–π–ª–æ–≤ –¥–ª—è –æ–±—Ä–∞–±–æ—Ç–∫–∏: 19846
  üè∑Ô∏è  –ú–µ—Ç–∫–∞: asphalt


                                                                                

  ‚úÖ –£—Å–ø–µ—à–Ω–æ –æ–±—Ä–∞–±–æ—Ç–∞–Ω–æ: 19846/19846
–û–±—Ä–∞–±–æ—Ç–∫–∞: raw_audio/cobblestones_dry
  üìÅ –§–∞–π–ª–æ–≤ –¥–ª—è –æ–±—Ä–∞–±–æ—Ç–∫–∏: 15012
  üè∑Ô∏è  –ú–µ—Ç–∫–∞: cobblestones


                                                                                

  ‚úÖ –£—Å–ø–µ—à–Ω–æ –æ–±—Ä–∞–±–æ—Ç–∞–Ω–æ: 15012/15012
–û–±—Ä–∞–±–æ—Ç–∫–∞: raw_audio/cobblestones_wet
  üìÅ –§–∞–π–ª–æ–≤ –¥–ª—è –æ–±—Ä–∞–±–æ—Ç–∫–∏: 5000
  üè∑Ô∏è  –ú–µ—Ç–∫–∞: cobblestones


                                                                                

  ‚úÖ –£—Å–ø–µ—à–Ω–æ –æ–±—Ä–∞–±–æ—Ç–∞–Ω–æ: 5000/5000
–ü–µ—Ä–µ—Ö–æ–¥–∏–º –∫ –æ–±—É—á–µ–Ω–∏—é –∏ –æ—Ü–µ–Ω–∫–µ Random Forest (—Å Optuna)‚Ä¶
–í—Å–µ–≥–æ —Å–µ–≥–º–µ–Ω—Ç–æ–≤: 59594 | –†–∞–∑–º–µ—Ä –ø—Ä–∏–∑–Ω–∞–∫–∞: 58
–ó–∞–ø—É—Å–∫ Optuna: trials=30, CV folds=3‚Ä¶


  0%|          | 0/30 [00:00<?, ?it/s]

[W 2025-10-19 23:33:04,453] Trial 6 failed with parameters: {'n_estimators': 750, 'max_depth': 25, 'min_samples_split': 8, 'min_samples_leaf': 1, 'max_features': 'sqrt', 'bootstrap': False} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/home/wld-linux/miniconda3/envs/tf/lib/python3.10/site-packages/optuna/study/_optimize.py", line 201, in _run_trial
    value_or_values = func(trial)
  File "/tmp/ipykernel_14072/1336751704.py", line 237, in objective
    scores = cross_val_score(model, X_tr, y_tr, cv=cv, scoring='f1_weighted', n_jobs=-1)
  File "/home/wld-linux/miniconda3/envs/tf/lib/python3.10/site-packages/sklearn/utils/_param_validation.py", line 218, in wrapper
    return func(*args, **kwargs)
  File "/home/wld-linux/miniconda3/envs/tf/lib/python3.10/site-packages/sklearn/model_selection/_validation.py", line 677, in cross_val_score
    cv_results = cross_validate(
  File "/home/wld-linux/miniconda3/envs/tf/lib/python3.10/site-packag

KeyboardInterrupt: 