In [3]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from typing import Type
from torch import nn
from torch.optim import optimizer
import rasterio
import zipfile
from matplotlib import pyplot as plt
import datetime
from torchvision import transforms as transforms
import shutil
import torchmetrics
import os
#import pytorch_lightning as pl

# --- GPU selection --- #
gpus = 1 # slot number (e.g., 3), no gpu use -> write just ' '
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]=str(gpus)
#os.environ["TORCH_CUDA_ARCH_LIST"]="3.5"
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [4]:
def minmax(array : Type[np.ndarray], dim = 0):
    min = np.min(array, axis=dim)
    max = np.max(array, axis=dim)
    array = (array-min)/(max-min)
    return array

In [5]:
lidar_image = rasterio.open('../Data/N12/N12_lidar.tif').read()
lidar_array = np.array(lidar_image)
lidar_array = minmax(lidar_array, dim=(0,1))

lidar_1n_image = rasterio.open('../Data/N12/N12_lidar_1n.tif').read()
lidar_1n_array = np.array(lidar_1n_image)
lidar_1n_array = minmax(lidar_1n_array, dim=(0,1))

lidar_nt_image = rasterio.open('../Data/N12/N12_lidar_nt.tif').read()
lidar_nt_array = np.array(lidar_nt_image)
lidar_nt_array = minmax(lidar_nt_array, dim=(0,1))

RGB2020_image = rasterio.open('../Data/N12/N12_RGB2020.tif').read()
RGB2020_array = np.array(RGB2020_image)
train_array = np.stack([lidar_array, lidar_1n_array, lidar_nt_array]).squeeze()
train_array = np.concatenate((train_array,RGB2020_array))
target_image = rasterio.open('../Data/N12/N12_newlc.tif').read()
target_array = np.array(target_image, dtype=int).squeeze()

target_array = np.where(target_array == 1, 0, target_array)
target_array = np.where(target_array == 2, 1, target_array)
target_array = np.where(target_array == 7, 2, target_array)
target_array = np.where(target_array == 8, 3, target_array)
target_array = np.where(target_array == 9, 4, target_array)
target_array = np.where(target_array == 10, 5, target_array)
target_array = np.where(target_array == 11, 6, target_array)

