In [1]:
import os

import torch
import torchvision
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import save_image
import random
import numpy as np
from sklearn import model_selection
import matplotlib.pyplot as plt

In [2]:
class autoencoder(nn.Module):
    def __init__(self):
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(1000, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True),
            nn.Linear(64, 12),
            nn.ReLU(True),
            #                                      nn.Linear(12, 3)
        )
        self.decoder = nn.Sequential(
            #             nn.Linear(3, 12), nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True),
            nn.Linear(128, 1000),
            #                                      nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x
    
def apply_interpolation(x, y, target_size):
    '''
    Resizes a chromatogram to a given size/resolution
    '''
    f = scipy.interpolate.interp1d(x, y)
    xnew = np.linspace(x.min(), x.max(), target_size)
    ynew = f(xnew)
    return xnew, ynew


class conv_autoencoder(nn.Module):
    def __init__(self):
        super(conv_autoencoder, self).__init__()
        self.encoder = nn.Sequential(nn.Conv1d(1, 256, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.Conv1d(256, 128, 9, stride=2),
                                     nn.ReLU(True),
                                      nn.Conv1d(128, 128, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.Conv1d(128, 64, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.Conv1d(64, 64, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.Conv1d(64, 32, 9, stride=2))
        
        self.decoder = nn.Sequential(nn.ConvTranspose1d(32, 64, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.ConvTranspose1d(64, 64, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.ConvTranspose1d(64, 128, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.ConvTranspose1d(128, 128, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.ConvTranspose1d(128, 256, 9, stride=2),
                                     nn.ReLU(True),
                                     nn.ConvTranspose1d(256, 1, 9, stride=2),
                                     nn.ReLU(True))

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [3]:
raw_data = np.load('../../data/simu_chromatography/train_set/raw_data.npy')
noise_data = np.load('../../data/simu_chromatography/train_set/noise_data.npy')
# white_noise = np.load('../../data/simu_chromatography/white_noise.npy')
# drift_noise = np.load('../../data/simu_chromatography/drift_noise.npy')

raw_data = torch.from_numpy(raw_data)
raw_data = raw_data.to(torch.float32)
noise_data = torch.from_numpy(noise_data)
noise_data = noise_data.to(torch.float32)


In [4]:
num_epochs = 10000
batch_size = 32
learning_rate = 1e-3
point_num = 8185

model = conv_autoencoder()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(
    model.parameters(), lr=learning_rate, weight_decay=0.08)

In [None]:
sample_code = np.arange(noise_data.shape[0])
loss_list = np.zeros((300, 2))

for epoch in range(num_epochs):
    samples = random.choices(sample_code, k=batch_size)
    
    # ===================forward=====================
    output = model(noise_data[samples, :point_num].view(-1, 1, point_num))
    loss = criterion(output, raw_data[samples, :point_num].view(-1, 1, point_num))
    
    # ===================backward====================
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    # ===================log========================
    if epoch % 100 == 0:
        loss_list[int(epoch / 100), 0] = epoch
        loss_list[int(epoch / 100), 1] = loss.data
        print('epoch [{}/{}], loss:{:.4f}'
              .format(epoch + 1, num_epochs, loss.data))


epoch [1/10000], loss:7.3185
epoch [101/10000], loss:4.4986
epoch [201/10000], loss:5.5873


In [None]:
y_test_pred = model(X_test[:20, :987].view(-1, 1, 987))

In [None]:
y_test_pred[i, :point_num].detach().numpy().shape

In [None]:
for i in range(10):
    plt.figure(dpi=400)
    plt.plot(y_test[i, :point_num].numpy(), linewidth=0.5, label='raw_data')
    plt.plot(y_test_pred[i, :point_num].detach().numpy().T,  label='denoise_data')
    plt.plot(X_test[i, :point_num].numpy(), linewidth=0.5,label='noise_data')
    plt.legend()
    plt.show()


In [None]:
torch.save(model.state_dict(), './sim_autoencoder.pth')