In [1]:
"""Importing libraries"""
import numpy as np
import math
import random
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import scipy
import scipy.io as sio
from scipy import signal
from scipy.linalg import norm
from scipy.spatial.distance import cdist
import skimage
from skimage import io, measure
import torchvision
from torchvision import transforms
from collections import  Counter

In [2]:
"""Checking GPU avilability"""
import torch 
torch.cuda.is_available()

True

In [3]:
def del2(im):
    """del2 (discrete laplacian operator)"""
    [ylen, xlen] = im.shape
    im_new = np.zeros([ylen, xlen], dtype=np.float32)
    for j in range(1, ylen-1):
        for i in range(1, xlen-1):
            im_new[j,i] = (im[j-1,i]+im[j+1,i]+im[j,i-1]+im[j,i+1])/4-im[j,i]
    return im_new
def srad(im, delta):
    """srad (speckle reducing anisotropic diffusion)"""
    q0 = 1
    for n in range(1, 6):
        [ylen, xlen] = im.shape
        X = np.zeros([ylen+2, xlen+2], dtype=np.float32)
        X[1:ylen+1, 1:xlen+1] = im
        #padding
        X[0, 1:xlen+1] = im[0, :]
        X[ylen+1, 1:xlen+1] = im[ylen-1, :]
        X[:, 0] = X[:, 1]
        X[:, xlen+1] = X[:, xlen]
        q0 = q0*np.exp(-delta)
        gRx = signal.convolve2d(X, [[0,0,0],[0,1,-1],[0,0,0]], mode='same', boundary='symm')
        gRy = signal.convolve2d(X, [[0,-1,0],[0,1,0],[0,0,0]], mode='same', boundary='symm')
        gLx = signal.convolve2d(X, [[0,0,0],[1,-1,0],[0,0,0]], mode='same', boundary='symm')
        gLy = signal.convolve2d(X, [[0,0,0],[0,-1,0],[0,1,0]], mode='same', boundary='symm')
        q1 = np.sqrt(gRx*gRx+gRy*gRy+gLx*gLx+gLy*gLy)/(X+0.0001)
        q2 = 4*del2(X)/(X+0.0001)
        q = np.sqrt(np.maximum(0, (1/2*(q1*q1)-1/16*(q2*q2))/((1+1/4*q2)*(1+1/4*q2)+0.01)))
        c = 1/(1+((q*q-q0*q0)/(q0*q0*(1+q0*q0))))
        d = signal.convolve2d(c, [[0,0,0],[0,0,-1],[0,0,0]],  mode='same', boundary='symm')* \
            signal.convolve2d(X, [[0,0,0],[0,1,-1],[0,0,0]], mode='same', boundary='symm')+ \
            signal.convolve2d(c, [[0,0,0],[0,-1,0],[0,0,0]],  mode='same', boundary='symm')* \
            signal.convolve2d(X, [[0,0,0],[-1,1,0],[0,0,0]], mode='same', boundary='symm')+ \
            signal.convolve2d(c, [[0,-1,0],[0,0,0],[0,0,0]],  mode='same', boundary='symm')* \
            signal.convolve2d(X, [[0,-1,0],[0,1,0],[0,0,0]], mode='same', boundary='symm')+ \
            signal.convolve2d(c, [[0,0,0],[0,-1,0],[0,0,0]],  mode='same', boundary='symm')* \
            signal.convolve2d(X, [[0,0,0],[0,1,0],[0,-1,0]], mode='same', boundary='symm')
        X = X+delta/4*d
        im = X[1:ylen+1, 1:xlen+1]
    return im
def dicomp(im1, im2):
    """dicomp (dictionary learning and sparse coding)"""
    im1 = srad(im1, 0.15)
    im2 = srad(im2, 0.15)
    im_di = abs(np.log((im1+1)/(im2+1)))
    im_di = srad(im_di, 0.15)
    return im_di
