<a href="https://colab.research.google.com/github/broshanfekr/Active_Semi-Supervised_Learning_Using_Sampling_Theory_for_Graph_signals/blob/main/Active_Semi_Supervised_Learning_Using_Sampling_Theory_for_Graph_signals.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget -O USPSdata.zip https://www.dropbox.com/s/f0ktdoudhipohdf/USPSdata.zip?dl=0
!unzip USPSdata.zip

In [None]:
from scipy import spatial
from scipy import special
import numpy as np
import scipy.io as sio
from numpy.random import permutation
from scipy.sparse.linalg import svds, eigs
from scipy import sparse

مقداردهی اولیه به برخی از پارامترهای مورد نیاز برای آموزش مدل

In [None]:
class ArgValus:
    def __init__(self):
        self.dataset_name = "USPS"

        # k in knn algorithm
        self.k = 10
        self.sample_per_class = 100
        self.train_percent = 0.01
        self.num_iter = 100

لود کردن مجموعه داده مورد نظر و ساخت گراف با استفاده از روش نزدیکترین همسایه

In [None]:
def load_dataset(myargs):
    img, label = load_USPS_data_instace("USPSdata/usps_all.mat", myargs)
    img = np.reshape(img, [img.shape[0], -1])

    # building the adjacency matrix
    print("building adjacency matrix")
    adj_mat, sigma = build_knn_adj_mat(img, k=myargs.k)

    return img, label, adj_mat, sigma


def load_USPS_data_instace(path, myargs):
    data = sio.loadmat(path)
    img = data['data']
    img_data = []
    label = []
    for i in range(img.shape[-1]):
        temp_sample_list = []
        for j in range(img.shape[1]):
            temp = np.reshape(img[:, j, i], [16, 16])
            temp_sample_list.append(temp)

        idx = permutation(len(temp_sample_list))
        idx = idx[:myargs.sample_per_class]
        selected_samples = [temp_sample_list[x] for x in idx]
        selected_labels = [i for _ in idx]
        img_data.extend(selected_samples)
        label.extend(selected_labels)

    img_data = np.array(img_data)
    img_data = img_data.astype('float')
    label = np.array(label[:])
    label = to_onehot(label)
    return img_data, label


def build_knn_adj_mat(data_nodes, k):
    shape = data_nodes.shape
    # compute pairwise distances
    distance_matrix = np.zeros([shape[0], shape[0]])
    # Assign lower triangular part only in loop, saves time
    for i in range(shape[0]):
        for j in range(i):
            distance_matrix[i, j] = spatial.distance.euclidean(data_nodes[i, :],
                                                               data_nodes[j, :])
    # Complete upper triangular part
    distance_matrix = np.add(distance_matrix, distance_matrix.T)

    # Calculating distances of k-nearest neighbors
    # sort all possible neighbors according to distance
    knn_distance = np.sort(distance_matrix, axis=1)

    # sparsification matrix
    nodes_to_retain = np.ones([shape[0], shape[0]])

    for i in range(shape[0]):
        nodes_to_retain[i, distance_matrix[i, :] > knn_distance[i, k]] = 0
        nodes_to_retain[i, i] = 0  # diagonal should be zero

    nodes_to_retain[nodes_to_retain != nodes_to_retain.T] = 1

    # keep distances in the range of knn distance
    distance_matrix = np.multiply(distance_matrix, nodes_to_retain)

    # computing sigma
    sigma = 1/3 * np.mean(knn_distance[:, k])

    # build knn adjacency matrix
    adjacency_mat = np.zeros([shape[0], shape[0]])
    for i in range(shape[0]):
        for j in range(i):
            if distance_matrix[i, j] == 0:
                continue
            else:
                adjacency_mat[i, j] = np.exp((-1.0 * distance_matrix[i, j] ** 2) / (2 * sigma ** 2))
    adjacency_mat = adjacency_mat + adjacency_mat.T

    return adjacency_mat, sigma


def to_onehot(label):
    m = len(set(label))
    n = len(label)
    onehot_matrix = np.zeros([n, m])
    for i in range(n):
        onehot_matrix[i, label[i]] = 1
    return onehot_matrix

ساخت ماتریس لاپلاسین نرمال شده با استفاده از ماتریس مجاورت گراف 
برای ساخت ماتریس لاپلاسین نرمال شده از رابطه زیر استفاده می شود.

L = I - D^(-0.5) * A * D^(-0.5)

