In [None]:
# Python Code for running the Autoencoder
# Load in Dependencies

import os
#os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

from osgeo import gdal
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import pandas as pd

from torchinfo import summary
import gc
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader

from tqdm.auto import tqdm # progress bar
from timeit import default_timer as timer
def print_train_time(start:float,
                    end:float,
                    device: torch.device= None):
    total_time=end-start
    print(f"Train time on {device} : {total_time:.3f} seconds")
    return total_time


# Load in  Filepaths

In [None]:
# Grab the paths to all of the images in the 25m buffer folder
# Images were created via the R code; raw data from https://digimap.edina.ac.uk/aerial

inputPath1="../data/processed/cam-2016-25m" # 16011 files
inputPath2="../data/processed/cam-2020-25m" # 16011 files
inputPath3="../data/processed/glo-2018-25m" # 16011 files
inputPath4="../data/processed/glo-2021-25m" # 16011 files
inputPath5="../data/processed/oxf-2016-25m" # 16011 files
inputPath6="../data/processed/oxf-2019-25m" # 16011 files
folderpaths = [inputPath1,inputPath2,inputPath3,inputPath4,inputPath5,inputPath6]
filelist = []

# Load the images, and append them to a list.
for folder in folderpaths: 
    for filepath in os.listdir(folder):
        if filepath.endswith((".tif")):
            #print(filepath)
            tempfile=folder+'/{0}'.format(filepath)
            filelist.append(tempfile)

            len(filelist) # 107305 files

In [None]:
# Grab the paths to all of the images in the 100m buffer folder
# Images were created via the R code; raw data from https://digimap.edina.ac.uk/aerial

inputPath1="../data/processed/cam-2016-100m" 
inputPath2="../data/processed/cam-2020-100m" 
inputPath3="../data/processed/glo-2018-100m" 
inputPath4="../data/processed/glo-2021-100m" 
inputPath5="../data/processed/oxf-2016-100m" 
inputPath6="../data/processed/oxf-2019-100m" 
folderpaths = [inputPath1,inputPath2,inputPath3,inputPath4,inputPath5,inputPath6]
filelist_100m = []

# Load the images, and append them to a list.
for folder in folderpaths: 
    for filepath in os.listdir(folder):
        if filepath.endswith((".tif")):
            #print(filepath)
            tempfile=folder+'/{0}'.format(filepath)
            filelist_100m.append(tempfile)
            
len(filelist_100m) # 105441 files

# Run a CAE - 100m example

