In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import librosa
import librosa.display
from tqdm import tqdm

mel_dir = '../data_4gr/mel_image'
chroma_dir = '../data_4gr/chroma_image'
mfcc_dir = '../data_4gr/mfcc_image'

## MFCC, Chroma, Mel-spectrogram from ICBHI Audio

In [18]:
import os
import librosa
import numpy as np
import random
import tensorflow as tf
from tqdm import tqdm
import pandas as pd
import cv2

# Seed 설정
seed_value = 1
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)
tf.compat.v1.set_random_seed(seed_value)
tf.keras.utils.set_random_seed(seed_value)

# 설정값
sample_rate = 16000
desired_length = 5
n_mels = 64
nfft = 2048
hop = 512
f_max = 4000

# 파일 경로
folds_file = '../ICBHI_Dataset/patient_list_foldwise.txt'
data_dir = '../ICBHI_Dataset/audio_and_txt_files/'

def Extract_Annotation_Data(file_name, data_dir):
    tokens = file_name.split('_')
    recording_info = pd.DataFrame(data=[tokens], columns=['Patient Number', 'Recording index', 'Chest location', 'Acquisition mode', 'Recording equipment'])
    recording_annotations = pd.read_csv(os.path.join(data_dir, file_name + '.txt'), names=['Start', 'End', 'Crackles', 'Wheezes'], delimiter='\t')
    return recording_info, recording_annotations

def get_annotations(data_dir):
    filenames = [s.split('.')[0] for s in os.listdir(data_dir) if '.txt' in s]
    i_list = []
    rec_annotations_dict = {}
    for s in filenames:
        i, a = Extract_Annotation_Data(s, data_dir)
        i_list.append(i)
        rec_annotations_dict[s] = a
    return filenames, rec_annotations_dict

def slice_data(start, end, raw_data, sample_rate):
    max_ind = len(raw_data)
    start_ind = min(int(start * sample_rate), max_ind)
    end_ind = min(int(end * sample_rate), max_ind)
    return raw_data[start_ind:end_ind]

def get_label(crackle, wheeze):
    if crackle == 0 and wheeze == 0:
        return 0
    elif crackle == 1 and wheeze == 0:
        return 1
    elif crackle == 0 and wheeze == 1:
        return 2
    else:
        return 3

def get_sound_samples(recording_annotations, file_name, data_dir, sample_rate):
    sample_data = [file_name]
    data, rate = librosa.load(os.path.join(data_dir, file_name + '.wav'), sr=sample_rate)
    for i in range(len(recording_annotations.index)):
        row = recording_annotations.loc[i]
        start = row['Start']
        end = row['End']
        crackles = row['Crackles']
        wheezes = row['Wheezes']
        audio_chunk = slice_data(start, end, data, rate)
        sample_data.append((audio_chunk, start, end, get_label(crackles, wheezes)))
    return sample_data

def create_mel_raw(current_window, sample_rate, n_mels=128, f_min=50, f_max=4000, nfft=2048, hop=512, resz=1):
    S = librosa.feature.melspectrogram(y=current_window, sr=sample_rate, n_mels=n_mels, fmin=f_min, fmax=f_max, n_fft=nfft, hop_length=hop)
    S = librosa.power_to_db(S, ref=np.max)
    S = (S - S.min()) / (S.max() - S.min())
    S *= 255
    S = cv2.applyColorMap(S.astype(np.uint8), cv2.COLORMAP_JET)
    S = cv2.resize(S, (224, 224))  # Resize to a fixed size
    return S

def create_chroma(current_window, sample_rate):
    chroma = librosa.feature.chroma_stft(y=current_window, sr=sample_rate)
    chroma = (chroma - chroma.min()) / (chroma.max() - chroma.min())
    chroma *= 255
    chroma = cv2.applyColorMap(chroma.astype(np.uint8), cv2.COLORMAP_JET)
    chroma = cv2.resize(chroma, (224, 224))  # Resize to a fixed size
    return chroma

