In [None]:
import gdown
url = "https://drive.google.com/uc?id=1kR4yNafYdhuGSo4kPI1HwApmluY3L1Oq"

output_zip_path = "/content/KPCAMNet_dataset.zip"

# Download the file using gdown
gdown.download(url, output_zip_path, quiet=False)

Downloading...
From (original): https://drive.google.com/uc?id=1kR4yNafYdhuGSo4kPI1HwApmluY3L1Oq
From (redirected): https://drive.google.com/uc?id=1kR4yNafYdhuGSo4kPI1HwApmluY3L1Oq&confirm=t&uuid=d7c22aa4-f8c4-45e1-84da-cbe8a356ab30
To: /content/KPCAMNet_dataset.zip
100%|██████████| 28.0M/28.0M [00:00<00:00, 72.6MB/s]


'/content/KPCAMNet_dataset.zip'

In [None]:
!unzip /content/KPCAMNet_dataset.zip

Archive:  /content/KPCAMNet_dataset.zip
   creating: KPCAMNet_dataset/
   creating: KPCAMNet_dataset/HY/
   creating: KPCAMNet_dataset/HY/labels/
  inflating: KPCAMNet_dataset/HY/labels/change.bmp  
  inflating: KPCAMNet_dataset/HY/labels/GT.bmp  
  inflating: KPCAMNet_dataset/HY/labels/multi-change.bmp  
  inflating: KPCAMNet_dataset/HY/labels/unchanged.bmp  
   creating: KPCAMNet_dataset/HY/raw data/
  inflating: KPCAMNet_dataset/HY/raw data/T1  
  inflating: KPCAMNet_dataset/HY/raw data/T1.hdr  
  inflating: KPCAMNet_dataset/HY/raw data/T2  
  inflating: KPCAMNet_dataset/HY/raw data/T2.hdr  
   creating: KPCAMNet_dataset/HY/visualized img/
 extracting: KPCAMNet_dataset/HY/visualized img/T1.png  
 extracting: KPCAMNet_dataset/HY/visualized img/T2.png  
   creating: KPCAMNet_dataset/QU/
   creating: KPCAMNet_dataset/QU/labels/
  inflating: KPCAMNet_dataset/QU/labels/change.bmp  
  inflating: KPCAMNet_dataset/QU/labels/GT.png  
  inflating: KPCAMNet_dataset/QU/labels/GT_multi.bmp  
  i

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from scipy.linalg import eig
from scipy.stats import chi2
from sklearn.cluster import KMeans


def get_binary_change_map(data, method='k_means'):
    """
    get binary change map
    :param data:
    :param method: cluster method
    :return: binary change map
    """
    if method == 'k_means':
        cluster_center = KMeans(n_clusters=2, max_iter=1500).fit(data.T).cluster_centers_.T  # shape: (1, 2)
        # cluster_center = k_means_cluster(weight, cluster_num=2)
        print('k-means cluster is done, the cluster center is ', cluster_center)
        dis_1 = np.linalg.norm(data - cluster_center[0, 0], axis=0, keepdims=True)
        dis_2 = np.linalg.norm(data - cluster_center[0, 1], axis=0, keepdims=True)

        bcm = np.copy(data)  # binary change map
        if cluster_center[0, 0] > cluster_center[0, 1]:
            bcm[dis_1 > dis_2] = 0
            bcm[dis_1 <= dis_2] = 255
        else:
            bcm[dis_1 > dis_2] = 255
            bcm[dis_1 <= dis_2] = 0
    elif method == 'otsu':
        bcm, threshold = otsu(data, num=200)
        print('otsu is done, the threshold is ', threshold)

    return bcm


