In [None]:
# Настройка путей
SOURCE_DIR = "/kaggle/input/train-val"
TEST_DIR = "/kaggle/input/testik-20"
AUDIO_SEGMENTS_DIR = "/kaggle/working/audio_segments"
DATASET_DIR = "/kaggle/working/dataset"
TEST_OUTPUT_DIR = "/kaggle/working/test"
METADATA_CSV = "/kaggle/working/segments_metadata.csv"
TEST_METADATA_CSV = "/kaggle/working/test_labels.csv"

os.makedirs(AUDIO_SEGMENTS_DIR, exist_ok=True)
os.makedirs(DATASET_DIR, exist_ok=True)
os.makedirs(TEST_OUTPUT_DIR, exist_ok=True)

print("Папки успешно созданы!")

In [None]:
# Параметры
DURATION_SEC = 1.0
TARGET_SAMPLE_RATE = 41100
AUGMENT_AUDIO = True
NUM_SEGMENTS = 3
HOP_LENGTH = 512
TARGET_SIZE = (150, 150)

In [None]:
# Функция для добавления гауссовского шума
def add_gaussian_noise(y, min_amplitude=0.001, max_amplitude=0.015, p=0.5):
    if random.random() < p:
        amplitude = random.uniform(min_amplitude, max_amplitude)
        noise = np.random.normal(0, amplitude, len(y))
        return y + noise
    return y

In [None]:
# Общие функции обработки
def remove_dc_offset(y):
    return y - np.mean(y)

def trim_silence(y, sr, top_db=40):
    if len(y) == 0:
        return None
    y_trimmed, _ = librosa.effects.trim(y, top_db=top_db)
    return y_trimmed if len(y_trimmed) / sr >= 0.5 else None

def normalize_audio(y):
    return librosa.util.normalize(y)

def pad_or_truncate(y, target_length):
    if len(y) < target_length:
        return np.pad(y, (0, target_length - len(y)), 'constant')
    return y[:target_length]

def preprocess_audio(y, sr, augment=False):
    y = remove_dc_offset(y)
    y = trim_silence(y, sr)
    if y is None:
        return None
    y = normalize_audio(y)
    y = pad_or_truncate(y, int(DURATION_SEC * sr))

    if augment:
        y = add_gaussian_noise(y)
        y = normalize_audio(y)

    return y

def segment_audio(y, sr, num_segments=NUM_SEGMENTS):
    seg_len = int(sr * DURATION_SEC)
    hop = (len(y) - seg_len) // (num_segments - 1) if num_segments > 1 else 0
    segments = []
    for i in range(num_segments):
        start = i * hop
        segment = y[start:start + seg_len]
        if len(segment) < seg_len:
            segment = np.pad(segment, (0, seg_len - len(segment)))
        segments.append(segment)
    return segments

In [None]:
# Мел-спектограммы
def extract_log_mel_spectrogram(y, sr, n_mels=128):
    mel_spec = librosa.feature.melspectrogram(
       y=y, sr=sr, n_fft=2048, hop_length=HOP_LENGTH, n_mels=n_mels)
    log_mel = librosa.power_to_db(mel_spec, ref=np.max)  # Исправлено: используем mel_spec
    min_db = -50.0  # Пример нижнего предела
    max_db = 0.0    # Пример верхнего предела
    log_mel_clipped = np.clip(log_mel, min_db, max_db)
    return log_mel_clipped

def save_spectrogram(spectrogram, filepath, sr=TARGET_SAMPLE_RATE):
    # Создаем временный файл
    temp_path = filepath + ".temp.png"

    # Генерируем и сохраняем спектрограмму
    plt.figure(figsize=(2, 2), dpi=128)
    librosa.display.specshow(
        spectrogram,
        sr=sr,
        hop_length=HOP_LENGTH,
        x_axis='time',
        y_axis='mel',
        cmap='magma'
    )
    plt.axis('off')
    plt.tight_layout(pad=0)
    plt.savefig(temp_path, bbox_inches='tight', pad_inches=0)
    plt.close()

    # Загружаем и масштабируем изображение
    img = cv2.imread(temp_path)
    if img is not None:
        resized_img = cv2.resize(img, TARGET_SIZE, interpolation=cv2.INTER_LINEAR)
        cv2.imwrite(filepath, resized_img)

    # Удаляем временный файл
    if os.path.exists(temp_path):
        os.remove(temp_path)

