In [1]:
import librosa

from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import random
import PIL
from PIL import Image
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

import torch
import torchmetrics
import os
import warnings

warnings.filterwarnings('ignore')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

import torch

print("PyTorch 버전:", torch.__version__)
print("CUDA 버전:", torch.version.cuda)
print("CUDA 사용 가능:", torch.cuda.is_available())
print("cuDNN 사용 가능:", torch.backends.cudnn.enabled)

if torch.cuda.is_available():
    print("GPU 수:", torch.cuda.device_count())
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i} 이름:", torch.cuda.get_device_name(i))
else:
    print("GPU를 찾을 수 없습니다. CPU를 사용합니다.")

print(device)
print("cuDNN 버전:", torch.backends.cudnn.version())

class Config:
    SR = 32000
    #SR = 16000
    N_MFCC = 13
    # Dataset
    ROOT_FOLDER = './'
    # Training
    N_CLASSES = 2
    BATCH_SIZE = 96
    N_EPOCHS = 40
    LR = 3e-4
    # Others
    SEED = 42
    
CONFIG = Config()

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CONFIG.SEED) # Seed 고정

df = pd.read_csv('./train.csv')
train, val, _, _ = train_test_split(df, df['label'], test_size=0.2, random_state=CONFIG.SEED)

PyTorch 버전: 1.12.1+cu116
CUDA 버전: 11.6
CUDA 사용 가능: True
cuDNN 사용 가능: True
GPU 수: 1
GPU 0 이름: NVIDIA GeForce RTX 2060
cuda
cuDNN 버전: 8302


In [2]:
# 길이 확인
mfcc_lengths = []
for _, row in df.iterrows():
    y, sr = librosa.load(row['path'], sr=CONFIG.SR)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CONFIG.N_MFCC)
    mfcc_lengths.append(mfcc.shape[1])
max_len = max(mfcc_lengths)
print(f"Max length: {max_len}")

KeyboardInterrupt: 

In [2]:
# 데이터 증강 함수들
def add_noise(y, noise_factor=0.005):
    noise = np.random.randn(len(y))
    augmented_data = y + noise_factor * noise
    return augmented_data

def change_pitch(y, sr=32000, pitch_factor=2.0):
    return librosa.effects.pitch_shift(y, sr=32000, n_steps=pitch_factor)

def stretch_time(y, rate=1.1):
    return librosa.effects.time_stretch(y, rate=1.1)

In [3]:
# mfcc 추출
def get_mfcc_feature(df, sr=22050, n_mfcc=13, train_mode=True):
    features = []
    labels = []
    for _, row in tqdm(df.iterrows(), total=df.shape[0]):
        # librosa 패키지를 사용하여 wav 파일 load
        y, sr = librosa.load(row['path'], sr=sr)

        # 데이터 증강 적용
        if train_mode:
            augmentations = [add_noise, change_pitch, stretch_time]
            augmentation = random.choice(augmentations)
            if augmentation == change_pitch:
                y = augmentation(y, sr)
            else:
                y = augmentation(y)

        # librosa 패키지를 사용하여 mfcc 추출
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        features.append(mfcc)
        
        if train_mode:
            label = row['label']
            label_vector = np.zeros(CONFIG.N_CLASSES, dtype=float)
            label_vector[0 if label == 'fake' else 1] = 1
            labels.append(label_vector)

    if train_mode:
        return features, labels
    return features

# MFCC 추출
train_mfcc, train_labels = get_mfcc_feature(train, train_mode=True)
val_mfcc, val_labels = get_mfcc_feature(val, train_mode=True)

100%|████████████████████████████████████████████████████████████████████████████| 44350/44350 [29:34<00:00, 24.99it/s]
100%|████████████████████████████████████████████████████████████████████████████| 11088/11088 [07:27<00:00, 24.78it/s]


In [4]:
# padding 을 통해 정규화 ,,평균값으로 정규화를 안해도 이 방식으로 가능
max_len = 1209

def pad_mfcc(mfcc, max_len):
    current_len = mfcc.shape[1]
    pad_width = max_len - current_len
    
    if pad_width < 0:
        # 현재 길이가 최대 길이보다 긴 경우, 최대 길이로 자르기
        mfcc = mfcc[:, :max_len]
    else:
        # 제로 패딩 적용 (constant 모드로 0으로 패딩)
        mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant')
    
    return mfcc
    
def pad_mfcc_list(mfcc_list, max_len):
    padded_features = []
    for mfcc in mfcc_list:
        mfcc_padded = pad_mfcc(mfcc, max_len)
        # 채널 차원 추가
        mfcc_padded = np.expand_dims(mfcc_padded, axis=0)  # (1, n_mfcc, max_len) 형태로 변환
        padded_features.append(mfcc_padded)
    return padded_features

