In [1]:
from sklearn.model_selection import train_test_split
from datasetLoader import load_dataset
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from inverse import fit_linear
from tools import model_tester
%matplotlib qt

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [3]:
pore_widths = np.load("data/initial kernels/Size_Kernel_Silica_Adsorption.npy")
pressures = np.load("data/initial kernels/Pressure_Silica.npy")
with open("data/initial kernels/Kernel_Silica_Adsorption.npy", 'rb') as f:
    data_sorb = np.load(f)

x, y = load_dataset('data/datasets/silica_random.npz')
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.15, random_state=1)

x_exp, y_exp = load_dataset('data/datasets/SMP_CUT_NOT_ZERO.npz')

x_train_exp, x_test_exp, y_train_exp, y_test_exp = train_test_split(x_exp, y_exp, test_size=0.15, random_state=1)

In [4]:
figure, axis = plt.subplots(3, 4)
for i in range(3):
    for j in range(4):
        k = np.random.randint(0, len(x_train))
        axis[i, j].plot(pore_widths, y_train[k], marker=".")
        axis[i, j].grid()
plt.show()

In [189]:
plt.plot(pore_widths, sum(y_train), marker=".")

[<matplotlib.lines.Line2D at 0x1e7b3b47a00>]

In [272]:
i = np.random.randint(0, len(x_train))
plt.plot(pressures[:-10], x_train[i], marker=".")
plt.grid()
plt.show()

In [61]:
class IsothermDataset(Dataset):
    def __init__(self, isotherms, transform=None):
        self.data = torch.tensor(isotherms, dtype=torch.float32).to(device)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx]
        if self.transform:
            x = self.transform(x)
        return x, x

x_mixed_train = np.concatenate((x_train_exp, x_train))
x_mixed_test = np.concatenate((x_test_exp, x_test))

# dataset = IsothermDataset(np.concatenate((x_train_exp, x_train_exp)))
# dataset_test = IsothermDataset(np.concatenate((x_test_exp, x_test_exp)))
dataset = IsothermDataset(np.concatenate((x_mixed_train, x_mixed_train)))
dataset_test = IsothermDataset(np.concatenate((x_mixed_test, x_mixed_test)))


batch_size = 512
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

In [62]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )

    def forward(self, x):
        z = self.encoder(x)
        x_recon = self.decoder(z)
        return x_recon, z

input_dim = 448
latent_dim = 16
epochs = 200
learning_rate = 1e-3

model = Autoencoder(input_dim=input_dim, latent_dim=latent_dim)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

def train_autoencoder(model, loader, loader_test):
    model.train()
    total_loss = 0
    total_vloss = 0
    for x, _ in loader:
        optimizer.zero_grad()
        x_recon, _ = model(x)
        loss = criterion(x_recon, x)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    model.eval()
    with torch.no_grad():
        for x, _ in loader_test:
            x_recon, _  = model(x)
            vloss = criterion(x_recon, x)
            total_vloss += vloss.item()

    return total_loss / len(loader.dataset), total_vloss / len(loader_test.dataset)


# sample_z = model.encoder(torch.tensor(isotherms_np[0], dtype=torch.float32))

In [63]:
for epoch in range(1, epochs+1):
    loss, vloss = train_autoencoder(model, loader,loader_test)
    if epoch % 1 == 0:
        print(f"Epoch {epoch}/{epochs}, Loss: {loss*100:.8f} Test loss: {vloss*100:.8f}")

Epoch 1/200, Loss: 0.00356772 Test loss: 0.00030211
Epoch 2/200, Loss: 0.00014139 Test loss: 0.00011367
Epoch 3/200, Loss: 0.00009425 Test loss: 0.00008442
Epoch 4/200, Loss: 0.00006589 Test loss: 0.00005766
Epoch 5/200, Loss: 0.00005241 Test loss: 0.00004579
Epoch 6/200, Loss: 0.00004097 Test loss: 0.00003817
Epoch 7/200, Loss: 0.00003918 Test loss: 0.00003647
Epoch 8/200, Loss: 0.00003675 Test loss: 0.00003650
Epoch 9/200, Loss: 0.00003175 Test loss: 0.00002873
Epoch 10/200, Loss: 0.00002786 Test loss: 0.00002396
Epoch 11/200, Loss: 0.00002318 Test loss: 0.00002057
Epoch 12/200, Loss: 0.00002173 Test loss: 0.00001898
Epoch 13/200, Loss: 0.00002060 Test loss: 0.00001772
Epoch 14/200, Loss: 0.00001858 Test loss: 0.00001779
Epoch 15/200, Loss: 0.00001778 Test loss: 0.00001547
Epoch 16/200, Loss: 0.00001733 Test loss: 0.00001747
Epoch 17/200, Loss: 0.00001665 Test loss: 0.00001501
Epoch 18/200, Loss: 0.00001750 Test loss: 0.00001852
Epoch 19/200, Loss: 0.00001365 Test loss: 0.00001399
Ep

