In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import transforms

from torchgeo.models import resnet18
from torchgeo.datasets import EuroSAT


In [None]:
# normalization for all 13 channels
channel_means = [0.5] * 13
channel_stds = [0.5] * 13

# Preprocessing
preprocess_transform = transforms.Compose([
    transforms.Normalize(mean=channel_means, std=channel_stds)
])

# Data augmentation
data_augmentation = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
])


In [None]:
# for TypeError: img should be PIL Image. Got <class 'dict'>
class TransformSample:
    def __init__(self, image_transforms):
        self.image_transforms = image_transforms

    def __call__(self, sample):
        image = self.image_transforms(sample["image"])  #apply transforms to the image
        label = sample["label"]  #extract the label with "label" key
        return image, label  #return only the tensors

# modified dataset with TransformSample class to fix TypeError: img should be PIL Image. Got <class 'dict'>
full_dataset = EuroSAT(root='torchgeo_data', transforms=TransformSample(transforms.Compose([data_augmentation, preprocess_transform])),
    download=True,
    checksum=True)


# Perform a 70-15-15 train-val-test split on the original dataset length
dataset_size = len(full_dataset)
train_size = int(0.7 * dataset_size)
val_size = int(0.15 * dataset_size)
test_size = dataset_size - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, val_size, test_size]) # split the datasets

#data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)

#print the sizes of the datasets
print(f"Total number of samples in the full dataset: {len(full_dataset)}")
print(f"Number of samples in the training set: {len(train_dataset)}")
print(f"Number of samples in the testing set: {len(test_dataset)}")


In [None]:
# Training, Validation, and Testing
from tqdm import tqdm #for time counting

def train(train_loader, val_loader, model, criterion, optimizer, num_epochs, device):
    model.to(device)
    best_val_accuracy = 0.0
    best_model = None
    
    for epoch in range(num_epochs):
        model.train() #set model for training mode
        running_loss = 0.0 #initializing running loss
        correct = 0 #for time counting
        total = 0 #for time counting
        #progress bar for time counting
        progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")
        for i, (images, labels) in progress_bar:
            images, labels = images.to(device), labels.to(device)  #data move to the appropriate device/GPU
            optimizer.zero_grad() #all gradients to zero
            outputs = model(images)
            loss = criterion(outputs, labels) #calculate loss
            loss.backward() #computing the gradient of loss with respect to each parameters in model
            optimizer.step() #updating model
            
            running_loss += loss.item() * images.size(0) #accumulating loss
            
            #time counting
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            progress_bar.set_postfix(loss=running_loss/(i+1), accuracy=100.*correct/total)
        epoch_accuracy = 100. * correct / total #accuracy per epoch
        
        epoch_loss = running_loss / len(train_loader.dataset) #avg loss computation
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.2f}, Accuracy: {epoch_accuracy:.2f}%')

        #validate the model
        val_accuracy = validate(val_loader, model, device)
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model = model.state_dict()

    model.load_state_dict(best_model)  #load the best model
    print("Training complete")
    return model


def test(test_loader, model, device):
    model.to(device)
    model.eval() #set evaluation mode
    correct = 0 #correct predictions
    total = 0 #total samples
    
    with torch.no_grad():  #iterating over the test dataset without tracking gradients
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)  #move data to the appropriate device
            
            outputs = model(images)  #forward pass

            _, predicted = torch.max(outputs, 1)  #get the index of the max log-probability (prediction)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    accuracy = correct / total
    print(f'Test Accuracy %: {accuracy * 100:.2f}%')
    return accuracy


def validate(val_loader, model, device):
    model.to(device)
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    print(f'Validation Accuracy %: {accuracy * 100:.2f}%')
    return accuracy

#define the device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #for "RuntimeError: Found no NVIDIA driver on your system." error

In [None]:
# Load the pre-trained ResNet18 model
model = resnet18(pretrained=True)

# Redefine first layer
model.conv1 = nn.Conv2d(13, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False) 

# Disable gradients from all layers of the model
for param in model.parameters():
    param.requires_grad = False

# Fine-tunning
# replace the last layer with a new linear layer where gradients are active
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10) #10 EuroSAT classes

# enable gradients for the new layer
for param in model.fc.parameters():
    param.requires_grad = True

criterion = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

In [None]:
num_epochs = 10

# turning on gradient calculation for all layers
for param in model.parameters():
    param.requires_grad = True

#the optimizer to optimize all parameters
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# train the model for fine-tuning
fine_tuned_model = train(train_loader, val_loader, model, criterion, optimizer, num_epochs, device)

# test the model for fine-tuning
fine_tuned_accuracy = test(test_loader, fine_tuned_model, device)

In [None]:
# saving the fine-tuned model
torch.save(fine_tuned_model.state_dict(), 'torchgeo_data/models/torchgeo_resnet.pth')

torch.save(model.state_dict(), 'torchgeo_data/models/torchgeo_resnet_weights.pth') #save only weights