Import `pyswip` and consult the Prolog background knowledge base.

In [15]:
from pyswip import Prolog
prolog = Prolog()
prolog.consult('mnist_sum.pl')

Test if the `CLP(FD)`-based abduction in the background knowledge base works correctly.

In [16]:
target = 9
for soln in prolog.query("abduce([X,Y], {})".format(target)):
    print(soln["X"], "adds", soln["Y"], "equals {}.".format(target))

0 adds 9 equals 9.
1 adds 8 equals 9.
2 adds 7 equals 9.
3 adds 6 equals 9.
4 adds 5 equals 9.
5 adds 4 equals 9.
6 adds 3 equals 9.
7 adds 2 equals 9.
8 adds 1 equals 9.
9 adds 0 equals 9.


# Abductive Learning

Now let's try to implement the MNIST sum learning algorithm using the Abductive Learning framework.

### Dataset Generation

Directly copy the codes from the `data_generator.ipynb` notebook file.

In [17]:
import numpy as np
import torch
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
import matplotlib.pyplot as plt

transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
    ])
dataset1 = datasets.MNIST('data', train=True, download=True,
                            transform=transform)
dataset2 = datasets.MNIST('data', train=False,
                            transform=transform)

device = torch.device("cpu")

digit_groups_train = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}
digit_groups_test = {0:[], 1:[], 2:[], 3:[], 4:[], 5:[], 6:[], 7:[], 8:[], 9:[]}

for i in range(len(dataset1)): 
    digit_groups_train[int(dataset1.targets[i])].append(i)
for i in range(len(dataset2)): 
    digit_groups_test[int(dataset2.targets[i])].append(i)

In [18]:
class MNIST_Sum:
    def __init__(self, num, digit_groups):
        self.targets = []
        self.img_indices = []
        self.ground_truth = []
        self.length = num
        for i in range(num):
            # sampling two numbers from 0 to 9
            sampled_digits = np.random.choice(10, 2)
            self.ground_truth.append(list(sampled_digits))

            # using the sum of the sampled digits as the target
            self.targets.append(sum(sampled_digits))
            ids = []
            for j in range(len(sampled_digits)):
                # get the j-th digits
                digit = sampled_digits[j]
                # total number of the images of the digit
                ids.append(np.random.choice(digit_groups[digit]))
            self.img_indices.append(ids)

# Generate the training and test dataset for MNIST Sum task
mnist_sum_data_train = MNIST_Sum(3000, digit_groups_train)
mnist_sum_data_test = MNIST_Sum(3000, digit_groups_test)

### The Machine Learning Part

Neural networks for image classification.

In [19]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR

device = torch.device("cpu")

class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)


def conv_net(outdim, *args, **kwargs):
    return nn.Sequential(
        nn.Conv2d(1, 32, 3, 1),
        nn.ReLU(),
        nn.Conv2d(32, 64, 3, 1),
        nn.ReLU(),
        nn.MaxPool2d(2),
        nn.Dropout(0.25),
        Flatten(),
        nn.Linear(9216, 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, outdim),
        nn.LogSoftmax(dim=1)
    )


def auto_enc(outdim, *args, **kwargs):
    return nn.Sequential(
        nn.Linear(outdim, 128),
        nn.ReLU(),
        nn.Linear(128, 784)
    )


def mlp(indim, outdim, *args, **kwargs):
    return nn.Sequential(
        nn.Linear(indim, 32),
        nn.ReLU(),
        nn.Linear(32, outdim),
        nn.LogSoftmax(dim=1),
    )

