In [None]:
import os


labels = []
all_files = []
path = 'D:/vibration-3classes'
labels_list = os.listdir(path)
print(labels_list)        
for i in labels_list:
    wavs_path = os.path.join(path,i)
    second_dirs = os.listdir(wavs_path)
    for csv in second_dirs:
        labels.append(i)
        csvFilePath = wavs_path+'/'+csv
        all_files.append(csvFilePath)
print(labels)
print(len(all_files))

In [None]:
from vmdpy import VMD
import pywt
import numpy as np
import pandas as pd
import os

def GetAllFiles(path):
    all_files = []
    labels_list = os.listdir(path)
    for i in labels_list:
        wavs_path = os.path.join(path,i)
        second_dirs = os.listdir(wavs_path)
        for csv in second_dirs:
            csvFilePath = wavs_path+'/'+csv
            all_files.append(csvFilePath)
    return all_files


def GetVMD(signal,K):

    Fs=1024 # 采样频率
    N=1024 # 采样点数
    t=np.arange(1,N+1)/N
    fre_axis=np.linspace(0,Fs/2,int(N/2))
    alpha=1500
    tau=0 # tau 噪声容限，即允许重构后的信号与原始信号有差别。
    K=3 # K 分解模态（IMF）个数
    DC=0 # DC 若为0则让第一个IMF为直流分量/趋势向量
    init=1 # init 指每个IMF的中心频率进行初始化。当初始化为1时，进行均匀初始化。
    tol=1e-7 # 控制误差大小常量，决定精度与迭代次数
    
    u, u_hat, omega = VMD(signal, alpha, tau, K, DC, init, tol)
    
    return u,omega[-1]

def GetCWTMembers(signal,freq):
    # 采样频率
    sampling_rate = 1024
    # 尺度长度
    totalscal = 1024   
    # 小波基函数
    wavename = 'morl'
    # 小波函数中心频率
    fc = pywt.central_frequency(wavename)
    # 常数c
    cparam = 2 * fc * totalscal  
    # 尺度序列
    scales = cparam / np.arange(totalscal, 0, -1)
    wavelet_center_frequencies = fc / scales
    coefficients, frequencies = pywt.cwt(signal, scales, wavename, 1.0/1000)
    amp = abs(coefficients)
    t = np.linspace(0, 1.0/sampling_rate, sampling_rate, endpoint=False)
    # 找到最接近的频率值对应的索引
    index = np.argmin(np.abs(wavelet_center_frequencies - freq))
    # 提取对应频率的振幅随时间变化数据
    amp_at_target_frequency = abs(coefficients[index])

    return amp_at_target_frequency


In [None]:
import torch
from torch.utils.data import Dataset

from vmdpy import VMD
import numpy as np


import pandas as pd
from sklearn.preprocessing import OneHotEncoder

import os

path = 'D:/vibration'
class MyDataset(Dataset):
    def __init__(self):
        labels = []
        path = 'D:/vibration-3classes'
        labels_list = os.listdir(path)
        for i in labels_list:
            wavs_path = os.path.join(path, i)
            second_dirs = os.listdir(wavs_path)
            for csv in second_dirs:
                labels.append(i)
        self.label = np.array(labels)
        # 创建 OneHotEncoder 对象
        self.encoder = OneHotEncoder(sparse_output=False)
        # 对标签进行独热编码
        encoded_labels = self.encoder.fit_transform([[label] for label in self.label])
        # 将独热编码结果保存为 Tensor
        self.encoded_labels_tensor = torch.tensor(encoded_labels, dtype=torch.float32)

    def __len__(self):
        return len(self.encoded_labels_tensor)

    def __getitem__(self,index):
        label = self.encoded_labels_tensor[index] #获取已经独热的标签
        CWTs = []
        path = 'D:/vibration-3classes'
        all_files = GetAllFiles(path)
        csvpath = all_files[index]
        data = pd.read_csv(csvpath)
        signal = data['0']
        signal = np.array(signal)
        imfs,central_freq = GetVMD(signal=signal,K=3) # 输出各个IMF分量，u_hat是各IMF的频谱，omega为各IMF的中心频率
        for freq in central_freq:
            amp = GetCWTMembers(signal=signal,freq=freq)
            CWTs.append(amp)
        cwt = np.array(CWTs)
        # 返回加载和预处理后的数据
        return torch.as_tensor(imfs,dtype=torch.float32), torch.as_tensor(cwt,dtype=torch.float), torch.as_tensor(label,dtype=torch.float)



In [None]:
def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

In [None]:
dataset1 = MyDataset()
total_size = len(dataset1)
train_size = int(0.7 * total_size)
val_size = int(0.15 * total_size)
test_size = total_size - train_size - val_size
print(total_size,train_size,val_size,test_size)
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset1, [train_size, val_size, test_size])


In [None]:
imfs,cwts,label = dataset1[3714]
print(imfs)
print(imfs.shape)
print(cwts)
print(cwts.shape)
print(label)