def create_mfcc(current_window, sample_rate):
    mfcc = librosa.feature.mfcc(y=current_window, sr=sample_rate, n_mfcc=13)
    mfcc = (mfcc - mfcc.min()) / (mfcc.max() - mfcc.min())
    mfcc *= 255
    mfcc = cv2.applyColorMap(mfcc.astype(np.uint8), cv2.COLORMAP_JET)
    mfcc = cv2.resize(mfcc, (224, 224))  # Resize to a fixed size
    return mfcc

def save_image(array, save_path):
    array = (array - np.min(array)) / (np.max(array) - np.min(array)) * 255
    img = Image.fromarray(array.astype(np.uint8))
    img.save(save_path)



In [19]:
filenames, rec_annotations_dict = get_annotations(data_dir)

cycle_list = []
classwise_cycle_list = [[], [], [], []]
for idx, file_name in tqdm(enumerate(filenames), desc="Extracting Individual Cycles"):
    data = get_sound_samples(rec_annotations_dict[file_name], file_name, data_dir, sample_rate)
    cycles_with_labels = [(d[0], d[3], file_name, cycle_idx, 0) for cycle_idx, d in enumerate(data[1:])]
    cycle_list.extend(cycles_with_labels)
    for cycle_idx, d in enumerate(cycles_with_labels):
        classwise_cycle_list[d[1]].append(d)

# 데이터 증강
scale = 1
aug_nos = scale * len(classwise_cycle_list[0]) - len(classwise_cycle_list[0])
for idx in range(aug_nos):
    i = random.randint(0, len(classwise_cycle_list[0]) - 1)
    j = random.randint(0, len(classwise_cycle_list[0]) - 1)
    normal_i = classwise_cycle_list[0][i]
    normal_j = classwise_cycle_list[0][j]
    new_sample = np.concatenate([normal_i[0], normal_j[0]])
    cycle_list.append((new_sample, 0, normal_i[2] + '-' + normal_j[2], idx, 1))

# 증강 함수 추가 (crackle, wheeze, both)
def augment_class(class_index, classwise_cycle_list, scale):
    aug_nos = scale * len(classwise_cycle_list[0]) - len(classwise_cycle_list[class_index])
    for idx in range(aug_nos):
        aug_prob = random.random()
        if aug_prob < 0.6:
            i = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            j = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            sample_i = classwise_cycle_list[class_index][i]
            sample_j = classwise_cycle_list[class_index][j]
        elif aug_prob >= 0.6 and aug_prob < 0.8:
            i = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            j = random.randint(0, len(classwise_cycle_list[0]) - 1)
            sample_i = classwise_cycle_list[class_index][i]
            sample_j = classwise_cycle_list[0][j]
        else:
            i = random.randint(0, len(classwise_cycle_list[0]) - 1)
            j = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            sample_i = classwise_cycle_list[0][i]
            sample_j = classwise_cycle_list[class_index][j]
        new_sample = np.concatenate([sample_i[0], sample_j[0]])
        cycle_list.append((new_sample, class_index, sample_i[2] + '-' + sample_j[2], idx, 1))

augment_class(1, classwise_cycle_list, scale)
augment_class(2, classwise_cycle_list, scale)
augment_class(3, classwise_cycle_list, scale)

# 오디오 데이터 정렬
desiredLength = 8
output = []
for idx, sample in enumerate(cycle_list):
    output_buffer_length = int(desiredLength * sample_rate)
    soundclip = sample[0].copy()
    n_samples = len(soundclip)
    if n_samples < output_buffer_length:
        t = output_buffer_length // n_samples
        d = output_buffer_length % n_samples
        d = soundclip[:d]
        repeat_sample = np.concatenate((np.tile(soundclip, t), d))
        copy_repeat_sample = repeat_sample.copy()
        output.append((copy_repeat_sample, sample[1]))
    else:
        copy_repeat_sample = soundclip[:output_buffer_length]
        output.append((copy_repeat_sample, sample[1]))
audio_data = output



Extracting Individual Cycles: 920it [00:10, 84.03it/s] 


