<a href="https://colab.research.google.com/github/Marcus-Son/Classification_and_Calibration_of_Dysarthric_Speech/blob/yaejoon/%EB%94%A5%EB%9F%AC%EB%8B%9D_%EC%A0%84%EC%B2%98%EB%A6%AC_yaejoon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pydub

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import librosa
import matplotlib.pyplot as plt
from pydub import AudioSegment
import torch
import pkg_resources

print("TensorFlow version:", tf.__version__)
print("Numpy version:", np.__version__)
print("Pandas version:", pd.__version__)
print("Librosa version:", librosa.__version__)
print("Matplotlib version:", plt.matplotlib.__version__)
print("PyTorch version:", torch.__version__)

# pydub version 확인
pydub_version = pkg_resources.get_distribution("pydub").version
print("PyDub version:", pydub_version)

In [None]:
import pandas as pd
import numpy as np
from pydub.utils import db_to_float
import itertools
from pydub import AudioSegment
import os

import IPython.display as ipd
from pydub import AudioSegment
import torch
import librosa # 음성데이터 분석 라이브러리
from IPython.display import Audio # 음성데이터 재생을 위해 사용하는 라이브러리

# 1. 데이터 불러오기

In [None]:
# wav_folder = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/원천데이터/"
# label_folder = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/라벨링데이터/"
# result_folder = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/결과/"

# wav_folder_language = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/원천데이터/언어+뇌신경장애"
# label_folder_language = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/라벨링데이터/언어+뇌신경장애"
# result_folder_language = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/결과/언어+뇌신경장애"

# wav_folder_listen = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/원천데이터/청각+뇌신경장애"
# label_folder_listen = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/라벨링데이터/청각+뇌신경장애"
# result_folder_listen = "/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/결과/청각+뇌신경장애"

wav_folder = "/content/gdrive/My Drive/data/원천데이터"
label_folder = "/content/gdrive/My Drive/data/라벨링데이터"
result_folder = "/content/gdrive/My Drive/data/결과"

wav_folder_language = "/content/gdrive/My Drive/data/원천데이터/언어+뇌신경장애"
label_folder_language = "/content/gdrive/My Drive/data/라벨링데이터/언어+뇌신경장애"
result_folder_language = "/content/gdrive/My Drive/data/결과/언어+뇌신경장애"

wav_folder_listen = "/content/gdrive/My Drive/data/원천데이터/청각+뇌신경장애"
label_folder_listen = "/content/gdrive/My Drive/data/라벨링데이터/청각+뇌신경장애"
result_folder_listen = "/content/gdrive/My Drive/data/결과/청각+뇌신경장애"

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

# 2. 데이터 전처리

## 2-1. 함수 생성

code reference : "[음성 인식 모델 프로젝트] 음성 데이터 침묵구간 - 비침묵구간 분리하기," RecCode, last modified Jan 28, 2024, accessed May 19, 2024, https://ysg2997.tistory.com/52.

In [None]:
# detect_silence를 통해 침묵 구간 탐색
def detect_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1, silence_has_gap=True):
    seg_len = len(audio_segment)

    if seg_len < min_silence_len:
        return []

    silence_thresh = db_to_float(silence_thresh) * audio_segment.max_possible_amplitude

    silence_starts = []

    last_slice_start = seg_len - min_silence_len
    slice_starts = range(0, last_slice_start + 1, seek_step)

    if last_slice_start % seek_step:
        slice_starts = itertools.chain(slice_starts, [last_slice_start])

    for i in slice_starts:
        audio_slice = audio_segment[i:i + min_silence_len]
        if audio_slice.rms <= silence_thresh:
            silence_starts.append(i)

    if not silence_starts:
        return []

    silent_ranges = []

    prev_i = silence_starts.pop(0)
    current_range_start = prev_i

    for silence_start_i in silence_starts:
        continuous = (silence_start_i == prev_i + seek_step)

        if not continuous and silence_has_gap:
            silent_ranges.append([current_range_start,
                                  prev_i + min_silence_len])
            current_range_start = silence_start_i
        prev_i = silence_start_i

    silent_ranges.append([current_range_start,
                          prev_i + min_silence_len])

    return silent_ranges