In [None]:
# Create Dataloaders
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create a DataLOader for my data
class rgb25mdataset_train(Dataset):
    """Face Landmarks dataset."""

    def __init__(self,filelist_100m, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.transform = transform
        self.filelist = filelist_100m           

    def __len__(self):
        return len(self.filelist)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()    
        # Generate data
        dataset = gdal.Open(self.filelist[idx])
        image = dataset.ReadAsArray()  # Returned image is a NumPy array with shape (16, 60, 60) for example.
        image = image/255

        return image
    
class rgb25mdataset_test(Dataset):
    """Face Landmarks dataset."""

    def __init__(self,filelist_100m, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.transform = transform
        self.filelist = filelist_100m           

    def __len__(self):
        return len(self.filelist)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()    
        # Generate data
        dataset = gdal.Open(self.filelist[idx])
        image = dataset.ReadAsArray()  # Returned image is a NumPy array with shape (16, 60, 60) for example.
        image = image/255

        return image
    
my_dataset_train = rgb25mdataset_train(filelist_100m=filelist_100m)
my_dataset_test = rgb25mdataset_test(filelist_100m=filelist_100m)

my_dataloader_train = DataLoader(my_dataset_train, batch_size=20,shuffle=True, num_workers=0)
my_dataloader_test = DataLoader(my_dataset_test, batch_size=20,shuffle=True, num_workers=0)

In [None]:
# Create the model
class Autoencoder_100m(nn.Module):
    def __init__(self):
        super(Autoencoder_100m,self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(32),
            nn.Conv2d(32, 64, kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.MaxPool2d(2, stride=2),  # b, 16, 5, 5
            
            nn.Conv2d(64,64,kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.Conv2d(64,128,kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(128),
            nn.MaxPool2d(2, stride=2),  # b, 16, 5, 5
            
            nn.Conv2d(128,64,kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.Conv2d(64,64,kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.MaxPool2d(2, stride=2),
            
            nn.Conv2d(64,32,kernel_size=3,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(32),
            nn.MaxPool2d(2, stride=2),
#             nn.Conv2d(32,32,kernel_size=3,padding=1),
#             nn.LeakyReLU(True),
            )
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=2, mode='nearest'),# align_corners=True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(32,64,kernel_size=3,stride=1,padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.Upsample(scale_factor=2, mode='nearest'), #align_corners=True),

            #nn.ReflectionPad2d(1),
            nn.Conv2d(64,64,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.ReflectionPad2d(1),
            nn.Conv2d(64,128,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(128),
            nn.Upsample(scale_factor=2, mode='nearest'),# align_corners=True),
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(128,64,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.ReflectionPad2d(1),
            nn.Conv2d(64,64,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.GELU(),
            nn.InstanceNorm2d(64),
            nn.Upsample(scale_factor=2, mode='nearest'),# align_corners=True),
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(64,32,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.GELU(),
            nn.ReflectionPad2d(1),
            nn.Conv2d(32,3,kernel_size=3,stride=1,padding=0),#,output_padding=1),
            nn.Sigmoid())
    def forward(self,x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x    
torch.manual_seed(42)
cae_100m = Autoencoder_100m().to(device)
cae_100m, summary(cae_100m,input_size= (1,3,200,200))

In [None]:
# Function to save best model
class SaveBestModel:
    """
    Class to save the best model while training. If the current epoch's 
    validation loss is less than the previous least less, then save the
    model state.
    """
    def __init__(
        self, best_valid_loss=float('inf')
    ):
        self.best_valid_loss = best_valid_loss
        
    def __call__(
        self, current_valid_loss, 
        epoch, model
    ):
        if current_valid_loss < self.best_valid_loss:
            self.best_valid_loss = current_valid_loss
            print(f"\nBest validation loss: {self.best_valid_loss}")
            print(f"\nSaving best model for epoch: {epoch+1}\n")
            torch.save(model.state_dict(), '../data/models/all_100m_best_model.pt')

save_best_model = SaveBestModel()

In [None]:
# Can run a Train/Test Loop now
# Loss Function
loss_fn = nn.MSELoss() # sigmoid activation function built in

# Opitimiser
optimizer = optim.Adam(params=cae_100m.parameters(),
                     lr=.001,
                      weight_decay=1e-5)

# calculate mae (how clsoe are pixels to correct?)
def mae_fn(y_true,y_pred):
    mae = torch.abs((y_true-y_pred)).sum()
    return mae

# Build the training Loop (and a testing loop)
torch.manual_seed(42)
epochs = 50

# Instatiate datasets/loaders
my_dataset_train = rgb25mdataset_train(filelist_100m=filelist_100m)
my_dataset_test = rgb25mdataset_test(filelist_100m=filelist_100m)

my_dataloader_train = DataLoader(my_dataset_train, batch_size=20,shuffle=True, num_workers=0)
my_dataloader_test = DataLoader(my_dataset_test, batch_size=20,shuffle=False, num_workers=0)

batch_size= 20

train_time_start_on_cpu = timer()
# Training Loop
for epoch in range(epochs):
    print(f"Epoch {epoch + 1} out of {epochs}")
    train_loss = 0
    
    # Loop through training batch data
    for i_batch, sample_batched in enumerate(my_dataloader_train):
        X = sample_batched.to(device)
        X = Variable(X.float().cuda())
        cae_100m.train()
        
        # Forward Pass
        y_pred = cae_1(X)
        
        # Calc loss (per batch)
        loss = loss_fn(y_pred, X)  # loss tested against itself
        train_loss += loss # accumulate train loss
        
        # Optimizer zero grad
        optimizer.zero_grad()
        
        # perform backpropagation on the loss
        loss.backward()
        
        # performm gradient descent
        optimizer.step()
        
        # track progress
        if i_batch % 200 == 0:
            print(f"Batch {i_batch+1} out of {len(my_dataloader_train)} completed.")
            
    # Divide total train loss by length of dataloader
    train_loss /= (len(my_dataset_train)/batch_size)
        
    ### Testing
    test_loss, test_mae = 0,0
    
    cae_100m.eval()
    with torch.inference_mode():
         for i_batch, sample_batched in enumerate(my_dataloader_test):
            X = sample_batched.to(device)
            X = Variable(X.float().cuda())
            
            # Forward pass
            test_pred = cae_100m(X)
            
            # loss accumulate
            test_loss += loss_fn(test_pred,X)
            
            #Accuracy accumulate
            test_mae += mae_fn(y_true=X, y_pred=test_pred)
         
         # get loss per batch
         test_loss /= (len(my_dataset_test)/batch_size)
         
         # get mae per batch
         test_mae /= (len(my_dataset_test)/batch_size)
        
    print(f"Train Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f} | Test Acc: {test_mae:.2f}")
    save_best_model(test_loss, epoch, cae_100m)


train_time_end_on_cpu = timer()    
total_train_time_on_cpu= print_train_time(start=train_time_start_on_cpu,
                                          end=train_time_end_on_cpu,
                                          device=str(next(cae_1.parameters()).device))

# Extract Image Features

In [None]:

cae_100m.state_dict()

cae_100m.load_state_dict(torch.load("../data/models/all_100m_best_model.pt"))


my_dataset_train = rgb25mdataset_train(filelist_train=filelist_100m)
my_dataset_test = rgb25mdataset_test(filelist_test=filelist_100m)
my_dataloader_train = DataLoader(my_dataset_train, batch_size=25,shuffle=True, num_workers=0)
my_dataloader_test = DataLoader(my_dataset_test, batch_size=25,shuffle=False, num_workers=0)

In [None]:
%%time
# using just the encoding layers, get the image features... for now just of the test set..
prediction_list=[]
cae_1.eval()
with torch.inference_mode():
    for i_batch, sample_batched in enumerate(my_dataloader_test):
        X = sample_batched.to(device)
        X = Variable(X.float().cuda())
        test_pred = cae_1.encoder(X)
        prediction_list.append(test_pred.cpu())

In [None]:
# stack the items in the list and then flatten to get one row of features per image
arr = np.vstack(prediction_list)
arr = arr.reshape(-1,4608)

# convert to df
df = pd.DataFrame(arr)
df["file_path"] = filelist_100m

# save a df of the image features
df.to_csv("../data/processed/image-features/cae-100m-features.csv")