In [6]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

In [7]:
import sys

sys.path.append('/kaggle/input/haarpsi')

In [8]:
import pickle
from haarPsi import *
from PIL import Image
import cv2
import pywt
from tqdm import tqdm
from numba import njit
from sklearn.metrics import *
from sklearn.cluster import spectral_clustering
from scipy import signal
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from sklearn import metrics
from sklearn import metrics
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.cluster import KMeans, kmeans_plusplus
from sklearn.metrics.pairwise import euclidean_distances

## Уменьшение размерности

In [9]:
def read_mnist(path, amount):
    
    def combine_columns(row):
    
        return row.tolist()
    
    data = pd.read_csv(path)
    
    result = data.groupby('label', group_keys=False).apply(lambda x: x.head(amount))
    
    result['image'] = result.iloc[:, 1:].apply(combine_columns, axis=1)
    
    result = result[['label', 'image']].reset_index(drop=True)
    
    return result

In [10]:
train_mnist = read_mnist('/kaggle/input/digit-recognizer/train.csv', 200)

#### HaarPsi

In [11]:
def resize_for_haar_psi(data):
    
    return [np.resize(img, (28, 28)) for img in data]

In [12]:
def sim_matrix_haar_psi(data, size):
    
    sim_matrix = np.zeros([size, size])
    
    for i in tqdm(range(size)):
        
        for j in range(i, size):

            res = haar_psi(data[i], data[j])[0]
            
            sim_matrix[i][j], sim_matrix[j][i] = res, res
            
    return sim_matrix

In [30]:
train_mnist_resize = resize_for_haar_psi(train_mnist.image)

In [31]:
mnist_haar_psi = sim_matrix_haar_psi(train_mnist_resize, len(train_mnist_resize))

100%|██████████| 2000/2000 [54:23<00:00,  1.63s/it] 


#### CW-SSIM Ricker

In [13]:
def get_signal(img1, img2):
    
    widths = np.arange(15, 18)

    cwtmatr1 = signal.cwt(img1, signal.ricker, widths)
    cwtmatr2 = signal.cwt(img2, signal.ricker, widths)
    
    return cwtmatr1, cwtmatr2



@njit(parallel=True)
def cw_ssim(cwtmatr1, cwtmatr2, k=0.01):
    
    c1c2_conj = np.multiply(cwtmatr1, np.conjugate(cwtmatr2))
    
    c1c2_sum = np.sum(c1c2_conj, axis=0)
    
    num_ssim = 2 * np.abs(c1c2_sum) + k
    
    c1_2 = np.square(np.abs(cwtmatr1))
    c2_2 = np.square(np.abs(cwtmatr2))
    
    den_ssim = np.sum(c1_2, axis=0) + np.sum(c2_2, axis=0) + k

    ssim_map = num_ssim / den_ssim

    # Average the per pixel results
    index = np.average(ssim_map)
    
    return index

def sim_matrix_cw(data, size):
    
    sim_m = np.zeros([size, size])
    
    for i in tqdm(range(size+1)):
        
        for j in range(i, size):
            
            cwtmatr1, cwtmatr2 = get_signal(data[i], data[j])
            
            res = cw_ssim(cwtmatr1, cwtmatr2)
            
            sim_m[i][j], sim_m[j][i] = res, res
            
            
    return sim_m

In [34]:
mnist_cw_ricker = sim_matrix_cw(train_mnist['image'], 2000)

100%|██████████| 2001/2001 [55:50<00:00,  1.67s/it] 


#### CW-SSIM Haar

In [14]:
def haar_signal(img1, img2):
    
    coef1, freq1 = pywt.dwt2(np.resize(img1, (28, 28)), 'haar')
    coef2, freq2 = pywt.dwt2(np.resize(img2, (28, 28)), 'haar')
    
    return coef1, coef2

def matrix_cw_haar(data, size):
    sim_m = np.zeros([size, size])
    
    for i in tqdm(range(size+1)):
        
        for j in range(i, size):
            
            cwtmatr1, cwtmatr2 = haar_signal(data[i], data[j])
            
            res = cw_ssim(cwtmatr1, cwtmatr2)
            
            sim_m[i][j], sim_m[j][i] = res, res
            
            
    return sim_m