def otsu(data, num=400):
    """
    generate binary change map based on otsu
    :param data: cluster data
    :param num: intensity number
    :return:
        binary change map
        selected threshold
    """
    max_value = np.max(data)
    min_value = np.min(data)

    total_num = data.shape[1]
    step_value = (max_value - min_value) / num
    value = min_value
    best_threshold = min_value
    best_inter_class_var = 0
    while value <= max_value:
        data_1 = data[data <= value]
        data_2 = data[data > value]
        w1 = data_1.shape[0] / total_num
        w2 = data_2.shape[0] / total_num

        mean_1 = data_1.mean()
        mean_2 = data_2.mean()

        inter_class_var = w1 * w2 * np.power((mean_1 - mean_2), 2)
        if best_inter_class_var < inter_class_var:
            best_inter_class_var = inter_class_var
            best_threshold = value
        value += step_value

    bwp = np.zeros(data.shape)
    bwp[data <= best_threshold] = 0
    bwp[data > best_threshold] = 255

    return bwp, best_threshold

In [None]:
import imageio

import numpy as np
from sklearn.metrics import roc_curve
from sklearn.metrics import confusion_matrix
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import accuracy_score


def assess_accuracy(gt_changed, gt_unchanged, changed_map, multi_class=False):
    """
    assess accuracy of changed map based on ground truth
    :param gt_changed: changed ground truth
    :param gt_unchanged: unchanged ground truth
    :param changed_map: changed map
    :return: confusion matrix and overall accuracy
    """
    cm = []
    gt = []

    if multi_class:
        height, width, channel = gt_changed.shape
        for i in range(0, height):
            for j in range(0, width):
                if (changed_map[i, j] == np.array([255, 255, 0])).all():
                    cm.append('soil')
                # elif (changed_map[i, j] == np.array([0, 0, 255])).all():
                #     cm.append('water')
                elif (changed_map[i, j] == np.array([255, 0, 0])).all():
                    cm.append('city')
                else:
                    cm.append('unchanged')
                if (gt_changed[i, j] == np.array([255, 255, 0])).all():
                    gt.append('soil')
                elif (gt_changed[i, j] == np.array([255, 0, 0])).all():
                    gt.append('city')
                # elif (gt_changed[i, j] == np.array([0, 0, 255])).all():
                #     gt.append('water')
                elif (gt_unchanged[i, j] == np.array([255, 255, 255])).all():
                    gt.append('unchanged')
                else:
                    gt.append('undefined')
        conf_mat = confusion_matrix(y_true=gt, y_pred=cm,
                                    labels=['soil', 'city', 'unchanged'])
        kappa_co = cohen_kappa_score(y1=gt, y2=cm,
                                     labels=['soil', 'city', 'unchanged'])
        aa = conf_mat.diagonal() / np.sum(conf_mat, axis=1)
        oa = np.sum(conf_mat.diagonal()) / np.sum(conf_mat)

        return conf_mat, oa, aa, kappa_co

    else:
        height, width = changed_map.shape
        changed_map = np.reshape(changed_map, (-1,))
        gt_changed = np.reshape(gt_changed, (-1,))
        gt_unchanged = np.reshape(gt_unchanged, (-1,))

        cm = np.ones((height * width,))
        cm[changed_map == 255] = 2

        gt = np.zeros((height * width,))
        gt[gt_changed == 255] = 2
        gt[gt_unchanged == 255] = 1


        conf_mat = confusion_matrix(y_true=gt, y_pred=cm,
                                    labels=[1, 2])  # ['soil', 'water', 'city', 'unchanged'])
        kappa_co = cohen_kappa_score(y1=gt, y2=cm,
                                     labels=[1, 2])  # ['soil', 'water', 'city', 'unchanged'])

        oa = np.sum(conf_mat.diagonal()) / np.sum(conf_mat)

        return conf_mat, oa, kappa_co



# if __name__ == '__main__':
#     # val_func()
#     ground_truth_changed = imageio.imread('./Adata/GF_2_2/change.bmp')[:, :, 0]
#     ground_truth_unchanged = imageio.imread('./Adata/GF_2_2/unchanged.bmp')  # [:, :, 1]

#     cm_path = 'PCANet/compare/SAE_binary.bmp'
#     changed_map = imageio.imread(cm_path)


#     conf_mat, oa, kappa_co = assess_accuracy(ground_truth_changed, ground_truth_unchanged, changed_map,
#                                                  multi_class=False)
#     conf_mat_2 = conf_mat.copy()
#     conf_mat_2[1, 1] = conf_mat[0, 0]
#     conf_mat_2[0, 0] = conf_mat[1, 1]
#     conf_mat_2[1, 0] = conf_mat[0, 1]
#     conf_mat_2[0, 1] = conf_mat[1, 0]