In [20]:
# mel-spectrogram, chroma, MFCC 이미지 생성 및 저장
mel_img = []
chroma_img = []
mfcc_img = []
for index in tqdm(range(len(audio_data)), desc="Saving Images"):
    audio = audio_data[index][0]
    label = audio_data[index][1]
    mel_image = create_mel_raw(audio, sample_rate, f_max=f_max, n_mels=n_mels, nfft=nfft, hop=hop, resz=1)
    chroma_image = create_chroma(audio, sample_rate)
    mfcc_image = create_mfcc(audio, sample_rate)
    mel_img.append((mel_image, label))
    chroma_img.append((chroma_image, label))
    mfcc_img.append((mfcc_image, label))

destination_dir = '../data_4gr'
mel_dir = os.path.join(destination_dir, 'mel_image')
chroma_dir = os.path.join(destination_dir, 'chroma_image')
mfcc_dir = os.path.join(destination_dir, 'mfcc_image')

os.makedirs(mel_dir, exist_ok=True)
os.makedirs(chroma_dir, exist_ok=True)
os.makedirs(mfcc_dir, exist_ok=True)

# Create the four folders for the labels in each image type directory
for label in ['normal', 'crackle', 'wheeze', 'both']:
    os.makedirs(os.path.join(mel_dir, label), exist_ok=True)
    os.makedirs(os.path.join(chroma_dir, label), exist_ok=True)
    os.makedirs(os.path.join(mfcc_dir, label), exist_ok=True)

for i in tqdm(range(len(mel_img)), desc="Saving Images in dir"):
    input_mel = mel_img[i][0]
    input_chroma = chroma_img[i][0]
    input_mfcc = mfcc_img[i][0]
    label = mel_img[i][1]
    
    if label == 0:
        mel_save_path = os.path.join(mel_dir, 'normal', 'image_'+str(i)+'.jpg')
        chroma_save_path = os.path.join(chroma_dir, 'normal', 'image_'+str(i)+'.jpg')
        mfcc_save_path = os.path.join(mfcc_dir, 'normal', 'image_'+str(i)+'.jpg')
    elif label == 1:
        mel_save_path = os.path.join(mel_dir, 'crackle', 'image_'+str(i)+'.jpg')
        chroma_save_path = os.path.join(chroma_dir, 'crackle', 'image_'+str(i)+'.jpg')
        mfcc_save_path = os.path.join(mfcc_dir, 'crackle', 'image_'+str(i)+'.jpg')
    elif label == 2:
        mel_save_path = os.path.join(mel_dir, 'wheeze', 'image_'+str(i)+'.jpg')
        chroma_save_path = os.path.join(chroma_dir, 'wheeze', 'image_'+str(i)+'.jpg')
        mfcc_save_path = os.path.join(mfcc_dir, 'wheeze', 'image_'+str(i)+'.jpg')
    else:
        mel_save_path = os.path.join(mel_dir, 'both', 'image_'+str(i)+'.jpg')
        chroma_save_path = os.path.join(chroma_dir, 'both', 'image_'+str(i)+'.jpg')
        mfcc_save_path = os.path.join(mfcc_dir, 'both', 'image_'+str(i)+'.jpg')
    
    cv2.imwrite(mel_save_path, cv2.cvtColor(input_mel, cv2.COLOR_RGB2BGR))
    cv2.imwrite(chroma_save_path, cv2.cvtColor(input_chroma, cv2.COLOR_RGB2BGR))
    cv2.imwrite(mfcc_save_path, cv2.cvtColor(input_mfcc, cv2.COLOR_RGB2BGR))

print('Done')


  return pitch_tuning(
Saving Images:   0%|          | 3/14568 [00:00<19:05, 12.71it/s]

Saving Images: 100%|██████████| 14568/14568 [18:02<00:00, 13.46it/s]
Saving Images in dir: 100%|██████████| 14568/14568 [02:31<00:00, 95.85it/s] 

Done





## ICBHI image 분산, 표준편차

In [11]:
from torchvision import datasets, transforms
import torch

dataset = datasets.ImageFolder(root='../data_4gr/mel_image', transform=transforms.ToTensor())
loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4)

mean = 0.0
std = 0.0
nb_samples = 0

for data in loader:
    batch = data[0]
    batch_samples = batch.size(0)
    batch = batch.view(batch_samples, batch.size(1), -1)
    mean += batch.mean(2).sum(0)
    std += batch.std(2).sum(0)
    nb_samples += batch_samples

