In [1]:
import os
import random
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.utils as utils
import torch.nn.functional as F

from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision.utils import save_image

from torchvision.transforms import Compose, CenterCrop, Normalize, ToTensor
from pytorch_wavelets import DWT1DForward, DWT1DInverse  # or simply DWT1D, IDWT1D

from glob import glob

import pywt
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

In [2]:
batch_size = 64
test_size = 64
Noise = "4_db"

def Normalization(data):
    
    data_mean = data.mean()
    data_std = data.std()
    
    data = data - data_mean
    data = data / data_std
    
    return data

In [3]:
device = torch.device("cuda:0" if (torch.cuda.is_available()) else "cpu")

print(torch.cuda.is_available())

#print(train_loader.batch_size)

True


In [4]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, root):
        self.root = root  
        if not os.path.exists(self.root):
            raise Exception("[!] {} not exists.".format(root))
        
        #sort file names
        self.input_paths = sorted(glob(os.path.join(self.root, '{}/*_train.npy'.format("GB_data/Real/noise_data/"+Noise+"/train_data"))))
        self.label_paths = sorted(glob(os.path.join(self.root, '{}/*_lab.npy'.format("GB_data/Real/noise_data/"+Noise+"/train_lab"))))
        self.name = os.path.basename(root)
        
        #print(self.input_paths)
        #print(self.label_paths)
        
        if len(self.input_paths) == 0 or len(self.label_paths) == 0:
            raise Exception("No signal/labels are found in {}".format(self.root))

    def __getitem__(self, index):
        
        Signal = np.load(self.input_paths[index])
        Signal = Normalization(Signal)
        
        Label = np.load(self.label_paths[index])
            
        return Signal, Label

    def __len__(self):
        return len(self.input_paths)
    
class Dataset_test(torch.utils.data.Dataset):
    def __init__(self, root):
        self.root = root
        
        if not os.path.exists(self.root):
            raise Exception("[!] {} not exists.".format(root))
        
        #sort file names
        self.input_paths = sorted(glob(os.path.join(self.root, '{}/*_test.npy'.format("GB_data/Real/noise_data/"+Noise+"/test_data"))))
        self.label_paths = sorted(glob(os.path.join(self.root, '{}/*_lab.npy'.format("GB_data/Real/noise_data/"+Noise+"/test_lab"))))
        self.name = os.path.basename(root)
        
        if len(self.input_paths) == 0 or len(self.label_paths) == 0:
            raise Exception("No sinagl/labels are found in {}".format(self.root))

    def __getitem__(self, index):
        
        Signal = np.load(self.input_paths[index])
        Signal = Normalization(Signal)
        
        Label = np.load(self.label_paths[index])
        
        return Signal, Label

    def __len__(self):
        return len(self.input_paths)

In [None]:
def loader(dataset, batch_size, num_workers=0, shuffle = False, drop_last=False):

    input_images = dataset
    input_loader = torch.utils.data.DataLoader(dataset=input_images, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, drop_last=drop_last)

    return input_loader

train_loader = loader(Dataset('../../'), batch_size= batch_size, shuffle = True, drop_last=True)
test_loader = loader(Dataset_test('../../'), batch_size= test_size, shuffle = True, drop_last=True)

In [None]:
class A_cSE(nn.Module):
    
    def __init__(self, in_ch):
        super(A_cSE, self).__init__()
        
        self.conv0 = nn.Sequential(
            nn.Conv1d(in_ch, in_ch, kernel_size=3, padding=1),
            nn.BatchNorm1d(in_ch),
            nn.ReLU(inplace=True),
        )
        self.conv1 = nn.Sequential(
            nn.Conv1d(in_ch, int(in_ch/2), kernel_size=1, padding=0),
            nn.BatchNorm1d(int(in_ch/2)),
            nn.ReLU(inplace=True),
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(int(in_ch/2), in_ch, kernel_size=1, padding=0),
            nn.BatchNorm1d(in_ch)
        )
        
    def forward(self, in_x):
        
        x = self.conv0(in_x)
        x = nn.AvgPool1d(x.size()[2:])(x)
        #print('channel',x.size())
        x = self.conv1(x)
        x = self.conv2(x)
        x = torch.sigmoid(x)
        
        return in_x * x + in_x

In [None]:
input = 1
numf =12

class SConv_1D(nn.Module):
    '''(conv => BN => ReLU) * 2'''
    def __init__(self, in_ch, out_ch, kernel, pad):
        super(SConv_1D, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv1d(in_ch, out_ch, kernel, padding=pad),
            nn.GroupNorm(6, out_ch),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        
        x = self.conv(x)
        return x


class CNN_1D(nn.Module):
    def __init__(self):
        super(CNN_1D, self).__init__()
        
        self.DWT0= DWT1DForward(J=1, wave='db16').cuda()
        
        self.SConv1 = SConv_1D(input*2, numf, 3, 0)
        self.DWT1= DWT1DForward(J=1, wave='db16').cuda()
        self.dropout1 = nn.Dropout(p=0.1)
        self.cSE1 = A_cSE(numf*2)
        
        self.SConv2 = SConv_1D(numf*2, numf*2, 3, 0)
        self.DWT2= DWT1DForward(J=1, wave='db16').cuda() 
        self.dropout2 = nn.Dropout(p=0.1)
        self.cSE2 = A_cSE(numf*4)
        
        self.SConv3 = SConv_1D(numf*4, numf*4, 3, 0)
        self.DWT3= DWT1DForward(J=1, wave='db16').cuda()       
        self.dropout3 = nn.Dropout(p=0.1)
        self.cSE3 = A_cSE(numf*8)
        
        self.SConv6 = SConv_1D(numf*8, numf*8, 3, 0)              
        
        self.avg_pool = nn.AdaptiveAvgPool1d((1))
        self.fc = nn.Linear(numf*8, 6)

        
    def forward(self, input):
        
        DMT_yl,DMT_yh = self.DWT0(input)
        output = torch.cat([DMT_yl,DMT_yh[0]], dim=1)
        
        output = self.SConv1(output)
        DMT_yl,DMT_yh = self.DWT1(output)
        output = torch.cat([DMT_yl,DMT_yh[0]], dim=1)
        output = self.dropout1(output)
        output = self.cSE1(output)
        
        output = self.SConv2(output)
        DMT_yl,DMT_yh = self.DWT2(output)
        output = torch.cat([DMT_yl,DMT_yh[0]], dim=1) 
        output = self.dropout2(output)
        output = self.cSE2(output)
        
        output = self.SConv3(output)
        DMT_yl,DMT_yh = self.DWT3(output)
        output = torch.cat([DMT_yl,DMT_yh[0]], dim=1) 
        output = self.dropout3(output)
        output = self.cSE3(output)
        
        output = self.SConv6(output)             
            
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)
        
        return output

