In [1]:
import numpy as np
import pandas as pd
import tifffile as tiff
import matplotlib.pyplot as plt
import os
from PIL import Image

import random
from IPython.display import display, clear_output
from tqdm.notebook import tqdm
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import os
import sqlite3

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error, accuracy_score



In [2]:
class CroppedDataset(Dataset): # assume that all shapes are the same
    def __init__(self, folder_path, metadata_path, patch_size=(32, 32)):
        super().__init__()
        self.folder_path = folder_path
        self.image_pathes = os.listdir(folder_path)
        self.patch_size = patch_size
        
        conn = sqlite3.connect(metadata_path)
        metadata = pd.read_sql_query("SELECT * FROM Micrograph", conn)
        label_encoder = LabelEncoder()
        self.label_encoder = label_encoder
        self.labels = label_encoder.fit_transform(metadata['primary_microconstituent'])
        
        img = plt.imread(os.path.join(self.folder_path, self.image_pathes[0]))
        img = img[:-38] ## cut the bottom line
        self.patches_i = int(img.shape[0] / self.patch_size[0])
        self.patches_j = int(img.shape[1] / self.patch_size[1])
        self.n_patches = self.patches_i * self.patches_j
        
        self.data = []
        self.means = []
        self.stds = []
        self.pathes = []
        for path in metadata.path:
            img = plt.imread(os.path.join(self.folder_path, path))
            self.pathes.append(path)
            if len(img.shape) > 2:
                img = img[:, :, 0]
            img = img[:-38] ## cut the bottom line
            img = img / 255
            #img = img.round() ## binarization
            mean, std = img.mean(), img.std()
            if std == 0:
                std = 1
            
            #std = np.sqrt((img ** 2).mean()) ## 2 moment == 1 (not std)
            
            self.means.append(mean)
            self.stds.append(std)
            #img = (img - mean) / std
            #img = img / std
            self.data.append(img)

    def __len__(self):
        return len(self.image_pathes) * self.n_patches

    def __getitem__(self, index):
        img_index = index // self.n_patches
        #img = plt.imread(os.path.join(self.folder_path, self.image_pathes[img_index]))
        img = self.data[img_index]
        patch_index = index % self.n_patches ## select patch
        patch_i = patch_index // self.patches_j
        patch_j = patch_index % self.patches_j
        img = img[patch_i * self.patch_size[0]:(patch_i + 1) * self.patch_size[0],
                 patch_j * self.patch_size[1]:(patch_j + 1) * self.patch_size[1]]
        return img, self.means[img_index], self.stds[img_index], self.labels[img_index]

In [3]:
folder_path = '/kaggle/input/vkr-data/data2/micrographs/'
metadata_path = '/kaggle/input/vkr-data/data2/microstructures.sqlite'

dataset = CroppedDataset(folder_path, metadata_path, patch_size=(32, 32))

In [4]:
train_size = int(len(dataset) * 0.8)
val_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size],
                                                           generator=torch.Generator().manual_seed(7))

In [5]:


train_dataloader = DataLoader(train_dataset, batch_size = 64, shuffle = True)
val_dataloader = DataLoader(val_dataset, batch_size = 512, shuffle = False)

In [6]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, input_channels, latent_dim, output_channels, loss_cfs, num_classes):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder(input_channels, latent_dim)
        self.decoder = Decoder(latent_dim, output_channels)
        self.classifier = Classifier(latent_dim, num_classes)
        self.loss_cfs = loss_cfs
        
    def forward(self, x):
        mu, log_var = self.encoder(x)
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        z = mu + eps * std
        reconstructed = self.decoder(mu)
        logits = self.classifier(mu)
        return reconstructed, mu, log_var, logits
    
    def loss(self, x, reconstructed, mu, log_var, logits, labels):
        sigma_x = 1.0e-1
        MSE = (torch.sum(((reconstructed - x) / sigma_x).pow(8.0))).pow(1.0/4.0) * (sigma_x ** 2)
        #MSE = nn.MSELoss()(reconstructed, x)
        KLD = -0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
        CE = nn.CrossEntropyLoss()(logits, labels)
        return MSE * self.loss_cfs[0] + KLD * self.loss_cfs[1] + CE * self.loss_cfs[2] 
    
    def encode(self, x):
        mu, log_var = self.encoder(x)
        return mu
        
class Encoder(nn.Module):
    def __init__(self, input_channels, latent_dim):
        super(Encoder, self).__init__()
        self.latent_dim = latent_dim
        self.conv = nn.Sequential(
            nn.Conv2d(input_channels, 8, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(8, 16, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=1),
        )
        self.fc = nn.Sequential(
            nn.Linear(128 * 2 * 2, 512),
            nn.ReLU(),
            nn.Linear(512, 2 * latent_dim)
        )
        
    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x[:, :self.latent_dim], x[:, self.latent_dim:]
    
class Decoder(nn.Module):
    def __init__(self, latent_dim, output_channels):
        super(Decoder, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 128 * 2 * 2),
        )
        self.conv = nn.Sequential(
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 8, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.ConvTranspose2d(8, output_channels, kernel_size=3, stride=1)
        )
    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.size(0), 128, 2, 2)
        x = self.conv(x)
        return x
    
    
