In [3]:
import os
import numpy as np
import torch
import torchaudio
import librosa


# 数据集路径
dataset_root = "D:\\paper-dataset\\mute&cut"

# 4 个类别
categories = ["Tug", "Cargo", "Passengership", "Tanker"]

# 递归加载数据
def load_dataset(split):
    dataset_path = os.path.join(dataset_root, split)
    file_paths, labels = [], []
    
    if not os.path.exists(dataset_path):
        raise ValueError(f"错误: 数据集路径 {dataset_path} 不存在!")

    for label, category in enumerate(categories):
        category_path = os.path.join(dataset_path, category)

        if not os.path.exists(category_path):
            print(f"⚠️ 警告: 类别目录 {category_path} 不存在，跳过!")
            continue

        for root, _, files in os.walk(category_path):
            for file in files:
                if file.endswith(".wav"):
                    file_paths.append(os.path.join(root, file))
                    labels.append(label)

    print(f"📊 {split} 数据集加载完成: {len(file_paths)} 个文件，类别: {set(labels)}")
    return file_paths, labels

# 读取 train, test, validation 数据集
train_files, train_labels = load_dataset("train")
test_files, test_labels = load_dataset("test")
val_files, val_labels = load_dataset("validation")

# 打印数据集大小
print(f"Train 数据集: {len(train_files)} 文件")
print(f"Test 数据集: {len(test_files)} 文件")
print(f"Validation 数据集: {len(val_files)} 文件")


📊 train 数据集加载完成: 9831 个文件，类别: {0, 1, 2, 3}
📊 test 数据集加载完成: 2808 个文件，类别: {0, 1, 2, 3}
📊 validation 数据集加载完成: 1408 个文件，类别: {0, 1, 2, 3}
Train 数据集: 9831 文件
Test 数据集: 2808 文件
Validation 数据集: 1408 文件


In [6]:
# 创建 Preprocessor 实例
class Preprocessor:
    def __init__(self):
        self.n_mfcc = 60  # MFCC 维度
        self.n_mels = 60  # Mel 频谱图维度
        self.sr = 22050   # 采样率

    def extract_mfcc(self, waveform):
        mfcc_feature = librosa.feature.mfcc(y=waveform, sr=self.sr, n_mfcc=self.n_mfcc, hop_length=512)
        return torch.tensor(mfcc_feature)   

    def extract_log_mel(self, waveform):
        mel_spectrogram = librosa.feature.melspectrogram(y=waveform, sr=self.sr, n_mels=self.n_mels, hop_length=512)
        log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
        return torch.tensor(log_mel_spectrogram)

    def extract_cctz(self, waveform):
        chroma = librosa.feature.chroma_stft(y=waveform, sr=self.sr, hop_length=512)
        contrast = librosa.feature.spectral_contrast(y=waveform, sr=self.sr, hop_length=512)
        tonnetz = librosa.feature.tonnetz(y=waveform, sr=self.sr, hop_length=512)
        zero_cross_rate = librosa.feature.zero_crossing_rate(waveform, hop_length=512)

        cctz_features = torch.cat([
            torch.tensor(chroma), 
            torch.tensor(contrast), 
            torch.tensor(tonnetz), 
            torch.tensor(zero_cross_rate)
        ], dim=0)
        return cctz_features

    def stack_features(self, waveform):
        mfcc_feature = self.extract_mfcc(waveform=waveform)
        log_mel_feature = self.extract_log_mel(waveform=waveform)
        cctz_feature = self.extract_cctz(waveform=waveform)
        
        max_feature_dim = max(mfcc_feature.size(0), log_mel_feature.size(0), cctz_feature.size(0))

        mfcc_tensor = torch.nn.functional.pad(mfcc_feature, (0, 0, 0, max_feature_dim - mfcc_feature.size(0)))
        log_mel_tensor = torch.nn.functional.pad(log_mel_feature, (0, 0, 0, max_feature_dim - log_mel_feature.size(0)))
        cctz_tensor = torch.nn.functional.pad(cctz_feature, (0, 0, 0, max_feature_dim - cctz_feature.size(0)))
        
        stacked_features = torch.stack([mfcc_tensor, log_mel_tensor, cctz_tensor], dim=0)
        return stacked_features

feature_extractor = Preprocessor()

def extract_features(file_list):
    features = []
    for file_path in file_list:
        waveform, sr = librosa.load(file_path, sr=feature_extractor.sr)
        feature = feature_extractor.stack_features(waveform)
        features.append(feature)
    return features

