In [None]:
!pip install scikit-activeml
!pip install skorch

# Research

In [None]:
import matplotlib as mlp
import matplotlib.pyplot as plt
import numpy as np
import torch
import warnings

from copy import deepcopy
from skactiveml.classifier import SklearnClassifier
from skactiveml.pool import UncertaintySampling, QueryByCommittee, RandomSampling
from skactiveml.utils import call_func
from sklearn.datasets import load_digits
from sklearn.ensemble import VotingClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from skorch import NeuralNetClassifier
from torch import nn

mlp.rcParams["figure.facecolor"] = "white"

MISSING_LABEL = -1
### REMOVE SEEDS TO REDUCE COMPUTATION TIME ###
random_states = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]
FONTSIZE = 20

batch_sizes = [1, 2, 4, 5, 10, 20]
total_samples = 100

cycles_per_batch_size = []
for batch_size in batch_sizes:
  cycles_per_batch_size.append(int(total_samples/batch_size))


# Define base module.
class ClassifierModule(nn.Module):
    def __init__(self):
        super(ClassifierModule, self).__init__()
        self.conv_layer = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.dense_layer = nn.Linear(288, len(classes))
        self.outpout = nn.Softmax(dim=-1)

    def forward(self, X):
        X = self.conv_layer(X)
        X = X.reshape(X.size(0), -1)
        X= self.dense_layer(X)
        X = self.outpout(X)
        return X

results = []

for state in random_states:
  print(f'Random state: {state}')
  torch.manual_seed(state)
  torch.cuda.manual_seed(state)

  warnings.filterwarnings("ignore")

  # Load digit data set.
  X, y_true = load_digits(return_X_y=True)

  # Standardize data.
  X = StandardScaler().fit_transform(X)

  # Reshape samples to n_samples x n_channels x width x height to fit skorch
  # requirements.
  X = X.reshape((len(X), 1, 8, 8))

  # Set data types according to skorch requirements.
  X, y_true = X.astype(np.float32), y_true.astype(np.int64)

  # Identify list of possible classes.
  classes = np.unique(y_true)

  # Make a 66-34 train-test split.
  X_train, X_test, y_train, y_test = train_test_split(
      X, y_true, train_size=0.66, random_state=state
  )

  # Create list of three base CNNs.
  learning_rates = [1.e-3, 1.e-2, 1.e-1]
  estimators = []
  for i, learning_rate in enumerate(learning_rates):
      net = NeuralNetClassifier(
          ClassifierModule,
          max_epochs=100,
          lr=learning_rate,
          verbose=0,
          train_split=False,
      )
      net.initialize()
      estimators.append((f'clf {i}',
                        SklearnClassifier(
                            estimator=net, missing_label=MISSING_LABEL,
                            random_state=i, classes=classes)
                        )
                        )

  # Creat voting ensemble out of given ensemble list.
  ensemble_init = SklearnClassifier(
      estimator=VotingClassifier(estimators=estimators, voting='soft'),
      missing_label=MISSING_LABEL, random_state=state, classes=classes
  )

  qs_dict = {
      'random sampling': RandomSampling(random_state=state, missing_label=MISSING_LABEL),
      'uncertainty sampling (least confident)': UncertaintySampling(method='least_confident', random_state=state, missing_label=MISSING_LABEL),
      'uncertainty sampling (entropy)': UncertaintySampling(method='entropy', random_state=state, missing_label=MISSING_LABEL),
      'uncertainty sampling (margin)': UncertaintySampling(method='margin_sampling', random_state=state, missing_label=MISSING_LABEL),
      'query-by-committee': QueryByCommittee(random_state=state, missing_label=MISSING_LABEL),
  }

  batch_acc_dictionaries = []
  for i, batch_size in enumerate(batch_sizes):
    acc_dict = {key: np.zeros(cycles_per_batch_size[i]+1) for key in qs_dict}

    print(f'Batch size used: {batch_size}')
    # Perform active learning with each query strategy.
    for qs_name, qs in qs_dict.items():

      print(f'Execute active learning using {qs_name}')

      # Copy initial ensemble model.
      ensemble = deepcopy(ensemble_init)

      # Create array of missing labels as initial labels.
      y = np.full_like(y_train, fill_value=MISSING_LABEL, dtype=np.int64)

      # Execute active learning cycle.
      for c in range(cycles_per_batch_size[i]):
          # Fit and evaluate ensemble.
          acc = ensemble.fit(X_train, y).score(X_test, y_test)
          acc_dict[qs_name][c] = acc

          # Select and update training data.
          query_idx = call_func(
              qs.query, X=X_train, y=y, clf=ensemble, fit_clf=False, ensemble=ensemble,
              fit_ensemble=False, batch_size=batch_size
          )
          y[query_idx] = y_train[query_idx]

      # Fit and evaluate ensemble.
      ensemble.fit(X_train, y)
      acc_dict[qs_name][cycles_per_batch_size[i]] = ensemble.score(X_test, y_test)

    batch_acc_dictionaries.append(acc_dict)

  results.append(batch_acc_dictionaries)


#print(results)
#print(len(results))


In [None]:
FONTSIZE = 30

averaged_batch_accuracies = []
for cycles in cycles_per_batch_size:
  sum = np.zeros((len(qs_dict), cycles+1))
  averaged_batch_accuracies.append(sum)

# sum results of seeds
for state_results in results:
  for i, batch_acc_dictionary in enumerate(state_results):
    for j, accuracies in enumerate(batch_acc_dictionary.values()):
      averaged_batch_accuracies[i][j] += accuracies

# average accuracies over seeds
for i, accuracies in enumerate(averaged_batch_accuracies):
  averaged_batch_accuracies[i] = accuracies/len(random_states)

for i, batch_accuracies in enumerate(averaged_batch_accuracies):
  cycles = np.arange(0, total_samples+1, batch_sizes[i], dtype=int)
  plt.figure(figsize=(16, 9))
  for index, qs_name in enumerate(qs_dict.keys()):
    plt.plot(cycles, batch_accuracies[index], label=f'{qs_name}: AULC={round(batch_accuracies[index].mean(), 2)}')
  plt.xticks(fontsize=FONTSIZE)
  plt.yticks(fontsize=FONTSIZE)
  plt.title(f'Batch size: {batch_sizes[i]}', fontsize=FONTSIZE)
  plt.xlabel('Number of samples', fontsize=FONTSIZE)
  plt.ylabel('Test accuracy', fontsize=FONTSIZE)
  plt.legend(loc='lower right', fontsize='xx-large')
  plt.show()
