In [None]:
import os
from google.colab import drive
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:
dataSetPath = []
drive.mount('/content/drive')
save_path = "/content/drive/MyDrive/"
train_file_path = os.path.join(save_path, "train_paths.txt")
test_file_path = os.path.join(save_path, "test_paths.txt")
train_paths = []
test_paths = []
# Read the train paths from the file
with open(train_file_path, "r") as train_file:
    for line in train_file:
        train_paths.append(line.strip())  # Remove newline characters

# Read the test paths from the file
with open(test_file_path, "r") as test_file:
    for line in test_file:
        test_paths.append(line.strip())  # Remove newline characters

In [None]:
print(len(train_paths))
print(len(test_paths))

In [None]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader

In [None]:
class WavDataset(Dataset):
    def __init__(self, file_list, processor, max_length=10000):
        self.file_list = file_list
        self.max_length = max_length
        self.processor = processor
        self.transform = torchaudio.transforms.Resample(orig_freq=44100, new_freq=16000)  # 目标采样率为16000Hz

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file_path = self.file_list[idx]
        audio_input, sample_rate = torchaudio.load(file_path)

        if sample_rate != 16000:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
            audio_input = resampler(audio_input)

        # if waveform.shape[1] > self.max_length:
        #     waveform = waveform[:, :self.max_length]
        #     #print('截取长度')
        audio_label = file_path.split('/')[-1].split('_')[1][0]
        if audio_label == 'C':
            label = 0
        elif audio_label == 'P':
            label = 1
        else:
            label = audio_label
        input_values = self.processor(audio_input.squeeze(0).numpy(), sampling_rate=16000, return_tensors="pt").input_values
        input_values = input_values.squeeze(0)

        if input_values.size(0) < 10000:
            pad_length = 10000 - input_values.size(0)
            input_values = torch.nn.functional.pad(input_values, (0, pad_length), mode='constant', value=0)
        return {'audioinfo': input_values, 'label': label}

In [None]:
import os
import torch
import torchaudio
from transformers import Wav2Vec2Model, Wav2Vec2Processor
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from tqdm import tqdm
import matplotlib.pyplot as plt

In [None]:

model_name = "facebook/wav2vec2-base-960h"
processor = Wav2Vec2Processor.from_pretrained(model_name)
wav2vec2_model = Wav2Vec2Model.from_pretrained(model_name)

In [None]:
train_dataset = WavDataset(train_paths,processor)
test_dataset = WavDataset(test_paths,processor)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=False)
val_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
num = 0
for batch in val_loader:
    num = num + 1
    break

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, base_model, hidden_size=128, num_layers=2, dropout_rate=0.1, pooling_type='min'):
        super(LSTMClassifier, self).__init__()
        self.wav2vec2 = base_model
        self.pooling_type = pooling_type


        self.lstm = nn.LSTM(
            input_size=self.wav2vec2.config.hidden_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout_rate if num_layers > 1 else 0
        )

        # Layer Normalization
        self.layer_norm = nn.LayerNorm(hidden_size)


        self.dropout = nn.Dropout(dropout_rate)


        self.classifier = nn.Linear(hidden_size, 2)

    def forward(self, input_values):
        with torch.no_grad():
            features = self.wav2vec2(input_values).last_hidden_state

        lstm_output, _ = self.lstm(features)


        if self.pooling_type == 'mean':
            pooled_output = torch.mean(lstm_output, dim=1)
        elif self.pooling_type == 'max':
            pooled_output, _ = torch.max(lstm_output, dim=1)
        else:
            raise ValueError(f"Unsupported pooling type: {self.pooling_type}")

        # pooled_output = self.layer_norm(pooled_output)
        # pooled_output = self.dropout(pooled_output)

        logits = self.classifier(pooled_output)

        return logits


classifier_model = LSTMClassifier(wav2vec2_model, pooling_type='mean')
total_params = sum(p.numel() for p in classifier_model.parameters())
print(f"Total Parameters: {total_params}")


