In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import torch
import librosa
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from transformers import Wav2Vec2Model, Wav2Vec2Processor
from tqdm.notebook import tqdm
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import zipfile
# 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델 및 프로세서 로드
model_name = "facebook/wav2vec2-large-960h-lv60-self"
processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2Model.from_pretrained(model_name, torch_dtype=torch.float16).to(device)

# 오디오 데이터셋 클래스
class AudioDataset(Dataset):
    def __init__(self, directory, processor, sr=16000, target_length=16000*4):
        self.directory = directory
        self.processor = processor
        self.sr = sr
        self.target_length = target_length
        self.audio_labels = []
        self.audio_data = []

        folder_list = os.listdir(directory)
        for label in tqdm(folder_list, desc="Processing folders", leave=True):
            label_dir = os.path.join(directory, label)
            if os.path.isdir(label_dir):
                file_list = os.listdir(label_dir)
                for filename in tqdm(file_list, desc=f"Loading files in {label}", leave=False, mininterval=1):
                    if filename.endswith('.mp3') or filename.endswith('.wav'):
                        file_path = os.path.join(label_dir, filename)
                        audio, _ = librosa.load(file_path, sr=sr)

                        # 특정 길이를 초과하면 잘라내기
                        if len(audio) > target_length:
                            audio = audio[:target_length]
                        # 특정 길이에 모자르면 패딩
                        elif len(audio) < target_length:
                            padding = target_length - len(audio)
                            audio = np.pad(audio, (0, padding), mode='constant')

                        self.audio_data.append(audio)
                        self.audio_labels.append(label)

        self.label_to_index = {label: idx for idx, label in enumerate(sorted(set(self.audio_labels)))}
        self.indexed_labels = [self.label_to_index[label] for label in self.audio_labels]

    def __len__(self):
        return len(self.audio_data)

    def __getitem__(self, idx):
        audio = self.audio_data[idx]
        label = self.indexed_labels[idx]
        inputs = self.processor(audio, sampling_rate=self.sr, return_tensors="pt", padding=True).input_values.squeeze(0)
        return inputs, torch.tensor(label)

Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-large-960h-lv60-self and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# ZIP 파일 열기 및 압축 풀기
zip_file_path = '/content/drive/MyDrive/sound.zip'
unzip_dir = '/content/'
os.makedirs(unzip_dir, exist_ok=True)
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_dir)

In [None]:
# 데이터셋 준비 및 데이터 로더 구성
train_directory = '/content/sound/train'
test_directory = '/content/sound/test'

train_dataset = AudioDataset(train_directory, processor)
test_dataset = AudioDataset(test_directory, processor)

train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False)

Processing folders:   0%|          | 0/2 [00:00<?, ?it/s]

Loading files in not_hornet:   0%|          | 0/985 [00:00<?, ?it/s]

Loading files in hornet:   0%|          | 0/243 [00:00<?, ?it/s]

Processing folders:   0%|          | 0/2 [00:00<?, ?it/s]

Loading files in not_hornet:   0%|          | 0/251 [00:00<?, ?it/s]

Loading files in hornet:   0%|          | 0/67 [00:00<?, ?it/s]

In [None]:
# 분류기 정의 및 설정
class AudioClassifier(torch.nn.Module):
    def __init__(self, feature_dim, num_classes):
        super(AudioClassifier, self).__init__()
        self.fc = torch.nn.Linear(feature_dim, num_classes)

    def forward(self, x):
        x = self.fc(x)
        return x

num_new_classes = len(set(train_dataset.audio_labels))  # 새로운 클래스 수
classifier = AudioClassifier(1024, num_new_classes).to(device)

# 손실 함수 및 옵티마이저
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)



In [None]:
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

In [None]:
import torch
from sklearn.metrics import confusion_matrix

def train_and_evaluate(model, classifier, device, train_loader, test_loader, criterion, optimizer, epochs=100):
    model.eval()  # 피처 추출용
    classifier.train()

    train_losses = []
    test_losses = []

    for epoch in range(epochs):
        # 훈련 루프
        total_train_loss = 0
        classifier.train()

        all_train_preds = []
        all_train_labels = []

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            with torch.no_grad():
                features = model(inputs.half()).last_hidden_state

            logits = classifier(features.mean(dim=1).float())
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

            # 예측값과 실제 레이블 저장 (훈련 데이터)
            preds = logits.argmax(dim=1)
            all_train_preds.extend(preds.cpu().numpy())
            all_train_labels.extend(labels.cpu().numpy())

        train_losses.append(total_train_loss / len(train_loader))

        # 테스트 루프
        total_test_loss = 0
        classifier.eval()

        all_test_preds = []
        all_test_labels = []

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                features = model(inputs.half()).last_hidden_state
                logits = classifier(features.mean(dim=1).float())
                loss = criterion(logits, labels)
                total_test_loss += loss.item()

                # 예측값과 실제 레이블 저장 (테스트 데이터)
                preds = logits.argmax(dim=1)
                all_test_preds.extend(preds.cpu().numpy())
                all_test_labels.extend(labels.cpu().numpy())

        test_losses.append(total_test_loss / len(test_loader))

        # 컨퓨전 메트릭스 출력
        train_cm = confusion_matrix(all_train_labels, all_train_preds)
        test_cm = confusion_matrix(all_test_labels, all_test_preds)

        print(f"Epoch: {epoch+1}, Train Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}")
        print(f"Training Confusion Matrix:\n{train_cm}\n")
        print(f"Testing Confusion Matrix:\n{test_cm}\n")


# 모델 및 다른 파라미터들 초기화 후 실행
train_and_evaluate(model, classifier, device, train_loader, test_loader, criterion, optimizer)


In [None]:
# 결과 시각화
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Test Losses')
plt.show()



In [None]:
# 예측 및 실제 레이블을 저장할 리스트
true_labels = []
predicted_labels = []

classifier.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        features = model(inputs.half()).last_hidden_state
        logits = classifier(features.mean(dim=1).float())
        predictions = torch.argmax(logits, dim=1)

        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predictions.cpu().numpy())

# 컨퓨전 메트릭스 계산
cm = confusion_matrix(true_labels, predicted_labels)
fig, ax = plt.subplots(figsize=(8, 8))
sns.heatmap(cm, annot=True, fmt='d', ax=ax, cmap='Blues')
ax.set_xlabel('Predicted Labels')
ax.set_ylabel('True Labels')
ax.set_title('Confusion Matrix')
plt.show()

# 분류 리포트 출력
print(classification_report(true_labels, predicted_labels))

In [None]:
# 최종 모델 저장
model_save_path = '/content/model.pth'
torch.save(classifier.state_dict(), model_save_path)

print("Training and evaluation complete.")