def hcluster(pix_vec, im_di):
    """hcluster (hierarchical clustering)"""
    fcm = FCM(n_clusters=2)
    fcm.fit(pix_vec)
    fcm_lab = fcm.u.argmax(axis=1)
    if sum(fcm_lab==0)<sum(fcm_lab==1):
        ttr = round(sum(fcm_lab==0)*1.25)
        ttl = round(sum(fcm_lab==0)/1.10)
    else:
        ttr = round(sum(fcm_lab==1)*1.25)
        ttl = round(sum(fcm_lab==1)/1.10)
    fcm = FCM(n_clusters=5)
    fcm.fit(pix_vec)
    fcm_lab  = fcm.u.argmax(axis=1)
    ylen, xlen = im_di.shape
    idx = []
    idx_tmp = []
    idxmean = []
    res_lab = np.zeros(ylen*xlen, dtype=np.float32)
    for i in range(0, 5):
        idx_tmp.append(np.argwhere(fcm_lab==i))
        idxmean.append(im_di.reshape(ylen*xlen, 1)[idx_tmp[i]].mean())
    idx_sort = np.argsort(idxmean)
    for i in range(0, 5):
        idx.append(idx_tmp[idx_sort[i]])
    c = len(idx[4])
    res_lab[idx[4]] = 2
    flag_mid = 0
    for i in range(1, 5):
        c = c+len(idx[4-i])
        if c < ttl:
            res_lab[idx[4-i]] = 2
        elif c >= ttl and c < ttr:
            res_lab[idx[4-i]] = 1.5
            flag_mid = 1
        elif flag_mid == 0:
            res_lab[idx[4-i]] = 1.5
            flag_mid = 1
        else:
            res_lab[idx[4-i]] = 1
    res_lab = res_lab.reshape(ylen, xlen)
    return res_lab
class FCM:
    """Fuzzy C means"""
    def __init__(self, n_clusters=10, max_iter=150, m=2, error=1e-5, random_state=42):
        self.u, self.centers = None, None
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.m = m
        self.error = error
        self.random_state = random_state
    def fit(self, X):
        N = X.shape[0]
        C = self.n_clusters
        centers = []
        r = np.random.RandomState(self.random_state)
        u = r.rand(N,C)
        u = u / np.tile(u.sum(axis=1)[np.newaxis].T,C)
        iteration = 0
        while iteration < self.max_iter:
            u2 = u.copy()
            centers = self.next_centers(X, u)
            u = self.next_u(X, centers)
            iteration += 1
            #Stopping rule
            if norm(u - u2) < self.error:
                break
        self.u = u
        self.centers = centers
        return self
    def next_centers(self, X, u):
        um = u ** self.m
        return (X.T @ um / np.sum(um, axis=0)).T
    def next_u(self, X, centers):
        return self._predict(X, centers)
    def _predict(self, X, centers):
        power = float(2 / (self.m - 1))
        temp = cdist(X, centers) ** power
        denominator_ = temp.reshape((X.shape[0], 1, -1)).repeat(temp.shape[-1], axis=1)
        denominator_ = temp[:, :, np.newaxis] / denominator_
        return 1 / denominator_.sum(2)
    def predict(self, X):
        if len(X.shape) == 1:
            X = np.expand_dims(X, axis=0)
        u = self._predict(X, self.centers)
        return np.argmax(u, axis=-1)

In [4]:
"""Reading input images"""
image1 = cv2.imread('Ottawa_1.bmp')
image2 = cv2.imread('Ottawa_2.bmp')
"""Resizing image2 as like image1 (can be skipped if image size's are same)"""
image2 = cv2.resize(image2, (image1.shape[1], image1.shape[0]))
print(image1.shape)
print(image2.shape)

(350, 290, 3)
(350, 290, 3)


In [5]:
patch_size = 7

In [6]:
def image_normalize(data):
    """Normalization"""
    _mean = np.mean(data)
    _std = np.std(data)
    npixel = np.size(data) * 1.0
    min_stddev = 1.0 / math.sqrt(npixel)
    return (data - _mean) / max(_std, min_stddev)
def image_padding(data,r):
    """Image padding"""
    if len(data.shape)==3:
        data_new=np.lib.pad(data,((r,r),(r,r),(0,0)),'constant',constant_values=0)
        return data_new
    if len(data.shape)==2:
        data_new=np.lib.pad(data,r,'constant',constant_values=0)
        return data_new
def arr(length):
  """Array"""
  arr=np.arange(length-1)
  random.shuffle(arr)
  print(arr)
  return arr

