In [1]:
# Adapted from https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
# for kaggle satellite image classification dataset https://www.kaggle.com/mahmoudreda55/satellite-image-classification
# and then basic active learning was applied.

from __future__ import print_function
from __future__ import division
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
from pytorch_finetuning_utils import train_model, train_model_given_numpy_arrays, initialize_model
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)
import sys
sys.path.append("..")
# sys.path.insert(0, '..')
from test_framework.model_interface import ModelInterface
from test_framework.tester import Tester
from utils.data_utils import get_kaggle_satellite_image_classification_dataset_as_numpy_arrays

%matplotlib inline

PyTorch Version:  1.10.0
Torchvision Version:  0.11.1


# parameters

In [2]:
# Top level data directory. Here we assume the format of the directory conforms
#   to the ImageFolder structure
data_dir = "../data/kaggle_satellite_image_classification"

# Models to choose from [resnet, alexnet, vgg, squeezenet, densenet]
model_name = "squeezenet"

# Number of classes in the dataset
num_classes = 4

# Batch size for training (change depending on how much memory you have)
batch_size = 8

# Number of epochs to train for
num_epochs = 1#15

# Flag for feature extracting. When False, we finetune the whole model,
#   when True we only update the reshaped layer params
feature_extract = True


# Load data

In [3]:
x_data,y_data = get_kaggle_satellite_image_classification_dataset_as_numpy_arrays()

# Initialize model

In [4]:
# Initialize the model for this run
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

# Print the model we just instantiated
print(model_ft.classifier)

Sequential(
  (0): Dropout(p=0.5, inplace=False)
  (1): Conv2d(512, 4, kernel_size=(1, 1), stride=(1, 1))
  (2): ReLU(inplace=True)
  (3): AdaptiveAvgPool2d(output_size=(1, 1))
)


In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [6]:
class ActiveLearningModel(ModelInterface):
    def __init__(self,model,name="no name provided",details="no details provided"):
        model.to(device)
        self.model = model
        self._name = name
        self._details = details
        # Gather the parameters to be optimized/updated in this run. If we are
        #  finetuning we will be updating all parameters. However, if we are
        #  doing feature extract method, we will only update the parameters
        #  that we have just initialized, i.e. the parameters with requires_grad
        #  is True.
        params_to_update = model.parameters()
        verbose = False
        if verbose:
            print("Params to learn:")
        if feature_extract:
            params_to_update = []
            for name,param in model.named_parameters():
                if param.requires_grad == True:
                    params_to_update.append(param)
                    if verbose:
                        print("\t",name)
        elif verbose:
            for name,param in model.named_parameters():
                if param.requires_grad == True:
                    print("\t",name)

        # Observe that all parameters are being optimized
        optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)
        self._optimizer = optimizer_ft
        
        # store criterion
        self._criterion = nn.CrossEntropyLoss()

    def name(self) -> str:
        return self._name
    def details(self) -> str:
        return self._details
    def train(self, train_x:np.ndarray, train_y:np.ndarray) -> None:
        self.model = train_model_given_numpy_arrays(self.model, train_x, train_y, self._criterion, self._optimizer, num_epochs, batch_size, verbose=False)
    def predict(self, test_x, output_tensors=False):
        if output_tensors:
            return self.model(torch.tensor(test_x).to(device))
        else:
            return self.model(torch.tensor(test_x).to(device)).cpu().detach().numpy()
    def query(self, unlabeled_data:np.ndarray, labeling_batch_size:int) -> np.ndarray:
        self.model.eval()
        softmax = nn.Softmax(dim=1)
        softmax_outputs = softmax(self.predict(unlabeled_data,output_tensors=True))
        max_softmax_vals, _ = torch.max(softmax_outputs, 1)
        max_softmax_vals_array = max_softmax_vals.cpu().detach().numpy()
        least_confident_indices = np.argpartition(max_softmax_vals_array,labeling_batch_size)[:labeling_batch_size]
        return least_confident_indices

In [7]:
y_data_onehot = np.zeros((y_data.size, y_data.max()+1))
y_data_onehot[np.arange(y_data.size),y_data] = 1

In [8]:
tester = Tester(x_data[:600],y_data_onehot[:600])
tester.TRAINING_EPOCHS = 2
tester.ACTIVE_LEARNING_BATCH_SIZE = 8

In [9]:
active_learning_model = ActiveLearningModel(model_ft,model_name,str(model_ft.classifier))

In [10]:
tester.test_model(active_learning_model)

Test 0, Training Epoch 1: 100%|██████████| 10/10 [00:15<00:00,  1.55s/it]


In [None]:
tester.plot_results()