In [6]:
class TrainDataset2(Dataset):
    def __init__(self, data_array : Type[np.ndarray], target_array : Type[np.ndarray], patch_size : int, is_evaluating : bool = False, is_validating : bool = False, rotate : bool = False, train_ratio : float = 0.8):
        self.is_validating = is_validating
        self.is_evaluating = is_evaluating
        seed = 386579

        self.data = np.zeros(((data_array.shape[1]//patch_size) * (data_array.shape[2]//patch_size), data_array.shape[0], patch_size, patch_size))

        for i in range(0,data_array.shape[1]//patch_size):
            for j in range(0,data_array.shape[2]//patch_size):
                self.data[data_array.shape[1]//patch_size*i+j,:,:,:] = data_array[:,i*patch_size:(i+1)*patch_size, j*patch_size:(j+1)*patch_size]

        self.label = np.zeros(((data_array.shape[1]//patch_size) * (data_array.shape[2]//patch_size),data_array.shape[0]+1))

        for k in range(0,data_array.shape[1]//patch_size):
            for l in range(0,data_array.shape[2]//patch_size):
                self.label[data_array.shape[1]//patch_size*k+l,:] = np.bincount(target_array[k*patch_size:(k+1)*patch_size, l*patch_size:(l+1)*patch_size].reshape(-1), minlength=7)/(patch_size*patch_size)

        if not is_evaluating:
            if rotate:
                for i in range(2):
                    rotated_data = np.rot90(self.data, k=i+1, axes=(-2, -1))
                    self.data = np.concatenate((self.data, rotated_data), axis=0)
                    rotated_data = self.label
                    self.label = np.concatenate((self.label, rotated_data), axis=0)
                    
                        
                #self.data =np.append(self.data, np.zeros(((1, data_array.shape[0], patch_size, patch_size))), axis = 0)
                #self.data = np.concatenate(self.data, np.zeros(((1, data_array.shape[0], patch_size, patch_size))))

        train_size = int(self.data.shape[0]*train_ratio)
        index_array = np.random.RandomState(seed=seed).permutation(self.data.shape[0])
        self.train_index = index_array[0:train_size]
        self.valid_index = index_array[train_size:index_array.shape[0]]
        
        self.data = torch.as_tensor(self.data).float()
        self.label = torch.as_tensor(self.label).float()

        self.data[:,3:6,:,:] = self.data[:,3:6,:,:]/255

    def __len__(self):
        if self.is_evaluating:
            return self.data.shape[0]

        if self.is_validating:
            return self.valid_index.shape[0]
        else:
            return self.train_index.shape[0]

    def __getitem__(self, idx):
        if self.is_evaluating:
            sample = torch.as_tensor(self.data[idx,:,:,:]).float()
            label = torch.as_tensor(self.label[idx,:]).float()
            return sample, label
        
        if self.is_validating:
            sample = torch.as_tensor(self.data[self.valid_index[idx],:,:,:]).float()
            label = torch.as_tensor(self.label[self.valid_index[idx],:]).float()
        else:
            sample = torch.as_tensor(self.data[self.train_index[idx],:,:,:]).float()
            label = torch.as_tensor(self.label[self.train_index[idx],:]).float()

        return sample, label


In [9]:
batch_size = 60
N12_Dataset_training = TrainDataset2(data_array=train_array, target_array=target_array, patch_size=40, is_evaluating=False, is_validating=False, rotate=True)
N12_Dataset_validating = TrainDataset2(data_array=train_array, target_array=target_array, patch_size=40, is_evaluating=False, is_validating=True, rotate=True)
N12_Dataset_prediction = TrainDataset2(data_array=train_array, target_array=target_array, patch_size=40, is_evaluating=True)

N12_Dataloader_training = DataLoader(N12_Dataset_training, batch_size=batch_size)
N12_Dataloader_validating = DataLoader(N12_Dataset_validating, batch_size=batch_size)
N12_Dataloader_prediction = DataLoader(N12_Dataset_prediction, batch_size=batch_size)

N12_Dataloaders = {
    'Training' : N12_Dataloader_training,
    'Validating' : N12_Dataloader_validating,
    'Prediction' : N12_Dataloader_prediction
}


In [11]:
class UrbanGreenNetwork3(pl.LightningModule):
    def __init__(self):
        super(UrbanGreenNetwork3, self).__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.ReLU(inplace=True),
            #nn.Dropout2d(p = 0.2),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=5),
            nn.ReLU(inplace=True),
            #nn.Dropout2d(p=0.2),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=16, kernel_size=3),
            #nn.Dropout2d(p=0.2),
            nn.ReLU(inplace=True)
        )

        self.fc_block = nn.Sequential(
            nn.Linear(in_features=256, out_features=64),
            nn.Linear(in_features=64, out_features=7)
        )

    def forward(self,x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = x.view(x.shape[0], -1)
        x = self.fc_block(x)
        return torch.softmax(x, dim=-1)

    def training_step(self, batch, batch_idx):
        sample, label = batch
        output = self(sample)
        loss = torch.nn.functional.mse_loss(output, label)
        return {'loss':loss}

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(self.parameters(), lr=0.01, momentum=0.9)
        return optimizer

    def train_dataloader(self):
        train_array = np.load('../Data/N12/np/train_array.npy')
        target_array = np.load('../Data/N12/np/target_array_OHE.npy')
        Dataset_training = TrainDataset2(data_array=train_array, target_array=target_array, patch_size=40, is_evaluating=False, is_validating=False, rotate=False)
        return DataLoader(Dataset_training, batch)
        


NameError: name 'pl' is not defined

In [None]:
class UrbanGreenNetwork2(nn.Module):
    def __init__(self):
        super(UrbanGreenNetwork2, self).__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p = 0.2),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.2),
            nn.MaxPool2d(kernel_size=2)
        )

        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=16, kernel_size=3),
            nn.Dropout2d(p=0.2),
            nn.ReLU(inplace=True)
        )

        self.fc_block = nn.Sequential(
            nn.Linear(in_features=256, out_features=64),
            nn.Linear(in_features=64, out_features=7)
        )

    def forward(self,x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = x.view(x.shape[0], -1)
        x = self.fc_block(x)
        return torch.softmax(x, dim=-1)

In [None]:
class UrbanGreenNetwork_FC_Reinforced(nn.Module):
    def __init__(self):
        super(UrbanGreenNetwork_FC_Reinforced, self).__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=6, out_channels=32, kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p = 0.2),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3),
            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.2),
            nn.MaxPool2d(kernel_size=3)
        )

        self.fc_block = nn.Sequential(
            nn.Linear(in_features=512, out_features=256),
            nn.Linear(in_features=256, out_features=256),
            nn.Linear(in_features=256, out_features=64),
            nn.Linear(in_features=64, out_features=7)
        )

    def forward(self,x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = x.view(x.shape[0], -1)
        x = self.fc_block(x)
        return torch.softmax(x, dim=-1)