In [None]:
#구글 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# 필요한 라이브러리 import
import random
import pandas as pd
import numpy as np
import os
from tqdm.auto import tqdm

# 데이터 준비
import librosa
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import defaultdict

# 시각화
import seaborn as sns
import matplotlib.pyplot as plt
from collections import Counter

# 모델 생성
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Embedding, Bidirectional
from keras.models import load_model

In [None]:
# GPU 사용 준비
%tensorflow_version 2.x
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
main_path = "/content/drive/MyDrive/Colab Notebooks/BD/"

# 1. most common 데이터 선정

In [None]:
columns = ['1번 감정', '2번 감정', '3번 감정', '4번 감정', '5번 감정']

df = pd.read_csv(f"{main_path}/5차년도_2차.csv", encoding='cp949')
label = []

for idx, data in df.iterrows():
  df_em = {'Angry' : 0, 'Disgust' : 0, 'Fear' : 0, 'Happiness' : 0, 'Neutral' : 0, 'Sadness': 0, 'Surprise': 0}
  for column in columns:
      df_em[data[column].title()] += 1

  # most common 감정 label
  emotions = max(df_em, key = df_em.get)

  label.append(emotions)

wav_id = df['wav_id']
speech = df['발화문']
most_common = {'wav_id': wav_id, 'speech': speech, 'emotion' : label}
most_common = pd.DataFrame(most_common)

most_common.to_csv(f"{main_path}/data/5차년도_2차_most_common.csv", index = False)

# 2. 분포 살피기 - 데이터 선정

In [None]:
df_path_4 = f"{main_path}most_common/4차년도_most_common.csv" # 4차년도 데이터 경로
df_path_5 = f"{main_path}most_common/5차년도_most_common.csv" # 5차년도 데이터 경로
df_path_5_2 = f"{main_path}most_common/5차년도_2차_most_common.csv" # 5차년도 2차 데이터 경로

In [None]:
df_4 = pd.read_csv(df_path_4)
df_5 = pd.read_csv(df_path_5)
df_5_2 = pd.read_csv(df_path_5_2)

In [None]:
# 데이터 분포 시각화
order = ["Angry","Sadness","Disgust","Fear","Happiness","Surprise", "Neutral"]
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(20,5))
sns.countplot(x="emotion",data=df_4,ax=axes[0],order=order).set_title("4th")
sns.countplot(x="emotion",data=df_5,ax=axes[1],order=order).set_title("5th")
sns.countplot(x="emotion",data=df_5_2, ax=axes[2],order=order).set_title("5th_2nd")

# 막대그래프에 숫자 출력
for ax in axes:
    for p in ax.patches:
        ax.annotate(f'{p.get_height()}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', fontsize=10, color='black', xytext=(0, 5),
                    textcoords='offset points')

plt.show()

# 3. Train Test 데이터셋 분류
 - 모든 감정이 150개씩 갖도록한다

In [None]:
# 감정별 데이터 프레임추출
angry_df = df_5_2[(df_5_2["emotion"]=="Angry")]
sadness_df = df_5_2[(df_5_2["emotion"]=="Sadness")]
disgust_df = df_5_2[(df_5_2["emotion"]=="Disgust")]
fear_df = df_5_2[(df_5_2["emotion"]=="Fear")]
happiness_df = df_5_2[(df_5_2["emotion"]=="Happiness")]
surprise_df = df_5_2[(df_5_2["emotion"]=="Surprise")]
neutral_df = df_5_2[(df_5_2["emotion"]=="Neutral")]
df_list = [angry_df, sadness_df, disgust_df, fear_df, happiness_df, surprise_df, neutral_df]

In [None]:
def random_split(df):
  # 랜덤하게 150개의 행을 선택하여 새로운 데이터프레임 생성
  random_rows = df.sample(n=150, random_state=42)

  # 선택된 행을 제외한 나머지 행으로 새로운 데이터프레임 생성
  remaining_rows = df.drop(random_rows.index)
  return random_rows, remaining_rows

In [None]:
# 빈 데이터 프레임 생성
train_df = pd.DataFrame()
test_df = pd.DataFrame()
for df in df_list:
  # 각 감정 데이터 프레임별로 150개씩 추출
  random_rows, remaining_rows = random_split(df)
  # 추출된 데이터는 test데이터에 추가하고 나머지 데이터는 train 데이터프레임에 추가
  train_df = pd.concat([train_df, remaining_rows])
  test_df = pd.concat([test_df, random_rows])