class LSTM(nn.Module):
    """A (Bi)LSTM Model.

    Attributes:
        num_layers: the number of LSTM layers (number of stacked LSTM models) in the network.
        in_dim: the size of the input sample.
        hidden_dim: the size of the hidden layers.
        out_dim: the size of the output.
        activation: the activation function.
        bidirectional: the flag for bidirectional LSTM
        dropout: the dropout rate if num_layers > 1
    """

    def __init__(self, num_layers, in_dim, hidden_dim, out_dim,
                 bidirectional=False, dropout=0):
        super().__init__()
        self.num_layers = num_layers
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim
        self.bidirectional = bidirectional
        self.dropout = dropout

        self.lstm = nn.LSTM(self.in_dim,
                            self.hidden_dim,
                            num_layers=self.num_layers,
                            bidirectional=self.bidirectional,
                            dropout=self.dropout,
                            batch_first=True)
        fc_dim = self.hidden_dim * 2 if self.bidirectional else self.hidden_dim
        self.fc = nn.Linear(fc_dim, self.out_dim)

    def forward(self, inputs):
        lstm_out, _ = self.lstm(inputs)
        outputs = self.fc(lstm_out[:, -1, :])
        outputs = torch.sigmoid(outputs)
        return outputs

    def loss_function(self, pred, y):
        return F.binary_cross_entropy(pred, y.view(y.shape[0], -1))


class Net(nn.Module):
    outdim = 10

    def __init__(self, outdim):
        super(Net, self).__init__()
        self.outdim = outdim
        self.enc = conv_net(outdim)

    def forward(self, x):
        output = self.enc(x)
        return output

    def loss_function(self, pred, y):
        return F.nll_loss(pred, y)