KeyboardInterrupt: 

In [10]:
torch.save(model, "data/models/torch/autoencoder_exp.pt")

In [6]:
model = torch.load("data/models/torch/autoencoder_exp.pt", weights_only=False)
model.eval()

Autoencoder(
  (encoder): Sequential(
    (0): Linear(in_features=448, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=64, bias=True)
    (3): ReLU()
    (4): Linear(in_features=64, out_features=16, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=16, out_features=64, bias=True)
    (1): ReLU()
    (2): Linear(in_features=64, out_features=128, bias=True)
    (3): ReLU()
    (4): Linear(in_features=128, out_features=448, bias=True)
  )
)

In [64]:
model.eval()
latent_vectors_train = model.encoder(torch.tensor(x_train, dtype=torch.float32).to(device)).detach().cpu().numpy()
latent_vectors_test = model.encoder(torch.tensor(x_test, dtype=torch.float32).to(device)).detach().cpu().numpy()
latent_vectors_test_exp = model.encoder(torch.tensor(x_test_exp, dtype=torch.float32).to(device)).detach().cpu().numpy()

In [65]:
decoded = model.decoder(model.encoder(torch.tensor(x_test_exp, dtype=torch.float32).to(device))).detach().cpu().numpy()

In [67]:
figure, axis = plt.subplots(3, 3)
for i in range(3):
    for j in range(3):
        k=np.random.randint(0, len(decoded))
        axis[i, j].plot(pressures[:-10], x_test_exp[k], marker=".", label = "origin")
        axis[i, j].plot(pressures[:-10], decoded[k], marker=".", label = "decoded")
        axis[i, j].grid(True)
axis[i, j].legend()
plt.show()


# k=np.random.randint(0, len(decoded))
# plt.plot(pressures[:-10], x_test_exp[k], marker=".", label = "origin")
# plt.plot(pressures[:-10], decoded[k], marker=".", label = "decoded")
# plt.legend()
# plt.grid(True)
# plt.show()

In [68]:
np.random.seed(0)
labels = None 

pca = PCA(n_components=2)
latent_pca = pca.fit_transform(latent_vectors_train[:100])
latent_pca_exp = pca.fit_transform(latent_vectors_test[:100])

tsne = TSNE(n_components=2, init='pca', random_state=0)
latent_tsne = tsne.fit_transform(latent_vectors_train[:100])
latent_tsne_exp = tsne.fit_transform(latent_vectors_test[:100])

plt.figure()
plt.scatter(latent_pca[:, 0], latent_pca[:, 1], label="train")
plt.scatter(latent_pca_exp[:, 0], latent_pca_exp[:, 1], label="exp")
plt.title("PCA of Latent Space")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.legend()
plt.show()

plt.figure()
plt.scatter(latent_tsne[:, 0], latent_tsne[:, 1], label="train")
plt.scatter(latent_tsne_exp[:, 0], latent_tsne_exp[:, 1], label="exp")
for i in range(latent_tsne_exp.shape[0]):
        plt.text(latent_tsne_exp[i, 0], latent_tsne_exp[i, 1], str(i), fontsize=8, ha='center', va='center')
plt.title("t-SNE of Latent Space")
plt.xlabel("Dim 1")
plt.ylabel("Dim 2")
plt.legend()
plt.show()



