In [1]:
"""
h-divergence.ipynb

Created on Mar 30 2023

@author: Lukas

This notebook is meant as an introduction to discriminative active learning (DAL),
and contains implementations for computing the H-Divergence between datasets.
"""

'\ndal.ipynb\n\nCreated on Mar 30 2023\n\n@author: Lukas\n\nThis notebook is meant as an introduction to discriminative active learning (DAL),\nand contains implementations for computing the H-Divergence between datasets and \nfor running DAL.\n'

First, we recall the definition of the H-Divergence. See here (https://melissadell.atlassian.net/wiki/spaces/TCC/pages/2584412161) for more background information and links to domain adaptation.

*Definition (H-Divergence):* Let $X$ be a domain (dataset), and let $D_S$ and $D_T$ be two distributions over $X$ (source and target). Let $H$ be a hypothesis class over $X$ (set of possible classifiers). Then we define the $H$-Divergence between $D_S$ and $D_T$ as

$d_H(D_S, D_T) = \sup_{h \in H} \left| \mathbb{P}_{x \sim D_S} \left[ h(x) = 1 \right] - \mathbb{P}_{x \sim D_T} \left[ h(x) = 1 \right]\right|$

We are interested in the distributions of the labeled and unlabed datasets, denoted by $L$ and $U$, respectively, so the $H$-Divergence becomes

$d_H(D_S, D_T) = \sup_{h \in H} \left| \frac{1}{|L|} \sum_{x \in L} h(x) - \frac{1}{|U|} \sum_{x \in U }h(x) \right|$

where $h(x)$ denotes the probability which the model $h$ assigns to the event $x \in L$. In order to (approximately) attain the supremum, we train a binary MLP classifier with the above expression as the loss function, i.e. we want it to output very high $h(x)$ for $x \in L$ and very low $h(x)$ for $x \in U$. We follow the original DAL paper and apply the classifier $h$ to the embeddings $\phi(x)$, and not to the original data $x$ itself. Here, $\phi$ is the model we would ultimately like to train using active learning (e.g. BERT).

Note that, by definition, $d_H(D_S, D_T) \in [0, 1]$, where we want $d_H(D_S, D_T) \approx 0$, which would indicate that the model (on average) cannot distinguish between $L$ and $U$.

In [2]:
# import packages

import numpy as np
import torch, torchvision

**Basic functionality for computing the H-Divergence between two datasets**

In [3]:
# compute the H-divergence between the labeled data and the unlabeled data

def compute_H_divergence(labeled, unlabeled, model):
    """
    A function that computes the H-divergence between the labeled and unlabeled data.

    Parameters
    ----------
    labeled : numpy array
        The labeled data.

    unlabeled : numpy array
        The unlabeled data.

    model : tensorflow model
        The discriminative model.

    Returns
    -------
    H_divergence : float
        The H-divergence between the labeled and unlabeled data.
        Must be between 0 and 1.
    """
    # for each element in the labeled data, compute the probabilities of the classes
    p_L = model.predict(labeled)

    # for each element in the unlabeled data, compute the probabilities of the classes
    p_U = model.predict(unlabeled)

    # sum the probabilities of class 0 for each element in the labeled data and divide by the number of elements
    p_L_0 = np.sum(p_L[:, 0])
    p_L_0 /= labeled.shape[0]

    # sum the probabilities of class 0 for each element in the unlabeled data and divide by the number of elements
    p_U_0 = np.sum(p_U[:, 0])
    p_U_0 /= unlabeled.shape[0]

    # compute the H-divergence as the absolute difference between p_U_0 and p_L_0
    H_divergence = np.abs(p_U_0 - p_L_0)

    return H_divergence


# train a discriminative model on the labeled data and unlabeled data

def train_discriminative_model(labeled, unlabeled, input_shape):
    """
    A function that trains and returns a discriminative model on the labeled and unlabeled data.

    Parameters
    ----------
    labeled : numpy.ndarray
        The labeled data.

    unlabeled : numpy.ndarray
        The unlabeled data.

    input_shape : int
        The number of features in the dataset.

    Returns
    -------
    model : tf.keras.Sequential
        The trained discriminative model.
    """

    # create the binary dataset:
    y_L = np.zeros((labeled.shape[0], 1), dtype='int')
    y_U = np.ones((unlabeled.shape[0], 1), dtype='int')
    X_train = np.vstack((labeled, unlabeled))
    Y_train = np.vstack((y_L, y_U))
    X_train = torch.from_numpy(X_train).float()

    # build the model:
    model = get_discriminative_model(input_shape)

    # train the model using torch:
    batch_size = 100
    epochs = 10
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(epochs):
        for i in range(0, X_train.shape[0], batch_size):
            x = X_train[i:i + batch_size]
            y = Y_train[i:i + batch_size]
            optimizer.zero_grad()
            y_pred = model(x)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()

    return model


# we use a 3-layer MLP as the discriminative model

def get_discriminative_model(input_shape):
    """
    The MLP model for discriminative active learning, without any regularization techniques.

    Parameters
    ----------
    input_shape : int
        The number of features in the dataset.

    Returns
    -------
    model : tf.keras.Sequential
        The MLP model.
    """
    width = input_shape
    model = torch.nn.Sequential(
        torch.nn.Linear(width, 100),
        torch.nn.ReLU(),
        torch.nn.Linear(100, 100),
        torch.nn.ReLU(),
        torch.nn.Linear(100, 2),
        torch.nn.Softmax(dim=1)
    )

    return model

**Example Implementation:** compute the H-Divergence between two randomly chosen subsets of MNIST (should be close to zero).

In [4]:
# set up functions for MNIST example

# load the MNIST dataset

def load_mnist():
    """
    A function that loads the MNIST dataset.

    Returns
    -------
    X_train : torch.Tensor
        The training data.

    y_train : torch.Tensor
        The training labels.

    X_test : torch.Tensor
        The test data.

    y_test : torch.Tensor
        The test labels.
    """
    # load the MNIST dataset from torchvision
    train = torchvision.datasets.MNIST('./files/', train=True, download=True)
    test = torchvision.datasets.MNIST('./files/', train=False, download=True)

    # convert the data to numpy arrays
    X_train = train.data.numpy()
    y_train = train.targets.numpy()
    X_test = test.data.numpy()
    y_test = test.targets.numpy()

    # reshape the data
    X_train = X_train.reshape(X_train.shape[0], -1)
    X_test = X_test.reshape(X_test.shape[0], -1)

    # normalize the data
    X_train = X_train / 255
    X_test = X_test / 255

    # convert the data to torch tensors
    X_train = torch.from_numpy(X_train).float()
    y_train = torch.from_numpy(y_train).long()
    X_test = torch.from_numpy(X_test).float()
    y_test = torch.from_numpy(y_test).long()

    return X_train, y_train, X_test, y_test


# randomly choose 500 samples from the training data

def choose_initial_samples(X_train, y_train):
    """
    A function that randomly chooses 500 samples from the training data.

    Parameters
    ----------
    X_train : torch.Tensor
        The training data.

    y_train : torch.Tensor
        The training labels.

    Returns
    -------
    X_labeled : torch.Tensor
        The labeled data.

    y_labeled : torch.Tensor
        The labeled labels.
    """
    # randomly choose 500 samples from the training data
    idx = np.random.choice(X_train.shape[0], 500, replace=False)
    X_labeled = X_train[idx]
    y_labeled = y_train[idx]

    return X_labeled, y_labeled


# define a 5-layer CNN model in torch to train on MNIST

def get_model():
    """
    The CNN model for semi-supervised active learning, without any regularization techniques.

    Returns
    -------
    model : torch.nn.Sequential
        The CNN model.
    """
    model = torch.nn.Sequential(
        torch.nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
        torch.nn.ReLU(),
        torch.nn.MaxPool2d(kernel_size=2, stride=2),
        torch.nn.Dropout(0.25),
        torch.nn.Flatten(),
        torch.nn.Linear(7 * 7 * 64, 128),
        torch.nn.ReLU(),
        torch.nn.Dropout(0.5),
        torch.nn.Linear(128, 10),
        torch.nn.Softmax(dim=1)
    )

    return model





# train the model on the labeled data (note that we don't really care about the test accuracy here)

def train_model(X_labeled, y_labeled):
    """
    A function that trains and returns a CNN model on the labeled data.

    Parameters
    ----------
    X_labeled : torch.Tensor
        The labeled data.

    y_labeled : torch.Tensor
        The labeled labels.

    Returns
    -------
    model : tf.keras.Sequential
        The trained CNN model.
    """
    # build the model
    model = get_model()

    # train the model
    batch_size = 100
    epochs = 10
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(epochs):
        for i in range(0, X_labeled.shape[0], batch_size):
            x = X_labeled[i:i + batch_size]
            y = y_labeled[i:i + batch_size]
            optimizer.zero_grad()
            y_pred = model(x)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()

    return model


# get the latent representation of the data using the trained model

def get_latent_representation(model, X):
    """
    A function that computes the latent representation of the data using the trained model.

    Parameters
    ----------
    model : torch.nn.Sequential
        The trained CNN model.

    X : torch.Tensor
        The data.

    Returns
    -------
    latent_representation : numpy.ndarray
        The latent representation of the data.
    """
    # get the latent representation of the data using the trained model
    intermediate_layer_model = torch.nn.Sequential(*list(model.children())[:-1])
    latent_representation = intermediate_layer_model.predict(X)

    return latent_representation

In [5]:
# get x_labeled and y_labeled from the previous step and train the model

X_train, y_train, X_test, y_test = load_mnist()
X_labeled, y_labeled = choose_initial_samples(X_train, y_train)
model = train_model(X_labeled, y_labeled)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz




Epoch 1/10
5/5 - 11s - loss: 2.1448 - accuracy: 0.2740 - 11s/epoch - 2s/step
Epoch 2/10
5/5 - 0s - loss: 1.4613 - accuracy: 0.5720 - 44ms/epoch - 9ms/step
Epoch 3/10
5/5 - 0s - loss: 0.9827 - accuracy: 0.6980 - 43ms/epoch - 9ms/step
Epoch 4/10
5/5 - 0s - loss: 0.7355 - accuracy: 0.7580 - 43ms/epoch - 9ms/step
Epoch 5/10
5/5 - 0s - loss: 0.5339 - accuracy: 0.8500 - 46ms/epoch - 9ms/step
Epoch 6/10
5/5 - 0s - loss: 0.4962 - accuracy: 0.8440 - 42ms/epoch - 8ms/step
Epoch 7/10
5/5 - 0s - loss: 0.3872 - accuracy: 0.8760 - 38ms/epoch - 8ms/step
Epoch 8/10
5/5 - 0s - loss: 0.3678 - accuracy: 0.8760 - 37ms/epoch - 7ms/step
Epoch 9/10
5/5 - 0s - loss: 0.2959 - accuracy: 0.9200 - 37ms/epoch - 7ms/step
Epoch 10/10
5/5 - 0s - loss: 0.2117 - accuracy: 0.9380 - 35ms/epoch - 7ms/step


In [6]:
# compute H-Divergence between X_labeled and X_train

# get the latent representation of the labeled data and the training data

latent_representation_labeled = get_latent_representation(model, X_labeled)
latent_representation_train = get_latent_representation(model, X_train)


# define a discriminator model with input shape the size of the latent representation
# and train it on the latent representation of the labeled data and the training data

discriminator = train_discriminative_model(latent_representation_labeled, 
                                           latent_representation_train, latent_representation_labeled.shape[1])


# compute the H-Divergence between the latent representation of the labeled data and the training data

H_divergence = compute_H_divergence(latent_representation_labeled, latent_representation_train, discriminator)

print(H_divergence)





Epoch 1/10
605/605 - 3s - loss: 0.0846 - accuracy: 0.9917 - 3s/epoch - 5ms/step
Epoch 2/10
605/605 - 1s - loss: 0.0483 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 3/10
605/605 - 2s - loss: 0.0481 - accuracy: 0.9917 - 2s/epoch - 3ms/step
Epoch 4/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 5/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 6/10
605/605 - 1s - loss: 0.0479 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 7/10
605/605 - 1s - loss: 0.0480 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 8/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 9/10
605/605 - 1s - loss: 0.0480 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 10/10
605/605 - 2s - loss: 0.0480 - accuracy: 0.9917 - 2s/epoch - 3ms/step
0.0005562470753987629


**Interpretation of Results:** The H-Divergence in this case is essentially zero, which is what we expected as the samples were chosen randomly.