در این رابطه 
A
نشان دهنده ماتریس مجاورت متناظر با گراف است.
D
نیز نشان دهنده ماتریس درجه مربوط به رئوس گراف است. این ماتریس یک ماتریس قطری است که درایه های قرار گرفته روی قطر اصلی آن درجه رئوس متناظر را نشان می دهند.

In [None]:
def build_normalized_laplacian_matrix(adjacency_matrix):
    # compute the symmetric normalized Laplacian matrix
    d = np.sum(adjacency_matrix, axis=1, dtype=float)
    for i in range(len(d)):
        if d[i] != 0:
            d[i] = d[i] ** (-1.0/2)
    Dinv = np.diag(d)
    Ln = np.eye(len(d)) - np.matmul(np.matmul(Dinv, adjacency_matrix), Dinv)
    # make sure the Laplacian is symmetric
    Ln = 0.5 * (Ln + Ln.T)
    return Ln

انتخاب نمونه به منظور انتساب برچسب به آن

In [None]:
def greedy_selection_of_samples(gl, num_of_samples):
    #  gl is the graph Laplacian matrix
    sample_set = []
    omega_list = []
    kth_power = 8

    Sc = set([i for i in range(gl.shape[0])])

    S = np.zeros([gl.shape[0], num_of_samples])

    # kth power of Laplacian
    print("computing proxies")
    L_K = np.linalg.matrix_power(gl, kth_power) 
    L_K = 0.5 * (L_K + L_K.T)

    for i in range(num_of_samples):
        print("number of selected samples: ", len(sample_set))
        rc = sorted(list(Sc))
        reduced_L = L_K[rc, :]
        reduced_L = reduced_L[:, rc]

        eval, evec = eigs(reduced_L, k=1, sigma=10**(-20))

        omega = abs(eval) ** (1.0/kth_power)
        omega_list.append(omega)

        phi = np.absolute(evec)
        max_val = max(phi)
        max_index = np.argmax(phi)
        selected = rc[max_index]
        Sc = Sc - {selected}
        sample_set.append(selected)

    rc = sorted(list(Sc))
    reduced_L = L_K[rc, :]
    reduced_L = reduced_L[:, rc]
    eval, evec = eigs(reduced_L, k=1, sigma=10**(-20))
    omega = abs(eval) ** (1.0/kth_power)
    omega_list.append(omega)

    sample_set = sorted(sample_set)

    for i, sidx in enumerate(sample_set):
        S[sidx, i] = 1

    return S, sample_set, omega

بازسازی گراف-سیگنالها با استفاده از گراف-سیگنالهایی که مقادیر آنها در نمونه های انتخاب شده، مشخص گردیده است.

In [None]:
def reconstruct_signal(Ln, observed_labels, sample_set, omega, myargs):
    filterlen = 10
    alpha = 8
    freq_range = [0, 2]

    chebyshev_coef = compute_cheby_coeff(lambda x: special.expit(alpha*(omega - x)), 
                                         filterlen, filterlen+1, freq_range)
    
    f_hat = cheby_op(Ln, chebyshev_coef, observed_labels, freq_range)

    to_estimate_idx = sorted(list(set(range(Ln.shape[0])) - set(sample_set)))

    prev_diffrence = 0
    for i in range(myargs.num_iter):
        # projection on C1
        err_s = observed_labels - f_hat
        err_s[to_estimate_idx, :] = 0 # error on the known set

        # projection on C2
        f_hat_temp = cheby_op(Ln, chebyshev_coef, f_hat + err_s, freq_range) # err on S approx LP

        diffrence = np.linalg.norm(f_hat_temp - f_hat) # to check convergence
        if i > 1 and diffrence > prev_diffrence:
            break
        else:
            prev_diffrence = diffrence
            f_hat = f_hat_temp

    return f_hat


def compute_cheby_coeff(f, m=30, N=None, a_arange=None):
    """
    Compute Chebyshev coefficients for a Filterbank.

    Parameters
    ----------
    f : Filter
        Filterbank with at least 1 filter
    m : int
        Maximum order of Chebyshev coeff to compute
        (default = 30)
    N : int
        Grid order used to compute quadrature
        (default = m + 1)
    i : int
        Index of the Filterbank element to compute
        (default = 0)

    Returns
    -------
    c : ndarray
        Matrix of Chebyshev coefficients

    """
    # G = f.G
    # i = kwargs.pop('i', 0)

    if not N:
        N = m + 1
    if not a_arange:
        a_arange = [0, 2]

    # a_arange = [0, G.lmax]

    a1 = (a_arange[1] - a_arange[0]) / 2.0
    a2 = (a_arange[1] + a_arange[0]) / 2.0
    c = np.zeros(m + 1)

    tmpN = np.arange(N)
    num = np.cos(np.pi * (tmpN + 0.5) / N)
    for o in range(m + 1):
        c[o] = 2. / N * np.dot(f(a1 * num + a2),
                               np.cos(np.pi * o * (tmpN + 0.5) / N))

    return c