# 提取特征
train_features = extract_features(train_files)
test_features = extract_features(test_files)
val_features = extract_features(val_files)

  return pitch_tuning(
  return pitch_tuning(


In [7]:
import torch
import torch.nn as nn
import torchaudio.transforms as AT

class SpecTransform(nn.Module):
    def __init__(self):
        super(SpecTransform, self).__init__()
        sr = 22050  # 샘플링 주파수 설정
        
        # 시간 마스킹과 주파수 마스킹 변환 정의
        self.time = AT.TimeMasking(time_mask_param=3)  # 시간 마스크 크기 3
        self.freq = AT.FrequencyMasking(freq_mask_param=5)  # 주파수 마스크 크기 5

    def forward(self, spec):
        # 스펙트로그램에 시간 마스크와 주파수 마스크를 적용
        spec = self.time(spec)
        spec = self.freq(spec)
        return spec

# 변환 객체 생성
spec = SpecTransform()


In [8]:
# 오디오 데이터셋 클래스 정의
from torch.utils.data import Dataset

class AudioDataset(Dataset):
    def __init__(self, file_paths, label, n_mfcc=60):
        self.file_paths = file_paths  # 오디오 파일 경로
        self.label = label  # 레이블
        self.n_mfcc = n_mfcc  # MFCC 차원 설정

    def __len__(self):
        return len(self.file_paths)  # 데이터셋 크기 반환

    def __getitem__(self, idx):
        # 주어진 인덱스의 오디오 파일을 가져와서 랜덤 프레임을 추출
        audio = self.file_paths[idx]
        audio = np.array(audio)
        extracted_frame = extract_random_frames(audio)  # 랜덤 프레임 추출
        
        # 특징 추출
        result = feature_extract.stack_features(waveform=extracted_frame)
        
        # 스펙트로그램에 변환 적용
        result[1] = spec(result[1])
        
        # 레이블 반환
        label = int(self.label[idx])
        return result.float(), label


In [9]:
from torch.utils.data import DataLoader

# 训练数据集
train_dataset = AudioDataset(train_files, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)

# 验证数据集
val_dataset = AudioDataset(val_files, val_labels)
val_dataloader = DataLoader(val_dataset, batch_size=128, shuffle=False, num_workers=4)

# 测试数据集
test_dataset = AudioDataset(test_files, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4)


In [None]:
# 数据集大小检查
train_dataset[0][0].shape  # 检查第一个样品的大小

NameError: name 'extract_random_frames' is not defined

In [11]:
# 培训和验证数据集尺寸检查
len(train_dataset), len(val_dataset)  # 培训和验证数据集的大小

(9831, 1408)

In [12]:
import torch.nn.functional as F

class ChannelAttention(nn.Module):
    def __init__(self, in_channels, reduction_ratio=4):
        super(ChannelAttention, self).__init__()
        
        # 평균 풀링과 최대 풀링을 사용하여 채널 중요도 계산
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        
        # 채널 수를 줄였다가 다시 확장하는 두 개의 1x1 컨볼루션을 사용
        self.fc = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // reduction_ratio, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(in_channels // reduction_ratio, in_channels, 1, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        # 평균 풀링과 최대 풀링을 통해 각 채널에 대한 중요도를 계산
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        
        # 각 채널의 중요도를 합산하고 입력에 가중치를 적용
        out = avg_out + max_out
        out = out * x
        return F.relu(out)  # ReLU 활성화 함수 적용

In [13]:
class Stage1(nn.Module):
    def __init__(self):
        super(Stage1, self).__init__()

        # 첫 번째 컨볼루션 + 배치 정규화 + GELU 활성화 함수
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.GELU()
        )
        
        # 채널 어텐션
        self.cam1 = ChannelAttention(64)
        # 맥스풀링
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    def forward(self, x):
        # 첫 번째 컨볼루션 및 채널 어텐션 적용
        out = self.conv1(x)
        out = self.cam1(out)
        out = self.maxpool(out)
        return out


In [14]:
class AConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(AConvBlock, self).__init__()

        # 메인 브랜치: 여러 개의 7x7 컨볼루션과 배치 정규화
        self.main_branch = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=7, stride=stride, padding=3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(out_channels, out_channels, kernel_size=7, stride=1, padding=3),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(out_channels, out_channels * 8, kernel_size=7, stride=1, padding=3),
            nn.BatchNorm2d(out_channels * 8)
        )
        
        # Shortcut: 입력과 출력의 차원이 다르면 1x1 컨볼루션을 사용해 맞춰줌
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * 8:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * 8, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * 8)
            )
        
        # 채널 어텐션
        self.cam = ChannelAttention(out_channels * 8)

    def forward(self, x):
        # 메인 브랜치를 통과한 결과
        out = self.main_branch(x)
        # shortcut 연결 추가
        out += self.cam(self.shortcut(x))
        return F.relu(out)  # ReLU 활성화 함수 적용