#     print(conf_mat)
#     print(conf_mat_2[1, 0] + conf_mat_2[0, 1])
#     print(oa)
#     print(kappa_co)

In [None]:
import numpy as np


def norm_img(img, channel_first=True):
    if channel_first:
        channel, img_height, img_width = img.shape
        img = np.reshape(img, (channel, img_height * img_width))  # (channel, height * width)
        max_value = np.max(img, axis=1, keepdims=True)  # (channel, 1)
        min_value = np.min(img, axis=1, keepdims=True)  # (channel, 1)
        diff_value = max_value - min_value
        nm_img = (img - min_value) / diff_value
        nm_img = np.reshape(nm_img, (channel, img_height, img_width))
    else:
        img_height, img_width, channel = img.shape
        img = np.reshape(img, (img_height * img_width, channel))  # (channel, height * width)
        max_value = np.max(img, axis=0, keepdims=True)  # (channel, 1)
        min_value = np.min(img, axis=0, keepdims=True)  # (channel, 1)
        diff_value = max_value - min_value
        nm_img = (img - min_value) / diff_value
        nm_img = np.reshape(nm_img, (img_height, img_width, channel))
    return nm_img


def norm_img_2(img, channel_first=True):
    if channel_first:
        channel, img_height, img_width = img.shape
        img = np.reshape(img, (channel, img_height * img_width))  # (channel, height * width)
        max_value = np.max(img, axis=1, keepdims=True)  # (channel, 1)
        min_value = np.min(img, axis=1, keepdims=True)  # (channel, 1)
        diff_value = max_value - min_value
        nm_img = 2 * ((img - min_value) / diff_value - 0.5)
        nm_img = np.reshape(nm_img, (channel, img_height, img_width))
    else:
        img_height, img_width, channel = img.shape
        img = np.reshape(img, (img_height * img_width, channel))  # (channel, height * width)
        max_value = np.max(img, axis=0, keepdims=True)  # (channel, 1)
        min_value = np.min(img, axis=0, keepdims=True)  # (channel, 1)
        diff_value = max_value - min_value
        nm_img = 2 * ((img - min_value) / diff_value - 0.5)
        nm_img = np.reshape(nm_img, (img_height, img_width, channel))
    return nm_img


def stad_img(img, channel_first=True):
    """
    normalization image
    :param channel_first:
    :param img: (C, H, W)
    :return:
        norm_img: (C, H, W)
    """
    if channel_first:
        channel, img_height, img_width = img.shape
        img = np.reshape(img, (channel, img_height * img_width))  # (channel, height * width)
        mean = np.mean(img, axis=1, keepdims=True)  # (channel, 1)
        center = img - mean  # (channel, height * width)
        var = np.sum(np.power(center, 2), axis=1, keepdims=True) / (img_height * img_width)  # (channel, 1)
        std = np.sqrt(var)  # (channel, 1)
        nm_img = center / std  # (channel, height * width)
        nm_img = np.reshape(nm_img, (channel, img_height, img_width))
    else:
        img_height, img_width, channel = img.shape
        img = np.reshape(img, (img_height * img_width, channel))  # (height * width, channel)
        mean = np.mean(img, axis=0, keepdims=True)  # (1, channel)
        center = img - mean  # (height * width, channel)
        var = np.sum(np.power(center, 2), axis=0, keepdims=True) / (img_height * img_width)  # (1, channel)
        std = np.sqrt(var)  # (channel, 1)
        nm_img = center / std  # (channel, height * width)
        nm_img = np.reshape(nm_img, (img_height, img_width, channel))
    return nm_img