In [8]:
def plot_preds(x, y, preds): 
    NX, NY = 3, 4
    figure, axis = plt.subplots(NX, NY)
    for i in range(NX):
        for j in range(NY):
            k = np.random.randint(0, len(preds)) 
            iso_axis = axis[i, j].twiny()
            iso_axis.set_xlabel("P/P$^0$",fontsize=8)
            iso_axis.plot(pressures[:-10], x[k], label="Isotherm", color = 'green')
            kernel = (data_sorb.T[:-10])
            iso_axis.plot(pressures[:-10], np.dot(kernel, preds[k][:128]), label="Isotherm by model", color="red")
            axis[i, j].set_title(f"№ {k}")
            axis[i, j].title.set_size(10)
            axis[i, j].grid()
            axis[i, j].set_xlabel("nm",fontsize=8)
            axis[i, j].plot(pore_widths, (preds[k]), marker=".", label=f"Model PSD")
            axis[i, j].plot(pore_widths, y[k], marker=".", label="PSD")
    plt.subplots_adjust(hspace=0.6, right=0.95, left=0.05, bottom=0.05, top=0.9)
    plt.legend()
    axis[0, 0].legend()
    plt.show()

In [56]:
from tools import model_tester
from inverse import fit_linear

error_lst, roughness_lst = model_tester.test_model_predictions(preds, x_test_exp, kernel=data_sorb[:, :-10])
kde_x, kde_error, kde_fun = model_tester.calculate_kde_data(error_lst, stop=150)
print("average error:", np.mean(error_lst))
plt.plot(kde_x, kde_error, label=model_name)
plt.grid(True)
plt.legend()
plt.plot()

average error: 19.576227837865584


[]

In [45]:
class DynamicWeightAveraging:
    def __init__(self, num_tasks, T=2.0):
        self.num_tasks = num_tasks
        self.T = T
        self.loss_history = []  # список списков: [ [L1_1, L1_2, ...], [L2_1, L2_2, ...], ... ]

    def update_weights(self):
        # Требуются как минимум 2 эпохи
        if len(self.loss_history[0]) < 2:
            return np.ones(self.num_tasks) / self.num_tasks  # равномерные веса

        r = []
        for i in range(self.num_tasks):
            li = self.loss_history[i]
            r_i = li[-1] / (li[-2] + 1e-8)
            r.append(r_i)

        r = np.array(r)
        weights = self.T * np.exp(r / self.T)
        weights /= weights.sum()
        return weights

    def append_losses(self, losses):  # losses — список текущих значений потерь [L1, L2, ...]
        if not self.loss_history:
            self.loss_history = [[] for _ in range(len(losses))]
        for i, l in enumerate(losses):
            self.loss_history[i].append(l)
dwa = DynamicWeightAveraging(num_tasks=2)

In [69]:
class PSD_model(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PSD_model, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Dropout(0.15),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Dropout(0.15),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Dropout(0.15),
            nn.Linear(128, output_dim),
            nn.ReLU()
        )

    def forward(self, x):
        psd = self.model(x)
        return psd

class Isotherm_PSD_Dataset(Dataset):
    def __init__(self, x, y, original_x, transform=None):
        self.x = torch.tensor(x, dtype=torch.float32).to(device)
        self.y = torch.tensor(y, dtype=torch.float32).to(device)
        self.original_x = torch.tensor(original_x, dtype=torch.float32).to(device)
        self.transform = transform

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        x = self.x[idx]
        y = self.y[idx]
        original_x = self.original_x[idx]
        if self.transform:
            x = self.transform(x)
        return x, y, original_x

train_PSD = Isotherm_PSD_Dataset(latent_vectors_train, (y_train), x_train)
test_PSD = Isotherm_PSD_Dataset(latent_vectors_test, (y_test), x_test)

batch_size = 512
PSD_loader = DataLoader(train_PSD, batch_size=batch_size, shuffle=True)
PSD_loader_test = DataLoader(test_PSD, batch_size=batch_size, shuffle=False)


model_PSD = PSD_model(input_dim=latent_dim, output_dim=128)
model_PSD.to(device)

optimizer = optim.Adam(model_PSD.parameters(), lr=learning_rate)
criterion = nn.MSELoss()


with open("data/initial kernels/Kernel_Silica_Adsorption.npy", 'rb') as f:
    data_sorb_torch = torch.tensor(np.load(f)[:, :-10])
    data_sorb_torch = data_sorb_torch.to(torch.float32)