In [None]:
Model = CNN_1D().to(device)

print(Model)

print('# Model parameters:', sum(param.numel() for param in Model.parameters()))

In [None]:
Criterion = nn.CrossEntropyLoss()

Optimizer = torch.optim.Adam(Model.parameters(), lr=0.0001)

#scheduler = optim.lr_scheduler.StepLR(Optimizer, step_size=10, gamma=0.9)

In [None]:
train_loss = []
train_acc = []
test_loss = []
test_acc  = []

for epoch in range(150):
    
    train_loss_sum, test_loss_sum, train_num, test_num, train_i, test_i = 0.0, 0.0, 0, 0, 0, 0
    train_acc_sum,test_acc_sum = 0, 0
    TEST_acc_sum = 0
    
    for i_1, train_data in enumerate(train_loader):
        
        inputs, labels = train_data
        inputs = inputs.unsqueeze(1)
        inputs = inputs.type(torch.FloatTensor)
        inputs = inputs.to(device)
        labels = labels.long()
        labels = labels.squeeze(1)
        labels = labels.to(device)
        
        #print(inputs.shape)
        #print(labels.shape)
        
        Model.train()
        pre_labs = Model(inputs)

        Loss = Criterion(pre_labs, labels)

        Optimizer.zero_grad()
        Loss.backward()
        Optimizer.step()
        
        train_loss_sum += Loss.item()
        train_acc_sum += (pre_labs.argmax(dim=1) == labels).sum().item()
        train_num += labels.shape[0]
        train_i += 1

    for i_2, data in enumerate(test_loader):
        with torch.no_grad():

            test_data, test_lab = data  
            test_data = test_data.unsqueeze(1)
            test_data = test_data.type(torch.FloatTensor)
            test_data = test_data.to(device)
            test_lab = test_lab.long()
            test_lab = test_lab.squeeze(1)
            test_lab = test_lab.to(device)

            Model.eval()
            pre_test = Model(test_data)
            t_loss = Criterion(pre_test, test_lab)

            test_loss_sum += t_loss    
            test_acc_sum += (pre_test.argmax(dim=1) == test_lab).sum().item()
            test_num += test_lab.shape[0]                    
            test_i += 1
            
            #_, Pred = torch.max(pre_test.data, 1)
            #TEST_acc_sum += torch.sum(Pred == test_lab)
           
    if epoch >= 120:
        #torch.save(Model, "model/DWA_0/CNN_%d.pkl"% epoch)
        print("-------保存第%d epoch的模型---------"% epoch)
       
    Train_Loss = train_loss_sum/train_i
    Test_Loss = test_loss_sum/test_i
    Train_ACC = train_acc_sum/train_num
    Test_ACC = test_acc_sum/test_num

    print('Epoch:%d, train_loss:%.5f, train_acc:%.5f, test_loss:%.5f, test_acc:%.5f' % 
          (epoch, Train_Loss, Train_ACC, Test_Loss, Test_ACC))
    print('-----------------------------------------------')

    train_loss.append(Train_Loss)            
    train_acc.append(Train_ACC) 
    test_loss.append(Test_Loss) 
    test_acc.append(Test_ACC)       

print("Finished Training")

In [None]:
print('train_acc:')
print(train_acc)
print('----------------------------------------')

print('test_acc:')
print(test_acc)
print('----------------------------------------')

print('The max test_acc:')
print(max(test_acc[100:]))


X_epoch = np.zeros((150))
for id in range(150):
    X_epoch[id] = id

fig = plt.figure(figsize=[8,5])
sub = fig.add_subplot(111)

sub.plot(X_epoch, train_acc, c='orange', label='Train Accuracy', linewidth=2)
sub.plot(X_epoch, test_acc, c='lime', label='Test Accuracy', linewidth=2)

ax=plt.gca();#获得坐标轴的句柄
ax.spines['bottom'].set_linewidth(1.5);###设置底部坐标轴的粗细
ax.spines['left'].set_linewidth(1.5);####设置左边坐标轴的粗细
ax.spines['right'].set_linewidth(1.5);###设置右边坐标轴的粗细
ax.spines['top'].set_linewidth(1.5);####设置上部坐标轴的粗细

plt.tick_params(labelsize=14)

plt.legend(loc=4, edgecolor='w')
plt.ylabel('Accuracy', fontsize=14)
plt.xlabel('Iteration', fontsize=14)
#plt.title('curve')

plt.show()
#fig.savefig('Image/WaveCNet-db16.png', dpi=400)