def random_select_samples(img_X, img_Y, n_train, patch_sz):
    '''
        randomly selecting patches as training sample for KPCA convolution training
    '''
    height, width, channel = img_X.shape
    edge = patch_sz // 2

    img_X = np.pad(img_X, ((edge, edge), (edge, edge), (0, 0)), 'constant')
    img_Y = np.pad(img_Y, ((edge, edge), (edge, edge), (0, 0)), 'constant')

    patch_X = []
    patch_Y = []

    for i in range(0, height):
        for j in range(0, width):
            patch_X.append(img_X[i:i + patch_sz, j:j + patch_sz])
            patch_Y.append(img_Y[i:i + patch_sz, j:j + patch_sz])
    patch_X = np.array(patch_X, np.float32)
    patch_Y = np.array(patch_Y, np.float32)

    train_idx = np.arange(0, height * width)
    np.random.seed()
    np.random.shuffle(train_idx)
    train_idx = train_idx[0:n_train]

    train_X = patch_X[train_idx]
    train_Y = patch_Y[train_idx]
    return train_X, train_Y

In [None]:
import numpy as np
from sklearn.decomposition import KernelPCA


class KernelPCANet(object):
    def __init__(self, num_stages, patch_size, num_filters, gamma):
        self.num_stages = num_stages
        self.patch_size = patch_size
        self.num_filters = num_filters

        self.filters = []
        self.gamma = gamma

    def train_net(self, input_data, stage, is_mean_removal, kernel='rbf'):
        return self.train_filters(input_data, stage, is_mean_removal, kernel)

    def train_filters(self, input_data, stage, is_mean_removal, kernel):
        input_shape = input_data.shape  # (B, m, n, c)
        # generate overlap patch
        print('-------patch generation-------')
        overlap_patch = self._generate_over_patch(input_data)

        overlap_patch = np.reshape(overlap_patch,  # (m * n, c * k1 * k2)
                                   (-1, self.patch_size[0] * self.patch_size[1] * input_shape[-1]))

        # overlap_patch = input_data
        # mean removal
        #  print('-------mean removal-------')
        if is_mean_removal:
            patch_mean = np.mean(overlap_patch, axis=1, keepdims=True)  # (m * n * c, 1)
            mean_overlap_patch = overlap_patch - patch_mean  # (m * n * c, k1 * k2)
        else:
            mean_overlap_patch = overlap_patch

        print('-------solve KPCA problem-------')
        KPCA_trans = KernelPCA(n_components=self.num_filters[stage], kernel=kernel, degree=3, gamma=self.gamma[stage])
        output = KPCA_trans.fit_transform(mean_overlap_patch)

        self.filters.append(KPCA_trans)
        return output

    def infer_data(self, input_data, stage, is_mean_removal):
        output_data = self.predict(input_data, stage, is_mean_removal)
        return output_data

    def predict(self, data, stage, is_mean_removal):
        input_shape = data.shape
        mar_ver = self.patch_size[0] // 2
        mar_hor = self.patch_size[1] // 2
        padding_img = np.zeros(
            (input_shape[0],
             input_shape[1] + 2 * mar_ver,
             input_shape[2] + 2 * mar_hor,
             input_shape[3]))
        for i in range(input_shape[0]):  # (B, m, n, c) --> (B, m+filter_h, n+filter_w, c)
            padding_img[i] = np.pad(data[i], ((mar_ver, mar_hor), (mar_ver, mar_hor), (0, 0)), 'constant')

        # print('-------generate overlap patch-------')
        overlap_patch = self._generate_over_patch(padding_img)

        overlap_patch = np.reshape(overlap_patch,  # (m * n, c * k1 * k2)
                                   (-1, self.patch_size[0] * self.patch_size[1] * input_shape[-1]))

        # print('-------mean removal-------')
        if is_mean_removal:
            patch_mean = np.mean(overlap_patch, axis=1, keepdims=True)  # (m * n * c, 1)
            mean_overlap_patch = overlap_patch - patch_mean  # (m * n * c, k1 * k2)
        else:
            mean_overlap_patch = overlap_patch

        KPCA_trains = self.filters[stage]
        trans_output = KPCA_trains.transform(mean_overlap_patch)

        trans_output = np.reshape(trans_output,
                                  (input_shape[0], input_shape[1], input_shape[2], self.num_filters[stage]))
        return trans_output

    def _generate_over_patch(self, data):
        input_shape = data.shape
        mar_ver = self.patch_size[0] // 2
        mar_hor = self.patch_size[1] // 2

        overlap_patch = []
        for batch_id in range(input_shape[0]):
            for i in range(mar_ver, input_shape[1] - mar_ver):
                for j in range(mar_hor, input_shape[2] - mar_hor):
                    # (B, k1, k2, c)
                    patch = data[batch_id, (i - mar_ver):(i + mar_ver + 1), (j - mar_hor):(j + mar_hor + 1)]
                    overlap_patch.append(patch)
        overlap_patch = np.reshape(overlap_patch, (-1, self.patch_size[0], self.patch_size[1], input_shape[-1]))
        return overlap_patch