In [None]:
# detect_nonsilent함수를 통해 발화 구간 탐색
def detect_nonsilent(audio_segment, min_silence_len=1000, silence_thresh=-16, seek_step=1):
    silent_ranges = detect_silence(audio_segment, min_silence_len, silence_thresh, seek_step)
    len_seg = len(audio_segment)

    if not silent_ranges:
        return [[0, len_seg]]

    if silent_ranges[0][0] == 0 and silent_ranges[0][1] == len_seg:
        return []

    prev_end_i = 0
    nonsilent_ranges = []
    for start_i, end_i in silent_ranges:
        nonsilent_ranges.append([prev_end_i, start_i])
        prev_end_i = end_i

    if end_i != len_seg:
        nonsilent_ranges.append([prev_end_i, len_seg])

    if nonsilent_ranges[0] == [0, 0]:
        nonsilent_ranges.pop(0)

    return nonsilent_ranges

In [None]:
# 위 두 함수를 통해 음성 데이터에서 최종 발화 구간과 침묵 구간 분리하여 반환
def create_json(audio_file):
  intervals_jsons = []

  min_silence_length = 70
  intervals = detect_nonsilent(audio_file,
                               min_silence_len=min_silence_length,
                               silence_thresh=-32.64)

  if intervals[0][0] != 0:
    intervals_jsons.append({'start':0,'end':intervals[0][0]/1000,'tag':'침묵'})

  non_silence_start = intervals[0][0]
  before_silence_start = intervals[0][1]

  for interval in intervals:
    interval_audio = audio_file[interval[0]:interval[1]]

    if (interval[0] - before_silence_start) >= 20000:
      intervals_jsons.append({'start':non_silence_start/1000,'end':(before_silence_start+200)/1000,'tag':'비침묵'})
      non_silence_start = interval[0]-200
      intervals_jsons.append({'start':before_silence_start/1000,'end':interval[0]/1000,'tag':'침묵'})
    before_silence_start = interval[1]

  if non_silence_start != len(audio_file):
    intervals_jsons.append({'start':non_silence_start/1000,'end':len(audio_file)/1000,'tag':'비침묵'})

  return intervals_jsons


#########################################################
def match_target_amplitude(sound, target_dBFS):
	change_in_dBFS = target_dBFS - sound.dBFS
	return sound.apply_gain(change_in_dBFS)


#########################################################
def split_on_silence(audio_segment, min_silence_len=1000, silence_thresh=-16, keep_silence=100,
                     seek_step=1):

    def pairwise(iterable):
        "s -> (s0,s1), (s1,s2), (s2, s3), ..."
        a, b = itertools.tee(iterable)
        next(b, None)
        return zip(a, b)

    if isinstance(keep_silence, bool):
        keep_silence = len(audio_segment) if keep_silence else 0

    output_ranges = [
        [ start - keep_silence, end + keep_silence ]
        for (start,end)
            in detect_nonsilent(audio_segment, min_silence_len, silence_thresh, seek_step)
    ]

    for range_i, range_ii in pairwise(output_ranges):
        last_end = range_i[1]
        next_start = range_ii[0]
        if next_start < last_end:
            range_i[1] = (last_end+next_start)//2
            range_ii[0] = range_i[1]

    return [
        audio_segment[ max(start,0) : min(end,len(audio_segment)) ]
        for start,end in output_ranges
    ]

## 2-2. 전처리 자동화 (wav, txt파일 생성)

code reference : "[음성 인식 모델 프로젝트] 음성 데이터 침묵구간 - 비침묵구간 분리하기," RecCode, last modified Jan 28, 2024, accessed May 19, 2024, https://ysg2997.tistory.com/52.

In [None]:
# 폴더 변수화
PATH = os.getcwd()

LABEL = os.path.join(PATH, label_folder)
AUDIO = os.path.join(PATH, wav_folder)

LABEL_language = os.path.join(PATH, label_folder_language)
AUDIO_language = os.path.join(PATH, wav_folder_language)

LABEL_listen = os.path.join(PATH, label_folder_listen)
AUDIO_listen = os.path.join(PATH, wav_folder_listen)

# 결과 폴더는 직접 만들기
OUTPUT = os.path.join(PATH, result_folder)
OUTPUT_language = os.path.join(PATH, result_folder_language)
OUTPUT_listen = os.path.join(PATH, result_folder_listen)