def train(model, device, train_loader, optimizer, epoch,
          log_interval=1000, dry_run=False):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = model.loss_function(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            if dry_run:
                break

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            # sum up batch loss
            test_loss += model.loss_function(output, target).item()
            # get the index of the max log-probability
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('-- Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


### The Logic Abduction Part

It involves the following steps:
1. Given the target (label, i.e., the sum of the two images), using `pyswip` to abduce possible pseudo-labels for them.
2. Calculate the probability of each pair of pseudo-labels.
3. Return the most probable pseudo-labels to retrain the neural network.

_Remark_: For more complicated problems, a better way of searching for the best pseudo-labels is required.

##### Abducing possible pseudo-labels given the sum

- `pl` is the Prolog instance that consulted `mnist_sum.pl`;
- `target` is the sum of two images.

In [20]:
def abduce(pl, target):
    # This abduce/2 function is defined in "mnist_sum.pl"
    ans = [];
    for soln in pl.query("abduce([X,Y], {})".format(target)):
        ans.append([soln["X"], soln["Y"]])
    if len(ans) > 0:
        return ans
    else:
        return None

Test the `abduce` function.

In [21]:
print(abduce(prolog, 15))

[[6, 9], [7, 8], [8, 7], [9, 6]]


##### The Abductive Learning Procedure

Importing useful libraries and set the default neural network training parameters within the Abductive Learning Process.

In [22]:
from tqdm import tqdm

nn_train_kwargs = {'batch_size': 64, 'shuffle': True}
nn_epoch = 2

nn_test_loader = torch.utils.data.DataLoader(dataset2, **nn_train_kwargs)

Useful functions for abduction:
1. `get_mnist_imgs`: given `indices`, sample a subset of images from `dataset` (such as the `MNIST` dataset).
2. `best_pseudo_label`: given a set of abduced possible pseudo-labels and the pseudo-label distribution, return the most probable pseudo-label combination for each image. 

In [23]:
def get_mnist_imgs(dataset, indices, use_cuda=False):
    """
    Given get the image tensor from mnist dataset by indices
    """

    n = len(indices)
    img_tensor, tgt = dataset[indices[0]]
    img_tensor = torch.reshape(img_tensor, (1, 1, 28, 28))
    targets = [tgt]
    i = 1
    while i < n:
        img, tgt = dataset[indices[i]]
        img = torch.reshape(img, (1, 1, 28, 28))
        img_tensor = torch.cat((img_tensor, img), 0)
        targets.append(tgt)
        i = i + 1
    if use_cuda:
        img_tensor = img_tensor.to(torch.device("cuda"))
    return img_tensor, targets

def best_pseudo_label(pseudo_label_lists, pseudo_label_dist):
    best_score = -100000.0
    best_combi = np.zeros(pseudo_label_dist.shape[0])
    probabilities = np.exp(pseudo_label_dist)
    for label_combi in pseudo_label_lists:
        # because the scores are log_softmax, the log probability can be calculated as sum
        score = 1.0
        for j in range(len(label_combi)):
            score = score*probabilities[j, label_combi[j]]
        if score >= best_score:
            best_score = score
            best_combi = label_combi
    return best_combi, score


Main procedure for abductive Learning. Given a machine learning `model` and a prolog instance `pl` with `dataset`, it does the following steps:
1. Using `model` to predict the pseudo-label probabilistic distribution of `dataset`;
2. Finding the best pseudo-label combination considering both the abduction result from `pl` and the pseudo-label probabilistic distribution;
3. Retrain the neural network with the abduced pseudo-labels.

In [24]:
def ABL_main(model, pl, dataset, optimizer=None, scheduler=None):
    # number of examples
    num_examples = dataset.length
    abduced_data_ids = []
    abduced_labels = []
    ground_truth_labels = []

    # start abduction
    for i in tqdm (range(num_examples), desc="Abducing..."):
        target = int(dataset.targets[i])
        possible_pseudo_labels = abduce(pl, target)
        if possible_pseudo_labels is not None:
            # reshape the tensor of the two MNIST images to match NN model's input dimensions
            img_indices = dataset.img_indices[i]
            imgs, _ = get_mnist_imgs(dataset1, img_indices, use_cuda=False)

            pseudo_label_distribution = model(imgs).detach().numpy()

            # find the pseudo-labels with the maximum likelihood
            abduced_pseudo_labels, _ = best_pseudo_label(possible_pseudo_labels, pseudo_label_distribution)

            # for abduced dataset
            abduced_data_ids = abduced_data_ids + img_indices
            abduced_labels = abduced_labels + abduced_pseudo_labels
            ground_truth_labels = ground_truth_labels + dataset.ground_truth[i]

    # changing the training data labels to the abduced labels
    for i, img in enumerate(abduced_data_ids):
        dataset1.targets[img] = abduced_labels[i]
    
    abduction_accuracy = np.sum(np.array(ground_truth_labels) == np.array(abduced_labels))/len(abduced_labels)


    # making new dataset with abduced labels
    abduced_data = torch.utils.data.Subset(dataset1, abduced_data_ids)

    # training the neural network model
    abduced_train_loader = torch.utils.data.DataLoader(abduced_data, batch_size=64)

    for epoch in range(1, nn_epoch + 1):
        train(model, device, abduced_train_loader, optimizer, epoch)
        print("Abduction accuracy: ", abduction_accuracy)
        scheduler.step()
    test(model, device, nn_test_loader)

### Running Experiment

Initialise model and optimizer.

In [25]:
model = Net(outdim=10).to(device)

optimizer = optim.Adadelta(model.parameters(), lr=1.0)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)

##### Run Abductive Learning without any pre-train.

In [26]:
ABL_epochs = 10
for epoch in range(ABL_epochs):
    ABL_main(model, prolog, mnist_sum_data_train, optimizer=optimizer, scheduler=scheduler)

Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1012.03it/s]


Abduction accuracy:  0.21533333333333332
Abduction accuracy:  0.21533333333333332
-- Test set: Average loss: 0.0308, Accuracy: 3038/10000 (30%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1075.86it/s]


Abduction accuracy:  0.47733333333333333
Abduction accuracy:  0.47733333333333333
-- Test set: Average loss: 0.0187, Accuracy: 5476/10000 (55%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1026.53it/s]


Abduction accuracy:  0.7113333333333334
Abduction accuracy:  0.7113333333333334
-- Test set: Average loss: 0.0099, Accuracy: 8140/10000 (81%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1039.02it/s]