In [7]:
def createTrainingCubes(X, y, patch_size):
    """Creating training cubes"""
    margin = int((patch_size - 1) / 2)
    zeroPaddedX = image_padding(X, margin)
    ele_num1 = np.sum(y==1)
    ele_num2 = np.sum(y==2)
    patchesData_1 = np.zeros( (ele_num1, patch_size, patch_size, X.shape[2]) )
    patchesLabels_1 = np.zeros(ele_num1)
    patchesData_2 = np.zeros((ele_num2, patch_size, patch_size, X.shape[2]))
    patchesLabels_2 = np.zeros(ele_num2)
    patchIndex_1 = 0
    patchIndex_2 = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            if y[r-margin, c-margin] == 1 :
                patch_1 = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
                patchesData_1[patchIndex_1, :, :, :] = patch_1
                patchesLabels_1[patchIndex_1] = y[r-margin, c-margin]
                patchIndex_1 = patchIndex_1 + 1
            elif y[r-margin, c-margin] == 2 :
                patch_2 = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
                patchesData_2[patchIndex_2, :, :, :] = patch_2
                patchesLabels_2[patchIndex_2] = y[r-margin, c-margin]
                patchIndex_2 = patchIndex_2 + 1
    patchesLabels_1 = patchesLabels_1-1
    patchesLabels_2 = patchesLabels_2-1
    arr_1=arr(len(patchesData_1))
    arr_2=arr(len(patchesData_2))
    train_len=min(10000,len(patchesData_1) + len(patchesData_2))
    pdata=np.zeros((train_len, patch_size, patch_size, X.shape[2]))
    plabels = np.zeros(train_len)
    i = 0
    j = 0
    for k in range(train_len):
        if k < len(patchesData_1):
            pdata[k,:,:,:]=patchesData_1[arr_1[i],:,:,:]
            plabels[k]=patchesLabels_1[arr_1[i]]
            i += 1
        else:
            pdata[k,:,:,:]=patchesData_2[arr_2[j],:,:,:]
            plabels[k]=patchesLabels_2[arr_2[j]]
            j += 1
    return pdata, plabels

In [8]:
def createTestingCubes(X, patch_size):
    """Creating testing cubes"""
    margin = int((patch_size - 1) / 2)
    zeroPaddedX = image_padding(X, margin)
    patchesData = np.zeros( (X.shape[0]*X.shape[1], patch_size, patch_size, X.shape[2]) )
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
            patchesData[patchIndex, :, :, :] = patch
            patchIndex = patchIndex + 1
    return patchesData

In [9]:
def postprocess(res):
    """Post_processing"""
    res_new = res
    res = measure.label(res, connectivity=2)
    num = res.max()
    for i in range(1, num+1):
        idy, idx = np.where(res==i)
        if len(idy) <= 20:
            res_new[idy, idx] = 0
    return res_new

In [10]:
"""Transforming images to float32"""
im1 = image1[:, :, 0].astype(np.float32)
im2 = image2[:, :, 0].astype(np.float32)
im_di = dicomp(im1, im2)
ylen, xlen = im_di.shape
pix_vec = im_di.reshape([ylen*xlen, 1])
preclassify_lab = hcluster(pix_vec, im_di)
print('hiearchical clustering finished')
mdata = np.zeros([im1.shape[0], im1.shape[1], 3], dtype=np.float32)
mdata[:,:,0] = im1
mdata[:,:,1] = im2
mdata[:,:,2] = im_di
mlabel = preclassify_lab
x_train, y_train = createTrainingCubes(mdata, mlabel, patch_size)
x_train = x_train.transpose(0, 3, 1, 2)
print('x train shape: ', x_train.shape)
print('y train shape: ', y_train.shape)
x_test = createTestingCubes(mdata, patch_size)
x_test = x_test.transpose(0, 3, 1, 2)
print('x test shape: ', x_test.shape)

hiearchical clustering finished
[44301 82674 22312 ... 73680 58428  6601]
[1755 8598 4345 ...  895 2444 8663]
x train shape:  (10000, 3, 7, 7)
y train shape:  (10000,)
x test shape:  (101500, 3, 7, 7)


In [17]:
class TrainDS(torch.utils.data.Dataset):
    """Training dataset"""
    def __init__(self):
        self.len = x_train.shape[0]
        self.x_data = torch.FloatTensor(x_train)
        self.y_data = torch.LongTensor(y_train)
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]
    def __len__(self):
        return self.len