mean /= nb_samples
std /= nb_samples

print(f'Mel-spectrogram Mean: {mean}, Std: {std}')

Mean: tensor([0.7520, 0.4380, 0.1649]), Std: tensor([0.2788, 0.3732, 0.2812])


In [12]:
from torchvision import datasets, transforms
import torch

dataset = datasets.ImageFolder(root='../data_4gr/chroma_image', transform=transforms.ToTensor())
loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4)

mean = 0.0
std = 0.0
nb_samples = 0

for data in loader:
    batch = data[0]
    batch_samples = batch.size(0)
    batch = batch.view(batch_samples, batch.size(1), -1)
    mean += batch.mean(2).sum(0)
    std += batch.std(2).sum(0)
    nb_samples += batch_samples

mean /= nb_samples
std /= nb_samples

print(f'Chroma Mean: {mean}, Std: {std}')

Chroma Mean: tensor([0.2995, 0.4973, 0.6374]), Std: tensor([0.3043, 0.3505, 0.2868])


In [13]:
from torchvision import datasets, transforms
import torch

dataset = datasets.ImageFolder(root='../data_4gr/mfcc_image', transform=transforms.ToTensor())
loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4)

mean = 0.0
std = 0.0
nb_samples = 0

for data in loader:
    batch = data[0]
    batch_samples = batch.size(0)
    batch = batch.view(batch_samples, batch.size(1), -1)
    mean += batch.mean(2).sum(0)
    std += batch.std(2).sum(0)
    nb_samples += batch_samples

mean /= nb_samples
std /= nb_samples

print(f'MFCC Mean: {mean}, Std: {std}')

MFCC Mean: tensor([0.0714, 0.4974, 0.8972]), Std: tensor([0.1645, 0.2059, 0.2028])


In [14]:
from torchvision import datasets, transforms
import torch

dataset = datasets.ImageFolder(root='../data_4gr/mel_image_old', transform=transforms.ToTensor())
loader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4)

mean = 0.0
std = 0.0
nb_samples = 0

for data in loader:
    batch = data[0]
    batch_samples = batch.size(0)
    batch = batch.view(batch_samples, batch.size(1), -1)
    mean += batch.mean(2).sum(0)
    std += batch.std(2).sum(0)
    nb_samples += batch_samples

mean /= nb_samples
std /= nb_samples

print(f'Mel-spectrogram old Mean: {mean}, Std: {std}')

Mel-spectrogram old Mean: tensor([0.3416, 0.1199, 0.3481]), Std: tensor([0.2769, 0.1272, 0.1512])


## RDLINet
- remove baseline wandering
- pitch shifting
- add noise
- normalize signal
- sample rate 4000
- nfft 1024

In [3]:
import os
import librosa
import numpy as np
import random
import tensorflow as tf
from tqdm import tqdm
import pandas as pd
import cv2
import cmapy

# Seed 설정
seed_value = 1
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)
tf.compat.v1.set_random_seed(seed_value)
tf.keras.utils.set_random_seed(seed_value)

# 설정값
sample_rate = 16000 #4000
desired_length = 5
n_mels = 64
nfft = 2048 #1024
hop = 512
f_max = 4000

# 파일 경로
folds_file = '../ICBHI_Dataset/patient_list_foldwise.txt'
data_dir = '../ICBHI_Dataset/audio_and_txt_files/'
mel_dir = '../data_4gr/mel_image_rdlinet'
chroma_dir = '../data_4gr/chroma_image_rdlinet'
mfcc_dir = '../data_4gr/mfcc_image_rdlinet'

def Extract_Annotation_Data(file_name, data_dir):
    tokens = file_name.split('_')
    recording_info = pd.DataFrame(data=[tokens], columns=['Patient Number', 'Recording index', 'Chest location', 'Acquisition mode', 'Recording equipment'])
    recording_annotations = pd.read_csv(os.path.join(data_dir, file_name + '.txt'), names=['Start', 'End', 'Crackles', 'Wheezes'], delimiter='\t')
    return recording_info, recording_annotations