total_size = sum(p.numel() * p.element_size() for p in classifier_model.parameters())
total_size_mb = total_size / (1024 ** 2)  # 转换为MB
print(f"Model Size: {total_size_mb:.2f} MB")

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, recall_score, f1_score, precision_score
import numpy as np
import seaborn as sns

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()
save_path = "/content/drive/MyDrive/Pretrained"
model_weights_path = os.path.join(save_path, 'Wav2vec-final.pth')

#Ensure the model architecture is defined before loading
classifier_model.load_state_dict(torch.load(model_weights_path))

classifier_model.to(device)

all_preds = []
all_labels = []
correct = 0
# Inside your loop
with torch.no_grad():
    for data in tqdm(val_loader):
        input_values = data['audioinfo'].float().to(device)
        labels = data['label'].long().to(device)

        outputs = classifier_model(input_values)
        loss = criterion(outputs, labels)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()

        # Store predictions and labels
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Convert lists to numpy arrays
all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

# Calculate confusion matrix
cm = confusion_matrix(all_labels, all_preds)
print("Confusion Matrix:\n", cm)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='g', cmap='Blues', cbar=False)


plt.xlabel('Predicted labels')
plt.ylabel('True labels')


plt.title('End-to-end Model Confusion Matrix')
plt.xticks(ticks=[0.5, 1.5], labels=['Negative', 'Positive'])
plt.yticks(ticks=[0.5, 1.5], labels=['Negative', 'Positive'], rotation=0)


plt.show()
# Calculate accuracy, recall, and F1 score
accuracy = accuracy_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds, average='macro')  # Use 'micro' or 'weighted' if needed
f1 = f1_score(all_labels, all_preds, average='macro')  # Use 'micro' or 'weighted' if needed
precision = precision_score(all_labels, all_preds, average='macro')  # Use 'micro' or 'weighted' if needed

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

In [None]:

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(classifier_model.parameters(), lr=1e-4)


def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # Initialize lists to store metrics
    train_losses = []
    val_losses = []
    val_accuracies = []

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0

        for data in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            input_values = data['audioinfo'].float().to(device)
            labels = data['label'].long().to(device)

            optimizer.zero_grad()
            outputs = model(input_values)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * input_values.size(0)

        train_loss /= len(train_loader.dataset)
        train_losses.append(train_loss)  # Store train loss

        model.eval()
        val_loss = 0.0
        correct = 0

        with torch.no_grad():
            for data in tqdm(val_loader, desc=f"Testing Epoch {epoch+1}"):
                input_values = data['audioinfo'].float().to(device)
                labels = data['label'].long().to(device)

                outputs = model(input_values)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * input_values.size(0)

                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()


        val_loss /= len(val_loader.dataset)
        val_accuracy = correct / len(val_loader.dataset)

        val_losses.append(val_loss)  # Store validation loss
        val_accuracies.append(val_accuracy)  # Store validation accuracy

        print(f"Epoch {epoch+1}/{num_epochs}:")
        print(f"Training Loss: {train_loss:.4f}")
        print(f"Validation Loss: {val_loss:.4f}")
        print(f"Validation Accuracy: {val_accuracy:.4f}")
        save_path = "/content/drive/MyDrive/Pretrained"
        os.makedirs(save_path, exist_ok=True)
        torch.save(classifier_model.state_dict(), os.path.join(save_path, f'Wav2vec-final.pth'))
        if val_accuracy > 0.795:
          print("Finish training")
          break


    # Plotting the metrics
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 3, 1)
    plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.grid(True)

    plt.subplot(1, 3, 2)
    plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss', color='orange')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Validation Loss')
    plt.grid(True)

    plt.subplot(1, 3, 3)
    plt.plot(range(1, num_epochs + 1), val_accuracies, label='Validation Accuracy', color='green')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Validation Accuracy')
    plt.grid(True)

    plt.tight_layout()
    plt.show()
save_path = "/content/drive/MyDrive/Pretrained"
model_weights_path = os.path.join(save_path, 'Wav2vec-final.pth')

#Ensure the model architecture is defined before loading
classifier_model.load_state_dict(torch.load(model_weights_path))

train_model(classifier_model, train_loader, val_loader, criterion, optimizer, num_epochs=50)