In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResNet(nn.Module):
    def __init__(self, num_classes=4):
        super(ResNet, self).__init__()

        # 네트워크의 각 단계 정의
        self.conv1_x = Stage1()
        self.conv2_x = AConvBlock(64, 32)  # 입력 채널 64, 출력 채널 32
        self.conv3_x = AConvBlock(32*8, 32)
        self.conv4_x = AConvBlock(32*8, 32)
        self.conv5_x = AConvBlock(32*8, 64)  # 입력 채널 32*8, 출력 채널 64
        self.conv6_x = AConvBlock(64*8, 64)

        # Fully connected layers
        self.fc1 = nn.Linear(64*8, 24)
        self.fc2 = nn.Linear(24, num_classes)

        # Average pooling 및 PReLU
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.prelu = nn.PReLU()

    def forward(self, x):
        out = self.conv1_x(x)
        out = self.conv2_x(out)
        out = self.conv3_x(out)
        out = self.conv4_x(out)
        out = self.conv5_x(out)
        out = self.conv6_x(out)
        
        # Average pooling 후 flatten
        out = self.avgpool(out)
        out = out.view(out.size(0), -1)
        
        # Fully connected layers
        out = self.fc1(out)
        x = self.prelu(out)
        y = self.fc2(x)
        
        return x, y  # x는 중간 출력, y는 최종 출력


In [17]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = ResNet().to(device)  # 모델을 지정된 장치로 이동

In [18]:
class CenterLoss(nn.Module):
    """
    Center loss.
    
    Reference:
    Wen et al. A Discriminative Feature Learning Approach for Deep Face Recognition. ECCV 2016.
    
    Args:
        num_classes (int): number of classes.
        feat_dim (int): feature dimension.
    """
    def __init__(self, num_classes=10, feat_dim=256, use_gpu=True):
        super(CenterLoss, self).__init__()
        self.num_classes = num_classes
        self.feat_dim = feat_dim
        self.use_gpu = use_gpu

        # 클래스 중심 초기화
        if self.use_gpu:
            self.centers = nn.Parameter(torch.randn(self.num_classes, self.feat_dim).cuda())
        else:
            self.centers = nn.Parameter(torch.randn(self.num_classes, self.feat_dim))

    def forward(self, x, labels):
        """
        Args:
            x: feature matrix with shape (batch_size, feat_dim).
            labels: ground truth labels with shape (batch_size).
        """
        batch_size = x.size(0)

        # 각 클래스 중심과의 거리 계산
        distmat = torch.pow(x, 2).sum(dim=1, keepdim=True).expand(batch_size, self.num_classes) + \
                  torch.pow(self.centers, 2).sum(dim=1, keepdim=True).expand(self.num_classes, batch_size).t()
        distmat.addmm_(1, -2, x, self.centers.t())

        # 레이블과 일치하는 위치 마스크 생성
        classes = torch.arange(self.num_classes).long()
        if self.use_gpu: 
            classes = classes.cuda()
        labels = labels.unsqueeze(1).expand(batch_size, self.num_classes)
        mask = labels.eq(classes.expand(batch_size, self.num_classes))

        # 거리 계산 후 손실 값 구하기
        dist = distmat * mask.float()
        loss = dist.clamp(min=1e-12, max=1e+12).sum() / batch_size

        return loss

In [19]:
from torch.optim import Adam, SGD
import torch.nn as nn

# 손실 함수와 CenterLoss 초기화
criterion = nn.CrossEntropyLoss().to(device)  # CrossEntropyLoss는 분류 문제에서 사용
center = CenterLoss(4, 24).to(device)  # 4개의 클래스, feature dimension 24

# 옵티마이저 설정
opti1 = Adam(model.parameters(), lr=1e-4, weight_decay=5e-4)  # 모델 파라미터에 대해 Adam 옵티마이저 사용
opti2 = SGD(center.parameters(), lr=0.5)  # CenterLoss 파라미터에 대해 SGD 사용

# 학습률 스케줄러 설정
scheduler1 = torch.optim.lr_scheduler.StepLR(opti1, step_size=10, gamma=0.5)  # 10번째 epoch마다 lr을 0.5배로 감소
scheduler2 = torch.optim.lr_scheduler.StepLR(opti2, step_size=10, gamma=0.5)


In [20]:
from tqdm import tqdm

