In [None]:
import tifffile
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
import torch.optim as optim
from sklearn.preprocessing import StandardScaler

In [None]:
# import NTL imagery and building volume data to construct the comprehensive dataset
image_paths = [
    r'NTL.tif'
]
label_paths = [
    r'building_volume.tif'
]

LM_list = []
label_list = []

for image_path, label_path in zip(image_paths, label_paths):
    image_A = tifffile.imread(image_path)
    
    if image_A.shape[0] % 2 != 0:
        image_A = np.pad(image_A, ((0, 1), (0, 0)), mode='constant')
    if image_A.shape[1] % 2 != 0:
        image_A = np.pad(image_A, ((0, 0), (0, 1)), mode='constant')
    BM = torch.from_numpy(image_A).unsqueeze(0).unsqueeze(0).float()
    padding = 8
    padded_image_A = np.pad(image_A, ((padding, padding), (padding, padding)), mode='constant')

    crop_size = 18
    stride = 1
    num_crops = ((padded_image_A.shape[0] - crop_size) // stride + 1) * ((padded_image_A.shape[1] - crop_size) // stride + 1)
    LM = torch.zeros(num_crops, 1, crop_size, crop_size)

    # slide cropping
    crop_idx = 0
    for i in range(0, padded_image_A.shape[0] - crop_size + 1, stride):
        for j in range(0, padded_image_A.shape[1] - crop_size + 1, stride):
            crop = padded_image_A[i:i+crop_size, j:j+crop_size]
            LM[crop_idx, 0, :, :] = torch.from_numpy(crop)
            crop_idx += 1

    image_B = tifffile.imread(label_path)

    if image_B.shape[0] % 2 != 0:
        image_B = np.pad(image_B, ((0, 1), (0, 0)), mode='constant')
    if image_B.shape[1] % 2 != 0:
        image_B = np.pad(image_B, ((0, 0), (0, 1)), mode='constant')

    BL = torch.from_numpy(image_B).unsqueeze(0).unsqueeze(0).float()

    crop_size_B = 2
    stride_B = 1
    num_crops_B = ((image_B.shape[0] - crop_size_B) // stride_B + 1) * ((image_B.shape[1] - crop_size_B) // stride_B + 1)
    LM_B = torch.zeros(num_crops_B, 1, crop_size_B, crop_size_B)

    # slide cropping
    crop_idx_B = 0
    for i in range(0, image_B.shape[0] - crop_size_B + 1, stride_B):
        for j in range(0, image_B.shape[1] - crop_size_B + 1, stride_B):
            crop_B = image_B[i:i+crop_size_B, j:j+crop_size_B]
            LM_B[crop_idx_B, 0, :, :] = torch.from_numpy(crop_B)
            crop_idx_B += 1

    label_tensor = LM_B.view(num_crops_B, -1).sum(dim=1).unsqueeze(1).unsqueeze(2).unsqueeze(3)

    LM_list.append(LM)
    label_list.append(label_tensor)

combined_LM = torch.cat(LM_list, dim=0)
combined_label = torch.cat(label_list, dim=0)

LM = combined_LM
combined_label = torch.squeeze(combined_label, dim=( 0, 2, 3)) 
label_tensor = combined_label

keep_indices = []

for i in range(len(LM)):
    if torch.any(LM[i] != 0) and torch.any(label_tensor[i] != 0):
        keep_indices.append(i)
        
new_LM = LM[keep_indices]
new_label_tensor = label_tensor[keep_indices]

In [None]:
# division of different sample clusters
sum_values = new_LM.view(-1, 18*18).sum(dim=1)

NPC_A = []
NPC_B = []
NPC_C = []

for i, value in enumerate(sum_values):
    if value > 9300:
        NPC_A.append(i)
    elif 3600 <= value <= 9300:
        NPC_B.append(i)
    else:
        NPC_C.append(i)

new_LMA = new_LM[NPC_A]
new_LMB = new_LM[NPC_B]
new_LMC = new_LM[NPC_C]

new_label_tensorA = new_label_tensor[NPC_A]
new_label_tensorB = new_label_tensor[NPC_B]
new_label_tensorC = new_label_tensor[NPC_C]

In [None]:
# splitting training and test dataset
step_size = 6

dataset1_size = new_LMA.size(0)
dataset2_size = new_LMB.size(0)
dataset3_size = new_LMC.size(0)

Test_Ax = new_LMA[::step_size]
Test_Ay = new_label_tensorA[::step_size]
Test_Bx = new_LMB[::step_size]
Test_By = new_label_tensorB[::step_size]
Test_Cx = new_LMC[::step_size]
Test_Cy = new_label_tensorC[::step_size]

Train_Ax = torch.cat([new_LMA[i].unsqueeze(0) for i in range(dataset1_size) if i % step_size != 0])
Train_Ay = torch.cat([new_label_tensorA[i].unsqueeze(0) for i in range(dataset1_size) if i % step_size != 0])
Train_Bx = torch.cat([new_LMB[i].unsqueeze(0) for i in range(dataset2_size) if i % step_size != 0])
Train_By = torch.cat([new_label_tensorB[i].unsqueeze(0) for i in range(dataset2_size) if i % step_size != 0])
Train_Cx = torch.cat([new_LMC[i].unsqueeze(0) for i in range(dataset3_size) if i % step_size != 0])
Train_Cy = torch.cat([new_label_tensorC[i].unsqueeze(0) for i in range(dataset3_size) if i % step_size != 0])

In [None]:
# data enhancement
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset, DataLoader

flip_transform = transforms.RandomHorizontalFlip()
vertical_flip_transform = transforms.RandomVerticalFlip()

augmented_dataA = []
augmented_dataB = []
augmented_dataC = []

for i in range(len(Train_Ax)):
    flipped_sampleA = flip_transform(Train_Ax[i])
    labelA = Train_Ay[i]
    augmented_dataA.append((flipped_sampleA, labelA))
for i in range(len(Train_Bx)):
    flipped_sampleB = flip_transform(Train_Bx[i])
    labelB = Train_By[i]
    augmented_dataB.append((flipped_sampleB, labelB))    
for i in range(len(Train_Ax)):
    flipped_sampleC = flip_transform(Train_Cx[i])
    labelC = Train_Cy[i]
    augmented_dataC.append((flipped_sampleC, labelC))

for i in range(len(Train_Ax)):
    flipped_sample = vertical_flip_transform(Train_Ax[i])
    labelA = Train_Ay[i]
    augmented_dataA.append((flipped_sample, labelA))
for i in range(len(Train_Bx)):
    flipped_sampleB = vertical_flip_transform(Train_Bx[i])
    labelB = Train_By[i]
    augmented_dataB.append((flipped_sampleB, labelB))    
for i in range(len(Train_Ax)):
    flipped_sampleC = vertical_flip_transform(Train_Cx[i])
    labelC = Train_Cy[i]
    augmented_dataC.append((flipped_sampleC, labelC))

combined_dataA = augmented_dataA + [(Train_Ax[i], Train_Ay[i]) for i in range(len(Train_Ax))]
combined_dataB = augmented_dataB + [(Train_Bx[i], Train_By[i]) for i in range(len(Train_Bx))]
combined_dataC = augmented_dataC + [(Train_Cx[i], Train_Cy[i]) for i in range(len(Train_Cx))]

combined_LMA = torch.stack([sample for sample, _ in combined_dataA])
combined_label_tensorA = torch.stack([label for _, label in combined_dataA])
combined_LMB = torch.stack([sample for sample, _ in combined_dataB])
combined_label_tensorB = torch.stack([label for _, label in combined_dataB])
combined_LMC = torch.stack([sample for sample, _ in combined_dataC])
combined_label_tensorC = torch.stack([label for _, label in combined_dataC])

combined_datasetA = TensorDataset(combined_LMA, combined_label_tensorA)
combined_datasetB = TensorDataset(combined_LMB, combined_label_tensorB)
combined_datasetC = TensorDataset(combined_LMC, combined_label_tensorC)

In [None]:
# constructing neural network models
BATCH_SIZE = 128
EPOCHS = 300
LR = 0.01
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# data loading
data_loaderA = DataLoader(combined_datasetA, batch_size=BATCH_SIZE, shuffle=True) 

train_sizeA = int(0.8 * len(combined_datasetA))
test_sizeA = len(combined_datasetA) - train_sizeA
train_datasetA, test_datasetA = torch.utils.data.random_split(combined_datasetA, [train_sizeA, test_sizeA])

train_loaderA = DataLoader(train_datasetA, batch_size=BATCH_SIZE, shuffle=True)
test_loaderA = DataLoader(test_datasetA, batch_size=BATCH_SIZE, shuffle=False)

data_loaderB = DataLoader(combined_datasetB, batch_size=BATCH_SIZE, shuffle=True) 

train_sizeB = int(0.8 * len(combined_datasetB))
test_sizeB = len(combined_datasetB) - train_sizeB
train_datasetB, test_datasetB = torch.utils.data.random_split(combined_datasetB, [train_sizeB, test_sizeB])

train_loaderB = DataLoader(train_datasetB, batch_size=BATCH_SIZE, shuffle=True)
test_loaderB = DataLoader(test_datasetB, batch_size=BATCH_SIZE, shuffle=False)

data_loaderC = DataLoader(combined_datasetC, batch_size=BATCH_SIZE, shuffle=True)

train_sizeC = int(0.8 * len(combined_datasetC))
test_sizeC = len(combined_datasetC) - train_sizeC
train_datasetC, test_datasetC = torch.utils.data.random_split(combined_datasetC, [train_sizeC, test_sizeC])

train_loaderC = DataLoader(train_datasetC, batch_size=BATCH_SIZE, shuffle=True)
test_loaderC = DataLoader(test_datasetC, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
class RegressionNet(nn.Module):
    def __init__(self):
        super(RegressionNet, self).__init__()        
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1)
        self.relu1 = nn.ReLU()        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)               
        self.conv3 = nn.Conv2d(64, 32, kernel_size=3, stride=1)
        self.relu3 = nn.ReLU()        
        self.fc1 = nn.Linear(32 * 5 * 5, 128)
        self.relu4 = nn.ReLU()        
        self.fc2 = nn.Linear(128, 64) 
        self.relu5 = nn.ReLU()        
        self.fc3 = nn.Linear(64, 1)        

    def forward(self, x):
        out = self.conv1(x)
        out = self.relu1(out)        
        out = self.conv2(out)
        out = self.relu2(out)
        out = self.pool2(out)                      
        out = self.conv3(out)
        out = self.relu3(out)      
        out = out.view(out.size(0), -1)
        out = self.fc1(out)
        out = self.relu4(out)
        out = self.fc2(out)
        out = self.relu5(out)
        out = self.fc3(out)
        return out

# calculating MAPE
def calculate_mape(output, target):
    absolute_error = torch.abs(output - target)
    percentage_error = (absolute_error / target) * 100
    mape = torch.mean(percentage_error)
    return mape

In [None]:
# training Model A
import matplotlib.pyplot as plt
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

modelA = RegressionNet().to(DEVICE)

optimizer = optim.Adam(modelA.parameters(), lr=LR, weight_decay=0.005)
criterion = nn.L1Loss()

train_losses = []
test_losses = []

for epoch in range(1, EPOCHS + 1):
    modelA.train()
    train_loss = 0.0
    train_mape = 0.0
    train_mean_difference = 0.0
    for batch_idx, (data, target) in enumerate(train_loaderA):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = modelA(data)
        loss = criterion(output, target)
        mape = calculate_mape(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
        train_mape += mape * data.size(0)
    train_loss /= len(train_datasetA)
    train_mape /= len(train_datasetA)
    train_mean_difference /= len(train_datasetA)
    
    train_losses.append(train_loss)

    modelA.eval()
    test_loss = 0.0
    test_mape = 0.0
    test_mean_difference = 0.0
    with torch.no_grad():
        for data, target in test_loaderA:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = modelA(data)
            loss = criterion(output, target)
            mape = calculate_mape(output, target)
            test_loss += loss.item() * data.size(0)
            test_mape += mape * data.size(0)
    test_loss /= len(test_datasetA)
    test_mape /= len(test_datasetA)
    test_mean_difference /= len(test_datasetA)
    
    test_losses.append(test_loss)

    print('Epoch: {}, Train MAPE: {:.2f}%,Test MAPE: {:.2f}%'.format(epoch, train_mape, test_mape))

plt.plot(range(1, EPOCHS + 1), train_losses, label='Training Loss')
plt.plot(range(1, EPOCHS + 1), test_losses, label='Testing Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Testing Loss')
plt.legend()
plt.show()

In [None]:
# training Model B
import matplotlib.pyplot as plt
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

modelB = RegressionNet().to(DEVICE)

optimizer = optim.Adam(modelB.parameters(), lr=LR, weight_decay=0.005)
criterion = nn.L1Loss()

train_losses = []
test_losses = []

for epoch in range(1, EPOCHS + 1):
    modelB.train()
    train_loss = 0.0
    train_mape = 0.0
    train_mean_difference = 0.0
    for batch_idx, (data, target) in enumerate(train_loaderB):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = modelB(data)
        loss = criterion(output, target)
        mape = calculate_mape(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
        train_mape += mape * data.size(0)
    train_loss /= len(train_datasetB)
    train_mape /= len(train_datasetB)
    train_mean_difference /= len(train_datasetB)
    
    train_losses.append(train_loss)

    modelB.eval()
    test_loss = 0.0
    test_mape = 0.0
    test_mean_difference = 0.0
    with torch.no_grad():
        for data, target in test_loaderB:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = modelB(data)
            loss = criterion(output, target)
            mape = calculate_mape(output, target)
            test_loss += loss.item() * data.size(0)
            test_mape += mape * data.size(0)
    test_loss /= len(test_datasetB)
    test_mape /= len(test_datasetB)
    test_mean_difference /= len(test_datasetB)
    
    test_losses.append(test_loss)

    print('Epoch: {}, Train MAPE: {:.2f}%,Test MAPE: {:.2f}%'.format(epoch, train_mape, test_mape))
    
plt.plot(range(1, EPOCHS + 1), train_losses, label='Training Loss')
plt.plot(range(1, EPOCHS + 1), test_losses, label='Testing Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Testing Loss')
plt.legend()
plt.show()

In [None]:
# training Model C
import matplotlib.pyplot as plt
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

modelC = RegressionNet().to(DEVICE)

optimizer = optim.Adam(modelC.parameters(), lr=LR, weight_decay=0.005)
criterion = nn.L1Loss()

train_losses = []
test_losses = []

for epoch in range(1, EPOCHS + 1):
    modelC.train()
    train_loss = 0.0
    train_mape = 0.0
    train_mean_difference = 0.0
    for batch_idx, (data, target) in enumerate(train_loaderC):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = modelC(data)
        loss = criterion(output, target)
        mape = calculate_mape(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * data.size(0)
        train_mape += mape * data.size(0)
    train_loss /= len(train_datasetC)
    train_mape /= len(train_datasetC)
    train_mean_difference /= len(train_datasetC)
    
    train_losses.append(train_loss)

    modelC.eval()
    test_loss = 0.0
    test_mape = 0.0
    test_mean_difference = 0.0
    with torch.no_grad():
        for data, target in test_loaderC:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = modelC(data)
            loss = criterion(output, target)
            mape = calculate_mape(output, target)
            test_loss += loss.item() * data.size(0)
            test_mape += mape * data.size(0)
    test_loss /= len(test_datasetC)
    test_mape /= len(test_datasetC)
    test_mean_difference /= len(test_datasetC)
    
    test_losses.append(test_loss)

    print('Epoch: {}, Train MAPE: {:.2f}%,Test MAPE: {:.2f}%'.format(epoch, train_mape, test_mape))

plt.plot(range(1, EPOCHS + 1), train_losses, label='Training Loss')
plt.plot(range(1, EPOCHS + 1), test_losses, label='Testing Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Testing Loss')
plt.legend()
plt.show()

In [None]:
# testing Model A
modelA.eval()

with torch.no_grad():
    predictionsA = modelA(new_LMA.to(DEVICE))
    predictionsTA = modelA(Test_Ax.to(DEVICE))

print('overall performance on training set')
print(int((torch.sum(predictionsA) - torch.sum(new_label_tensorA))/torch.sum(new_label_tensorA)*100),'%')
print('overall performance on test set')
print(int((torch.sum(predictionsTA) - torch.sum(Test_Ay))/torch.sum(Test_Ay)*100),'%')

In [None]:
# testing Model B
modelB.eval()

with torch.no_grad():
    predictionsB = modelB(new_LMB.to(DEVICE))
    predictionsTB = modelB(Test_Bx.to(DEVICE))

print('overall performance on training set')
print(int((torch.sum(predictionsB) - torch.sum(new_label_tensorB))/torch.sum(new_label_tensorB)*100),'%')
print('overall performance on test set')
print(int((torch.sum(predictionsTB) - torch.sum(Test_By))/torch.sum(Test_By)*100),'%')

In [None]:
# testing Model C
modelC.eval()

with torch.no_grad():
    predictionsC = modelC(new_LMC.to(DEVICE))
    predictionsTC = modelC(Test_Cx.to(DEVICE))

print('overall performance on training set')
print(int((torch.sum(predictionsC) - torch.sum(new_label_tensorC))/torch.sum(new_label_tensorC)*100),'%')
print('overall performance on test set')
print(int((torch.sum(predictionsTC) - torch.sum(Test_Cy))/torch.sum(Test_Cy)*100),'%')

In [None]:
# utilizing the BVEC model for prediction tasks

# import nighttime lights data of target areas
image_path_CSJ = r'E:\NTL_CSJ.tif'
image_CSJ = tifffile.imread(image_path_CSJ)

if image_CSJ.shape[0] % 2 != 0:
    image_CSJ = np.pad(image_CSJ, ((0, 1), (0, 0)), mode='constant')
if image_CSJ.shape[1] % 2 != 0:
    image_CSJ = np.pad(image_CSJ, ((0, 0), (0, 1)), mode='constant')

CSJntl = torch.from_numpy(image_CSJ).unsqueeze(0).unsqueeze(0).float()

padding = 8
padded_image_CSJ = np.pad(image_CSJ, ((padding, padding), (padding, padding)), mode='constant')

crop_size = 18
stride = 2
num_crops = ((padded_image_CSJ.shape[0] - crop_size) // stride + 1) * ((padded_image_CSJ.shape[1] - crop_size) // stride + 1)
CSJntl = torch.zeros(num_crops, 1, crop_size, crop_size)

crop_idx = 0
for i in range(0, padded_image_CSJ.shape[0] - crop_size + 1, stride):
    for j in range(0, padded_image_CSJ.shape[1] - crop_size + 1, stride):
        crop = padded_image_CSJ[i:i+crop_size, j:j+crop_size]
        CSJntl[crop_idx, 0, :, :] = torch.from_numpy(crop)
        crop_idx += 1



In [None]:
sum_valu = CSJntl.view(-1, 18*18).sum(dim=1)
ntl_A = []
ntl_B = []
ntl_C = []

for i, value in enumerate(sum_valu):
    if value > 9300:
        ntl_A.append(i)
    elif 3600 < value <= 9300:
        ntl_B.append(i)
    else :
        ntl_C.append(i)

CSJntlA = CSJntl[ntl_A]
CSJntlB = CSJntl[ntl_B]
CSJntlC = CSJntl[ntl_C]

rod = ntl_A + ntl_B + ntl_C

print(CSJntlA.size())
print(CSJntlB.size())
print(CSJntlC.size())

In [None]:
modelA.eval()

with torch.no_grad():
    CSJtfaA = modelA(CSJntlA.to(DEVICE))

modelB.eval()

with torch.no_grad():
    CSJtfaB = modelB(CSJntlB.to(DEVICE))

modelC.eval()

with torch.no_grad():
    CSJtfaC = modelC(CSJntlC.to(DEVICE))

In [None]:
CSJtfaO = torch.zeros(CSJtfaA.size(0), 1)
device = CSJtfaO.device
CSJtfaA = CSJtfaA.to(device)
CSJtfaB = CSJtfaB.to(device)
CSJtfaC = CSJtfaC.to(device)

CSJtfa = torch.cat([CSJtfaA, CSJtfaB, CSJtfaC], dim=0)

rod_tensor = torch.tensor(rod, dtype=torch.long)

rod_tensor = rod_tensor.unsqueeze(1)

CSJtfa_with_rod = torch.cat([CSJtfa, rod_tensor], dim=1)

sorted_CSJtfa = CSJtfa_with_rod[CSJtfa_with_rod[:, 1].argsort()]

CSJtfa = sorted_CSJtfa[:, 0]
CSJtfa = CSJtfa.view(725, 697)

In [None]:
# save the prediction results as a single-band TIFF image
tifffile.imwrite('output.tif', CSJtfa.numpy().astype(np.float32))