In [None]:
# 분포 시각화
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20,5))
sns.countplot(x="emotion",data=train_df,ax=axes[0],order=order).set_title("Distribution of train")
sns.countplot(x="emotion",data=test_df,ax=axes[1],order=order).set_title("Distribution of test")

for ax in axes:
    for p in ax.patches:
        ax.annotate(f'{p.get_height()}', (p.get_x() + p.get_width() / 2., p.get_height()),
                    ha='center', va='center', fontsize=10, color='black', xytext=(0, 5),
                    textcoords='offset points')

plt.show()

# 4. 특성추출 - mfcc

## 데이터셋 특성추출 - mfcc

In [None]:
wav_path = f"{main_path}data_5차2/" # wav파일 경로

In [None]:
def extract_features_pick(df):
    mfcc_len_list=[]
    mfcc_data = []
    for name in tqdm(df['wav_id']):
        audio_path = f"{wav_path}{name}.wav"
        if(os.path.isfile(audio_path)):
            y, sr = librosa.load(audio_path, sr=48000)
            mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, hop_length=int(sr*0.01), n_fft=int(sr*0.02))
            mfcc_data.append(mfccs.T)
            mfcc_len_list.append(len(mfccs.T))
        else:
            # 빈 배열 반환
            print(f"Warning: File not found for {name}")
            return np.array([])
    return mfcc_data, mfcc_len_list

In [None]:
train_mfcc_data,train_mfcc_len = extract_features_pick(train_df)
test_mfcc_data,test_mfcc_len = extract_features_pick(test_df)
# ndarray로 변환
train_mfcc_data_np = np.array(train_mfcc_data, dtype=object)
test_mfcc_data_np = np.array(test_mfcc_data, dtype=object)

In [None]:
# 둘 길이확인
print(f'{max(train_mfcc_len)}')
print(f'{max(test_mfcc_len)}')

## 레이블처리 - 원 핫 인코딩

In [None]:
train_labels = train_df["emotion"]
test_labels = test_df["emotion"]

In [None]:
# 원핫인코딩
train_labels_encoding = pd.get_dummies(train_labels).values
test_labels_encoding = pd.get_dummies(test_labels).values

# encoding 저장
np.save("train_labels_encoding_new", train_labels_encoding)
np.save("test_labels_encoding_new", test_labels_encoding)

# 5. 모델생성 (LSTM)