In [5]:
# CustomDataset 정의
class CustomDataset(Dataset):
    def __init__(self, mfcc_features, labels):
        self.mfcc_features = mfcc_features
        self.labels = labels

    def __len__(self):
        return len(self.mfcc_features)

    def __getitem__(self, idx):
        mfcc = self.mfcc_features[idx]
        mfcc = mfcc.squeeze(0)  # 불필요한 차원 제거
        if self.labels is not None:
            label = self.labels[idx]
            return mfcc, label
        return mfcc

# 패딩 적용
train_mfcc_padded = pad_mfcc_list(train_mfcc, max_len)
val_mfcc_padded = pad_mfcc_list(val_mfcc, max_len)

# Numpy 배열로 변환 및 채널 차원 추가
train_mfcc_padded = np.array(train_mfcc_padded)  # 이미 채널 차원이 추가된 상태
val_mfcc_padded = np.array(val_mfcc_padded)      # 이미 채널 차원이 추가된 상태
train_labels = np.array(train_labels)
val_labels = np.array(val_labels)

# CustomDataset 및 DataLoader 준비
train_dataset = CustomDataset(train_mfcc_padded, train_labels)
val_dataset = CustomDataset(val_mfcc_padded, val_labels)

train_loader = DataLoader(
    train_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)