def isoterm_loss(predicted_y, x):
    restored_isotherm = torch.matmul(predicted_y, data_sorb_torch)
    loss = torch.mean((x - restored_isotherm) ** 2)
    return loss

def train_PSD_model(model, loader, loader_test):
    model.train()
    total_loss = 0
    total_vloss = 0
    for x, y, original_x in loader:
        optimizer.zero_grad()
        y_recon = model(x)
        loss = criterion(y_recon, y)
        iso_loss = isoterm_loss(y_recon, original_x)
        dwa.append_losses([loss.item(), iso_loss.item()])
        weights = dwa.update_weights()
        loss = weights[0] * loss + weights[1] * iso_loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    model.eval()
    with torch.no_grad():
        for x, y, original_x in loader_test:
            y_recon  = model(x)
            loss = criterion(y_recon, y)
            iso_loss = isoterm_loss(y_recon, original_x)
            vloss = weights[0] * loss + weights[1] * iso_loss
            total_vloss += vloss.item()

    return total_loss / len(loader.dataset), total_vloss / len(loader_test.dataset)


In [70]:
dwa.update_weights()

array([0.50297639, 0.49702361])

In [71]:
epochs = 50
loss_lst = []
vloss_lst = []
for epoch in range(1, epochs+1):
    loss, vloss = train_PSD_model(model_PSD, PSD_loader, PSD_loader_test)
    loss_lst.append(loss)
    vloss_lst.append(vloss)
    if epoch % 1 == 0:
        print(f"Epoch {epoch}/{epochs}, Loss: {loss*100:.8f} Test loss: {vloss*100:.8f}")

Epoch 1/50, Loss: 0.00402995 Test loss: 0.00154724
Epoch 2/50, Loss: 0.00160973 Test loss: 0.00080544
Epoch 3/50, Loss: 0.00098062 Test loss: 0.00068448
Epoch 4/50, Loss: 0.00080046 Test loss: 0.00051201
Epoch 5/50, Loss: 0.00071016 Test loss: 0.00050408
Epoch 6/50, Loss: 0.00061367 Test loss: 0.00040244
Epoch 7/50, Loss: 0.00056074 Test loss: 0.00042816
Epoch 8/50, Loss: 0.00052721 Test loss: 0.00049521
Epoch 9/50, Loss: 0.00052020 Test loss: 0.00042756
Epoch 10/50, Loss: 0.00054076 Test loss: 0.00039911
Epoch 11/50, Loss: 0.00049221 Test loss: 0.00049548
Epoch 12/50, Loss: 0.00049799 Test loss: 0.00035964
Epoch 13/50, Loss: 0.00042775 Test loss: 0.00047554
Epoch 14/50, Loss: 0.00046024 Test loss: 0.00036754
Epoch 15/50, Loss: 0.00042541 Test loss: 0.00041672
Epoch 16/50, Loss: 0.00044517 Test loss: 0.00038077
Epoch 17/50, Loss: 0.00044720 Test loss: 0.00043657
Epoch 18/50, Loss: 0.00049191 Test loss: 0.00061936
Epoch 19/50, Loss: 0.00073271 Test loss: 0.00049764
Epoch 20/50, Loss: 0.

In [72]:
plt.plot(loss_lst)
plt.plot(vloss_lst)
plt.show()

In [28]:
model_name = "autoencoder_regressor_pinn"
torch.save(model_PSD, f"data/models/torch/{model_name}") 

In [29]:
model_name = "autoencoder_regressor_pinn"
model_PSD = torch.load(f"data/models/torch/{model_name}", weights_only=False)

In [73]:
model_PSD.eval()
y_train_PSD = model_PSD.model(torch.tensor(latent_vectors_train, dtype=torch.float32).to(device)).detach().cpu().numpy()
y_test_PSD = model_PSD.model(torch.tensor(latent_vectors_test, dtype=torch.float32).to(device)).detach().cpu().numpy()
y_test_exp_PSD = model_PSD.model(torch.tensor(latent_vectors_test_exp, dtype=torch.float32).to(device)).detach().cpu().numpy()

In [76]:
plot_preds(x_test_exp, y_test_exp, y_test_exp_PSD)

In [51]:
np.savez(f"data/models/metrics/{model_name}", x=x_test_exp, y=y_test_exp_PSD)
model_name


'autoencoder_regressor_pinn'