In [38]:
mnist_cw_haar = matrix_cw_haar(train_mnist['image'], 2000)

100%|██████████| 2001/2001 [17:11<00:00,  1.94it/s]


## Semi-Supervised Spectral Clustering with regular K-means

In [15]:
import scipy as sp
from sklearn.cluster import KMeans
import itertools

def sm_laplacian(W):
    D = np.diag(np.sum(W, axis=1))
    I = np.identity(W.shape[0])
    D_inv_sqrt = np.linalg.inv(np.sqrt(D))
    L = I - np.dot(D_inv_sqrt, W).dot(D_inv_sqrt)
    
    return L

def compute_eigenvectors(L, k):
    eigenvalues, eigenvectors = np.linalg.eig(L)
    sorted_indices = np.argsort(eigenvalues)  # Сортируем индексы собственных значений
    k_eigenvalues = eigenvalues[sorted_indices[:k+1]]  # Выбираем k + 1 наименьших собственных значений
    k_eigenvectors = eigenvectors[:, sorted_indices[1:k+1]]  # Соответствующие им собственные векторы
    
    return k_eigenvectors

def semi_spectral(W, k, j_l, j_u):
    '''
    W - adjacency matrix
    k - number of clusters
    j_l - matrix, in which rows are class indicator vectors for labelled data
    j_u - matrix, in which rows are class indicator vectors for unlabelled data
    '''
    
    L = sm_laplacian(W)
    
    k_eigenvectors = compute_eigenvectors(L, k)
    
    m = np.zeros((k, k))
    
    for a in range(k):
        for b in range(k):
            
            m[a][b] = np.dot(k_eigenvectors[:, a].T, j_l[b]) / len(j_l[b])
            
    v_hat = np.zeros((len(W), k))
    v_tilde = []
    
    for a in range(k):
        
        v_hat[:, a] = np.sum([m[a, b] * np.array(j_l[b]) for b in range(k)], axis=0) + np.dot(np.diag(j_u), k_eigenvectors[:, a])
        v_tilde.append(np.dot(W, v_hat[:, a]) / np.linalg.norm(np.dot(W, v_hat[:, a])))
        
    v_tilde = np.array(v_tilde).T
        
    kmeans = KMeans(n_clusters=k).fit(v_tilde)
    cluster_labels = kmeans.labels_
    
    return cluster_labels

In [16]:
def generate_number_combinations(num_digits):
    digits = list(range(10))
    combinations = list(itertools.combinations(digits, num_digits))
    return combinations

def randomized_testing(data, W, k, percent_step, iterations, var_labels):
    
    ari_total = {}

    
    for labels in var_labels:

        data_local = data[data['label'].isin(list(labels))]
        labels_true = data_local[data_local['label'].isin(list(labels))]['label'].tolist()
        idx = data_local['index'].tolist()
        pd_W = pd.DataFrame(W).iloc[idx, idx]
        W_matrix = pd_W.values

        ari_local = {}

        for perc in range(0, 100 + percent_step, percent_step):

            ari = 0

            for itr in range(iterations):

                j_l = []
                j_u = []

                for label in labels:
                    random_indexes = data_local[data_local['label'] == label]['index'].to_numpy()
                    np.random.shuffle(random_indexes)
                    selected_indexes = random_indexes[:int((perc / 100) * len(random_indexes))]

                    res = [1 if i in selected_indexes else 0 for i in range(len(data_local))]
                    j_l.append(res)

                j_l_total = [0] * len(j_l[0])

                for sublist in j_l:
                    j_l_total = [sum(x) for x in zip(j_l_total, sublist)]

                j_u = [1 if j_l_total[i] == 0 else 0 for i in range(len(data_local))]

                labels_pred = semi_spectral(W_matrix, k, j_l, j_u)

                ari += metrics.adjusted_rand_score(labels_true, labels_pred)

            ari_local[perc] = ari / iterations
            
        ari_total[labels] = ari_local
        
    return ari_total