def get_annotations(data_dir):
    filenames = [s.split('.')[0] for s in os.listdir(data_dir) if '.txt' in s]
    i_list = []
    rec_annotations_dict = {}
    for s in filenames:
        i, a = Extract_Annotation_Data(s, data_dir)
        i_list.append(i)
        rec_annotations_dict[s] = a
    return filenames, rec_annotations_dict

def slice_data(start, end, raw_data, sample_rate):
    max_ind = len(raw_data)
    start_ind = min(int(start * sample_rate), max_ind)
    end_ind = min(int(end * sample_rate), max_ind)
    return raw_data[start_ind:end_ind]

def get_label(crackle, wheeze):
    if crackle == 0 and wheeze == 0:
        return 0
    elif crackle == 1 and wheeze == 0:
        return 1
    elif crackle == 0 and wheeze == 1:
        return 2
    else:
        return 3

def get_sound_samples(recording_annotations, file_name, data_dir, sample_rate):
    sample_data = [file_name]
    data, rate = librosa.load(os.path.join(data_dir, file_name + '.wav'), sr=sample_rate)
    for i in range(len(recording_annotations.index)):
        row = recording_annotations.loc[i]
        start = row['Start']
        end = row['End']
        crackles = row['Crackles']
        wheezes = row['Wheezes']
        audio_chunk = slice_data(start, end, data, rate)
        sample_data.append((audio_chunk, start, end, get_label(crackles, wheezes)))
    return sample_data

def remove_baseline_wandering(signal, sample_rate):
    n = len(signal)
    freq = np.fft.fftfreq(n, d=1/sample_rate)
    fft_signal = np.fft.fft(signal)
    fft_signal[np.abs(freq) < 1] = 0  # Remove frequencies below 1 Hz
    filtered_signal = np.fft.ifft(fft_signal)
    return np.real(filtered_signal)

def normalize_signal(signal):
    return signal / np.max(np.abs(signal))

def pitch_shift(signal, sr, n_steps):
    return librosa.effects.pitch_shift(signal, sr=sr, n_steps=n_steps)

def add_noise(signal, snr):
    noise = np.random.normal(0, 1, signal.shape)
    signal_power = np.mean(signal**2)
    noise_power = signal_power / (10**(snr / 10))
    noise = noise * np.sqrt(noise_power)
    return signal + noise


def create_mel_raw(current_window, sample_rate, n_mels=128, f_min=50, f_max=4000, nfft=2048, hop=512, resz=1):
    S = librosa.feature.melspectrogram(y=current_window, sr=sample_rate, n_mels=n_mels, fmin=f_min, fmax=f_max, n_fft=nfft, hop_length=hop)
    S = librosa.power_to_db(S, ref=np.max)
    S = (S - S.min()) / (S.max() - S.min())
    S *= 255
    S = cv2.applyColorMap(S.astype(np.uint8), cmapy.cmap('plasma'))
    S = cv2.resize(S, (224, 224))  # Resize to a fixed size
    S = cv2.flip(S, 0)
    return S

def create_chroma(current_window, sample_rate):
    chroma = librosa.feature.chroma_stft(y=current_window, sr=sample_rate)
    chroma = (chroma - chroma.min()) / (chroma.max() - chroma.min())
    chroma *= 255
    chroma = cv2.applyColorMap(chroma.astype(np.uint8), cmapy.cmap('inferno'))
    chroma = cv2.resize(chroma, (224, 224))  # Resize to a fixed size
    chroma = cv2.flip(chroma, 0)
    return chroma

def create_mfcc(current_window, sample_rate):
    mfcc = librosa.feature.mfcc(y=current_window, sr=sample_rate, n_mfcc=13)
    mfcc = (mfcc - mfcc.min()) / (mfcc.max() - mfcc.min())
    mfcc *= 255
    mfcc = cv2.applyColorMap(mfcc.astype(np.uint8), cmapy.cmap('viridis'))
    mfcc = cv2.resize(mfcc, (224, 224))  # Resize to a fixed size
    mfcc = cv2.flip(mfcc, 0)
    return mfcc

def save_image(array, save_path):
    array = (array - np.min(array)) / (np.max(array) - np.min(array)) * 255
    img = Image.fromarray(array.astype(np.uint8))
    img.save(save_path)