OUTPUT_language_spec = os.path.join(OUTPUT_language, "spectrogram")
OUTPUT_listen_spec = os.path.join(OUTPUT_listen, "spectrogram")

In [None]:
OUTPUT_language_spec

In [None]:
print("라벨 폴더 폴더 개수: ", len(os.listdir(LABEL)))
print("음성 폴더 폴더 개수: ", len(os.listdir(AUDIO)))
print()
print("언어+뇌신경장애 라벨 폴더 파일 개수: ", len(os.listdir(LABEL_language)))
print("언어+뇌신경장애 음성 폴더 파일 개수: ", len(os.listdir(AUDIO_language)))
print()
print("청각+뇌신경장애 라벨 폴더 파일 개수: ", len(os.listdir(LABEL_listen)))
print("청각+뇌신경장애 음성 폴더 파일 개수: ", len(os.listdir(AUDIO_listen)))

In [None]:
# import os
# import pandas as pd
# from pydub import AudioSegment
# from pydub.utils import make_chunks, mediainfo
# from scipy.io.wavfile import write

# # 결과 폴더 생성
# for folder in [OUTPUT, OUTPUT_language, OUTPUT_listen]:
#     os.makedirs(folder, exist_ok=True)

# # 라벨 및 오디오 폴더 설정 및 처리
# for folder_type, RESULT_FOLDER in [("language", OUTPUT_language), ("listen", OUTPUT_listen)]:
#     if folder_type == "language":
#         LABEL_FOLDER = LABEL_language
#         AUDIO_FOLDER = AUDIO_language
#     elif folder_type == "listen":
#         LABEL_FOLDER = LABEL_listen
#         AUDIO_FOLDER = AUDIO_listen

#     # 라벨 파일과 오디오 파일 리스트
#     label_files = sorted(os.listdir(LABEL_FOLDER))
#     audio_files = sorted(os.listdir(AUDIO_FOLDER))

#     # 공통된 파일 개수만 처리
#     num_files = min(len(label_files), len(audio_files))

#     for i in range(num_files):
#         # 스크립트 로드 (메타데이터, DataFrame)
#         print(f'파일: {i}')
#         label_file_path = os.path.join(LABEL_FOLDER, label_files[i])
#         meta = pd.read_json(label_file_path, orient='columns')
#         sampling_rate = meta['Meta_info']['SamplingRate']
#         meta = pd.DataFrame(meta['Transcript'][0].split("."))[:-1]

#         # 오디오 로드
#         audio_file_path = os.path.join(AUDIO_FOLDER, audio_files[i])

#         # 침묵 및 비침묵 분리
#         sound = AudioSegment.from_file(audio_file_path, "wav")
#         normalized_sound = match_target_amplitude(sound, -20.0)
#         json_data = create_json(normalized_sound)

#         # 음성/텍스트 세그먼트 로드 (DataFrame)
#         audio_df = pd.DataFrame(json_data)
#         df = pd.concat([audio_df[audio_df['tag'] == '비침묵'].reset_index(drop=True).drop('tag', axis=1), meta], axis=1)
#         df.columns = ['start', 'end', 'text']

#         # 세그먼트용 오디오 로드
#         audio = AudioSegment.from_file(audio_file_path)

#         # 문장별 데이터 저장 -> Nan 값은 제외
#         df_without_nan = df.dropna()

#         for j, row in df_without_nan.iterrows():
#             start_time_sec = row['start']  # 시작 시간(초 단위)
#             end_time_sec = row['end']  # 종료 시간(초 단위)

#             # 시작 시간과 종료 시간을 밀리초 단위로 변환
#             start_time_ms = start_time_sec * 1000
#             end_time_ms = (end_time_sec + 1.5) * 1000

#             # 오디오 세그먼트 추출
#             output = audio[int(start_time_ms):int(end_time_ms)]

#             # 세그먼트 길이 확인
#             clip_length = len(output) / 1000  # 밀리초에서 초로 변환
#             if clip_length > 12:
#                 continue  # 12초를 초과하는 세그먼트는 저장하지 않음

#             # 세그먼트 오디오 파일 저장
#             output.export(os.path.join(RESULT_FOLDER, f"output_{folder_type}{i}_{j}.wav"), format="wav")