In [17]:
def plot_multiple_graphs(ari_haarpsi, ari_ricker, ari_haar, k):
    figure = plt.figure(figsize=(10, 6))
    axes = figure.subplots()
    
    labels = None

    for label in ari_haarpsi.keys():
        labels = label
        x = list(ari_haarpsi[label].keys())
        y = list(ari_haarpsi[label].values())
        axes.plot(x, y, label='haarPsi', marker='o')
        
    for label in ari_ricker.keys():
        x = list(ari_ricker[label].keys())
        y = list(ari_ricker[label].values())
        axes.plot(x, y, label='CW-SSIM Ricker', marker='o')
        
    for label in ari_haar.keys():
        x = list(ari_haar[label].keys())
        y = list(ari_haar[label].values())
        axes.plot(x, y, label='CW-SSIM Haar', marker='o')

    axes.set_xlabel('Процент размеченных изображений', fontsize=12)
    axes.set_ylabel('ARI', fontsize=12)
    axes.set_title(f'Число классов: {k}, набор чисел: {labels}', fontsize=14)
    axes.legend(title='Меры близости', title_fontsize='12')
    plt.grid(True)
    plt.tight_layout()
    
    return figure

In [33]:
train_1000 = train_1000.reset_index()

In [18]:
figures = []

for k in range(2, 11):
    percent_step = 5
    iterations = 500
    
    var_labels = [generate_number_combinations(k)[0]]
    
    for labels in var_labels:
    
        ari_haarpsi = randomized_testing(train_mnist, mnist_haar_psi, k, percent_step, iterations, [labels])
        ari_ricker = randomized_testing(train_mnist, mnist_cw_ricker, k, percent_step, iterations, [labels])
        ari_haar = randomized_testing(train_mnist, mnist_cw_haar, k, percent_step, iterations, [labels])
        figure = plot_multiple_graphs(ari_haarpsi, ari_ricker, ari_haar, k)
        figures.append(figure)
    
pdf_filename = "semi_spectral-mnist-all_metrics-10_classes_iter500-2000_samples.pdf"
pdf = PdfPages(pdf_filename)

for fig in figures:
    pdf.savefig(fig)

pdf.close()

## Semi-Supervised Spectral Clustering with semi-supervised K-means

In [19]:
class SupervisedKMeans(ClassifierMixin, KMeans):
    def fit(self, X, y):
        self.classes = np.unique(y)
        self.centers_ = np.array([np.mean(X[y==c], axis=0) for c in self.classes])
        self.cluster_centers_ = self.centers_
        return self

    def predict(self, X):
        ed = euclidean_distances(X, self.cluster_centers_)
        return [self.classes[k] for k in np.argmin(ed, axis=1)]

    def score(self, X, y):
        y_ = self.predict(X)
        return np.mean(y == y_)


class SemiKMeans(SupervisedKMeans):
    def fit(self, X0, y0, X1):
        """To fit the semisupervised model
        
        Args:
            X0 (array): input variables with labels
            y0 (array): labels
            X1 (array): input variables without labels
        
        Returns:
            the model
        """
        classes0 = np.unique(y0)
        classes1 = np.setdiff1d(np.arange(self.n_clusters), classes0)
        self.classes = np.concatenate((classes0, classes1))

        X = np.row_stack((X0, X1))
        n1 = len(classes1)
        mu0 = SupervisedKMeans().fit(X0, y0).centers_
        if n1:
            centers, indices = kmeans_plusplus(X1, n_clusters=n1)
            self.cluster_centers_ = np.row_stack((centers, mu0))
        else:
            self.cluster_centers_ = mu0
        for _ in range(30):
            ED = euclidean_distances(X1, self.cluster_centers_)
            y1 = [self.classes[k] for k in np.argmin(ED, axis=1)]
            y = np.concatenate((y0, y1))
            self.cluster_centers_ = np.array([np.mean(X[y==c], axis=0) for c in self.classes])
        return self