In [None]:
def build_lstm(X_train):
  with tf.device('/device:GPU:0'):
    # LSTM 모델 구축
    model = Sequential()
    model.add(Bidirectional(LSTM(128, return_sequences=True), input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(Bidirectional(LSTM(128, return_sequences=False)))
    model.add(Dense(7, activation='softmax'))

    # 모델 컴파일
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# 6. 데이터 증가
- pitch shift를 진행

In [None]:
# 가상의 데이터셋 예시
sentiments = ['Sadness', 'Angry', 'Fear', 'Neutral', 'Disgust', 'Happiness', 'Suprise']
data_per_sentiment = {'Angry': 2169, 'Sadness': 3290, 'Disgust': 1466, 'Fear': 1018, 'Happiness': 3900, 'Suprise': 693, 'Neutral': 5788}
index_per_sentiment = {'Angry': 2168, 'Sadness': 5458, 'Disgust': 6924, 'Fear': 7942, 'Happiness': 11842, 'Suprise': 12535, 'Neutral': 18323}

# 필요한 최소 데이터 개수
target_data_count = 5788

# 각 클래스별로 필요한 augmentation 횟수
augmentation_counts = defaultdict(int)

for sentiment in sentiments:
    current_count = data_per_sentiment[sentiment]
    remaining_count = max(0, target_data_count - current_count)
    augmentation_counts[sentiment] = remaining_count

# 각 클래스별로 필요한 augmentation 횟수 출력
print("Augmentation Counts:")
for sentiment, count in augmentation_counts.items():
    print(f"{sentiment}: {count}")

In [None]:
# augmentation 함수 정의
def pitch_shift_mfcc(mfcc, sr, semitone_shift):
    y_pitch_shifted = librosa.effects.pitch_shift(mfcc, sr, n_steps=semitone_shift)
    mfcc_pitch_shifted = librosa.feature.mfcc(y_pitch_shifted, sr=sr, n_mfcc=13)
    return mfcc_pitch_shifted

In [None]:
augmented_mfcc_list = []
augmented_y_label_list = []

# 각 클래스별로 augmentation 적용
for sentiment in tqdm(sentiments):
    current_count = data_per_sentiment[sentiment]
    remaining_count = augmentation_counts[sentiment]
    index = index_per_sentiment[sentiment]

    # 현재 데이터 수가 필요한 데이터 수보다 작은 경우에만 augmentation 수행
    while remaining_count > 0:

        # 랜덤하게 MFCC 선택
        selected_index = np.random.randint(index-current_count+1, index)
        selected_mfcc = train_mfcc_data_np[selected_index]
        selected_y_label = train_labels_encoding[selected_index]

        # pitch shifting을 사용한 augmentation
        semitone_shift = np.random.uniform(low=-2, high=2)  # 랜덤한 음정 변경
        augmented_mfcc = librosa.effects.pitch_shift(selected_mfcc, sr=22050, n_steps=semitone_shift)
        # MFCC를 저장
        augmented_mfcc_list.append(augmented_mfcc)
        augmented_y_label_list.append(selected_y_label)

        remaining_count -= 1

        if remaining_count <= 0:
            break

# 리스트에서 NumPy 배열로 변환
augmented_mfcc_np = np.array(augmented_mfcc_list, dtype=object)

In [None]:
# 증가 데이터 저장
np.save("augmented_mfcc_np",augmented_mfcc_np)
np.save("augmented_mfcc_np_label",augmented_y_label_list)

In [None]:
# 기존 데이터와 증가 데이터 병합
final_X_train_np = np.concatenate((train_mfcc_data_np ,augmented_mfcc_np), axis=0)
final_y_one_hot_np = np.concatenate((train_labels_encoding ,augmented_y_label_list), axis=0)

# 레이블 개수확인
augmented_label_argmax = np.argmax(final_y_one_hot_np,axis=1)
unique_values, counts = np.unique(augmented_label_argmax, return_counts=True)

for value, count in zip(unique_values, counts):
    print(f"{value}: {count} times")

In [None]:
#증가한 특성 데이터 + 원본데이터 저장
np.save('final_X_train',final_X_train_np)
#증가한 레이블 + 원본 레이블 저장
np.save('final_y_one_hot',final_y_one_hot_np)

# 7. 훈련 및 예측
- 데이터 증가 전 후를 나누어 진행한다

In [None]:
# 모델 평가 함수
def evaluate_model(model, test_features_padded, test_label):
  test_loss, test_acc = model.evaluate(test_features_padded, test_label)
  print("Test Loss:", test_loss)
  print("Test Accuracy:", test_acc)

In [None]:
#정답 오답 개수 카운팅
def count_correct_incorrect(model, test_label):
  predictions = model.predict(test_mfcc_padded)
  predicted_labels = np.argmax(predictions, axis=1)

  answer_labels = np.argmax(test_label, axis=1)

  correct = []
  incorrect_answer = []
  incorrect_pred = []

  for predict, answer in zip(predicted_labels,answer_labels):
    if predict == answer:
      correct.append(predict)
    else:
      incorrect_answer.append(answer)
      incorrect_pred.append(predict)

  correct_counts = Counter(correct)

  incorrect_answer_counts = Counter(incorrect_answer)

  print("Correct Counts:", correct_counts)
  print("Incorrect Answer Counts:", incorrect_answer_counts)
  return correct_counts

## 증가 전 데이터 예측

In [None]:
# 데이터 패딩
train_mfcc_padded = pad_sequences(train_mfcc_data_np,maxlen=5897, dtype='float32')
test_mfcc_padded = pad_sequences(test_mfcc_data_np,maxlen=5897, dtype='float32')

In [None]:
train_mfcc_padded = pad_sequences(train_mfcc_padded,maxlen=5897, dtype='float32')

# train validation 나누기
X_train, X_val, y_train, y_val = train_test_split(train_mfcc_padded, train_labels_encoding, test_size=0.2, random_state=42)

# 모델 훈련
model = build_lstm(X_train)
with tf.device('/device:GPU:0'):
  model.fit(X_train, y_train, epochs=40, batch_size=16, validation_data=(X_val, y_val))

#모델 저장
  model.save(f"{main_path}models/lstm_epoch40_basic.h5")

#평가
evaluate_model(model,test_mfcc_padded,test_labels_encoding)

In [None]:
basic_correct = count_correct_incorrect(model,test_labels_encoding)

In [None]:
predictions = model.predict(test_mfcc_padded)
predicted_labels = np.argmax(predictions, axis=1)

answer_labels = np.argmax(test_labels_encoding, axis=1)
comparison = pd.DataFrame()

# 정답 틀린것들만 수집
indices = [i for i, (pred, ans) in enumerate(zip(predicted_labels, answer_labels)) if pred != ans]

# 예측 정답 데이터 프레임
comparison['Predicted'] = [predicted_labels[i] for i in indices]
comparison['Answer'] = [answer_labels[i] for i in indices]

#시각화
unique_values, counts = np.unique(predicted_labels, return_counts=True)

# 그래프 그리기
plt.bar(unique_values, counts, alpha=0.7)
plt.xlabel('Predicted Value')
plt.ylabel('Count')
plt.title('Distribution of Predicted Values before argumentation')

# 숫자 표시하기
for i, count in enumerate(counts):
    plt.text(unique_values[i], count + 0.1, str(count), ha='center', va='bottom')

plt.show()

In [None]:
# 예측 비율 시각화
grouped_data = comparison.groupby('Answer')['Predicted'].value_counts().unstack(fill_value=0)

grouped_data.plot(kind='bar', stacked=True)
plt.xlabel('Answer Label')
plt.ylabel('Count')
plt.title('Count of Predicted Labels for Each Answer Label before argumentation')
plt.legend(title='Predicted Label')
plt.show()

## 증가 후 데이터 예측

In [None]:
# 증가 후 데이터 패딩
train_mfcc_padded_arg = pad_sequences(final_X_train_np,maxlen=5897, dtype='float32')

# train validation 나누기
X_train, X_val, y_train, y_val = train_test_split(train_mfcc_padded_arg, final_y_one_hot_np, test_size=0.2, random_state=42)

# 모델 훈련
model = build_lstm(X_train)
with tf.device('/device:GPU:0'):
  model.fit(X_train, y_train, epochs=40, batch_size=16, validation_data=(X_val, y_val))
#모델 저장
  model.save(f"{main_path}models/lstm_argu.h5")
#평가
evaluate_model(model,test_mfcc_padded,test_labels_encoding)

In [None]:
argu_correct = count_correct_incorrect(model,test_labels_encoding)

In [None]:
predictions = model.predict(test_mfcc_padded)
predicted_labels = np.argmax(predictions, axis=1)

answer_labels = np.argmax(test_labels_encoding, axis=1)
comparison = pd.DataFrame()

# 정답 틀린것들만 수집
indices = [i for i, (pred, ans) in enumerate(zip(predicted_labels, answer_labels)) if pred != ans]

# 예측 정답 데이터 프레임
comparison['Predicted'] = [predicted_labels[i] for i in indices]
comparison['Answer'] = [answer_labels[i] for i in indices]

#시각화
unique_values, counts = np.unique(predicted_labels, return_counts=True)

# Plotting a bar plot
plt.bar(unique_values, counts, alpha=0.7)
plt.xlabel('Predicted Value')
plt.ylabel('Count')
plt.title('Distribution of Predicted Values after argumentation')

# Adding annotations
for i, count in enumerate(counts):
    plt.text(unique_values[i], count + 0.1, str(count), ha='center', va='bottom')

plt.show()

In [None]:
# 예측 비율 시각화
grouped_data = comparison.groupby('Answer')['Predicted'].value_counts().unstack(fill_value=0)

# Plotting
grouped_data.plot(kind='bar', stacked=True)
plt.xlabel('Answer Label')
plt.ylabel('Count')
plt.title('Count of Predicted Labels for Each Answer Label after argumentation')
plt.legend(title='Predicted Label')
plt.show()

# 8. 결과정리

In [None]:
data = basic_correct

# 키(key)와 값(value) 추출
keys = list(data.keys())
values = list(data.values())

# 막대 곡선 그래프 생성
sns.lineplot(x=keys, y=values, marker='o', label='Line Plot')
sns.barplot(x=keys, y=values, alpha=0.7, color='skyblue', label='Bar Plot')


# 라벨 및 타이틀 추가
plt.xlabel('Key')
plt.ylabel('Value')
plt.title('Distribution of Correct (before Argumentaion)')


# 그래프 표시
plt.show()

In [None]:
data = argu_correct

# 키(key)와 값(value) 추출
keys = list(data.keys())
values = list(data.values())

# 막대 곡선 그래프 생성
sns.lineplot(x=keys, y=values, marker='o', label='Line Plot')
sns.barplot(x=keys, y=values, alpha=0.7, color='skyblue', label='Bar Plot')


# 라벨 및 타이틀 추가
plt.xlabel('Key')
plt.ylabel('Value')
plt.title('Distribution of Correct (after Argumentaion)')


# 그래프 표시
plt.show()