In [None]:
import pandas as pd
import numpy as np
import ast
import torch
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ReduceLROnPlateau
import matplotlib.pyplot as plt

In [None]:
# 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

# 데이터 불러오기
df = pd.read_csv('/content/drive/My Drive/soundAI/df_concat.csv')

Mounted at /content/drive


In [None]:
# 문자열을 실제 텐서로 변환하는 함수
def convert_to_tensor(tensor_str):
    tensor_list = ast.literal_eval(tensor_str)
    tensor = torch.tensor(tensor_list)
    return tensor

# 'feature' 열의 문자열을 실제 텐서로 변환하여 다시 저장
df['feature'] = df['feature'].apply(convert_to_tensor)

In [None]:
# 노이즈 추가 함수 정의
def add_noise(tensor, noise_level=0.01):
    noise = np.random.normal(0, noise_level, tensor.shape)
    return tensor + noise


# 시간 축 이동 함수 정의
def time_shift(tensor, shift_max=2):
    shift = np.random.randint(-shift_max, shift_max)
    return np.roll(tensor, shift, axis=1)

# 시간 축 스케일링 함수 정의
def time_stretch(tensor, stretch_factor=0.2):
    length = tensor.shape[1]
    stretched_length = int(length * (1 + stretch_factor))
    stretched_tensor = np.zeros((tensor.shape[0], stretched_length))
    for i in range(tensor.shape[0]):
        stretched_tensor[i] = np.interp(np.linspace(0, length, stretched_length), np.arange(length), tensor[i])
    return stretched_tensor

    # 데이터 증강 함수
def augment_data(df, num_augmentations=10, noise_level=0.01):
    augmented_features = []
    augmented_labels = []

    for _, row in df.iterrows():
        feature = np.array(row['feature'])
        label = row['label']

        augmented_features.append(feature)  # 원본 데이터 추가
        augmented_labels.append(label)

        for _ in range(num_augmentations - 1):
            augmented_feature = add_noise(feature, noise_level)
            augmented_feature = time_shift(augmented_feature)
            augmented_feature = time_stretch(augmented_feature)
            augmented_features.append(augmented_feature)
            augmented_labels.append(label)

    augmented_df = pd.DataFrame({
        'feature': augmented_features,
        'label': augmented_labels
    })
    return augmented_df

# 데이터 증강 실행
augmented_df = augment_data(df)

In [None]:
# 패딩 함수 정의
def pad_feature(feature, target_shape=(1025, 130), pad_value=-1):
    current_shape = feature.shape
    padded_feature = np.full(target_shape, pad_value)
    if current_shape[1] > target_shape[1]:
        padded_feature[:, :] = feature[:, :target_shape[1]]
    else:
        padded_feature[:, :current_shape[1]] = feature
    return padded_feature

# 각 행마다 패딩 적용
augmented_df['feature'] = augmented_df['feature'].apply(lambda x: pad_feature(np.array(x)))
df['feature'] = df['feature'].apply(lambda x: pad_feature(np.array(x)))

In [None]:
# 레이블 매핑
unique_labels = np.sort(augmented_df['label'].unique())
label_mapping = {old_label: new_label for new_label, old_label in enumerate(unique_labels)}

# 매핑을 적용하여 레이블 변환
augmented_df['label'] = augmented_df['label'].map(label_mapping)
df['label'] = df['label'].map(label_mapping)

# 데이터 및 레이블 준비
X_train = np.stack(augmented_df['feature'].values)
y_train = augmented_df['label'].values

X_test = np.stack(df['feature'].values)
y_test = df['label'].values

# 데이터 형태 조정 (2D CNN 입력 형태로 맞춤)
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]

# 레이블 원-핫 인코딩
num_classes = len(np.unique(y_train))
y_train_categorical = to_categorical(y_train, num_classes)
y_test_categorical = to_categorical(y_test, num_classes)

In [None]:
# 2D CNN 모델 정의
model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(1025, 130, 1)),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(128, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(256, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Flatten(),
    Dense(512, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])

In [None]:
# 모델 컴파일
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Learning rate scheduler 정의
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001)