def train(model, dataloader, criterion, data_len, opti1, opti2):
    correct = 0
    losses = 0

    model.train()  # 모델을 학습 모드로 설정
    for data, target in tqdm(dataloader):  # 배치 단위로 데이터를 순차적으로 가져옴
        data = data.to(device)  # 데이터를 device(CPU/GPU)로 이동
        target = target.to(device)  # 타겟 레이블을 device로 이동
        
        cen, output = model(data)  # 모델의 출력값과 특징 벡터 계산
        loss1 = criterion(output, target)  # CrossEntropyLoss 계산
        loss2 = center(cen, target)  # CenterLoss 계산
        loss = loss1 + loss2  # 두 손실을 합산

        opti1.zero_grad()  # 옵티마이저1의 기울기를 0으로 초기화
        opti2.zero_grad()  # 옵티마이저2의 기울기를 0으로 초기화
        loss.backward()  # 역전파 계산
        opti1.step()  # 옵티마이저1 파라미터 업데이트
        opti2.step()  # 옵티마이저2 파라미터 업데이트

        # 예측값을 구하고 정확도 계산
        pred = output.max(1, keepdim=True)[1]  # 예측값의 인덱스 추출
        correct += pred.eq(target.view_as(pred)).sum().item()  # 정확도 계산
        losses += loss.item()  # 손실 값 합산

    # 학습률 스케줄러를 업데이트
    scheduler1.step()
    scheduler2.step()

    # 정확도와 평균 손실 반환
    return 100 * correct / data_len, losses / data_len

In [21]:
def evaluate(model, dataloader, criterion, data_len):
    correct = 0

    model.eval()  # 모델을 평가 모드로 설정
    with torch.no_grad():  # 평가 시 기울기 계산을 하지 않음
        for data, target in dataloader:  # 데이터 로더에서 배치 단위로 데이터를 가져옴
            data = data.to(device)  # 데이터를 device로 이동
            target = target.to(device)  # 타겟 레이블을 device로 이동

            _, output = model(data)  # 모델 출력 계산
            loss = criterion(output, target)  # 손실 계산

            # 예측값을 구하고 정확도 계산
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    # 정확도 계산
    acc = 100. * correct / data_len
    return acc

In [None]:
epoch = 300

train_accuracies = []
val_accuracies = []

for i in range(epoch):
    # Training the model
    train_acc, train_loss = train(model, train_dataloader, criterion, len(train_dataloader.dataset), opti1, opti2)
    
    # Evaluating the model on validation data
    val_acc = evaluate(model, val_dataloader, criterion, len(val_dataloader.dataset))
    
    # Uncomment the line below if you want to evaluate on test data
    # test_acc = evaluate(model, test_dataloader, criterion, len(test_dataloader.dataset))

    # Storing the accuracies
    train_accuracies.append(train_acc)
    val_accuracies.append(val_acc)

    # Printing the results for the current epoch
    print(f"[Epoch: {i+1}], [Validation Acc: {val_acc:.4f}]")
    print(f"train_acc: {train_acc}, train_loss: {train_loss}")


  0%|          | 0/77 [00:00<?, ?it/s]

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import precision_score, recall_score, f1_score
import torch
import numpy as np

# Plotting training and validation accuracies
plt.figure(figsize=(10, 5))
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()
plt.title('Training and Validation Accuracies')
plt.show()

# Setting up the device and loading the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet().to(device)
model.load_state_dict(torch.load('./deepship7.pt'))

# Function to evaluate precision, recall, and F1-score
def evaluate_metrics(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for data, target in dataloader:
            data = data.to(device)
            target = target.to(device)
            
            _, output = model(data)
            pred = output.argmax(dim=1)  # Get the class with the highest score
            
            all_preds.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    # Calculate precision, recall, and F1-score for each class
    precision = precision_score(all_targets, all_preds, average=None)
    recall = recall_score(all_targets, all_preds, average=None)
    f1 = f1_score(all_targets, all_preds, average=None)

    # Calculate average precision, recall, and F1-score
    avg_precision = precision.mean()
    avg_recall = recall.mean()
    avg_f1 = f1.mean()

    return precision, recall, f1, avg_precision, avg_recall, avg_f1

# Evaluate metrics on the test dataset
class_precision, class_recall, class_f1, avg_precision, avg_recall, avg_f1 = evaluate_metrics(model, test_dataloader)

# Print performance for each class
for i in range(len(class_precision)):
    print(f"Class {i} - Precision: {class_precision[i]:.4f}, Recall: {class_recall[i]:.4f}, F1-score: {class_f1[i]:.4f}")

# Print average performance across all classes
print(f"Avg Precision: {avg_precision:.4f}, Avg Recall: {avg_recall:.4f}, Avg F1-score: {avg_f1:.4f}")