In [20]:
def semi_spectral_v2(W, k, j_l, j_u, indexes_l, indexes_u, labels_l):
    '''
    W - adjacency matrix
    k - number of clusters
    j_l - matrix, in which rows are class indicator vectors for labelled data
    j_u - matrix, in which rows are class indicator vectors for unlabelled data
    '''
    
    L = sm_laplacian(W)
    
    k_eigenvectors = compute_eigenvectors(L, k)
    
    m = np.zeros((k, k))
    
    for a in range(k):
        for b in range(k):
            
            m[a][b] = np.dot(k_eigenvectors[:, a].T, j_l[b]) / len(j_l[b])
            
    v_hat = np.zeros((len(W), k))
    v_tilde = []
    
    for a in range(k):
        
        v_hat[:, a] = np.sum([m[a, b] * np.array(j_l[b]) for b in range(k)], axis=0) + np.dot(np.diag(j_u), k_eigenvectors[:, a])
        v_tilde.append(np.dot(W, v_hat[:, a]) / np.linalg.norm(np.dot(W, v_hat[:, a])))
        
    v_tilde = np.array(v_tilde).T
    
    X_train0 = np.array(v_tilde[indexes_l, :].tolist())
    X_train1 = np.array(v_tilde[indexes_u, :].tolist())

    kmeans = SemiKMeans(n_clusters=k).fit(X_train0, np.array(labels_l), X_train1)
    cluster_labels = kmeans.predict(v_tilde)
    
    return cluster_labels

In [22]:
def randomized_testing_v2(data, W, k, percent_step, iterations, var_labels):
    
    ari_total = {}
    
    for labels in var_labels:

        data_local = data[data['label'].isin(list(labels))]
        labels_true = data_local[data_local['label'].isin(list(labels))]['label'].tolist()    
        idx = data_local['index'].tolist()
        pd_W = pd.DataFrame(W).iloc[idx, idx]
        W_matrix = pd_W.values

        ari_local = {}

        for perc in range(percent_step, 100, percent_step):

            ari = 0

            for itr in range(iterations):

                j_l = []
                j_u = []
                indexes_l = []

                for label in labels:
                    random_indexes = data_local[data_local['label'] == label]['index'].to_numpy()
                    np.random.shuffle(random_indexes)
                    selected_indexes = random_indexes[:int((perc / 100) * len(random_indexes))]

                    res = [1 if i in selected_indexes else 0 for i in range(len(data_local))]
                    indexes_l.append(selected_indexes)
                    j_l.append(res)

                j_l_total = [0] * len(j_l[0])
                indexes_l = sorted(np.array(indexes_l).flatten())
                labels_l = []
                
                for idx in indexes_l:
                    labels_l.append(labels_true[idx])

                labels_l = np.array(labels_l)
                
                for sublist in j_l:
                    j_l_total = [sum(x) for x in zip(j_l_total, sublist)]

                j_u = [1 if j_l_total[i] == 0 else 0 for i in range(len(data_local))]
                indexes_u = []
                for idx, value in enumerate(j_u):
                    if value == 1:
                        indexes_u.append(idx)

                labels_pred = semi_spectral_v2(W_matrix, k, j_l, j_u, indexes_l, indexes_u, labels_l)

                ari += metrics.adjusted_rand_score(labels_true, labels_pred)

            ari_local[perc] = ari / iterations
            
        ari_total[labels] = ari_local
        
    return ari_total

In [23]:
figures = []

for k in range(2, 11):
    percent_step = 5
    iterations = 500
    
    var_labels = generate_number_combinations(k)
    
    for labels in var_labels:

        ari_haarpsi = randomized_testing(train_mnist, mnist_haarpsi, k, percent_step, iterations, [labels])
        ari_ricker = randomized_testing(train_mnist, mnist_cw_ricker, k, percent_step, iterations, [labels])
        ari_haar = randomized_testing(train_mnist, mnist_cw_haar, k, percent_step, iterations, [labels])
        figure = plot_multiple_graphs(ari_haarpsi, ari_ricker, ari_haar, k)
        figures.append(figure)
    
pdf_filename = "semi_kmeans-mnist-all_metrics-10_classes-iter500.pdf"
pdf = PdfPages(pdf_filename)

for fig in figures:
    pdf.savefig(fig)

pdf.close()