In [None]:
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=32, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class VCTmodel(nn.Module):
    def __init__(self,input_dim,hidden_dim,num_classes,num_layers1,num_layers2,linear_dim, num_heads1, num_heads2, dropout1, dropout2):
        super(VCTmodel, self).__init__()
        self.linear = nn.Linear(input_dim,hidden_dim)
        self.hidden_dim = hidden_dim
        self.position_encoding = PositionalEncoding(hidden_dim)
        self.transformer_encoder = TransformerEncoder(hidden_dim, num_layers1, num_heads1, dropout1)
        self.transformer_decoder = TransformerDecoder(hidden_dim, num_layers2, num_heads2, dropout2)
        self.ClassificationHead = ClassificationHead(hidden_dim,linear_dim,num_classes)

    def forward(self, vmd, cwt):

        linear_vmd = self.linear(vmd)
        pos_vmd = self.position_encoding(linear_vmd)
        enc_vmd = self.transformer_encoder(pos_vmd)

        linear_cwt = self.linear(cwt)
        pos_cwt = self.position_encoding(linear_cwt)
        dec_cwt = self.transformer_decoder(enc_vmd,pos_cwt)
        output = self.ClassificationHead(dec_cwt)

        return output


class PositionalEncoding(nn.Module):
    def __init__(self, hidden_dim, max_length=3):
        super(PositionalEncoding, self).__init__()

        position_encoding = torch.zeros(max_length, hidden_dim)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))

        position_encoding[:, 0::2] = torch.sin(position * div_term)
        position_encoding[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('position_encoding', position_encoding.unsqueeze(0))

    def forward(self, x):
        '''
        x: [seq_len, batch_size, d_model]
        '''
        x = x + self.position_encoding[:x.size(0), :]
        return x


class TransformerEncoder(nn.Module):
    def __init__(self, hidden_dim, num_layers1, num_heads1, dropout1):
        super(TransformerEncoder, self).__init__()

        self.layers = nn.ModuleList([
            TransformerEncoderLayer(hidden_dim, num_heads1, dropout1)
            for _ in range(num_layers1)
        ])

    def forward(self, vmd):
        for layer in self.layers:
            vmd = layer(vmd)
        return vmd
    
class TransformerDecoder(nn.Module):
    def __init__(self, hidden_dim, num_layers2, num_heads2, dropout2):
        super(TransformerDecoder, self).__init__()

        self.layers = nn.ModuleList([
            TransformerDecoderLayer(hidden_dim, num_heads2, dropout2)
            for _ in range(num_layers2)
        ])

    def forward(self, cwt, transformed):
        for layer in self.layers:
            cwt = layer(cwt,transformed)
        return cwt

class TransformerEncoderLayer(nn.Module):
    def __init__(self, hidden_dim, num_heads, dropout):
        super(TransformerEncoderLayer, self).__init__()

        self.multihead_attention = nn.MultiheadAttention(hidden_dim, num_heads, dropout=dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim)
        )
        self.layer_norm1 = nn.LayerNorm(hidden_dim)
        self.layer_norm2 = nn.LayerNorm(hidden_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attended = self.multihead_attention(x, x, x)[0]
        x = x + self.dropout(attended)
        x = self.layer_norm1(x)

        fed_forward = self.feed_forward(x)
        x = x + self.dropout(fed_forward)
        x = self.layer_norm2(x)

        return x
    
    
class TransformerDecoderLayer(nn.Module):
    def __init__(self, hidden_dim, num_heads, dropout):
        super(TransformerEncoderLayer, self).__init__()

        self.multihead_attention = nn.MultiheadAttention(hidden_dim, num_heads, dropout=dropout)
        self.feed_forward = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.ReLU(),
            nn.Linear(hidden_dim * 2, hidden_dim)
        )
        self.layer_norm1 = nn.LayerNorm(hidden_dim)
        self.layer_norm2 = nn.LayerNorm(hidden_dim)
        self.layer_norm3 = nn.LayerNorm(hidden_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, cwt, enc_vmd):
        cwt1 = self.multihead_attention(cwt, cwt, cwt)[0]
        cwt = cwt + self.dropout(cwt1)
        cwt = self.layer_norm1(cwt)

        cwt1,= self.multihead_attention(cwt, enc_vmd, enc_vmd)
        cwt = cwt + self.dropout(cwt1)
        cwt = self.layernorm2(cwt)

        fed_forward = self.feed_forward(cwt)
        cwt = cwt + self.dropout(fed_forward)
        cwt = self.layer_norm3(cwt)

        return cwt

class ClassificationHead(nn.Module):
    def __init__(self, hidden_dim, linear_dim1, linear_dim2, num_classes):
        self.linear1 = 




In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

# 定义损失函数
criterion = nn.CrossEntropyLoss()

# 创建模型实例
model = TransformerClassifier(input_dim=1024,
                              hidden_dim=1024,
                              linear_dim=512,
                              num_classes=3,
                              num_layers=1,
                              num_heads=4,
                              dropout=0.1
                              ).cuda()
for param in model.parameters():
    print(type(param), param.size())
    