class Classifier(nn.Module):
    def __init__(self, latent_dim, num_classes):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(latent_dim, latent_dim // 2),
            nn.BatchNorm1d(latent_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(latent_dim // 2, latent_dim // 4),
            nn.ReLU(),
            nn.Linear(latent_dim // 4, num_classes)
        )
    def forward(self, x):
        return self.classifier(x)


In [7]:
from IPython.display import display, clear_output
from ipywidgets import Output
from tqdm.auto import trange
from typing import Type, Union
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, accuracy_score

def predict(model, loader):
    model.eval()
    with torch.no_grad():
        original = []
        reconstructed = []
        true_labels = []
        predicted_labels = []
        for i, (x, mean, std, labels) in tqdm(enumerate(loader), leave=False, total=len(loader)):
            std = std[:, None, None, None]
            mean = mean[:, None, None, None]
            x = x.unsqueeze(1).float()
            x = x.to(device)
            reconstructed_x, mu, log, logits = model(x)
            reconstructed_x = reconstructed_x.cpu()
            original.append(x.cpu())
            reconstructed.append(reconstructed_x)
            
            probas = nn.Softmax(1)(logits)
            preds = probas.argmax(dim=-1)
            true_labels.append(labels)
            predicted_labels.append(preds)
    return original, reconstructed, true_labels, predicted_labels

In [8]:
model = VariationalAutoencoder(1, 128, 1, loss_cfs=(1, 1 / 1e8 * 0.1, 1 / 20), num_classes=7)
model.load_state_dict(torch.load('/kaggle/input/vkr-data/state_dict_full_model', map_location=torch.device('cpu')))
#optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device);
#criterion = nn.MSELoss()
#train_model(model, train_dataloader, val_dataloader, optimizer, device=device, num_epochs=100, verbose_num_iters=64)

In [9]:
original, reconstructed, true_labels, predicted_labels = predict(model, val_dataloader)

  0%|          | 0/113 [00:00<?, ?it/s]

In [10]:
print('RMSE:', mean_squared_error(torch.cat(original).cpu().detach().numpy().flatten(),
               torch.cat(reconstructed).cpu().detach().numpy().flatten(), squared=False))
print('Accuracy:', accuracy_score(torch.cat(true_labels).cpu().detach().numpy(),
               torch.cat(predicted_labels).cpu().detach().numpy()))

RMSE: 0.1478292
Accuracy: 0.7401144640998959


In [11]:
loader = val_dataloader
model.eval()
with torch.no_grad():
    original = []
    reconstructed = []
    true_labels = []
    predicted_labels = []
    encoded = []
    indexes = []
    for i, (x, mean, std, labels) in tqdm(enumerate(loader), leave=False, total=len(loader)):
        std = std[:, None, None, None]
        mean = mean[:, None, None, None]
        x = x.unsqueeze(1).float()
        x = x.to(device)
        encoded.append(model.encode(x).cpu())
        original.append(x.cpu())
        true_labels.append(labels)

  0%|          | 0/113 [00:00<?, ?it/s]

In [12]:
encoded = torch.cat(encoded)
true_labels = torch.cat(true_labels)
original = torch.cat(original)
original = original.flatten(start_dim=1)

In [13]:
from sklearn.cluster import KMeans
from sklearn.metrics import rand_score

kmeans = KMeans(7)

In [14]:
%%time
kmeans.fit(original.numpy())

rand_score(true_labels, kmeans.labels_)



CPU times: user 3min 15s, sys: 33.7 s, total: 3min 48s
Wall time: 1min 31s


0.6776103758857778

In [15]:
from sklearn.metrics import confusion_matrix
from itertools import permutations
from sklearn.metrics import confusion_matrix, accuracy_score

best_p = []
best_score = 0
for p in tqdm(permutations([1, 2, 3, 4, 5, 6, 7]), total=5040):
    labels = list(map(lambda x : p[x], kmeans.labels_))
    matrix = confusion_matrix(true_labels, labels)
    if matrix.diagonal().sum() > best_score:
        best_score = matrix.diagonal().sum()
        best_p = p
        
print(accuracy_score(true_labels, kmeans.labels_))
labels = list(map(lambda x : best_p[x], kmeans.labels_))
print(accuracy_score(true_labels, labels))


  0%|          | 0/5040 [00:00<?, ?it/s]

0.15220256677072494
0.23416579951439473


In [16]:
from sklearn.cluster import KMeans
from sklearn.metrics import rand_score

kmeans = KMeans(7)

In [17]:
%%time

kmeans.fit(encoded.numpy())

rand_score(true_labels, kmeans.labels_)



CPU times: user 28.7 s, sys: 6.68 s, total: 35.4 s
Wall time: 13.5 s


0.7119138050491299

In [18]:
from sklearn.metrics import confusion_matrix
from itertools import permutations
from sklearn.metrics import confusion_matrix, accuracy_score

best_p = []
best_score = 0
for p in tqdm(permutations([1, 2, 3, 4, 5, 6, 7]), total=5040):
    labels = list(map(lambda x : p[x], kmeans.labels_))
    matrix = confusion_matrix(true_labels, labels)
    if matrix.diagonal().sum() > best_score:
        best_score = matrix.diagonal().sum()
        best_p = p
        
print(accuracy_score(true_labels, kmeans.labels_))
labels = list(map(lambda x : best_p[x], kmeans.labels_))
print(accuracy_score(true_labels, labels))


  0%|          | 0/5040 [00:00<?, ?it/s]

0.20202913631633715
0.3231876517516476


In [19]:
model = VariationalAutoencoder(1, 128, 1, loss_cfs=(1, 1 / 1e8 * 0.1, 1 / 20), num_classes=7)
model.load_state_dict(torch.load('/kaggle/input/vkr-data/state_dict_VAE', map_location=torch.device('cpu')))
#optimizer = torch.optim.Adam(model.parameters(), lr=2e-4)
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device);
#criterion = nn.MSELoss()
#train_model(model, train_dataloader, val_dataloader, optimizer, device=device, num_epochs=100, verbose_num_iters=64)

In [20]:
original, reconstructed, true_labels, predicted_labels = predict(model, val_dataloader)

  0%|          | 0/113 [00:00<?, ?it/s]

In [21]:
print('RMSE:', mean_squared_error(torch.cat(original).cpu().detach().numpy().flatten(),
               torch.cat(reconstructed).cpu().detach().numpy().flatten(), squared=False))
print('Accuracy:', accuracy_score(torch.cat(true_labels).cpu().detach().numpy(),
               torch.cat(predicted_labels).cpu().detach().numpy()))

RMSE: 0.14835764
Accuracy: 0.1079431148109608


In [22]:
loader = val_dataloader
model.eval()
with torch.no_grad():
    original = []
    reconstructed = []
    true_labels = []
    predicted_labels = []
    encoded = []
    indexes = []
    for i, (x, mean, std, labels) in tqdm(enumerate(loader), leave=False, total=len(loader)):
        std = std[:, None, None, None]
        mean = mean[:, None, None, None]
        x = x.unsqueeze(1).float()
        x = x.to(device)
        encoded.append(model.encode(x).cpu())
        original.append(x.cpu())
        true_labels.append(labels)

  0%|          | 0/113 [00:00<?, ?it/s]

In [23]:
encoded = torch.cat(encoded)
true_labels = torch.cat(true_labels)
original = torch.cat(original)
original = original.flatten(start_dim=1)

In [24]:
from sklearn.cluster import KMeans
from sklearn.metrics import rand_score

kmeans = KMeans(7)

In [25]:
%%time
kmeans.fit(original.numpy())

rand_score(true_labels, kmeans.labels_)



CPU times: user 3min 57s, sys: 29.9 s, total: 4min 27s
Wall time: 1min 54s


0.6770091537194797

In [26]:
from sklearn.metrics import confusion_matrix
from itertools import permutations
from sklearn.metrics import confusion_matrix, accuracy_score

best_p = []
best_score = 0
for p in tqdm(permutations([1, 2, 3, 4, 5, 6, 7]), total=5040):
    labels = list(map(lambda x : p[x], kmeans.labels_))
    acc = accuracy_score(true_labels[:28830], labels[:28830])
    if acc > best_score:
        best_score = acc
        best_p = p
        
print(accuracy_score(true_labels, kmeans.labels_))
labels = list(map(lambda x : best_p[x], kmeans.labels_))
print(accuracy_score(true_labels, labels))

  0%|          | 0/5040 [00:00<?, ?it/s]

0.10900104058272633
0.23263961151578216


In [27]:
from sklearn.cluster import KMeans
from sklearn.metrics import rand_score

kmeans = KMeans(7)

In [28]:
%%time

kmeans.fit(encoded.numpy())

rand_score(true_labels, kmeans.labels_)



CPU times: user 27.4 s, sys: 6.89 s, total: 34.3 s
Wall time: 12.8 s


0.6855806655485953

In [29]:
from sklearn.metrics import confusion_matrix
from itertools import permutations
from sklearn.metrics import confusion_matrix, accuracy_score

best_p = []
best_score = 0
for p in tqdm(permutations([1, 2, 3, 4, 5, 6, 7]), total=5040):
    labels = list(map(lambda x : p[x], kmeans.labels_))
    acc = accuracy_score(true_labels[:28830], labels[:28830])
    if acc > best_score:
        best_score = acc
        best_p = p
        
print(accuracy_score(true_labels, kmeans.labels_))
labels = list(map(lambda x : best_p[x], kmeans.labels_))
print(accuracy_score(true_labels, labels))

  0%|          | 0/5040 [00:00<?, ?it/s]

0.11456815816857441
0.26125563648976763