In [None]:
# Обработка тренировочных
def process_files(input_folder, output_folder, class_label, num_segments=NUM_SEGMENTS):
    os.makedirs(output_folder, exist_ok=True)
    metadata = []
    for file_name in os.listdir(input_folder):
        if not file_name.lower().endswith(('.wav', '.mp3', '.flac')):
            continue
        input_path = os.path.join(input_folder, file_name)
        try:
            y, sr = librosa.load(input_path, sr=TARGET_SAMPLE_RATE)
            y = preprocess_audio(y, sr, augment=AUGMENT_AUDIO)
            if y is None:
                print(f"Skipping {file_name}: too short after trimming.")
                continue

            segments = segment_audio(y, sr, num_segments)
            patient_id = Path(file_name).stem

            for i, segment in enumerate(segments):
                segment_name = f"{patient_id}_segment_{i+1:03d}.wav"
                output_path = os.path.join(output_folder, segment_name)
                sf.write(output_path, segment, sr)

                # Создаем спектрограмму
                logmel = extract_log_mel_spectrogram(segment, sr)
                img_path = os.path.join(DATASET_DIR, str(class_label), f"{patient_id}_segment_{i+1:03d}.png")
                os.makedirs(os.path.dirname(img_path), exist_ok=True)
                save_spectrogram(logmel, img_path)

                metadata.append({
                    'segment_number': i+1,
                    'segment_filename': segment_name,
                    'spectrogram_filename': f"{patient_id}_segment_{i+1:03d}.png",
                    'original_filename': file_name,
                    'patient_id': patient_id,
                    'class': class_label,
                    'duration_sec': DURATION_SEC
                })
        except Exception as e:
            print(f"Error processing {file_name}: {e}")
    return pd.DataFrame(metadata)

def main_segmentation():
    input_folders = {
        os.path.join(SOURCE_DIR, 'PD'): 1,
        os.path.join(SOURCE_DIR, 'HC'): 0
    }
    all_metadata = []

    for folder, class_label in input_folders.items():
        output_folder = os.path.join(AUDIO_SEGMENTS_DIR, os.path.basename(folder))
        df = process_files(folder, output_folder, class_label)
        all_metadata.append(df)

    final_df = pd.concat(all_metadata, ignore_index=True)
    final_df.to_csv(METADATA_CSV, index=False)

    print(f"Metadata saved to: {METADATA_CSV}")
    print(f"Class distribution:\n{final_df['class'].value_counts()}")
    print(f"Total segments: {len(final_df)}")

In [None]:
# Обработка тестовых данных с сегментацией
def process_test_folder(folder, class_label, metadata):
    for fname in tqdm(os.listdir(folder), desc=os.path.basename(folder)):
        if not fname.lower().endswith(('.wav', '.mp3', '.flac')):
            continue
        fpath = os.path.join(folder, fname)
        try:
            y, sr = librosa.load(fpath, sr=TARGET_SAMPLE_RATE)
            y = preprocess_audio(y, sr, augment=AUGMENT_AUDIO)
            if y is None:
                print(f"── пропущено (короткий): {fname}")
                continue

            # Добавляем сегментацию для тестовых данных
            segments = segment_audio(y, sr, NUM_SEGMENTS)
            base = Path(fname).stem

            for i, segment in enumerate(segments):
                # Сохраняем в подпапку по классу
                class_dir = os.path.join(TEST_OUTPUT_DIR, str(class_label))
                os.makedirs(class_dir, exist_ok=True)
                img_name = f"{base}_segment_{i+1:03d}.png"
                img_path = os.path.join(class_dir, img_name)

                logmel = extract_log_mel_spectrogram(segment, sr)
                save_spectrogram(logmel, img_path)

                metadata.append({
                    "filename": os.path.join(str(class_label), img_name),
                    "class": class_label,
                    "segment_number": i+1,
                    "original_filename": fname
                })
        except Exception as e:
            print(f"Ошибка {fname}: {e}")

def main_test_processing():
    metadata = []
    class_map = {'PD': 1, 'HC': 0}

    for group, label in class_map.items():
        folder = os.path.join(TEST_DIR, group)
        if os.path.isdir(folder):
            process_test_folder(folder, label, metadata)
        else:
            print(f"Нет папки: {folder}")

    # Обновляем структуру DataFrame для тестовых данных
    df = pd.DataFrame(metadata)[["filename", "class", "segment_number", "original_filename"]]
    df.to_csv(TEST_METADATA_CSV, index=False)

    print(f"Спектрограммы сохранены в: {TEST_OUTPUT_DIR}")
    print(f"Метки в файле: {TEST_METADATA_CSV}")
    print(f"Class distribution:\n{df['class'].value_counts()}")
    print(f"Total test segments: {len(df)}")

In [None]:
# Запуск

if __name__ == "__main__":
    # Обработка тренировочных данных
    print("\nОбработка тренировочных данных...")
    main_segmentation()

    # Обработка тестовых данных
    print("\nОбработка тестовых данных...")
    main_test_processing()


Обработка тренировочных данных...
Metadata saved to: /kaggle/working/segments_metadata.csv
Class distribution:
class
0    111
1    108
Name: count, dtype: int64
Total segments: 219

Обработка тестовых данных...


PD: 100%|██████████| 5/5 [00:01<00:00,  4.96it/s]
HC: 100%|██████████| 5/5 [00:00<00:00,  5.89it/s]

Спектрограммы сохранены в: /kaggle/working/test
Метки в файле: /kaggle/working/test_labels.csv
Class distribution:
class
1    15
0    15
Name: count, dtype: int64
Total test segments: 30