In [6]:
class AudioCNN(nn.Module):
    def __init__(self):
        super(AudioCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.25)
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        
        self.conv5 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(512)
        
        self.conv6 = nn.Conv2d(512, 1024, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(1024)
        
        # Assuming the input size after the convolutions and pooling is (1024, 4, 4) if the input size is appropriately large
        self.fc1 = nn.Linear(1024 * (CONFIG.N_MFCC // 64) * (max_len // 64), 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, CONFIG.N_CLASSES)
        
    def forward(self, x):
        x = self.pool(self.bn1(self.conv1(x)))
        x = self.dropout(x)
        
        x = self.pool(self.bn2(self.conv2(x)))
        x = self.dropout(x)
        
        x = self.pool(self.bn3(self.conv3(x)))
        x = self.dropout(x)
        
        x = self.pool(self.bn4(self.conv4(x)))
        x = self.dropout(x)
        
        x = self.pool(self.bn5(self.conv5(x)))
        x = self.dropout(x)
        
        x = self.pool(self.bn6(self.conv6(x)))
        x = self.dropout(x)
        
        x = x.view(-1, 1024 * (CONFIG.N_MFCC // 64) * (max_len // 64))
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.dropout(torch.relu(self.fc2(x)))
        x = self.fc3(x)
        return x


In [20]:
class AudioCNNLSTM(nn.Module):
    def __init__(self, num_classes=1):
        super(AudioCNNLSTM, self).__init__()
        
        # CNN layers
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # LSTM layer
        self.lstm = nn.LSTM(64 * 3, 128, batch_first=True, bidirectional=True)
        
        # Fully connected layer
        self.fc = nn.Linear(128 * 2, num_classes)

    def forward(self, x):
        # Apply CNN layers
        batch_size = x.size(0)
        x = self.cnn_layers(x)  # (batch_size, 64, 3, 302)
        # print(f"After CNN: {x.shape}")
        
        # Reshape for LSTM layer
        x = x.permute(0, 2, 3, 1)  # (batch_size, 3, 302, 64)
        x = x.reshape(batch_size, 302, 64 * 3)  # (batch_size, 302, 192)
        # print(f"After reshape: {x.shape}")
        
        # Apply LSTM
        x, _ = self.lstm(x)  # (batch_size, 302, 128*2)
        
        # Use the last time step's output for classification
        x = x[:, -1, :]  # (batch_size, 128*2)
        
        # Apply fully connected layer
        x = self.fc(x)  # (batch_size, num_classes)
        return x

In [38]:
class ImprovedAudioCNNLSTM(nn.Module):
    def __init__(self, num_classes=1):
        super(ImprovedAudioCNNLSTM, self).__init__()
        
        self.cnn_layers = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
            nn.Dropout(0.5)  # 드롭아웃 추가
        )

        self.lstm = nn.LSTM(256, 256, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256 * 2, num_classes)

    def forward(self, x):
        batch_size = x.size(0)
        x = self.cnn_layers(x)
        # print(f"After CNN: {x.shape}")  # CNN 레이어 후 데이터 형태와 크기 출력

        # Calculate the new shape for the LSTM layer
        x = x.permute(0, 2, 3, 1)  # (batch_size, height, width, channels)
        x = x.reshape(batch_size, x.size(1) * x.size(2), x.size(3))  # (batch_size, height*width, channels)
        # print(f"After reshape: {x.shape}")  # Reshape 후 데이터 형태와 크기 출력
        
        x, _ = self.lstm(x)
        # print(f"After LSTM: {x.shape}")  # LSTM 레이어 후 데이터 형태와 크기 출력
        
        x = x[:, -1, :]
        x = self.fc(x)
        return x

In [11]:
from sklearn.metrics import roc_auc_score  # AUC 점수 계산을 위한 임포트
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim

# 모델 학습 함수
def train(model, optimizer, train_loader, val_loader, device):
    model.to(device)
    criterion = nn.BCEWithLogitsLoss().to(device)  # BCEWithLogitsLoss는 시그모이드 활성화 포함

    best_val_score = 0
    best_model = None

    for epoch in range(1, CONFIG.N_EPOCHS + 1):
        model.train()
        train_loss = []
        correct = 0
        total = 0

        for features, labels in tqdm(iter(train_loader), desc=f"Training Epoch {epoch}/{CONFIG.N_EPOCHS}"):
            features = features.unsqueeze(1).float().to(device)  # 채널 차원 추가
            labels = labels.float().to(device)
            
            # 레이블 값이 [0, 1] 범위 내에 있는지 확인
            assert labels.min() >= 0 and labels.max() <= 1, "레이블 값이 [0, 1] 범위를 벗어났습니다."

            optimizer.zero_grad()

            output = model(features)
            loss = criterion(output, labels)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

            # 정확도 계산
            predicted = (torch.sigmoid(output.squeeze()) > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        accuracy = 100 * correct / total
        print(f'Epoch [{epoch}], Train Loss: [{_train_loss:.5f}] Val Loss: [{_val_loss:.5f}] Val AUC: [{_val_score:.5f}] Accuracy: [{accuracy:.2f}%]')

        if best_val_score < _val_score:
            best_val_score = _val_score
            best_model = model

    return best_model

# AUC 계산 함수
def multiLabel_AUC(y_true, y_scores):
    auc_scores = []
    for i in range(y_true.shape[1]):
        auc = roc_auc_score(y_true[:, i], y_scores[:, i])
        auc_scores.append(auc)
    mean_auc_score = np.mean(auc_scores)
    return mean_auc_score

# 모델 검증 함수
def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss, all_labels, all_probs = [], [], []
    correct = 0
    total = 0

    with torch.no_grad():
        for features, labels in tqdm(iter(val_loader), desc="Validating"):
            features = features.unsqueeze(1).float().to(device)  # 채널 차원 추가
            labels = labels.float().to(device)

            # 레이블 값이 [0, 1] 범위 내에 있는지 확인
            assert labels.min() >= 0 and labels.max() <= 1, "레이블 값이 [0, 1] 범위를 벗어났습니다."

            probs = model(features)

            loss = criterion(probs, labels)
            val_loss.append(loss.item())

            all_labels.append(labels.cpu().numpy())
            all_probs.append(probs.cpu().numpy())

            # 정확도 계산
            predicted = (torch.sigmoid(probs.squeeze()) > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        _val_loss = np.mean(val_loss)
        accuracy = 100 * correct / total

        all_labels = np.concatenate(all_labels, axis=0)
        all_probs = np.concatenate(all_probs, axis=0)

        # Calculate AUC score
        auc_score = multiLabel_AUC(all_labels, all_probs)

    print(f'Validation Accuracy: {accuracy:.2f}%')
    return _val_loss, auc_score

In [12]:
# 모델 인스턴스화 및 학습
model = AudioCNN()
optimizer = torch.optim.Adam(params=model.parameters(), lr=CONFIG.LR)
criterion = nn.BCEWithLogitsLoss()
best_model = train(model, optimizer, train_loader, val_loader, device)


Training Epoch 1/40:   0%|                                                                     | 0/462 [00:00<?, ?it/s]


RuntimeError: Given input size: (256x1x151). Calculated output size: (256x0x75). Output size is too small

In [39]:
# 모델 인스턴스화 및 학습
model = ImprovedAudioCNNLSTM(num_classes=2)
optimizer = torch.optim.Adam(params=model.parameters(), lr=CONFIG.LR)
criterion = nn.BCEWithLogitsLoss()
best_model = train(model, optimizer, train_loader, val_loader, device)


Training Epoch 1/10:   0%|                                                                     | 0/462 [00:01<?, ?it/s]

KeyboardInterrupt



In [10]:
from tqdm import tqdm
# 학습 루프 전에 레이블 값 확인

# 학습 루프
for epoch in range(CONFIG.N_EPOCHS):
    model.train()
    running_loss = 0.0
    num_batches = len(train_loader)
    correct = 0
    total = 0
    
    # tqdm을 사용하여 진행률 막대 추가
    progress_bar = tqdm(enumerate(train_loader), total=num_batches, desc=f"Epoch {epoch+1}/{CONFIG.N_EPOCHS}")
    
    for i, (features, labels) in progress_bar:
        features = features.unsqueeze(1).float().to(device)  # 채널 차원 추가 및 FloatTensor로 변환
        labels = labels.float().to(device)  # FloatTensor로 변환
        
        optimizer.zero_grad()
        output = model(features)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

        # 정확도 계산
        predicted = (output.squeeze() > 0.5).float()
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
        accuracy = 100 * correct / total
        
        # 진행률 막대에 손실 값과 정확도 업데이트
        progress_bar.set_postfix(loss=running_loss/(i+1), accuracy=accuracy)
    
    epoch_loss = running_loss / num_batches
    print(f"Epoch [{epoch+1}/{CONFIG.N_EPOCHS}], Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.2f}%")

# 평가 루프 (선택적)
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for features, labels in val_loader:
        features = features.unsqueeze(1).float().to(device)
        labels = labels.float().to(device)
        output = model(features)
        predicted = (output.squeeze() > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Validation Accuracy: {accuracy:.2f}%')


Epoch 1/40:   0%|                                                                              | 0/462 [00:01<?, ?it/s]


RuntimeError: Given input size: (256x1x151). Calculated output size: (256x0x75). Output size is too small

In [35]:
test = pd.read_csv('./test.csv')
print(test.head())  # 데이터프레임의 첫 몇 줄을 출력하여 경로가 올바른지 확인
test_mfcc = get_mfcc_feature(test, sr=22050, train_mode=False)
test_dataset = CustomDataset(test_mfcc, None)
test_loader = DataLoader(
    test_dataset,
    batch_size=CONFIG.BATCH_SIZE,
    shuffle=False
)


           id                   path
0  TEST_00000  ./test/TEST_00000.ogg
1  TEST_00001  ./test/TEST_00001.ogg
2  TEST_00002  ./test/TEST_00002.ogg
3  TEST_00003  ./test/TEST_00003.ogg
4  TEST_00004  ./test/TEST_00004.ogg


100%|██████████████████████████████████████████████████████████████████████████| 50000/50000 [1:21:56<00:00, 10.17it/s]


In [None]:
# 패딩 적용
max_len = 1209  # 적절한 max_len 설정
test_mfcc_padded = pad_mfcc_list(test_mfcc, max_len)

# Numpy 배열로 변환
test_mfcc_padded = np.array(test_mfcc_padded)

# CustomDataset 및 DataLoader 정의
test_dataset = CustomDataset(test_mfcc_padded, None)
test_loader = DataLoader(test_dataset, batch_size=CONFIG.BATCH_SIZE, shuffle=False)

# 모델 정의
model = AudioCNNLSTM(num_classes=2).to(device)
optimizer = torch.optim.Adam(params=model.parameters(), lr=CONFIG.LR)
criterion = nn.BCEWithLogitsLoss()

# 추론 함수
def inference(model, test_loader, device):
    model.to(device)
    model.eval()
    predictions = []
    with torch.no_grad():
        for features in tqdm(test_loader):
            features = features.unsqueeze(1).float().to(device)  # 채널 차원 추가 및 FloatTensor로 변환
            probs = model(features)
            probs = probs.cpu().detach().numpy()
            predictions += probs.tolist()
    return predictions


In [None]:
# 예측 실행
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
preds = inference(model, test_loader, device)

submit = pd.read_csv('./sample_submission.csv')
submit.iloc[:, 1:] = preds
submit.head()
submit.to_csv('./submit_CNNLSTMdeep_dataPlus_1.csv', index=False)

In [23]:
print(f'Train MFCC shape: {train_mfcc_padded.shape}')
print(f'Validation MFCC shape: {val_mfcc_padded.shape}')
print(f'Train labels shape: {train_labels.shape}')
print(f'Validation labels shape: {val_labels.shape}')

Train MFCC shape: (44350, 1, 13, 1209)
Validation MFCC shape: (11088, 1, 13, 1209)
Train labels shape: (44350, 2)
Validation labels shape: (11088, 2)
