In [None]:
pip install spectral

Collecting spectral
[?25l  Downloading https://files.pythonhosted.org/packages/4f/9b/db2e160f891441b2f576e31caaa93c283ce5f82d625173830abd7ab3cbc3/spectral-0.22.2.tar.gz (184kB)
[K     |█▉                              | 10kB 14.1MB/s eta 0:00:01[K     |███▋                            | 20kB 11.2MB/s eta 0:00:01[K     |█████▍                          | 30kB 8.2MB/s eta 0:00:01[K     |███████▏                        | 40kB 6.9MB/s eta 0:00:01[K     |█████████                       | 51kB 4.1MB/s eta 0:00:01[K     |██████████▊                     | 61kB 4.6MB/s eta 0:00:01[K     |████████████▌                   | 71kB 4.8MB/s eta 0:00:01[K     |██████████████▎                 | 81kB 5.0MB/s eta 0:00:01[K     |████████████████                | 92kB 5.1MB/s eta 0:00:01[K     |█████████████████▉              | 102kB 5.5MB/s eta 0:00:01[K     |███████████████████▋            | 112kB 5.5MB/s eta 0:00:01[K     |█████████████████████▍          | 122kB 5.5MB/s eta 0:00:01

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
import spectral
import torch, math
from torchvision import datasets, transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from scipy.io import loadmat, savemat
import random
from time import time
import h5py

Setting the seed of GPU

In [None]:
def seed_torch(seed = 612):
	random.seed(seed)
	os.environ['PYTHONHASHSEED'] = str(seed) # 为了禁止hash随机化，使得实验可复现
	np.random.seed(seed)
	torch.manual_seed(seed)
	torch.cuda.manual_seed(seed)
	torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
	torch.backends.cudnn.benchmark = False
	torch.backends.cudnn.deterministic = True
seed_torch() 

# Setting parameters of model 

# the number of bands
channel_hsi = 63
channel_msi = 2

# parameters of loss finctions
alpha = 0.01
beta = 0.01

windowSize = 11
class_num = 20
batch_size = 64

# parameters of optimizer
lr = 0.001  # learning rate 
momentum = 0.9 # momentum of SGD
betas = (0.9, 0.999) # betas of Adam
num_epochs = 100

# 1. Feature extraction network

In [None]:
class HSINet(nn.Module):
  def __init__(self, channel_hsi):
    super(HSINet, self).__init__()

    self.conv1 = nn.Conv2d(channel_hsi, 256, 3, padding=1)
    self.bn1 = nn.BatchNorm2d(256)

    self.conv2 = nn.Conv2d(256, 128, 3)
    self.bn2 = nn.BatchNorm2d(128)
    self.conv3 = nn.Conv2d(128, 128, 3)
    self.bn3 = nn.BatchNorm2d(128)

  def forward(self, x):

    x = F.relu(self.bn1(self.conv1(x)))
    x = F.relu(self.bn2(self.conv2(x)))
    x = F.relu(self.bn3(self.conv3(x)))
    return x

class MSINet(nn.Module):
  def __init__(self, channel_msi):
    super(MSINet, self).__init__()

    self.conv1 = nn.Conv2d(channel_msi, 128, 3, padding =1)
    self.bn1 = nn.BatchNorm2d(128)

    self.conv2 = nn.Conv2d(128, 128, 3)
    self.bn2 = nn.BatchNorm2d(128)

    self.conv3 = nn.Conv2d(128, 128, 3)
    self.bn3 = nn.BatchNorm2d(128)

  def forward(self, x):

    x = F.relu(self.bn1(self.conv1(x)))
    x = F.relu(self.bn2(self.conv2(x)))
    x = F.relu(self.bn3(self.conv3(x)))
    return x

Define normalization and dropout layer

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, size, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.eps = eps
        self.a_2 = nn.Parameter(torch.ones(size))
        self.b_2 = nn.Parameter(torch.zeros(size))

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.a_2 * (x - mean) / (std + self.eps) + self.b_2

class Dropout(nn.Module):
    def __init__(self):
        super(Dropout, self).__init__()

    def forward(self, x):
        out = F.dropout(x, p = 0.2, training=self.training)
        return out

Define cross attention layer

In [None]:
class CAM(nn.Module):
    def __init__(self):
        super(CAM, self).__init__()      
        k_size = 3 
        self.conv = nn.Conv1d(1, 1, kernel_size=k_size, padding=(k_size - 1) // 2, bias=False)
        # self.conv1 = nn.Conv2d(9, 7, 1) # 81 为特征图像空间特征大小
        # self.conv2 = nn.Conv2d(7, 49, 1, stride=1, padding=0)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))

    def get_attention(self, a):

        input_a = a
        a = a.mean(3)
        a = a.transpose(1, 3)
        # a= F.relu(self.conv1(a))
        # a= self.conv2(a)
        a = self.conv(a.squeeze(-1).transpose(-1, -2)).transpose(-1, -2).unsqueeze(-1)
        a = a.transpose(1, 3)

        a = a.unsqueeze(3)
        a = torch.mean(input_a * a, -1)
        a = F.softmax(a / 0.025, dim=-1) +1
        return a 

    def forward(self, f1, f2):

        b, n1, c, h, w = f1.size()
        n2 = f2.size(1)

        f1 = f1.view(b, n1, c, -1) 
        f2 = f2.view(b, n2, c, -1)

        f1_norm = F.normalize(f1, p=2, dim=2, eps=1e-12)
        f2_norm = F.normalize(f2, p=2, dim=2, eps=1e-12)
        
        f1_norm = f1_norm.transpose(2, 3).unsqueeze(2)
        f2_norm = f2_norm.unsqueeze(1)

        a1 = torch.matmul(f1_norm, f2_norm) 
        a2 = a1.transpose(3, 4) 

        a1 = self.get_attention(a1)
        a2 = self.get_attention(a2)
        f1 = f1 * a1
        f1 = f1.view(b, c, h, w)
        f2 = f2 * a2
        f2 = f2.view(b, c, h, w)
        return f1, f2

# 2. The proposed deep feature interaction network

In [None]:
class Net(nn.Module):
    def __init__(self, channel_hsi, channel_msi, class_num):
        super(Net, self).__init__()

        self.featnet1 = HSINet(channel_hsi)
        self.featnet2 = MSINet(channel_msi)
        self.cam = CAM()
        self.proj_norm = LayerNorm(64)
        self.fc1 = nn.Linear(1 * 1 * 128, 64)
        self.fc2 = nn.Linear(64, class_num)
        self.dropout = nn.Dropout()

    def forward(self, x, y):

        # Pre-process Image Feature
        feature_1 = self.featnet1(x)
        feature_2 = self.featnet2(y)

        hsi_feat = feature_1.unsqueeze(1)
        lidar_feat = feature_2.unsqueeze(1)
        hsi, lidar = self.cam(hsi_feat, lidar_feat)
        x = self.xcorr_depthwise(hsi, lidar)
        y = self.xcorr_depthwise(lidar, hsi)
        x1 = x.contiguous().view(x.size(0), -1)
        y1 = y.contiguous().view(y.size(0), -1)
        x = x1 + y1
        x = F.relu(self.proj_norm(self.fc1(x)))
        
        x = self.dropout(x)
        x = self.fc2(x)
        # hsi = hsi.contiguous().view(x.size(0), -1)
        # lidar = lidar.contiguous().view(x.size(0), -1)
        return feature_1, feature_2, x1, y1, x

    def xcorr_depthwise11(self, x, kernel):
        batch = kernel.size(0)
        channel = kernel.size(1)
        x = x.view(1, batch * channel, x.size(2), x.size(3))
        kernel = kernel.view(batch, channel, kernel.size(2), kernel.size(3))
        out = F.conv2d(x, kernel, groups= 1)
        # out = F.relu(out)
        out = out.view(batch, 1, out.size(2), out.size(3))
        return out

    def xcorr_depthwise(self, x, kernel):
        batch = kernel.size(0)
        channel = kernel.size(1)
        x = x.view(1, batch * channel, x.size(2), x.size(3))
        kernel = kernel.view(batch * channel, 1, kernel.size(2), kernel.size(3))
        out = F.conv2d(x, kernel, groups=batch*channel)
        # out = F.relu(out)
        out = out.view(batch, channel, out.size(2), out.size(3))
        return out

# 3. Data processing function

In [None]:
def max_min_mean(img):
  """
    calculate the max value ,min value ,mean value from the image.
  """
  print('max: ',np.max(img),'min: ',np.min(img),'mean: ',np.mean(img))

def c(img):
  """
    map the image to [0,255]
  """
  return ( img - np.min(img) ) / ( np.max(img)-np.min(img) ) * 255


def applyPCA(X, numComponents):
  """
    apply PCA to the image to reduce dimensionality 
  """
  newX = np.reshape(X, (-1, X.shape[2]))
  pca = PCA(n_components=numComponents, whiten=True)
  newX = pca.fit_transform(newX)
  newX = np.reshape(newX, (X.shape[0], X.shape[1], numComponents))
  return newX

def addZeroPadding(X, margin=2):
  """
    add zero padding to the image
  """
  newX = np.zeros((
      X.shape[0] + 2 * margin,
      X.shape[1] + 2 * margin,
      X.shape[2]
            ))
  newX[margin:X.shape[0]+margin, margin:X.shape[1]+margin, :] = X
  return newX


def createImgCube(X ,gt ,pos:list ,windowSize=25):
  """
    create Cube from pos list
    return imagecube gt nextPos
  """
  margin = (windowSize-1)//2
  zeroPaddingX = addZeroPadding(X, margin=margin)
  dataPatches = np.zeros((pos.__len__(), windowSize, windowSize, X.shape[2]))
  if( pos[-1][1]+1 != X.shape[1] ):
    nextPos = (pos[-1][0] ,pos[-1][1]+1)
  elif( pos[-1][0]+1 != X.shape[0] ):
    nextPos = (pos[-1][0]+1 ,0)
  else:
    nextPos = (0,0)
  return np.array([zeroPaddingX[i:i+windowSize, j:j+windowSize, :] for i,j in pos ]),\
  np.array([gt[i,j] for i,j in pos]) ,\
  nextPos


def createPos(shape:tuple, pos:tuple, num:int):
  """
    creatre pos list after the given pos
  """
  if (pos[0]+1)*(pos[1]+1)+num >shape[0]*shape[1]:
    num = shape[0]*shape[1]-( (pos[0])*shape[1] + pos[1] )
  return [(pos[0]+(pos[1]+i)//shape[1] , (pos[1]+i)%shape[1] ) for i in range(num) ]

def createPosWithoutZero(hsi, gt):
  """
    creatre pos list without zero labels
  """
  mask = gt > 0
  return [(i,j) for i , row  in enumerate(mask) for j , row_element in enumerate(row) if row_element]

def createImgPatch(lidar, pos:list, windowSize=25):
  """
    return lidar Img patches
  """
  margin = (windowSize-1)//2
  zeroPaddingLidar = np.zeros((
      lidar.shape[0] + 2 * margin,
      lidar.shape[1] + 2 * margin
            ))
  zeroPaddingLidar[margin:lidar.shape[0]+margin, margin:lidar.shape[1]+margin] = lidar
  return np.array([zeroPaddingLidar[i:i+windowSize, j:j+windowSize] for i,j in pos ])

def minmax_normalize(array):    
    amin = np.min(array)
    amax = np.max(array)
    return (array - amin) / (amax - amin)

# 4. Create dataloader

In [None]:
data_path = '/content/drive/MyDrive/data/'

data_traingt = sio.loadmat(os.path.join(data_path, 'mask_train.mat'))['mask_train']
data_testgt = sio.loadmat(os.path.join(data_path, 'mask_test.mat'))['mask_test']
data_hsi = sio.loadmat(os.path.join(data_path, 'YC_hsi.mat'))['trento_hsi']
data_msi = sio.loadmat(os.path.join(data_path, 'YC_msi.mat'))['trento_lidar']
# data_msi = h5py.File(os.path.join(data_path, 'HHK_msi.mat'))
# data_msi = data_msi['HHK_msi'][:]
# data_msi= np.transpose(data_msi,(2,1,0))

data_hsi = minmax_normalize(data_hsi)
data_msi = minmax_normalize(data_msi)
height , width, c = data_msi.shape

# training / testing set for 2D-CNN

train_hsiCube, train_labels ,_ = createImgCube(data_hsi, data_traingt, createPosWithoutZero(data_hsi, data_traingt), windowSize=windowSize)
train_hsiCube = torch.from_numpy(train_hsiCube.transpose(0,3,1,2)).float()
train_patches, _ ,_ = createImgCube(data_msi, data_traingt, createPosWithoutZero(data_hsi, data_traingt), windowSize=windowSize)
train_patches = torch.from_numpy(train_patches.transpose(0,3,1,2)).float()

test_hsiCube, test_labels , _ = createImgCube(data_hsi, data_testgt, createPosWithoutZero(data_hsi, data_testgt), windowSize=windowSize)
test_hsiCube = torch.from_numpy(test_hsiCube.transpose(0,3,1,2)).float()
test_patches, _, _ = createImgCube(data_msi, data_testgt, createPosWithoutZero(data_msi, data_testgt), windowSize=windowSize)
test_patches = torch.from_numpy(test_patches.transpose(0,3,1,2)).float()

print (train_hsiCube.shape)
print (test_hsiCube.shape)

print("Creating dataloader")

class TrainDS(torch.utils.data.Dataset):
    def __init__(self):
        self.len = train_labels.shape[0]
        self.hsi = torch.FloatTensor(train_hsiCube)
        self.lidar = torch.FloatTensor(train_patches)
        self.labels = torch.LongTensor(train_labels - 1)
    def __getitem__(self, index):
        return self.hsi[index], self.lidar[index], self.labels[index]
    def __len__(self):
        return self.len

""" Testing dataset"""
class TestDS(torch.utils.data.Dataset):
    def __init__(self):
        self.len = test_labels.shape[0]
        self.hsi = torch.FloatTensor(test_hsiCube)
        self.lidar = torch.FloatTensor(test_patches)
        self.labels = torch.LongTensor(test_labels - 1)
    def __getitem__(self, index):
        return self.hsi[index], self.lidar[index], self.labels[index]
    def __len__(self):
        return self.len

# creat trainloader and testloader
trainset = TrainDS()
testset  = TestDS()
 
# def _init_fn(worker_id):
#     np.random.seed(int(seed)+worker_id)

train_loader = torch.utils.data.DataLoader(dataset = trainset, batch_size = batch_size, shuffle = True, num_workers = 0)
test_loader = torch.utils.data.DataLoader(dataset = testset, batch_size = batch_size, shuffle = False, num_workers = 0)

print("Finish!")

torch.Size([819, 63, 11, 11])
torch.Size([30214, 63, 11, 11])
Creating dataloader
Finish!


# 5. The loss function

1.   loss1: The consistency loss
2.   loss2: The distinctive loss
3.   loss3: The classification loss





In [None]:
def calc_label_sim(label_1,label_2):

    batch_size = label_1.shape[0]
    label = torch.zeros(batch_size, class_num).scatter_(1, label_1.unsqueeze(1).cpu(), 1)
    sim = label.float().mm(label.float().t()).cuda()
    return sim
def calc_loss(feature_1, feature_2, hsi_1, lidar_1, outputs, labels, alpha, beta):

    cos = lambda x, y: x.mm(y.t()) / ((x ** 2).sum(1, keepdim=True).sqrt().mm((y ** 2).sum(1, keepdim=True).sqrt().t())).clamp(min=1e-6) / 2.
    theta = cos(hsi_1, lidar_1)
    sim = calc_label_sim(labels, labels)
    theta1 = cos(hsi_1, hsi_1)
    theta2 = cos(lidar_1, lidar_1)

    term1= ((1+torch.exp(theta)).log() + sim * theta).mean()
    term2 = ((1 + torch.exp(theta1)).log() + sim * theta1).mean()
    term3 = ((1 + torch.exp(theta2)).log() + sim * theta2).mean()
    loss2 = term1 + term2 + term3

    criterion = nn.CrossEntropyLoss()
    loss3 = criterion(outputs, labels)
    loss1 = torch.mean(torch.pow(feature_1 - feature_2, 2))

    loss_sum = loss3 + alpha * loss2 + beta * loss3
    return loss_sum.mean()

Define training and testing layer

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    total_loss = 0
    for i, (inputs_1, inputs_2, labels) in enumerate(train_loader):
        inputs_1, inputs_2 = inputs_1.to(device), inputs_2.to(device)

        labels = labels.to(device)

        optimizer.zero_grad()
        feature_1, feature_2, hsi_1, lidar_1, outputs = model(inputs_1, inputs_2)
        loss = calc_loss(feature_1, feature_2, hsi_1, lidar_1, outputs, labels, alpha =0.01, beta = 0.01)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print('[Epoch: %d] [loss avg: %.4f] [current loss: %.4f]' %(epoch + 1, total_loss/(epoch+1), loss.item()))

def test(model, device, test_loader):
    model.eval()
    count = 0
    feature =[]
    flabel = []
    for inputs_1, inputs_2, labels in test_loader:
  
        inputs_1, inputs_2 = inputs_1.to(device), inputs_2.to(device)
        _, _, _, _, outputs = model(inputs_1, inputs_2)
        feature.append(outputs.detach().cpu().numpy())
        outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
        if count == 0:
            y_pred_test =  outputs
            test_labels = labels
            count = 1
        else:
            y_pred_test = np.concatenate((y_pred_test, outputs))
            test_labels = np.concatenate((test_labels, labels))
    classification = classification_report(test_labels, y_pred_test, digits=4)
      
    sio.savemat('feature.mat', {'feature': feature})
    a = 0
    for c in range(len(y_pred_test)):
      if test_labels[c]==y_pred_test[c]:
        a = a+1
    sio.savemat('test_labels.mat', {'test_labels': test_labels})
    print(classification)
    print('%.2f' %(a/len(y_pred_test)*100))

# 6. Running

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = Net(channel_hsi,channel_msi,class_num).to(device)

params_to_update = list(model.parameters())

# optimizer = torch.optim.Adam(params_to_update, lr=lr, betas=betas, eps=1e-8, weight_decay=0.0005)
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=0.0005)

# if num_epochs % 50 == 0:
#     for p in optimizer.param_groups:p['lr'] *= 0.9
#     lr_list.append(optimizer.state_dict()['param_groups'][0]['lr'])
#     print (lr_list)

for epoch in range(num_epochs):
  train(model, device, train_loader, optimizer, epoch) 
test(model, device, test_loader)

# 7. Record the test results

In [None]:
from operator import truediv

def AA_andEachClassAccuracy(confusion_matrix):
    counter = confusion_matrix.shape[0]
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(truediv(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return each_acc, average_acc

def reports(test_loader, name):
    count = 0
    for inputs, inputs_1, labels in test_loader:
        inputs, inputs_1 = inputs.to(device), inputs_1.to(device)
        _, _, _, _, outputs = model(inputs, inputs_1)

        outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
        if count == 0:
            y_pred = outputs
            test_labels = labels
            count = 1
        else:
            y_pred = np.concatenate((y_pred, outputs))
            test_labels = np.concatenate((test_labels, labels))
    if name == 'Houston':
        target_names = ['Healthy grass', 'Stressed grass', 'Synthetic grass', 'Tree'
            , 'Soil', 'Water', 'Residential', 'Commercial',
                        'Road', 'Highway', 'Railway',
                        'Parking lot 1', 'Parking lot 2', 'Tennis court', 'Running track']
    elif name == 'Muufl':
        target_names = ['Trees', 'Mostly grass', 'Mixed ground surface', 'Dirt and sand',
                        'Road',
                        'Water', 'Building shadow', 'Building', 'Sidewalk', 'Yellow curb',
                        'Cloth panels']
    elif name == 'HHK':
        target_names = ['1', '2', '3', '4', '5', '6', '7', '8', '9n', '10',
                        '11', '12', '13', '14', '15','16', '17', '18', '19', '20']

    classification = classification_report(test_labels, y_pred, target_names=target_names)
    oa = accuracy_score(test_labels, y_pred)
    confusion = confusion_matrix(test_labels, y_pred)
    each_acc, aa = AA_andEachClassAccuracy(confusion)
    kappa = cohen_kappa_score(test_labels, y_pred)

    return classification, confusion, oa * 100, each_acc * 100, aa * 100, kappa * 100

"""record the final result"""

classification, confusion, oa, each_acc, aa, kappa = reports(test_loader, 'HHK')

# print('The running time:', time() - start)
sio.savemat('confusion.mat',{'confusion':confusion})

print('Save confusion Finish！')

classification = str(classification)
confusion = str(confusion)

file_name = "classification_report.txt"

with open(file_name, 'w') as x_file:
    x_file.write('\n')
    x_file.write('{} Kappa accuracy (%)'.format(kappa))
    x_file.write('\n')
    x_file.write('{} Overall accuracy (%)'.format(oa))
    x_file.write('\n')
    x_file.write('{} Average accuracy (%)'.format(aa))
    x_file.write('\n')
    x_file.write('\n')
    x_file.write('{}'.format(classification))
    x_file.write('\n')
    x_file.write('{}'.format(confusion))