In [None]:
import os
import pickle
import argparse

from osgeo import gdal
import numpy as np
from sklearn.cluster import KMeans
import sys
# import KernelPCANet
# import get_binary_change_map
# import assess_accuracy
# import stad_img, norm_img, random_select_samples
import imageio

DEFAULT_ARGS = {
    'net_depth': 3,
    'patch_size': 5,
    'kernel_func': 'rbf',
    'gamma_list': [5e-4, 5e-4, 5e-4],
    'save_path': 'result',
    'data_path': '/content/KPCAMNet_dataset/HY/raw data',
    'epoch': 1,
    'filter_num': [8, 8, 8],
    'sample_num': 100,
}

# if len(FILTER_NUM) != NET_DEPTH:
#     print('filter number doesn\'t match network depth! Please check it')
#     sys.exit(1)

# if len(GAMMA) != len(FILTER_NUM):
#     print('gamma number doesn\' match filter number! Please check it')
#     sys.exit(1)


def load_data(data_path):
    '''
        load dataset, you should modify this function according to your own dataset
    '''
    data_set_X = gdal.Open(os.path.join(data_path, 'T1'))  # data set X
    data_set_Y = gdal.Open(os.path.join(data_path, 'T2'))  # data set Y

    img_width = data_set_X.RasterXSize  # image width
    img_height = data_set_X.RasterYSize  # image height

    img_X = data_set_X.ReadAsArray(0, 0, img_width, img_height)
    img_Y = data_set_Y.ReadAsArray(0, 0, img_width, img_height)

    img_X = stad_img(img_X, channel_first=True)  # (C, H, W)
    img_Y = stad_img(img_Y, channel_first=True)
    img_X = np.transpose(img_X, [1, 2, 0])  # (H, W, C)
    img_Y = np.transpose(img_Y, [1, 2, 0])  # (H, W, C)
    return img_X, img_Y