num_params = sum([param.nelement() for param in model.parameters()])
print(num_params)

In [None]:
print(model)

In [None]:
print(torch.cuda.is_available())
print(torch.version.cuda)

model = model.to('cuda')

for name, param in model.named_parameters():
    print(f"Parameter: {name}, Device: {param.device}")


In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.00005)

num_epochs = 2

train_losses = []
train_accs = []
val_losses = []
val_accs = []



for epoch in range(num_epochs):
    model.train()

    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for inputs1, inputs2, labels in tqdm(train_dataloader):
        inputs1, inputs2, labels = inputs1.cuda(), inputs2.cuda(), labels.cuda()
        optimizer.zero_grad()
        outputs = model(inputs1, inputs2)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        _, predicted = torch.max(outputs, 1)
        train_total += labels.size(0)
        
        for i in range(labels.size(0)):
            onehotpredicted = torch.zeros(3)
            index = predicted[i]
            onehotpredicted[index]=1.0
            onehotpredicted = onehotpredicted.to('cuda')
            if torch.equal(onehotpredicted,labels[i]):
                train_correct += 1

    train_loss /= len(train_dataloader)
    train_acc = train_correct / train_total

    train_losses.append(train_loss)
    train_accs.append(train_acc)

    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_dataloader:
            inputs, labels = inputs.cuda(), labels.cuda()
            
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)

            for i in range(labels.size(0)):
                onehotpredicted = torch.zeros(3)
                index = predicted[i]
                onehotpredicted[index]=1.0
                onehotpredicted = onehotpredicted.to('cuda')
                if torch.equal(onehotpredicted,labels[i]):
                    correct += 1

    val_loss /= len(val_dataloader)
    val_acc = correct / total

    val_losses.append(val_loss)
    val_accs.append(val_acc)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

In [None]:
import matplotlib.pyplot as plt

plt.figure()
plt.plot(range(1, num_epochs+1), train_losses, label='Train Loss')
plt.plot(range(1, num_epochs+1), val_losses, label='Val Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.figure()
plt.plot(range(1, num_epochs+1), train_accs, label='Train Accuracy')
plt.plot(range(1, num_epochs+1), val_accs, label='Val Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
torch.save(train_losses,'C:/Users/dell/Desktop/工作文件/论文图/4imfs_trainlosses_1.pt')
torch.save(train_accs,'C:/Users/dell/Desktop/工作文件/论文图/4imfs_trainaccs_1.pt')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# 绘制混淆矩阵的函数
def plot_confusion_matrix(cm, labels_name, title="Confusion Matrix",  is_norm=True,  colorbar=True, cmap=plt.cm.Blues):
    if is_norm==True:
        cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis],2)  # 横轴归一化并保留2位小数

    plt.imshow(cm, interpolation='nearest', cmap=cmap)  # 在特定的窗口上显示图像
    for i in range(len(cm)):
        for j in range(len(cm)):
            plt.annotate(cm[j, i], xy=(i, j), horizontalalignment='center', verticalalignment='center') # 默认所有值均为黑色
            # plt.annotate(cm[j, i], xy=(i, j), horizontalalignment='center', color="white" if i==j else "black", verticalalignment='center') # 将对角线值设为白色
    if colorbar:
        plt.colorbar() # 创建颜色条

    num_local = np.array(range(len(labels_name)))
    plt.xticks(num_local, labels_name)  # 将标签印在x轴坐标上
    plt.yticks(num_local, labels_name)  # 将标签印在y轴坐标上
    plt.title(title)  # 图像标题
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    plt.show() # plt.show()在plt.savefig()之后
    plt.close()


In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
model.eval()
test_loss = 0.0
correct = 0
total = 0

all_preds = torch.tensor([]).cuda()
all_labels = torch.tensor([]).cuda()

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs, labels = inputs.cuda(), labels.cuda()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        _, predicted = outputs.max(1)
        total += labels.size(0)

        for i in range(labels.size(0)):
            onehotpredicted = torch.zeros(3)
            index = predicted[i]
            onehotpredicted[index]=1.0
            onehotpredicted = onehotpredicted.to('cuda')
            all_preds = torch.cat((all_preds,torch.tensor([index]).cuda()),dim=0)
            real_labels = torch.argmax(labels,dim=1)
            all_labels = torch.cat((all_labels,torch.tensor([real_labels[i]]).cuda()),dim=0)
            if torch.equal(onehotpredicted,labels[i]):
                correct += 1
# 输出测试集上的损失和准确率
print(f"Test Loss: {test_loss/len(test_dataloader)}, Test Accuracy: {(correct/total)*100}%")
print(correct)
print(total)



In [None]:
all_labels,all_preds = all_labels.cpu(),all_preds.cpu()

conf_matrix = confusion_matrix(all_labels, all_preds)

plot_confusion_matrix(conf_matrix,['Cage', 'Inner race', 'Outer race'],title='cm',is_norm=False)
plot_confusion_matrix(conf_matrix,['Cage', 'Inner race', 'Outer race'],title='cm',is_norm=True)