Abduction accuracy:  0.913
Abduction accuracy:  0.913
-- Test set: Average loss: 0.0041, Accuracy: 9380/10000 (94%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1072.33it/s]


Abduction accuracy:  0.986
Abduction accuracy:  0.986
-- Test set: Average loss: 0.0027, Accuracy: 9556/10000 (96%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1068.84it/s]


Abduction accuracy:  0.993
Abduction accuracy:  0.993
-- Test set: Average loss: 0.0025, Accuracy: 9581/10000 (96%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1023.87it/s]


Abduction accuracy:  0.9946666666666667
Abduction accuracy:  0.9946666666666667
-- Test set: Average loss: 0.0024, Accuracy: 9593/10000 (96%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1062.37it/s]


Abduction accuracy:  0.9953333333333333
Abduction accuracy:  0.9953333333333333
-- Test set: Average loss: 0.0024, Accuracy: 9596/10000 (96%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1051.11it/s]


Abduction accuracy:  0.9956666666666667
Abduction accuracy:  0.9956666666666667
-- Test set: Average loss: 0.0023, Accuracy: 9598/10000 (96%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1061.71it/s]


Abduction accuracy:  0.9956666666666667
Abduction accuracy:  0.9956666666666667
-- Test set: Average loss: 0.0023, Accuracy: 9594/10000 (96%)


##### Run Abductive Learning with one-shot pre-train

Sample an one-shot training dataset.

In [27]:
import random

# reset the machine learning model
model = Net(outdim=10).to(device)

# reset dataset1 to reset the labels for one-shot training, 
# since the previous abductive learning process has changed 
# the ground truth labels in dataset1
dataset1 = datasets.MNIST('data', train=True, download=True,
                            transform=transform)

n_samples = 1
few_shot_indices = []

for i in range(10):
    few_shot_indices = few_shot_indices + \
        random.sample(digit_groups_train[i], n_samples)

# few_shot_indices = random.sample(all_img_indices, n_samples)

sup_imgs_train = torch.utils.data.Subset(dataset1, few_shot_indices)

sup_train_loader = torch.utils.data.DataLoader(
    sup_imgs_train, **nn_train_kwargs)

optimizer = optim.Adadelta(model.parameters(), lr=1.0)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)

for epoch in range(1, 5):
    train(model, device, sup_train_loader,
        optimizer, epoch)
    #test(model, device, nn_test_loader)
    scheduler.step()
test(model, device, nn_test_loader)

-- Test set: Average loss: 0.0320, Accuracy: 3611/10000 (36%)


In [28]:
ABL_epochs = 5
for epoch in range(ABL_epochs):
    ABL_main(model, prolog, mnist_sum_data_train, optimizer=optimizer, scheduler=scheduler)

Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1068.00it/s]


Abduction accuracy:  0.623
Abduction accuracy:  0.623
-- Test set: Average loss: 0.0141, Accuracy: 7271/10000 (73%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1070.81it/s]


Abduction accuracy:  0.8946666666666667
Abduction accuracy:  0.8946666666666667
-- Test set: Average loss: 0.0053, Accuracy: 9084/10000 (91%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1049.79it/s]


Abduction accuracy:  0.9836666666666667
Abduction accuracy:  0.9836666666666667
-- Test set: Average loss: 0.0027, Accuracy: 9499/10000 (95%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1028.30it/s]


Abduction accuracy:  0.995
Abduction accuracy:  0.995
-- Test set: Average loss: 0.0025, Accuracy: 9545/10000 (95%)


Abducing...: 100%|██████████| 3000/3000 [00:02<00:00, 1002.92it/s]


Abduction accuracy:  0.9966666666666667
Abduction accuracy:  0.9966666666666667
-- Test set: Average loss: 0.0024, Accuracy: 9576/10000 (96%)