In [4]:
import pickle

filenames, rec_annotations_dict = get_annotations(data_dir)

cycle_list = []
classwise_cycle_list = [[], [], [], []]
for idx, file_name in tqdm(enumerate(filenames), desc="Extracting Individual Cycles"):
    data = get_sound_samples(rec_annotations_dict[file_name], file_name, data_dir, sample_rate)
    cycles_with_labels = [(d[0], d[3], file_name, cycle_idx, 0) for cycle_idx, d in enumerate(data[1:])]
    cycle_list.extend(cycles_with_labels)
    for cycle_idx, d in enumerate(cycles_with_labels):
        classwise_cycle_list[d[1]].append(d)

# 데이터 증강
scale = 1
aug_nos = scale * len(classwise_cycle_list[0]) - len(classwise_cycle_list[0])
for idx in range(aug_nos):
    i = random.randint(0, len(classwise_cycle_list[0]) - 1)
    j = random.randint(0, len(classwise_cycle_list[0]) - 1)
    normal_i = classwise_cycle_list[0][i]
    normal_j = classwise_cycle_list[0][j]
    new_sample = np.concatenate([normal_i[0], normal_j[0]])
    cycle_list.append((new_sample, 0, normal_i[2] + '-' + normal_j[2], idx, 1))

# 증강 함수 추가 (crackle, wheeze, both)
def augment_class(class_index, classwise_cycle_list, scale):
    aug_nos = scale * len(classwise_cycle_list[0]) - len(classwise_cycle_list[class_index])
    for idx in range(aug_nos):
        aug_prob = random.random()
        if aug_prob < 0.6:
            i = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            j = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            sample_i = classwise_cycle_list[class_index][i]
            sample_j = classwise_cycle_list[class_index][j]
        elif aug_prob >= 0.6 and aug_prob < 0.8:
            i = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            j = random.randint(0, len(classwise_cycle_list[0]) - 1)
            sample_i = classwise_cycle_list[class_index][i]
            sample_j = classwise_cycle_list[0][j]
        else:
            i = random.randint(0, len(classwise_cycle_list[0]) - 1)
            j = random.randint(0, len(classwise_cycle_list[class_index]) - 1)
            sample_i = classwise_cycle_list[0][i]
            sample_j = classwise_cycle_list[class_index][j]
        new_sample = np.concatenate([sample_i[0], sample_j[0]])
        cycle_list.append((new_sample, class_index, sample_i[2] + '-' + sample_j[2], idx, 1))

augment_class(1, classwise_cycle_list, scale)
augment_class(2, classwise_cycle_list, scale)
augment_class(3, classwise_cycle_list, scale)

# 오디오 데이터 정렬
desiredLength = 8
output = []
for idx, sample in tqdm(enumerate(cycle_list), desc="Audio augmenting"):
    output_buffer_length = int(desiredLength * sample_rate)
    soundclip = sample[0].copy()
    n_samples = len(soundclip)
    if n_samples < output_buffer_length:
        t = output_buffer_length // n_samples
        d = output_buffer_length % n_samples
        d = soundclip[:d]
        repeat_sample = np.concatenate((np.tile(soundclip, t), d))
        copy_repeat_sample = repeat_sample.copy()
        # output.append((copy_repeat_sample, sample[1]))
    else:
        copy_repeat_sample = soundclip[:output_buffer_length]
        # output.append((copy_repeat_sample, sample[1]))
    # print('before sample type: ', type(copy_repeat_sample))
    # Baseline Wandering removal
    # 전처리: 베이스라인 방황 제거 및 정규화
    copy_repeat_sample = remove_baseline_wandering(copy_repeat_sample, sample_rate)
    copy_repeat_sample = normalize_signal(copy_repeat_sample)
    
    # 원본 신호 추가
    output.append((copy_repeat_sample, sample[1]))

    # 증강: 피치 변환 및 노이즈 추가
    augmented_sample = pitch_shift(copy_repeat_sample, sample_rate, -2)
    output.append((augmented_sample, sample[1]))
    # for n_steps in [-2, 1]:
    #     augmented_sample = pitch_shift(copy_repeat_sample, sample_rate, n_steps)
    #     output.append((augmented_sample, sample[1]))

    augmented_sample = add_noise(copy_repeat_sample, 10)
    output.append((augmented_sample, sample[1]))