#             # 텍스트 파일 저장
#             text = row['text']
#             with open(os.path.join(RESULT_FOLDER, f"output_{folder_type}{i}_{j}.txt"), 'w') as f:
#                 f.write(text)
#                 f.write("\n")
#                 f.write(str(sampling_rate))

# 결과 확인
print("언어 결과 파일:", os.listdir(OUTPUT_language))
print("청각 결과 파일:", os.listdir(OUTPUT_listen))

## 2-3. 이미지로 전처리

In [None]:
import os
import librosa
import librosa.display
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2 as cv

# 로그 멜 스펙트로그램 생성 함수
def compute_log_mel_spectrogram(signal, sample_rate, n_mels=128):
    frame_length = int(0.02 * sample_rate)  # 20ms를 샘플링레이트로 변환
    hop_length = int(frame_length / 2)      # 겹치는 부분
    stft = librosa.stft(signal, n_fft=frame_length, hop_length=hop_length)
    mel_spec = librosa.feature.melspectrogram(S=np.abs(stft)**2, sr=sample_rate, n_mels=n_mels)
    log_mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
    return log_mel_spec


def pad_to_square(spectrogram, target_size=128, padding_value=-80):
    if spectrogram.shape[1] < target_size:
        pad_width = target_size - spectrogram.shape[1]
        spectrogram = np.pad(spectrogram, ((0, 0), (0, pad_width)), mode='constant', constant_values=padding_value)
    return spectrogram

def split_and_save_spectrogram(log_mel_spec, base_filename, target_size=128, output_folder='./', sampling_rate=48000):
    """Splits a spectrogram into smaller square segments and saves each segment as an image file."""
    # Ensure the input spectrogram height matches the target size
    assert log_mel_spec.shape[0] == target_size, f"Spectrogram height should be {target_size}"

    num_segments_x = log_mel_spec.shape[1] // target_size

    for j in range(num_segments_x + 1):
        segment = log_mel_spec[:, j*target_size:(j+1)*target_size]
        if segment.shape[1] < target_size:
            segment = pad_to_square(segment, target_size)
        #print(segment.shape)
        segment_filename = os.path.join(output_folder, f"{base_filename}_{j}.png")
        save_spectrogram(segment, segment_filename)


# 스펙트로그램 저장 함수
def save_spectrogram(spectrogram, filename, sampling_rate=48000):
    plt.figure(figsize=(1.28, 1.28))
    plt.subplots_adjust(left=0, right=1, top=1, bottom=0, wspace=0, hspace=0)  # 여백 조정
    librosa.display.specshow(spectrogram, x_axis='time', y_axis='mel', sr=sampling_rate, cmap='gray')
    plt.axis('off')
    plt.savefig(filename, bbox_inches='tight', pad_inches=0, dpi=100)
    plt.close()

# 데이터프레임 생성을 위한 리스트 초기화
data = []

def process_wav_files(folder_path, label, output_folder, target_size=128):
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".wav"):
            file_path = os.path.join(folder_path, file_name)
            txt_path = file_path.replace('.wav', '.txt')

            if not os.path.exists(txt_path):
                print(f"Warning: {txt_path} does not exist.")
                continue

            with open(txt_path, 'r') as f:
                lines = f.readlines()
                sample_rate = int(lines[1].strip())

            signal, sr = librosa.load(file_path, sr=sample_rate)
            signal, _ = librosa.effects.trim(signal, top_db=60, frame_length=int(0.02*sample_rate), hop_length=int(0.02*sample_rate))
            log_mel_spec = compute_log_mel_spectrogram(signal, sr)
            base_filename = os.path.splitext(file_name)[0]
            split_and_save_spectrogram(log_mel_spec, base_filename, target_size, output_folder, sampling_rate=sr)
            data.append({'file_name': file_name, 'log_mel_spec': log_mel_spec, 'label': label})

In [None]:
OUTPUT_language_spec

In [None]:
# # 폴더 내의 WAV 파일 처리
# process_wav_files(OUTPUT_language, 0, OUTPUT_language_spec)

In [None]:
# data[0]['log_mel_spec']

In [None]:
# process_wav_files(OUTPUT_listen, 1, OUTPUT_listen_spec)

# '''
# # 데이터프레임 생성
# df = pd.DataFrame(data)