def train_net(args):

    FLAGS = DEFAULT_ARGS.copy()
    FLAGS.update(args)

    PATCH_SZ = FLAGS['patch_size']
    NET_DEPTH = FLAGS['net_depth']
    SAVE_PATH = FLAGS['save_path']
    DATA_PATH = FLAGS['data_path']
    KERNEL_FUNC = FLAGS['kernel_func']
    GAMMA = FLAGS['gamma_list']
    EPOCH = FLAGS['epoch']
    FILTER_NUM = FLAGS['filter_num']
    SAMPLE_NUM = FLAGS['sample_num']
    img_X, img_Y = load_data(DATA_PATH)
    height, width, channel = img_X.shape

    print(f'sample number is {2 * SAMPLE_NUM}')

    train_X, train_Y = random_select_samples(img_X, img_Y, n_train=SAMPLE_NUM, patch_sz=PATCH_SZ)
    train_data = np.concatenate([train_X, train_Y], axis=0)

    # limited by the memory size, we have to slide the dataset into 500*500 patch, you can define this according
    # to your own dataset
    step_1 = 500
    step_2 = 500
    before_img = np.reshape(img_X, (1, img_X.shape[0], img_X.shape[1], img_X.shape[2]))
    after_img = np.reshape(img_Y, (1, img_Y.shape[0], img_Y.shape[1], img_Y.shape[2]))
    pred_img = np.concatenate([before_img, after_img], axis=0)  # (2, H, W, C)
    PCANet_model = KernelPCANet(num_stages=NET_DEPTH, patch_size=[PATCH_SZ, PATCH_SZ],
                                num_filters=FILTER_NUM,
                                gamma=GAMMA)
    for s in range(NET_DEPTH):
        trans_img = np.ones((2, height, width, FILTER_NUM[s]))
        PCANet_model.train_net(train_data, stage=s, is_mean_removal=False, kernel=KERNEL_FUNC)

        for i in range(0, height, step_1):
            for j in range(0, width, step_2):
                pred_data = pred_img[:, i:(i + step_1), j:(j + step_2), :]
                net_output = PCANet_model.infer_data(pred_data, stage=s, is_mean_removal=False)
                proj_before_img = net_output[0]
                proj_after_img = net_output[1]
                trans_img[0, i:(i + step_1), j:(j + step_2)] = proj_before_img
                trans_img[1, i:(i + step_1), j:(j + step_2)] = proj_after_img

        pred_img = np.copy(trans_img)  # feature images will be treated as input in the next stage

        # select new training samples for the next KPCA convolutional layer
        change_sample_X, change_sample_Y = random_select_samples(trans_img[0], trans_img[1],
                                                                 n_train=SAMPLE_NUM,
                                                                 patch_sz=PATCH_SZ)

        train_data = np.concatenate([change_sample_X, change_sample_Y], axis=0)

    #############################
    # mapping data into a 2-D polar domain
    #############################
    diff_img = (trans_img[0] - trans_img[1])
    rou = np.sqrt(np.sum(np.square(diff_img), axis=-1))
    eig_value = PCANet_model.filters[-1].eigenvalues_
    eig_value = np.reshape(eig_value, (1, 1, -1))
    theta = np.arccos(1 / np.sqrt(FILTER_NUM[-1]) * (np.sum(eig_value * diff_img, -1) / np.sqrt(
        np.sum(np.square(eig_value)) * np.sum(np.square(diff_img), axis=-1))))

    ###############################################
    # binary CD
    ###############################################
    print('-------Perform Binary Change Detection-------')
    rou = np.reshape(rou, (-1, 1))
    binary_change_map = get_binary_change_map(rou, method='otsu')
    binary_change_map = np.reshape(binary_change_map, (height, width))
    binary_change_map_rgb = np.repeat(binary_change_map[:, :, np.newaxis], 3, axis=2)
    imageio.imwrite(os.path.join(SAVE_PATH, 'KPCAMNet_BCM.png'), binary_change_map_rgb.astype(np.uint8))

    ###############################################
    # multi-class CD
    ###############################################
    print('-------Perform Multi-class Change Detection-------')
    changed_idx = (binary_change_map == 255)
    changed_theta = theta[changed_idx]
    changed_theta = np.reshape(changed_theta, (-1, 1))
    KMeans_cls = KMeans(n_clusters=3, max_iter=1500)
    print('-------Clustering algorithm is running')
    KMeans_cls.fit(changed_theta)
    label_pred = KMeans_cls.labels_  # get cluster label

    multi_change_map = np.zeros((height, width, 3))
    k = 0
    for h in range(height):
        for w in range(width):
            if changed_idx[h, w]:
                if label_pred[k] == 0:
                    multi_change_map[h, w] = np.array([255, 255, 0])
                elif label_pred[k] == 1:
                    multi_change_map[h, w] = np.array([255, 0, 0])
                elif label_pred[k] == 2:
                    multi_change_map[h, w] = np.array([0, 0, 255])
                k += 1
    multi_change_map_uint8 = (multi_change_map * 255).astype(np.uint8)
    imageio.imwrite(os.path.join(SAVE_PATH, 'KPCAMNet_MCM.png'), multi_change_map_uint8)


if __name__ == '__main__':
    args = {
          'net_depth': 3,
          'patch_size': 5,
          # Add other arguments as needed
      }
    train_net(args)

sample number is 200
-------patch generation-------
-------solve KPCA problem-------
-------patch generation-------
-------solve KPCA problem-------
-------patch generation-------
-------solve KPCA problem-------
-------Perform Binary Change Detection-------
otsu is done, the threshold is  0.008253679594746105
-------Perform Multi-class Change Detection-------
-------Clustering algorithm is running