audio_data = output


# 데이터 저장 디렉토리 설정
save_dir = 'audio_data_parts'
os.makedirs(save_dir, exist_ok=True)

# 데이터 나눠서 저장
part_size = 1000  # 한 파일에 저장할 샘플 수
for i in range(0, len(audio_data), part_size):
    part = audio_data[i:i + part_size]
    part_file = os.path.join(save_dir, f'audio_data_part_{i//part_size}.pkl')
    with open(part_file, 'wb') as f:
        pickle.dump(part, f)
    print(f'{part_file} 저장 완료')

Extracting Individual Cycles: 920it [00:10, 84.27it/s] 
Audio augmenting: 14568it [41:41,  5.82it/s]


audio_data_parts\audio_data_part_0.pkl 저장 완료
audio_data_parts\audio_data_part_1.pkl 저장 완료
audio_data_parts\audio_data_part_2.pkl 저장 완료
audio_data_parts\audio_data_part_3.pkl 저장 완료
audio_data_parts\audio_data_part_4.pkl 저장 완료
audio_data_parts\audio_data_part_5.pkl 저장 완료
audio_data_parts\audio_data_part_6.pkl 저장 완료
audio_data_parts\audio_data_part_7.pkl 저장 완료
audio_data_parts\audio_data_part_8.pkl 저장 완료
audio_data_parts\audio_data_part_9.pkl 저장 완료
audio_data_parts\audio_data_part_10.pkl 저장 완료
audio_data_parts\audio_data_part_11.pkl 저장 완료
audio_data_parts\audio_data_part_12.pkl 저장 완료
audio_data_parts\audio_data_part_13.pkl 저장 완료
audio_data_parts\audio_data_part_14.pkl 저장 완료
audio_data_parts\audio_data_part_15.pkl 저장 완료
audio_data_parts\audio_data_part_16.pkl 저장 완료
audio_data_parts\audio_data_part_17.pkl 저장 완료
audio_data_parts\audio_data_part_18.pkl 저장 완료
audio_data_parts\audio_data_part_19.pkl 저장 완료
audio_data_parts\audio_data_part_20.pkl 저장 완료
audio_data_parts\audio_data_part_21.pkl 저장 완

In [5]:
# 저장된 데이터 파일 리스트
save_dir = 'audio_data_parts'
part_files = sorted(os.listdir(save_dir))

os.makedirs(mel_dir, exist_ok=True)
print(mel_dir)

# 각 파일을 하나씩 불러와서 이미지 생성 및 저장
for part_file in part_files:
    with open(os.path.join(save_dir, part_file), 'rb') as f:
        audio_data_part = pickle.load(f)
    
    print(f'{part_file} 불러오기 완료')

     # 이미지 생성 및 저장
    for index in tqdm(range(len(audio_data_part)), desc=f"Saving Images {part_file}"):
        audio, label = audio_data_part[index]
        mel_image = create_mel_raw(audio, sample_rate, f_max=f_max, n_mels=n_mels, nfft=nfft, hop=hop, resz=1)

        # 이미지 저장
        if label == 0:
            mel_save_path = os.path.join(mel_dir, 'normal', f'image_{part_file}_{index}.jpg')
        elif label == 1:
            mel_save_path = os.path.join(mel_dir, 'crackle', f'image_{part_file}_{index}.jpg')
        elif label == 2:
            mel_save_path = os.path.join(mel_dir, 'wheeze', f'image_{part_file}_{index}.jpg')
        else:
            mel_save_path = os.path.join(mel_dir, 'both', f'image_{part_file}_{index}.jpg')
        
        cv2.imwrite(mel_save_path, cv2.cvtColor(mel_image, cv2.COLOR_RGB2BGR))

audio_data_part_0.pkl 불러오기 완료


Saving Images audio_data_part_0.pkl: 100%|██████████| 1000/1000 [04:22<00:00,  3.80it/s]


audio_data_part_1.pkl 불러오기 완료


