In [None]:
import os
import numpy as np
import pandas as pd
import librosa
import random
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# === 資料增強方法 ===
def add_noise(signal, noise_level=0.005):
    noise = np.random.randn(len(signal))
    return signal + noise_level * noise

def random_gain(signal, min_gain=0.8, max_gain=1.2):
    gain = np.random.uniform(min_gain, max_gain)
    return signal * gain

def safe_time_warp(signal, rate_range=(0.8, 1.2), target_len=44100):
    try:
        rate = np.random.uniform(rate_range[0], rate_range[1])
        warped = librosa.effects.time_stretch(signal, rate=rate)
        if len(warped) > target_len:
            warped = warped[:target_len]
        else:
            warped = np.pad(warped, (0, target_len - len(warped)), mode='constant')
        return warped
    except Exception as e:
        print(f"time_warp 失敗：{e}")
        return signal

# === 特徵提取方法 ===
def extract_features(signal, sr=22050, method='logmel', delta=False, zscore=False):
    if method == 'logmel':
        feat = librosa.feature.melspectrogram(y=signal, sr=sr, n_mels=128)
        feat = np.log(feat + 1e-6)
        feat = librosa.util.fix_length(feat, size=256, axis=1)
    elif method == 'mfcc':
        feat = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=20)
    elif method == 'stft':
        D = librosa.stft(signal, n_fft=1024, hop_length=512)
        feat = np.abs(D)
        feat = np.log1p(feat) 
        feat = librosa.util.fix_length(feat, size=256, axis=1)
    else:
        raise ValueError("不支援的特徵提取方法")

    if delta:
        delta1 = librosa.feature.delta(feat)
        delta2 = librosa.feature.delta(feat, order=2)
        feat = np.vstack([feat, delta1, delta2])

    if zscore:
        feat = np.clip(feat, -3.0, 3.0)
        scaler = StandardScaler()
        feat = scaler.fit_transform(feat.T).T

    if feat.shape[0] > 128:
        feat = feat[:128, :]
    elif feat.shape[0] < 128:
        pad_width = 128 - feat.shape[0]
        feat = np.pad(feat, ((0, pad_width), (0, 0)), mode='constant')

    return feat[..., np.newaxis]

# === 蒐集資料檔案路徑 ===
def get_data_sources(base_dir="../dataset"):
    data_sources = {}
    data_sources['normal'] = os.path.join(base_dir, 'normal/normal')

    for fault_type in ['imbalance', 'horizontal-misalignment', 'vertical-misalignment']:
        fault_path = os.path.join(base_dir, fault_type, fault_type)
        for folder in os.listdir(fault_path):
            data_sources[fault_type] = os.path.join(fault_path, folder)
            break

    for align in ['overhang', 'underhang']:
        align_path = os.path.join(base_dir, align, align)
        for fault_type in os.listdir(align_path):
            fault_dir = os.path.join(align_path, fault_type)
            label = f"{align}_{fault_type}"
            for weight in os.listdir(fault_dir):
                data_sources[label] = os.path.join(fault_dir, weight)

    return data_sources

# === 收集檔案路徑並切分為 train/test 檔案清單 ===
def split_file_paths(data_sources, test_size=0.2, random_state=42):
    train_files = []
    test_files = []
    label_map = {label: idx for idx, label in enumerate(data_sources.keys())}

    for label, root_path in data_sources.items():
        files = []
        for dirpath, _, filenames in os.walk(root_path):
            for file in filenames:
                if file.endswith(".csv"):
                    files.append(os.path.join(dirpath, file))
        train, test = train_test_split(files, test_size=test_size, random_state=random_state)
        train_files.extend([(f, label_map[label]) for f in train])
        test_files.extend([(f, label_map[label]) for f in test])

    return train_files, test_files, label_map

# === 根據檔案清單收集音訊片段資料 ===
def collect_segments(file_label_pairs, segment_len=44100, sr=22050, augment=False):
    X = []
    y = []
    for file_path, label in file_label_pairs:
        try:
            data = pd.read_csv(file_path, header=None)
            signal = data.iloc[:, 7].values.astype(np.float32)
            for start in range(0, len(signal) - segment_len + 1, segment_len):
                segment = signal[start:start + segment_len]
                variants = [segment]
                if augment:
                    variants += [
                        add_noise(segment),
                        random_gain(segment),
                        safe_time_warp(segment)
                    ]
                for sig in variants:
                    X.append(sig)
                    y.append(label)

            # 補尾巴
            remainder = len(signal) % segment_len
            if remainder != 0:
                last_start = len(signal) - remainder
                segment = np.pad(signal[last_start:], (0, segment_len - remainder), mode='constant')
                variants = [segment]
                if augment:
                    variants += [
                        add_noise(segment),
                        random_gain(segment),
                        safe_time_warp(segment)
                    ]
                for sig in variants:
                    X.append(sig)
                    y.append(label)

        except Exception as e:
            print(f"錯誤：{file_path} - {e}")

    return np.array(X), np.array(y)

# === 主程式 ===
data_sources = get_data_sources()
train_files, test_files, label_map = split_file_paths(data_sources)
X_train_raw, y_train = collect_segments(train_files, augment=True)
X_test_raw, y_test = collect_segments(test_files, augment=False)

# === 特徵提取 ===
X_train_feat = np.array([
    extract_features(x, method='logmel', delta=True, zscore=True)
    for x in X_train_raw
])
X_test_feat = np.array([
    extract_features(x, method='logmel', delta=True, zscore=True)
    for x in X_test_raw
])

# === 儲存 ===
np.savez_compressed("train_data.npz", X=X_train_feat, y=y_train)
np.savez_compressed("test_data.npz", X=X_test_feat, y=y_test)