# 모델 요약 출력
model.summary()

# 모델 훈련
history = model.fit(X_train, y_train_categorical, validation_data=(X_test, y_test_categorical), epochs=50, batch_size=32, callbacks=[reduce_lr])

# 모델 평가
loss, accuracy = model.evaluate(X_test, y_test_categorical)
print(f'Test accuracy: {accuracy}')

# 정확도와 손실 값을 시각화하는 함수
def plot_history(history):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy over Epochs')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss over Epochs')

    plt.show()

# 정확도와 손실 값 시각화
plot_history(history)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 1023, 128, 32)     320       
                                                                 
 batch_normalization (Batch  (None, 1023, 128, 32)     128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 511, 64, 32)       0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 511, 64, 32)       0         
                                                                 
 conv2d_1 (Conv2D)           (None, 509, 62, 64)       18496     
                                                                 
 batch_normalization_1 (Bat  (None, 509, 62, 64)       2

**그래프 분석**

1.
훈련 정확도(Train Accuracy):

훈련 정확도는 빠르게 상승하여 거의 100%에 도달했습니다. 이는 모델이 훈련 데이터에 대해 매우 잘 맞춰지고 있음을 나타냅니다.

2. 검증 정확도(Validation Accuracy):

검증 정확도는 초기에는 상승하다가 약 20-30 에포크 사이에서 급격히 하락하고, (30에 끊어야하나) 이후 매우 불안정한 모습을 보입니다.

3. 훈련 손실(Train Loss):

훈련 손실은 거의 0에 가까워지고, 훈련 데이터에 대해 매우 잘 맞춰지고 있음을 나타냅니다.

4. 검증 손실(Validation Loss):

검증 손실은 초반에 낮다가 약 20-30 에포크 이후 급격히 상승하며 매우 불안정한 모습을 보입니다.



---



이 모델은 과적합이 발생하고 있습니다.
이를 해결하기 위해 다음과 같은 조치를 고려할 수 있습니다:

1. 조기 종료(Early Stopping): 검증 손실이 증가하기 시작할 때 훈련을 중지합니다.

2. 데이터 증강(Data Augmentation): 훈련 데이터를 다양하게 만들어
모델이 더 잘 일반화할 수 있도록 합니다.

3. 정규화(Regularization): 드롭아웃(Dropout)이나 가중치 정규화(Weight Regularization) 기법을 사용하여 모델의 복잡도를 줄입니다.

4. 훈련 데이터 양 증가: 더 많은 훈련 데이터를 수집하여 모델이 더 다양한 데이터를 학습할 수 있도록 합니다.
이러한 방법들을 통해 과적합 문제를 완화시킬 수 있습니다.

In [None]:
import pandas as pd
import numpy as np
import ast
import torch
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ReduceLROnPlateau
import matplotlib.pyplot as plt

# 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

# 데이터 불러오기
df = pd.read_csv('/content/drive/My Drive/soundAI/df_concat.csv')


# 문자열을 실제 텐서로 변환하는 함수
def convert_to_tensor(tensor_str):
    tensor_list = ast.literal_eval(tensor_str)
    tensor = torch.tensor(tensor_list)
    return tensor

# 'feature' 열의 문자열을 실제 텐서로 변환하여 다시 저장
df['feature'] = df['feature'].apply(convert_to_tensor)

# 노이즈 추가 함수 정의
def add_noise(tensor, noise_level=0.01):
    noise = np.random.normal(0, noise_level, tensor.shape)
    return tensor + noise


# 시간 축 이동 함수 정의
def time_shift(tensor, shift_max=2):
    shift = np.random.randint(-shift_max, shift_max)
    return np.roll(tensor, shift, axis=1)

# 시간 축 스케일링 함수 정의
def time_stretch(tensor, stretch_factor=0.2):
    length = tensor.shape[1]
    stretched_length = int(length * (1 + stretch_factor))
    stretched_tensor = np.zeros((tensor.shape[0], stretched_length))
    for i in range(tensor.shape[0]):
        stretched_tensor[i] = np.interp(np.linspace(0, length, stretched_length), np.arange(length), tensor[i])
    return stretched_tensor

    # 데이터 증강 함수
def augment_data(df, num_augmentations=10, noise_level=0.01):
    augmented_features = []
    augmented_labels = []

    for _, row in df.iterrows():
        feature = np.array(row['feature'])
        label = row['label']

        augmented_features.append(feature)  # 원본 데이터 추가
        augmented_labels.append(label)

        for _ in range(num_augmentations - 1):
            augmented_feature = add_noise(feature, noise_level)
            augmented_feature = time_shift(augmented_feature)
            augmented_feature = time_stretch(augmented_feature)
            augmented_features.append(augmented_feature)
            augmented_labels.append(label)

    augmented_df = pd.DataFrame({
        'feature': augmented_features,
        'label': augmented_labels
    })
    return augmented_df

# 데이터 증강 실행
augmented_df = augment_data(df)

# 패딩 함수 정의
def pad_feature(feature, target_shape=(1025, 130), pad_value=-1):
    current_shape = feature.shape
    padded_feature = np.full(target_shape, pad_value)
    if current_shape[1] > target_shape[1]:
        padded_feature[:, :] = feature[:, :target_shape[1]]
    else:
        padded_feature[:, :current_shape[1]] = feature
    return padded_feature

# 각 행마다 패딩 적용
augmented_df['feature'] = augmented_df['feature'].apply(lambda x: pad_feature(np.array(x)))
df['feature'] = df['feature'].apply(lambda x: pad_feature(np.array(x)))

# 레이블 매핑
unique_labels = np.sort(augmented_df['label'].unique())
label_mapping = {old_label: new_label for new_label, old_label in enumerate(unique_labels)}

# 매핑을 적용하여 레이블 변환
augmented_df['label'] = augmented_df['label'].map(label_mapping)
df['label'] = df['label'].map(label_mapping)

# 데이터 및 레이블 준비
X_train = np.stack(augmented_df['feature'].values)
y_train = augmented_df['label'].values

X_test = np.stack(df['feature'].values)
y_test = df['label'].values

# 데이터 형태 조정 (2D CNN 입력 형태로 맞춤)
X_train = X_train[..., np.newaxis]
X_test = X_test[..., np.newaxis]

# 레이블 원-핫 인코딩
num_classes = len(np.unique(y_train))
y_train_categorical = to_categorical(y_train, num_classes)
y_test_categorical = to_categorical(y_test, num_classes)

# 2D CNN 모델 정의
model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(1025, 130, 1)),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(128, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Conv2D(256, kernel_size=(3, 3), activation='relu'),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Dropout(0.25),

    Flatten(),
    Dense(512, activation='relu'),
    BatchNormalization(),
    Dropout(0.5),
    Dense(num_classes, activation='softmax')
])




In [None]:
# 모델 컴파일
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Learning rate scheduler 정의
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.001)

# 모델 요약 출력
model.summary()

# 모델 훈련
history = model.fit(X_train, y_train_categorical, validation_data=(X_test, y_test_categorical), epochs=50, batch_size=32, callbacks=[reduce_lr])

# 모델 평가
loss, accuracy = model.evaluate(X_test, y_test_categorical)
print(f'Test accuracy: {accuracy}')

# 정확도와 손실 값을 시각화하는 함수
def plot_history(history):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Accuracy over Epochs')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Loss over Epochs')

    plt.show()

# 정확도와 손실 값 시각화
plot_history(history)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 1023, 128, 32)     320       
                                                                 
 batch_normalization (Batch  (None, 1023, 128, 32)     128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 511, 64, 32)       0         
 D)                                                              
                                                                 
 dropout (Dropout)           (None, 511, 64, 32)       0         
                                                                 
 conv2d_1 (Conv2D)           (None, 509, 62, 64)       18496     
                                                                 
 batch_normalization_1 (Bat  (None, 509, 62, 64)       2

KeyboardInterrupt: 