def cheby_op(L, c, signal, a_arange):
    """
    Chebyshev polynomial of graph Laplacian applied to vector.

    Parameters
    ----------
    G : Graph
    L : Laplacian Matrix of the graph
    c : ndarray or list of ndarrays
        Chebyshev coefficients for a Filter or a Filterbank
    signal : ndarray
        Signal to filter

    Returns
    -------
    r : ndarray
        Result of the filtering

    """
    # Handle if we do not have a list of filters but only a simple filter in cheby_coeff.
    if not isinstance(c, np.ndarray):
        c = np.array(c)

    c = np.atleast_2d(c)
    Nscales, M = c.shape
    N = signal.shape[0]

    if M < 2:
        raise TypeError("The coefficients have an invalid shape")

    # thanks to that, we can also have 1d signal.
    try:
        Nv = np.shape(signal)[1]
        r = np.zeros((N * Nscales, Nv))
    except IndexError:
        r = np.zeros((N * Nscales))

    a1 = float(a_arange[1] - a_arange[0]) / 2.
    a2 = float(a_arange[1] + a_arange[0]) / 2.

    twf_old = signal
    twf_cur = (np.dot(L, signal) - a2 * signal) / a1

    tmpN = np.arange(N, dtype=int)
    for i in range(Nscales):
        r[tmpN + N*i] = 0.5 * c[i, 0] * twf_old + c[i, 1] * twf_cur

    factor = 2/a1 * (L - a2 * sparse.eye(N))
    for k in range(2, M):
        twf_new = factor.dot(twf_cur) - twf_old
        for i in range(Nscales):
            r[tmpN + N*i] += c[i, k] * twf_new

        twf_old = twf_cur
        twf_cur = twf_new

    return r


محاسبه دقت دسته بندی

In [None]:
def acc_score(pred_label, true_label, sample_set):
    Ind = np.argsort(-np.abs(pred_label), axis=1)
    pred_label = Ind[:, 0]

    Ind = np.argsort(-np.abs(true_label), axis=1)
    true_label = Ind[:, 0]

    all_samples = set(range(true_label.shape[0]))
    test_idx = list(all_samples - set(sample_set))

    test_acc = pred_label[test_idx] == true_label[test_idx]
    test_acc = sum(test_acc)/(len(true_label[test_idx]) * 1.0)

    train_acc = pred_label[sample_set] == true_label[sample_set]
    train_acc = sum(train_acc)/ (len(true_label[sample_set]) * 1.0)

    return train_acc, test_acc

Active Semi-Supervised Learning Using Sampling Theory for Graph signals

In [None]:
myargs = ArgValus()

img, label, adj_mat_p, sigma = load_dataset(myargs)

# compute the symmetric normalized Laplacian matrix
Ln = build_normalized_laplacian_matrix(adj_mat_p)

# Choosing optimal sampling sets of different sizes
print("sample selection from Graph...")
S, sample_set, omega = greedy_selection_of_samples(Ln, num_of_samples=round(myargs.train_percent*Ln.shape[0]))

# signal reconstruction
observed_labels = np.zeros(label.shape)
for sample in sample_set:
    observed_labels[sample, :] = label[sample, :]
f_hat = reconstruct_signal(Ln, observed_labels, sample_set, omega, myargs)

train_acc, test_acc = acc_score(f_hat, label, sample_set)
print("train_acc is: {}    |   test_acc is: {}".format(train_acc, test_acc))

building adjacency matrix
sample selection from Graph...
computing proxies
number of selected samples:  0
number of selected samples:  1
number of selected samples:  2
number of selected samples:  3
number of selected samples:  4
number of selected samples:  5
number of selected samples:  6
number of selected samples:  7
number of selected samples:  8
number of selected samples:  9
train_acc is: 1.0    |   test_acc is: 0.5858585858585859