Saving Images audio_data_part_1.pkl: 100%|██████████| 1000/1000 [03:55<00:00,  4.25it/s]


audio_data_part_10.pkl 불러오기 완료


Saving Images audio_data_part_10.pkl:  52%|█████▏    | 523/1000 [02:24<02:11,  3.62it/s]


KeyboardInterrupt: 

In [3]:
# mel-spectrogram, chroma, MFCC 이미지 생성 및 저장
mel_img = []
chroma_img = []
mfcc_img = []
for index in tqdm(range(len(audio_data)), desc="Saving Images"):
    audio = audio_data[index][0]
    label = audio_data[index][1]
    mel_image = create_mel_raw(audio, sample_rate, f_max=f_max, n_mels=n_mels, nfft=nfft, hop=hop, resz=1)
    # chroma_image = create_chroma(audio, sample_rate)
    # mfcc_image = create_mfcc(audio, sample_rate)
    mel_img.append((mel_image, label))
    # chroma_img.append((chroma_image, label))
    # mfcc_img.append((mfcc_image, label))

# destination_dir = '../data_4gr'
# mel_dir = os.path.join(destination_dir, 'mel_image')
# chroma_dir = os.path.join(destination_dir, 'chroma_image')
# mfcc_dir = os.path.join(destination_dir, 'mfcc_image')

os.makedirs(mel_dir, exist_ok=True)
os.makedirs(chroma_dir, exist_ok=True)
os.makedirs(mfcc_dir, exist_ok=True)

# Create the four folders for the labels in each image type directory
for label in ['normal', 'crackle', 'wheeze', 'both']:
    os.makedirs(os.path.join(mel_dir, label), exist_ok=True)
    # os.makedirs(os.path.join(chroma_dir, label), exist_ok=True)
    # os.makedirs(os.path.join(mfcc_dir, label), exist_ok=True)

for i in tqdm(range(len(mel_img)), desc="Saving Images in dir"):
    input_mel = mel_img[i][0]
    # input_chroma = chroma_img[i][0]
    # input_mfcc = mfcc_img[i][0]
    label = mel_img[i][1]
    
    if label == 0:
        mel_save_path = os.path.join(mel_dir, 'normal', 'image_'+str(i)+'.jpg')
        # chroma_save_path = os.path.join(chroma_dir, 'normal', 'image_'+str(i)+'.jpg')
        # mfcc_save_path = os.path.join(mfcc_dir, 'normal', 'image_'+str(i)+'.jpg')
    elif label == 1:
        mel_save_path = os.path.join(mel_dir, 'crackle', 'image_'+str(i)+'.jpg')
        # chroma_save_path = os.path.join(chroma_dir, 'crackle', 'image_'+str(i)+'.jpg')
        # mfcc_save_path = os.path.join(mfcc_dir, 'crackle', 'image_'+str(i)+'.jpg')
    elif label == 2:
        mel_save_path = os.path.join(mel_dir, 'wheeze', 'image_'+str(i)+'.jpg')
        # chroma_save_path = os.path.join(chroma_dir, 'wheeze', 'image_'+str(i)+'.jpg')
        # mfcc_save_path = os.path.join(mfcc_dir, 'wheeze', 'image_'+str(i)+'.jpg')
    else:
        mel_save_path = os.path.join(mel_dir, 'both', 'image_'+str(i)+'.jpg')
        # chroma_save_path = os.path.join(chroma_dir, 'both', 'image_'+str(i)+'.jpg')
        # mfcc_save_path = os.path.join(mfcc_dir, 'both', 'image_'+str(i)+'.jpg')
    
    cv2.imwrite(mel_save_path, cv2.cvtColor(input_mel, cv2.COLOR_RGB2BGR))
    # cv2.imwrite(chroma_save_path, cv2.cvtColor(input_chroma, cv2.COLOR_RGB2BGR))
    # cv2.imwrite(mfcc_save_path, cv2.cvtColor(input_mfcc, cv2.COLOR_RGB2BGR))

print('Done')

Saving Images:  44%|████▍     | 19169/43704 [54:25<1:09:39,  5.87it/s]


MemoryError: Unable to allocate 1.96 MiB for an array with shape (1025, 251) and data type float64