trainset = TrainDS()
train_loader = torch.utils.data.DataLoader(dataset=trainset, batch_size=128, shuffle=True, num_workers=0)
class MRC(nn.Module):
    """Multi-region convolution"""
    def __init__(self, inchannel):
        super(MRC, self).__init__()
        self.conv1 = nn.Conv2d(inchannel, 15, kernel_size=1, stride=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(15)
        self.conv2_1 = nn.Conv2d(5, 5, kernel_size=3, stride=1, padding=1, bias=True)
        self.bn2_1 = nn.BatchNorm2d(5)
        self.conv2_2 = nn.Conv2d(5, 5, kernel_size=3, stride=1, padding=1, bias=True)
        self.bn2_2 = nn.BatchNorm2d(5)
        self.conv2_3 = nn.Conv2d(5, 5, kernel_size=3, stride=1, padding=1, bias=True)
        self.bn2_3 = nn.BatchNorm2d(5)
    def forward(self, x):
       ori_out = F.relu(self.bn1(self.conv1(x)))
       shape=(x.shape[0], 5, 7, 7)
       all_zero3_3=torch.zeros(size=shape).cuda()
       all_zero1_3=torch.zeros(size=(x.shape[0], 5, 3, 7)).cuda()
       all_zero3_1=torch.zeros(size=(x.shape[0], 5, 7, 3)).cuda()
       all_zero3_3[:,:,:,:]=ori_out[:,0:5,:,:]
       all_zero1_3[:,:,:,:]=ori_out[:,5:10,2:5,:]
       all_zero3_1[:,:,:,:]=ori_out[:,10:15,:,2:5]
       square=F.relu(self.bn2_1(self.conv2_1(all_zero3_3)))
       horizontal=F.relu(self.bn2_2(self.conv2_2(all_zero1_3)))
       vertical=F.relu(self.bn2_3(self.conv2_3(all_zero3_1)))
       horizontal_final=torch.zeros(size=(x.shape[0], 5, 7, 7)).cuda()
       vertical_final=torch.zeros(size=(x.shape[0], 5, 7, 7)).cuda()
       horizontal_final[:,:,2:5,:]=horizontal[:,:,:,:]
       vertical_final[:,:,:,2:5]=vertical[:,:,:,:]
       glo = square + horizontal_final + vertical_final
       return glo

In [18]:
def DCT(x):
  """Discrete cosine transform"""
  out = F.interpolate(x, size=(8, 8), mode='bilinear', align_corners=True)
  dct_values_1 = [cv2.dct(np.float32(out[i, 0, :, :].detach().cpu().numpy())) for i in range(x.shape[0])]
  dct_values_2 = [cv2.dct(np.float32(out[i, 1, :, :].detach().cpu().numpy())) for i in range(x.shape[0])]
  dct_values_3 = [cv2.dct(np.float32(out[i, 2, :, :].detach().cpu().numpy())) for i in range(x.shape[0])]
  dct_array_1 = np.array(dct_values_1)
  dct_array_2 = np.array(dct_values_2)
  dct_array_3 = np.array(dct_values_3)
  dct_out_1 = torch.Tensor(dct_array_1)
  dct_out_2 = torch.Tensor(dct_array_2)
  dct_out_3 = torch.Tensor(dct_array_3)
  dct_out = torch.zeros(size=(x.shape[0], 3, 8, 8))
  dct_out[:, 0, :, :] = dct_out_1
  dct_out[:, 1, :, :] = dct_out_2
  dct_out[:, 2, :, :] = dct_out_3
  dct_out = dct_out.cuda()
  out = dct_out.view(x.shape[0], 3, 64)
  out = F.glu(out, dim=-1)
  dct_out = out.view(x.shape[0], 1, 96)
  return dct_out

In [19]:
class SpectroChangeNet(nn.Module):
    """SpectroChangeNet"""
    def __init__(self):
        super(SpectroChangeNet, self).__init__()
        self.mrc1=MRC(3)
        self.mrc2=MRC(5)
        self.mrc3=MRC(5)
        self.mrc4=MRC(5)
        self.linear1=nn.Linear(341, 10)
        self.linear2=nn.Linear(10, 2)
    def forward(self, x):
        m_1=self.mrc1(x)
        m_2=self.mrc2(m_1)
        m_3=self.mrc3(m_2)
        m_4=self.mrc4(m_3)
        glo=m_4.view(x.shape[0], 1, 245)
        dct_out=DCT(x)
        out=torch.cat((glo,dct_out),2)
        out = out.view(out.size(0), -1)
        out_1 = self.linear1(out)
        out = self.linear2(out_1)
        return out

In [20]:
class Loss(torch.nn.Module):
    """Hybrid loss (focal_loss and mean absolute error)"""
    def __init__(self, alpha=0.1, beta=0.9, gamma=2.0, classes=2):
        super(Loss, self).__init__()
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma 
        self.classes = classes
    def focal_loss(self, pred, labels):
        pred = F.softmax(pred, dim=1)  
        label_one_hot = torch.nn.functional.one_hot(labels, self.classes).float().to(self.device)
        pt = torch.sum(pred * label_one_hot, dim=1)
        focal_loss = -((1 - pt) ** self.gamma) * torch.log(pt + 1e-7)
        return focal_loss.mean()
    def forward(self, pred, labels):
        fl = self.focal_loss(pred, labels)
        pred = F.softmax(pred, dim=1)
        label_one_hot = torch.nn.functional.one_hot(labels, self.classes).float().to(self.device)
        mae = 2 - 2 * (torch.sum(pred * label_one_hot, dim=1))
        loss = self.alpha * fl + self.beta * mae.mean()
        return loss

In [21]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
"""Model training"""
istrain = True  
net = SpectroChangeNet().to(device)
criterion = Loss(alpha=0.1, beta=0.9, gamma=2.0, classes=2).to(device)
optimizer = optim.Adam(net.parameters(), lr=0.001)
net.train()
total_loss = 0
for epoch in range(10):
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print('[Epoch: %d]  [loss avg: %.4f]  [current loss: %.4f]' % (epoch + 1, total_loss / (epoch + 1), loss.item()))
print('Finished Training')

[Epoch: 1]  [loss avg: 19.7618]  [current loss: 0.0000]
[Epoch: 2]  [loss avg: 9.8869]  [current loss: 0.0000]
[Epoch: 3]  [loss avg: 6.5917]  [current loss: 0.0000]
[Epoch: 4]  [loss avg: 4.9440]  [current loss: 0.0000]
[Epoch: 5]  [loss avg: 3.9554]  [current loss: 0.0000]
[Epoch: 6]  [loss avg: 3.2963]  [current loss: 0.0000]
[Epoch: 7]  [loss avg: 2.8255]  [current loss: 0.0000]
[Epoch: 8]  [loss avg: 2.4724]  [current loss: 0.0000]
[Epoch: 9]  [loss avg: 2.1977]  [current loss: 0.0000]
[Epoch: 10]  [loss avg: 1.9780]  [current loss: 0.0000]
Finished Training


In [22]:
istrain=False 
"""Model testing"""
net.eval()
outputs = np.zeros((ylen, xlen))
glo_fin=torch.Tensor([]).cuda()
dct_fin=torch.Tensor([]).cuda()
for i in range(ylen):
    for j in range(xlen):
        if preclassify_lab[i, j] != 1.5 :
            outputs[i, j] = preclassify_lab[i, j]
        else:
            img_patch = x_test[i*xlen+j, :, :, :]
            img_patch = img_patch.reshape(1, img_patch.shape[0], img_patch.shape[1], img_patch.shape[2])
            img_patch = torch.FloatTensor(img_patch).to(device)
            prediction = net(img_patch)
            prediction = np.argmax(prediction.detach().cpu().numpy(), axis=1)
            outputs[i, j] = prediction+1
    if (i+1) % 50 == 0:
        print('row', i+1, 'handling')
outputs = outputs-1
res = outputs*255
res = postprocess(res)
cv2.imwrite('Output_image.png',res)
Output_image=cv2.imread('Output_image.png')

  outputs[i, j] = prediction+1


row 50 handling
row 100 handling
row 150 handling
row 200 handling
row 250 handling
row 300 handling
row 350 handling


In [23]:
"""Image overlaying"""
_, binary_mask = cv2.threshold(Output_image, 200, 255, cv2.THRESH_BINARY)
green_mask = np.zeros_like(image1)
indices = np.where(binary_mask == 255)
green_mask[indices[0], indices[1], :] = [0, 255, 0]
overlay = cv2.add(image1, green_mask)
cv2.imwrite('Overlay_image.png', overlay)

True