# # 데이터프레임 확인
# print(df.head())
# '''

# 3. CNN 모델링 테스트 ( 간이 실험 )

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory

import os
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing import image_dataset_from_directory

# 이미지 데이터가 저장된 경로
# data_dir = '/content/gdrive/My Drive/deeplearning_2024/팀프로젝트/data/결과'
data_dir = "/content/gdrive/My Drive/data/결과"

# 데이터셋 생성
train_dataset = image_dataset_from_directory(
    data_dir,
    labels='inferred',  # 폴더 이름을 라벨로 사용
    label_mode='categorical',   # 라벨을 정수형으로 설정
    color_mode='grayscale',  # 이미지는 흑백으로 로드
    batch_size=32,  # 배치 크기
    image_size=(28, 28),  # 이미지 크기 조정
    shuffle=True,  # 데이터 섞기
    validation_split=0.2,  # 검증 데이터 비율
    subset='training',  # 훈련 데이터셋으로 사용
    seed=123  # 랜덤 시드 설정
)

validation_dataset = image_dataset_from_directory(
    data_dir,
    labels='inferred',
    label_mode='categorical',
    color_mode='grayscale',
    batch_size=32,
    image_size=(28, 28),
    shuffle=True,
    validation_split=0.2,
    subset='validation',
    seed=123
)

class_names = train_dataset.class_names

In [None]:
import matplotlib.pyplot as plt
import numpy as np
#class_names = train_dataset.class_names
# 데이터셋의 첫 번째 배치를 가져옵니다.
image_batch, label_batch = next(iter(train_dataset))

# 배치에서 일부 이미지와 라벨을 시각화합니다.
plt.figure(figsize=(10, 10))
for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(image_batch[i].numpy().astype("uint8"), cmap='gray')
    label = np.argmax(label_batch[i].numpy())  # 원-핫 인코딩된 라벨을 정수형으로 변환
    plt.title("language" if (label==0) else "listen" if (label==1) else "unknown")
    plt.axis("off")
plt.show()

In [None]:
from sklearn.model_selection import train_test_split

# 데이터셋 통합
full_dataset = train_dataset.concatenate(validation_dataset)

# 모든 이미지와 라벨을 리스트로 변환
image_paths = []
labels = []

for image, label in full_dataset.unbatch():
    image_paths.append(image.numpy())
    labels.append(label.numpy())

# 리스트를 배열로 변환
image_paths = np.array(image_paths)
labels = np.array(labels)

# 데이터셋을 60% 훈련, 20% 검증, 20% 테스트로 분리
X_temp, X_test, y_temp, y_test = train_test_split(image_paths, labels, test_size=0.2, random_state=123)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.25, random_state=123)

# 배열을 tf.data.Dataset 객체로 변환
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32).shuffle(buffer_size=1000)
validation_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(32)
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

# 데이터셋 확인
for images, labels in train_dataset.take(1):
    print(images.shape)
    print(labels.shape)

In [None]:
# # 배열을 다시 텐서플로 데이터셋으로 변환
# import tensorflow as tf

# train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).batch(32).shuffle(buffer_size=len(X_train))
# validation_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(32).shuffle(buffer_size=len(X_val))
# test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

# class_names = dataset.class_names

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, AveragePooling2D, Flatten, Dense
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras import optimizers

# 모델 구성
model = Sequential([
    Conv2D(8, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),  # 흑백 이미지이므로 input_shape에서 채널 수를 1로 설정
    BatchNormalization(),
    AveragePooling2D(pool_size=(2, 2)),

    Conv2D(16, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    AveragePooling2D(pool_size=(2, 2)),

    Conv2D(32, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    AveragePooling2D(pool_size=(2, 2)),

    Flatten(),
    Dense(512, activation='relu'),
    Dense(128, activation='relu'),
    Dense(128, activation='relu'),
    Dense(2, activation='softmax')  # 클래스 수에 따라 출력 노드 수 조정
])

model.summary()

# 모델 컴파일
model.compile(optimizer=optimizers.Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# 모델 학습
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=50  # 원하는 에포크 수로 설정
)

# 모델 평가
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test accuracy: {test_accuracy * 100:.2f}%")

In [None]:
# 결과 확인
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

In [None]:
epochs_range = range(50)

plt.figure(figsize=(16, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()