# Imports

In [None]:
import os
import numpy as np 
import tqdm # version 4.40.0

import torch
from torchvision import datasets, transforms

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, Subset
import torch.nn.functional as F

import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid

from cpmpy import * 

# Make/load the sudoku boards

In [None]:
# ----- Make fixed sudoku boards for training and testing -----
# train sudoku boards consist of train MNIST images, test sudoku boards of test MNIST images

# Sudoku boards
# from https://github.com/locuslab/SATNet/blob/master/exps/sudoku.py
datadir = "sudoku"
with open(os.path.join(datadir,'features.pt'), 'rb') as f:
    X_in = torch.load(f)
with open(os.path.join(datadir,'labels.pt'), 'rb') as f:
    Y_in = torch.load(f)

# Divide the boards (2000 test, 8000 train)
train_ids = [i for i in range(0, 7000)]
validation_ids = [i for i in range(7000, 8000)]
test_ids = [i for i in range(8000, 10000)]

# Normalize MNIST data
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,), (0.5,))])

# Download and load the MNIST data
testset = datasets.MNIST('.', download=True, train=False, transform=transform)
digit_indices_test = {k:torch.LongTensor(*np.where(testset.targets == k)) for k in range(0,10)}
trainset = datasets.MNIST('.', download=True, train=True, transform=transform)

# Make validation set
# Initialize counters for each digit class in the validation set
count_per_class = [0] * 10
# Initialize arrays for the validation set
validation_set = []
train_set = []
# Iterate through the training set and move samples to the validation set
for img, label in trainset:
    if count_per_class[label] < 1000:
        validation_set.append([img, label])
        count_per_class[label] += 1
    else:
        train_set.append([img, label])
validation_labels = np.array([x[1] for x in validation_set])
digit_indices_validation = {k:torch.LongTensor(*np.where(validation_labels == k)) for k in range(0,10)}
train_labels = np.array([x[1] for x in train_set])
digit_indices_train = {k:torch.LongTensor(*np.where(train_labels == k)) for k in range(0,10)}


# Define constants
N_CHANNEL = 1 # MNIST images are in grey scale 
IMAGE_WIDTH = 28 # MNIST image width
IMAGE_HEIGHT = 28 # MNIST image height


# fix the seed for board generation
torch.manual_seed(9168)
np.random.seed(9168)
rng = np.random.RandomState(243)


def sample_visual_sudoku_test(sudoku_puzzle):
    """
        Turn the given `sudoku_puzzle` into a visual puzzle by replace numeric values
        by images from MNIST. 
    """
    # Visual Sudoku Tensor of shape puzzle_width x puzzle_height x n_channel x image_width x image_height
    sudoku_torch_dimension = sudoku_puzzle.shape + (N_CHANNEL, IMAGE_WIDTH, IMAGE_HEIGHT,)
    vizsudoku = torch.zeros(sudoku_torch_dimension, dtype=torch.float32)

    # sample dataset indices for each non-zero digit
    #for val in np.unique(sudoku_puzzle[sudoku_puzzle > 0]):
    for val in np.unique(sudoku_puzzle):
        val_idx = np.where(sudoku_puzzle == val)
        # randomly sample different MNIST images for a given digit all at once
        idx = torch.LongTensor(rng.choice(digit_indices_test[val], len(sudoku_puzzle[val_idx])))
        vizsudoku[val_idx] = torch.stack([testset[i][0] for i in idx])
        #print(val, idx)
    return vizsudoku


def sample_visual_sudoku_test_correlated(sudoku_puzzle):
    """
        Turn the given `sudoku_puzzle` into a visual puzzle by replace numeric values
        by images from MNIST. 
    """
    # Visual Sudoku Tensor of shape puzzle_width x puzzle_height x n_channel x image_width x image_height
    sudoku_torch_dimension = sudoku_puzzle.shape + (N_CHANNEL, IMAGE_WIDTH, IMAGE_HEIGHT,)
    vizsudoku = torch.zeros(sudoku_torch_dimension, dtype=torch.float32)

    # sample dataset indices for each non-zero digit
    #for val in np.unique(sudoku_puzzle[sudoku_puzzle > 0]):
    for val in np.unique(sudoku_puzzle):
        val_idx = np.where(sudoku_puzzle == val)
        # randomly sample different MNIST images for a given digit all at once
        idx = torch.LongTensor(rng.choice(digit_indices_test[val], len(sudoku_puzzle[val_idx])))
        # Add noise
        images_with_noise = [add_noise(testset[idx[0]][0], noise_level=0.75) for i in idx]
        vizsudoku[val_idx] = torch.stack([torch.tensor(img) for img in images_with_noise])
    return vizsudoku


def sample_visual_sudoku_train(sudoku_puzzle):
    """
        Turn the given `sudoku_puzzle` into a visual puzzle by replace numeric values
        by images from MNIST. 
    """
    # Visual Sudoku Tensor of shape puzzle_width x puzzle_height x n_channel x image_width x image_height
    sudoku_torch_dimension = sudoku_puzzle.shape + (N_CHANNEL, IMAGE_WIDTH, IMAGE_HEIGHT,)
    vizsudoku = torch.zeros(sudoku_torch_dimension, dtype=torch.float32)

    # sample dataset indices for each non-zero digit
    #for val in np.unique(sudoku_puzzle[sudoku_puzzle > 0]):
    for val in np.unique(sudoku_puzzle):
        val_idx = np.where(sudoku_puzzle == val)
        # randomly sample different MNIST images for a given digit all at once
        idx = torch.LongTensor(rng.choice(digit_indices_train[val], len(sudoku_puzzle[val_idx])))
        vizsudoku[val_idx] = torch.stack([train_set[i][0] for i in idx])
        #print(val, idx)
    return vizsudoku


def sample_visual_sudoku_validation(sudoku_puzzle):
    """
        Turn the given `sudoku_puzzle` into a visual puzzle by replace numeric values
        by images from MNIST. 
    """
    # Visual Sudoku Tensor of shape puzzle_width x puzzle_height x n_channel x image_width x image_height
    sudoku_torch_dimension = sudoku_puzzle.shape + (N_CHANNEL, IMAGE_WIDTH, IMAGE_HEIGHT,)
    vizsudoku = torch.zeros(sudoku_torch_dimension, dtype=torch.float32)

    # sample dataset indices for each non-zero digit
    #for val in np.unique(sudoku_puzzle[sudoku_puzzle > 0]):
    for val in np.unique(sudoku_puzzle):
        val_idx = np.where(sudoku_puzzle == val)
        # randomly sample different MNIST images for a given digit all at once
        idx = torch.LongTensor(rng.choice(digit_indices_validation[val], len(sudoku_puzzle[val_idx])))
        vizsudoku[val_idx] = torch.stack([validation_set[i][0] for i in idx])
        #print(val, idx)
    return vizsudoku


def sample_visual_sudoku_validation_correlated(sudoku_puzzle):
    """
        Turn the given `sudoku_puzzle` into a visual puzzle by replace numeric values
        by images from MNIST. 
    """
    # Visual Sudoku Tensor of shape puzzle_width x puzzle_height x n_channel x image_width x image_height
    sudoku_torch_dimension = sudoku_puzzle.shape + (N_CHANNEL, IMAGE_WIDTH, IMAGE_HEIGHT,)
    vizsudoku = torch.zeros(sudoku_torch_dimension, dtype=torch.float32)

    # sample dataset indices for each non-zero digit
    #for val in np.unique(sudoku_puzzle[sudoku_puzzle > 0]):
    for val in np.unique(sudoku_puzzle):
        val_idx = np.where(sudoku_puzzle == val)
        # randomly sample different MNIST images for a given digit all at once
        idx = torch.LongTensor(rng.choice(digit_indices_validation[val], len(sudoku_puzzle[val_idx])))
        # Add noise
        images_with_noise = [add_noise(validation_set[idx[0]][0], noise_level=0.75) for i in idx]
        vizsudoku[val_idx] = torch.stack([torch.tensor(img) for img in images_with_noise])
    return vizsudoku


# Define a function to add random noise to an image
def add_noise(image, noise_level=0.1):
    noise = np.random.normal(scale=noise_level, size=image.shape)
    noisy_image = image + noise
    return torch.tensor(noisy_image.clip(-1, 1), dtype=torch.float32).clone().detach()  # Ensure values are within [-1, 1] range


def transform_labels(sudoku):
    new_labels = torch.zeros(9, 9)
    tensor_digits = torch.arange(1, 10)
    for i in range(0, 9):
        for j in range(0, 9):
            digit_vector = sudoku[i, j]
            new_labels[i, j] = torch.sum(tensor_digits * digit_vector)
    return new_labels.numpy().astype(int)


# ----- Do the test sudokus -----
visual_test_sudokus = []
numerical_test_sudokus = []
completed_numerical_test_sudokus = []

for i in test_ids:
    completed_numerical_sudoku = torch.from_numpy(transform_labels(Y_in[i]))
    numerical_sudoku = torch.from_numpy(transform_labels(X_in[i]))
    visual_sudoku = sample_visual_sudoku_test(transform_labels(X_in[i]))
    visual_test_sudokus.append(visual_sudoku)
    numerical_test_sudokus.append(numerical_sudoku)
    completed_numerical_test_sudokus.append(completed_numerical_sudoku)
 
# torch.save(visual_test_sudokus, 'visual_test_sudokus.pth')
# torch.save(numerical_test_sudokus, 'numerical_test_sudokus.pth')
# torch.save(completed_numerical_test_sudokus, 'completed_numerical_test_sudokus.pth')


# ----- Do the correlated test sudokus -----
visual_correlated_test_sudokus = []

for i in test_ids:
    completed_numerical_sudoku = torch.from_numpy(transform_labels(Y_in[i]))
    numerical_sudoku = torch.from_numpy(transform_labels(X_in[i]))
    visual_sudoku = sample_visual_sudoku_test_correlated(transform_labels(X_in[i]))
    visual_correlated_test_sudokus.append(visual_sudoku)
 
 
# ----- Do the train sudokus -----
visual_train_sudokus = []
numerical_train_sudokus = []
completed_numerical_train_sudokus = []

for i in train_ids:
    completed_numerical_sudoku = torch.from_numpy(transform_labels(Y_in[i]))
    numerical_sudoku = torch.from_numpy(transform_labels(X_in[i]))
    visual_sudoku = sample_visual_sudoku_train(transform_labels(X_in[i]))
    visual_train_sudokus.append(visual_sudoku)
    numerical_train_sudokus.append(numerical_sudoku)
    completed_numerical_train_sudokus.append(completed_numerical_sudoku)

# torch.save(visual_train_sudokus, 'visual_train_sudokus.pth')
# torch.save(numerical_train_sudokus, 'numerical_train_sudokus.pth')
# torch.save(completed_numerical_train_sudokus, 'completed_numerical_train_sudokus.pth')


# ----- Do the validation sudokus -----
visual_validation_sudokus = []
numerical_validation_sudokus = []
completed_numerical_validation_sudokus = []

for i in validation_ids:
    completed_numerical_sudoku = torch.from_numpy(transform_labels(Y_in[i]))
    numerical_sudoku = torch.from_numpy(transform_labels(X_in[i]))
    visual_sudoku = sample_visual_sudoku_validation(transform_labels(X_in[i]))
    visual_validation_sudokus.append(visual_sudoku)
    numerical_validation_sudokus.append(numerical_sudoku)
    completed_numerical_validation_sudokus.append(completed_numerical_sudoku)

# torch.save(visual_validation_sudokus, 'visual_validation_sudokus.pth')
# torch.save(numerical_validation_sudokus, 'numerical_validation_sudokus.pth')
# torch.save(completed_numerical_validation_sudokus, 'completed_numerical_validation_sudokus.pth')


# ----- Do the correlated validation sudokus -----
visual_correlated_validation_sudokus = []

for i in validation_ids:
    completed_numerical_sudoku = torch.from_numpy(transform_labels(Y_in[i]))
    numerical_sudoku = torch.from_numpy(transform_labels(X_in[i]))
    visual_sudoku = sample_visual_sudoku_validation_correlated(transform_labels(X_in[i]))
    visual_correlated_validation_sudokus.append(visual_sudoku)
    

In [None]:
# Let's also define a helper function to visualize the visual puzzle

N_CHANNEL = 1 # MNIST images are in grey scale 
IMAGE_WIDTH = 28 # MNIST image width
IMAGE_HEIGHT = 28 # MNIST image height

def show_grid_img(visual_sudoku, in_green=None, in_red=None, title=None):
    images =visual_sudoku.reshape(-1, IMAGE_HEIGHT, IMAGE_WIDTH)
    images = (255 * images).int()
    dim = visual_sudoku.shape[0]
    fig = plt.figure(figsize=(8,8))
    grid = ImageGrid(fig, 111, nrows_ncols=(dim,dim), axes_pad=0.03)
    
    if title:
        fig.suptitle(title, fontsize=16)

    # cells to plot in green | red
    N = len(images)
    if in_green is None:
        in_green = np.zeros(N, dtype=bool)
    if in_red is None:
        in_red = np.zeros(N, dtype=bool)
    in_green = in_green.flatten()
    in_red = in_red.flatten()
    
    for ax, index in zip(grid, range(len(images))):
        # dont display ticks
        for axis in ax.axis.values():
            axis.toggle(ticks=False, ticklabels=False)
        # color appropriately
        color = 'gray_r'
        if in_red[index]:
            color = 'autumn'
        if in_green[index]:
            color = 'summer'
        # and show
        ax.imshow(images[index].numpy().squeeze(), cmap=color)

In [None]:
# Show some puzzles
show_grid_img(visual_validation_sudokus[0])

# Train CNN & SLN

In [None]:
## Convolutional Neural Network for digit classification

from torch import nn 
import torch.nn.functional as F
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5, 1, padding=2)
        self.conv2 = nn.Conv2d(6, 16, 5, 1)
        self.fc1 = nn.Linear(5*5*16, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84,10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 5*5*16) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.fc3(x)
        # return F.log_softmax(x, dim=1)
        #return F.softmax(x, dim=1)
        return x

    def forward2(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 5*5*16) 
        x = F.relu(self.fc1(x))
        #x = self.fc2(x)
        #x = self.fc3(x)
        # return F.log_softmax(x, dim=1)
        #return F.softmax(x, dim=1)
        return x


class TinyLeNet(nn.Module):
    def __init__(self):
        super(TinyLeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 1, 5, 1, padding=2)
        self.conv2 = nn.Conv2d(1, 2, 5, 1)
        self.fc1 = nn.Linear(5*5*2, 20)
        self.fc2 = nn.Linear(20, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 5*5*2) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class SmallLeNet(nn.Module):
    def __init__(self):
        super(SmallLeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 2, 5, 1, padding=2)
        self.conv2 = nn.Conv2d(2, 3, 5, 1)
        self.fc1 = nn.Linear(5*5*3, 30)
        self.fc2 = nn.Linear(30, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 5*5*3) 
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

def load_classifier(clf_classname, path):
    """
        Initialize a new CNN classifier by
        loading pre-trained weights stored in `path` file
    """
    net = clf_classname()
    state_dict = torch.load(path, map_location=lambda storage, loc: storage)
    net.load_state_dict(state_dict)
    return net

def load_classifier2(clf_classname, path):
    """
        Initialize a new CNN classifier by
        loading pre-trained weights stored in `path` file
    """
    net = clf_classname()
    net_state_dict = net.state_dict()
    state_dict = torch.load(path, map_location=lambda storage, loc: storage)
    state_dict = {k: v for k, v in state_dict.items() if k in net_state_dict}
    net_state_dict.update(state_dict)
    net.load_state_dict(net_state_dict)

    # Freeze loaded weights
    for name, param in net.named_parameters():
        if name in state_dict:
            param.requires_grad = False
        else:
            param.requires_grad = True
    return net

@torch.no_grad()
def predict_proba_sudoku_2(neuralnet, vizsudoku):
    """
        Assign a probabilistic vector to each image of the visual puzzle
    """
    grid_shape = vizsudoku.shape[:2]
    # reshape from 9x9x1x28x28 to 81x1x28x28 
    pred = neuralnet(vizsudoku.flatten(0,1))
    partial_evidence_vectors = torch.ones((81, 10))
    complete_inputs = torch.cat((pred, partial_evidence_vectors), dim=1)
    outputs = refinement_net(complete_inputs)
    outputs = F.softmax(outputs, dim=1)
    outputs = outputs.cpu()  # Move the tensor to the CPU
    # our NN return 81 probabilistic vector: an 81x10 matrix
    return outputs.reshape(*grid_shape,10).detach().numpy() # reshape as 9x9x10 tensor for easier visualisation

@torch.no_grad()
def predict_proba_sudoku(neuralnet, vizsudoku):
    """
        Assign a probabilistic vector to each image of the visual puzzle
    """
    grid_shape = vizsudoku.shape[:2]
    # reshape from 9x9x1x28x28 to 81x1x28x28 
    pred = neuralnet(vizsudoku.flatten(0,1))
    pred = F.softmax(pred, dim=1)
    pred = pred.cpu()  # Move the tensor to the CPU
    # our NN return 81 probabilistic vector: an 81x10 matrix
    return pred.reshape(*grid_shape,10).detach().numpy() # reshape as 9x9x10 tensor for easier visualisation

In [None]:
# Define a custom dataset class


# Set the seed for NumPy's random number generator
np.random.seed(1232)


class CustomCNNDataset(Dataset):
    def __init__(self, dataset):
        self.data = dataset

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        label = self.data[idx][1]
        img = self.data[idx][0]
        return img, label

In [None]:
# Train


# Seed CNN
torch.manual_seed(422)  # 4287, 9623, 5896, 42, 596, 6343439
np.random.seed(422)


# Make dataloaders
cnn_trainset = CustomCNNDataset(train_set)
cnn_testset = CustomCNNDataset(validation_set)
cnn_train_loader = DataLoader(cnn_trainset, batch_size=64, shuffle=True)
cnn_test_loader = DataLoader(cnn_testset, batch_size=64, shuffle=False)


# Initialize the neural network and define the loss function and optimizer
cnn = SmallLeNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(cnn.parameters(), lr=0.001)
# Training loop
num_epochs = 3


for epoch in range(num_epochs):
    running_loss = 0.0
    cnn.train()
    for i, data in enumerate(cnn_train_loader, 0):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = cnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 10 == 9:
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}/{len(cnn_train_loader)}], Loss: {running_loss/100:.4f}')
            running_loss = 0.0

            # Print test loss
            cnn.eval()
            correct = 0
            total = 0
            running_test_loss = 0.0
            batches = 0
            with torch.no_grad():
                for data in cnn_test_loader:
                    inputs, labels = data
                    outputs = cnn(inputs)
                    loss = criterion(outputs, labels)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
                    running_test_loss += loss.item()
                    batches += 1
        
            accuracy = 100 * correct / total
            print(f'Epoch [{epoch+1}/{num_epochs}] - Test Accuracy: {accuracy:.2f}% - Test Loss: {running_test_loss/batches:.4f}')

    
print('Training finished.')


In [None]:
# Print test accuracy

# Manual loading
#cnn = SmallLeNet()
#cnn.load_state_dict(torch.load('SmallLeNet_0.960.pth'))
# cnn = trained_sln

torch.manual_seed(9168)
np.random.seed(9168)

cnn_testset = CustomCNNDataset(testset)
cnn_test_loader = DataLoader(cnn_testset, batch_size=64, shuffle=False)

cnn.eval()
correct = 0
total = 0
running_test_loss = 0.0
batches = 0
with torch.no_grad():
    for data in cnn_test_loader:
        inputs, labels = data
        outputs = cnn(inputs)
        loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        running_test_loss += loss.item()
        batches += 1

accuracy = 100 * correct / total
print(f'Test Accuracy: {accuracy:.2f}% - Test Loss: {running_test_loss/batches:.4f}')

In [None]:
# Manual saving

torch.save(cnn.state_dict(), 'SmallLeNet_0.9.pth')

In [None]:
# Manual loading

cnn = LeNet()
cnn.load_state_dict(torch.load('LeNet_0.944.pth'))
cnn.eval()

# Count the number of weights
num_weights = sum(p.numel() for p in cnn.parameters() if p.requires_grad)
print("Number of weights in the model:", num_weights)

# Sudoku solving

## The model

In [None]:
# Own model

import cpmpy as cp
from cpmpy.solvers import CPM_ortools


def get_sudoku_model(n=9):
    b = np.sqrt(n).astype(int)
    cells = cp.IntVar(1, n, shape=(n,n))

    # plain sudoku model
    m = cp.Model(
        [cp.alldifferent(row) for row in cells],
        [cp.alldifferent(col) for col in cells.T],
        [cp.alldifferent(cells[i:i + b, j:j + b])
            for i in range(0, n, b) for j in range(0, n, b)],
    )
    return {
        'model':m,
        'variables':cells
    }


def solve_sudoku(model, dvars, instance):
    # use another object for solving
    newmodel = cp.Model(model.constraints) 
    # set given clues
    newmodel += cp.all(instance[instance>0] == dvars[instance>0])
    if newmodel.solve():
        results = {
            'runtime':np.asarray(newmodel.cpm_status.runtime),
            'solution':dvars.value(),
        }
    else:
        results = {
            'solution':np.full_like(dvars.value(), np.nan)
        }
    results['status'] = np.asarray(newmodel.cpm_status.exitstatus.value)
    return results


def get_visual_sudoku_full_image_model(ml_predictions, precision=1e-5):
    # base model and decision variables 
    visual_sudoku_problem = get_sudoku_model()
    model = visual_sudoku_problem['model']
    decision_variables = visual_sudoku_problem['variables']
    # introduce a layer of 'perception' variables, as an interface
    # between the solver and the ml network
    # their domain is [0, ..., 9], with 0 acting as the 'empty' symbol
    perception_variables = cp.intvar(0, 9, shape=decision_variables.shape, name='perception')

    # convert predictions to logspace
    logprobs = np.log(np.maximum( ml_predictions, precision ))
    # cp solver requires integer values
    logprobs = np.array(logprobs / precision).astype(int)
    # switch to cpm_array for more features
    logprobs = cp.cpm_array(logprobs)
    # build the objective function over perception variables 
    objective_function = sum(logprobs[idx][v] for idx,v in np.ndenumerate(perception_variables))
    model.maximize(objective_function)
    
    # channeling constraints to link decision variables to perception variables
    # perception variable is either 'empty' or matches grid symbol
    model+= [(perception_variables != 0).implies(decision_variables == perception_variables)]
    # keep track of perception variables as well 
    visual_sudoku_problem['perception'] = perception_variables
    return visual_sudoku_problem

def solve_visual_sudoku_full_image(visual_sudoku_problem, solver_params=dict()):
    model = visual_sudoku_problem['model']
    dvars, pvars = visual_sudoku_problem['variables'], visual_sudoku_problem['perception']
    s = CPM_ortools(model)
    if s.solve(**solver_params):
        results = {
            'solution':dvars.value(),
            'perception':pvars.value()
        }
    else:
        # in case of infeasibility, nan
        results = {
            'solution':np.full_like(dvars.value(), np.nan),
            'perception':np.full_like(dvars.value(), np.nan)
        }
    results['status'] = np.asarray(s.cpm_status.exitstatus.value)
    results['runtime'] = np.asarray(s.cpm_status.runtime)
    return results

## Maximum Likelihood

In [None]:
def perception_leads_to_unique_solution(perception, solution):
    sudoku_mod = get_sudoku_model(n=9)
    vars = sudoku_mod["variables"]
    model = sudoku_mod["model"]
    model += ~ cp.all((vars == solution).flatten())
    is_unique = not solve_sudoku_again(model, vars, perception)
    return is_unique


def solve_sudoku_again(model, dvars, instance):
    # use another object for solving
    newmodel = cp.Model(model.constraints) 
    # set given clues
    newmodel += cp.all(instance[instance>0] == dvars[instance>0])
    return newmodel.solve()


def solve_visual_sudoku_higher_order(visual_sudoku_problem, solver_params=dict(), max_iter=10):
    #Write a loop repeating following steps:
    # while results['solution'] is not unique or iteration < max_iter:
    #   add nogood to the vizsudoku model
    #   solve again
    #   iteration += 1
    
    model = visual_sudoku_problem['model']
    dvars, pvars = visual_sudoku_problem['variables'], visual_sudoku_problem['perception']
    results = solve_visual_sudoku_full_image(visual_sudoku_problem, solver_params)
    #print("first ", results)
    iteration = 0
    
    while (not perception_leads_to_unique_solution(pvars.value(), dvars.value())) and (iteration < max_iter):
      # add nogood to the vizsudoku model
      #model += [~cp.all((perception_variables ==  perception_variables.value())) ]
      model += ~ cp.all((pvars == results['perception']).flatten())
      results = solve_visual_sudoku_full_image(visual_sudoku_problem, solver_params)
      iteration += 1
    if iteration >= max_iter:
        print("max iter exceeded at solving for unique solution")

    return results

In [None]:
def inference_unique_solution(visual_sudoku, cnn):
    # convolutional neural network predictions 
    ml_predictions = predict_proba_sudoku(cnn, visual_sudoku)
    
    # visual sudoku cp model
    visual_sudoku_problem = get_visual_sudoku_full_image_model(ml_predictions)
    
    # solve 
    #results = solve_visual_sudoku_full_image(visual_sudoku_problem)
    results = solve_visual_sudoku_higher_order(visual_sudoku_problem)
    return results


def top_k_inference_unique_solution(visual_sudoku, cnn, k=1):
    return_list = []
    # convolutional neural network predictions 
    ml_predictions = predict_proba_sudoku(cnn, visual_sudoku)
    # visual sudoku cp model
    visual_sudoku_problem = get_visual_sudoku_full_image_model(ml_predictions)
    model = visual_sudoku_problem['model']
    dvars, pvars = visual_sudoku_problem['variables'], visual_sudoku_problem['perception']
    for i in range(0, k):
        results = solve_visual_sudoku_higher_order(visual_sudoku_problem)
        return_list.append(results)
        model += ~ cp.all((pvars == results['perception']).flatten())
        #model += ~ cp.all((dvars == results['solution']).flatten()) # new addition
    return return_list

## Symbolic feedback

In [None]:
import copy


def find_index_of_element_in_arrays(arrays, t):
    """
    Find the index of an element with value t across multiple NumPy arrays.

    Parameters:
        arrays (list of numpy.ndarray): List of NumPy arrays.
        t (int or float): Value to find across the arrays.

    Returns:
        index (tuple or None): Index of the element in the arrays, or None if not found.
    """
    # Check if all arrays have the same shape
    shapes = [arr.shape for arr in arrays]
    if len(set(shapes)) != 1:
        raise ValueError("All arrays must have the same shape")

    # Stack arrays along a new axis
    stacked_array = np.stack(arrays)

    # Check if t exists in all arrays
    presence = np.all(stacked_array == t, axis=0)

    # If t is present in all arrays, find its index
    if np.any(presence):
        index = np.where(presence)
        return (index[0][0], index[1][0])  # Return only the index of the first occurrence
    else:
        return None
        

def inference_with_feedback(visual_sudoku, cnn, sln, top_k_boards=5):
    # convolutional neural network predictions 
    original_ml_predictions = predict_proba_sudoku(cnn, visual_sudoku)
    ml_predictions = copy.deepcopy(original_ml_predictions)
    
    # call the solver
    visual_sudoku_problem = get_visual_sudoku_full_image_model(ml_predictions)
    model = visual_sudoku_problem['model']
    dvars, pvars = visual_sudoku_problem['variables'], visual_sudoku_problem['perception']
    board_list = []
    complete_board_list = []
    for i in range(0, top_k_boards):
        results = solve_visual_sudoku_higher_order(visual_sudoku_problem)
        board_list.append(results)
        complete_board_list.append(results)
        model += ~ cp.all((pvars == results['perception']).flatten())
        #model += ~ cp.all((dvars == results['solution']).flatten()) # new addition
    top_boards = [board["perception"] for board in board_list]

    # refine probabilities
    for i in range(0, 9):
        for j in range(0, 9):
            initial_probabilities = ml_predictions[i, j]
            partial_evidence = [board[i, j] for board in top_boards]
            if (len(set(partial_evidence)) > 1) and (len(set(partial_evidence)) < 10):
                # Construct complete input for the sln
                complete_input = torch.zeros(1, 11, 1, 28, 28)
                complete_input[0, 0] = visual_sudoku[i, j]
                for k in range(0, 10):
                    index = find_index_of_element_in_arrays(top_boards, k)
                    if index is not None:
                        complete_input[0, k+1] = visual_sudoku[index[0], index[1]]
                with torch.no_grad():
                    refined_probability = sln(complete_input)[0]
                    refined_probability = torch.nn.functional.softmax(refined_probability, dim=0).numpy()
                ml_predictions[i, j] = refined_probability

    # Max top board
    choice = 0
    top_score = 0
    new_scores = []
    old_scores = []
    for i, board in enumerate(top_boards):
        score = 1
        old_score = 1
        for row in range(0,9):
            for col in range(0,9):
                score *= ml_predictions[row, col, board[row, col]]
                old_score *= original_ml_predictions[row, col, board[row, col]]
        new_scores.append(score)
        old_scores.append(old_score)
        if score > top_score:
            top_score = score
            choice = i 
    
    return complete_board_list[choice]


def inference_with_feedback_double_cnn(visual_sudoku, cnn, sln, top_k_boards=5):
    # convolutional neural network predictions 
    original_ml_predictions = predict_proba_sudoku(cnn, visual_sudoku)
    ml_predictions = copy.deepcopy(original_ml_predictions)
    
    # call the solver
    visual_sudoku_problem = get_visual_sudoku_full_image_model(ml_predictions)
    model = visual_sudoku_problem['model']
    dvars, pvars = visual_sudoku_problem['variables'], visual_sudoku_problem['perception']
    board_list = []
    complete_board_list = []
    for i in range(0, top_k_boards):
        results = solve_visual_sudoku_higher_order(visual_sudoku_problem)
        board_list.append(results)
        complete_board_list.append(results)
        model += ~ cp.all((pvars == results['perception']).flatten())
        #model += ~ cp.all((dvars == results['solution']).flatten()) # new addition
    top_boards = [board["perception"] for board in board_list]

    # refine probabilities
    for i in range(0, 9):
        for j in range(0, 9):
            initial_probabilities = ml_predictions[i, j]
            partial_evidence = [board[i, j] for board in top_boards]
            if (len(set(partial_evidence)) > 1) and (len(set(partial_evidence)) < 10):
                with torch.no_grad():
                    refined_probability = sln(visual_sudoku[i, j])[0]
                    refined_probability = torch.nn.functional.softmax(refined_probability, dim=0).numpy()
                ml_predictions[i, j] = refined_probability

    # Max top board
    choice = 0
    top_score = 0
    new_scores = []
    old_scores = []
    for i, board in enumerate(top_boards):
        score = 1
        old_score = 1
        for row in range(0,9):
            for col in range(0,9):
                score *= ml_predictions[row, col, board[row, col]]
                old_score *= original_ml_predictions[row, col, board[row, col]]
        new_scores.append(score)
        old_scores.append(old_score)
        if score > top_score:
            top_score = score
            choice = i 
    
    return complete_board_list[choice]

# Finetune SLN

## For k=2

In [None]:
# Make train list

trained_cnn = SmallLeNet2()
trained_cnn.load_state_dict(torch.load('SmallLeNet2_0.957_digit_acc_0.972.pth'))
trained_cnn.eval()

train_list = []

for index in range(0, 7000):
    numerical_sudoku = numerical_train_sudokus[index].numpy()
    visual_sudoku = visual_train_sudokus[index]
    
    top_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=2)
    diff = top_boards[0]["perception"]-top_boards[1]["perception"]
    true_indices = np.where(diff!=0)

    first_board = [top_boards[0]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    second_board = [top_boards[1]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    correct_board = [numerical_sudoku[true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]

    for k in range(0, len(first_board)):
        element = [first_board[k], second_board[k], correct_board[k], index, true_indices[0][k], true_indices[1][k]]
        train_list.append(element)

    if index % 20 == 19:
        print(f'Board {index}')

In [None]:
# Make validation list

trained_cnn = SmallLeNet2()
trained_cnn.load_state_dict(torch.load('SmallLeNet2_0.957_digit_acc_0.972.pth'))
trained_cnn.eval()

validation_list = []

for index in range(0, 1000):
    numerical_sudoku = numerical_validation_sudokus[index].numpy()
    visual_sudoku = visual_validation_sudokus[index]
    
    top_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=2)
    diff = top_boards[0]["perception"]-top_boards[1]["perception"]
    true_indices = np.where(diff!=0)

    first_board = [top_boards[0]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    second_board = [top_boards[1]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    correct_board = [numerical_sudoku[true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]

    board_elements = []
    for k in range(0, len(first_board)):
        element = [first_board[k], second_board[k], correct_board[k], index, true_indices[0][k], true_indices[1][k]]
        board_elements.append(element)
    validation_list.append(board_elements)

    if index % 20 == 19:
        print(f'Board {index}')

In [None]:
# Function to calculate board accuracy when using the SLN and symbolic feedback on validation boards,
# validation list has been computed earlier with the CNN

def print_acc_on_validation_boards(sln, validation_list):
    correct = 0
    total_digits = 0
    loss = 0
    criterion = nn.CrossEntropyLoss()
    
    for boards in validation_list:
        correct_prob = 1.0
        prob1 = 1.0
        prob2 = 1.0
        for k in range(0, len(boards)):
            element = [boards[k][0], boards[k][1], boards[k][2], boards[k][3], boards[k][4], boards[k][5]]
            total_digits += 1
            
            index = element[3]
            visual_sudoku = visual_validation_sudokus[index]
            
            output = sln(visual_sudoku[element[4], element[5]])[0]
            pred = torch.nn.functional.softmax(output, dim=0).detach().numpy()
            loss += criterion(output.unsqueeze(0), torch.tensor([element[2]]).long())
    
            correct_prob *= pred[element[2]]
            prob1 *= pred[element[0]]
            prob2 *= pred[element[1]]
            
        if np.max([prob1, prob2]) == correct_prob:
            correct += 1

    print("##### Results validation #####")
    print("Accuracy: ", correct/len(validation_list))
    print("Loss: ", loss.item()/len(validation_list))

In [None]:
# Finetune SLN using the train list

trained_sln = SmallLeNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(trained_sln.parameters(), lr=0.00001)
running_loss = 0

for i, element in enumerate(train_list, 0):
    index = element[3]
    numerical_sudoku = numerical_train_sudokus[index].numpy()
    visual_sudoku = visual_train_sudokus[index]
    
    # Train
    optimizer.zero_grad()
    output = trained_sln(visual_sudoku[element[4], element[5]])[0]
    label = torch.Tensor([element[2]]).long()
    loss = criterion(output.unsqueeze(0), label)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()

    if i % 100 == 99:
        print(f'Board {i}, Train Loss: {running_loss/100:.4f}')
        running_loss = 0.0
        print_acc_on_validation_boards(trained_sln, validation_list)

In [None]:
# Validation

# Manual loading
sln = SmallLeNet2()
sln.load_state_dict(torch.load('SmallLeNet2_0.957_digit_acc_0.972.pth'))
print_acc_on_validation_boards(sln, validation_list)

In [None]:
# Manual saving

torch.save(trained_sln.state_dict(), 'SmallLeNet_SLN.pth')

## For k=3

In [None]:
# Make train list

trained_cnn = LeNet()
trained_cnn.load_state_dict(torch.load('LeNet_0.987.pth'))
trained_cnn.eval()

train_list = []

for index in range(0, 7000):
    numerical_sudoku = numerical_train_sudokus[index].numpy()
    visual_sudoku = visual_train_sudokus[index]
    
    top_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=3)
    diff = (top_boards[0]["perception"]-top_boards[1]["perception"]) + (top_boards[0]["perception"]-top_boards[2]["perception"]) + (top_boards[1]["perception"]-top_boards[2]["perception"])
    true_indices = np.where(diff!=0)

    first_board = [top_boards[0]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    second_board = [top_boards[1]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    third_board = [top_boards[2]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    correct_board = [numerical_sudoku[true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]

    for k in range(0, len(first_board)):
        element = [first_board[k], second_board[k], third_board[k], correct_board[k], index, true_indices[0][k], true_indices[1][k]]
        train_list.append(element)

    if index % 20 == 19:
        print(f'Board {index}')

In [None]:
# Make validation list

trained_cnn = LeNet()
trained_cnn.load_state_dict(torch.load('LeNet_0.987.pth'))
trained_cnn.eval()

validation_list = []

for index in range(0, 1000):
    numerical_sudoku = numerical_validation_sudokus[index].numpy()
    visual_sudoku = visual_validation_sudokus[index]
    
    top_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=3)
    diff = (top_boards[0]["perception"]-top_boards[1]["perception"]) + 10*(top_boards[0]["perception"]-top_boards[2]["perception"]) + 100*(top_boards[1]["perception"]-top_boards[2]["perception"])
    true_indices = np.where(diff!=0)

    first_board = [top_boards[0]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    second_board = [top_boards[1]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    third_board = [top_boards[2]["perception"][true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]
    correct_board = [numerical_sudoku[true_indices[0][i], true_indices[1][i]] for i in range(0, len(true_indices[0]))]

    board_elements = []
    for k in range(0, len(first_board)):
        element = [first_board[k], second_board[k], third_board[k], correct_board[k], index, true_indices[0][k], true_indices[1][k]]
        board_elements.append(element)

    # Only add to validation list if correct board is among the top-3
    for board in top_boards:
        if np.array_equal(numerical_sudoku, board["perception"]):
            validation_list.append(board_elements)
            break

    if index % 20 == 19:
        print(f'Board {index}')

In [None]:
# Function to calculate board accuracy when using the SLN and symbolic feedback on validation boards,
# validation list has been computed earlier with the CNN

def print_acc_on_validation_boards(sln, validation_list):
    correct = 0
    total_digits = 0
    loss = 0
    criterion = nn.CrossEntropyLoss()
    
    for boards in validation_list:
        correct_prob = 1.0
        prob1 = 1.0
        prob2 = 1.0
        prob3 = 1.0
        for k in range(0, len(boards)):
            element = [boards[k][0], boards[k][1], boards[k][2], boards[k][3], boards[k][4], boards[k][5], boards[k][6]]
            total_digits += 1
            
            index = element[4]
            visual_sudoku = visual_validation_sudokus[index]
            
            output = sln(visual_sudoku[element[5], element[6]])[0]
            pred = torch.nn.functional.softmax(output, dim=0).detach().numpy()
            loss += criterion(output.unsqueeze(0), torch.tensor([element[3]]).long())
    
            correct_prob *= pred[element[3]]
            prob1 *= pred[element[0]]
            prob2 *= pred[element[1]]
            prob3 *= pred[element[2]]
            
        if np.max([prob1, prob2, prob3]) == correct_prob:
            correct += 1

    print("##### Results validation #####")
    print("Accuracy: ", correct/1000)
    print("Loss: ", loss.item()/1000)

In [None]:
# Finetune SLN using the train list

trained_sln = LeNet()
trained_sln.load_state_dict(torch.load('LeNet_0.987.pth'))

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(trained_sln.parameters(), lr=0.00001)
running_loss = 0

for i, element in enumerate(train_list, 0):
    index = element[4]
    numerical_sudoku = numerical_train_sudokus[index].numpy()
    visual_sudoku = visual_train_sudokus[index]
    
    # Train
    optimizer.zero_grad()
    output = trained_sln(visual_sudoku[element[5], element[6]])[0]
    label = torch.Tensor([element[3]]).long()
    loss = criterion(output.unsqueeze(0), label)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()

    if i % 100 == 99:
        print("------------------------------")
        print(f'Board {i}, Train Loss: {running_loss/100:.4f}')
        running_loss = 0.0
        print_acc_on_validation_boards(trained_sln, validation_list)

In [None]:
# Validation

# Manual loading
sln = LeNet()
sln.load_state_dict(torch.load('LeNet_SLN_0.988_finetuned_for_0.987_k=3_digit_acc_0.992.pth'))
print_acc_on_validation_boards(sln, validation_list)

In [None]:
# Manual saving

torch.save(trained_sln.state_dict(), 'LeNet_SLN_digit_acc_0.9.pth')

# Testing

In [None]:
# Validation

# Manual loading
trained_cnn = LeNet()
trained_cnn.load_state_dict(torch.load('LeNet_0.987.pth'))
trained_cnn.eval()
trained_sln = SmallLeNet()
trained_sln.load_state_dict(torch.load('SmallLeNet_0.960.pth'))
trained_sln.eval()

total = 0
normal_correct = 0
feedback_correct = 0
four_boards_correct = 0
normal_correct_feedback_not = 0
feedback_correct_normal_not = 0

for i in range(0, 20):
    numerical_sudoku = numerical_validation_sudokus[i]
    visual_sudoku = visual_validation_sudokus[i]

    normal_result = inference_unique_solution(visual_sudoku, trained_cnn)["perception"]
    feedback_result = inference_with_feedback_double_cnn(visual_sudoku, trained_cnn, trained_sln, 3)["perception"]
    four_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=3)

    normal_correct_bool = False
    feedback_correct_bool = False
    
    if np.array_equal(numerical_sudoku, normal_result):
        normal_correct += 1
        normal_correct_bool = True
    if np.array_equal(numerical_sudoku, feedback_result):
        feedback_correct += 1
        feedback_correct_bool = True
    for board in four_boards:
        if np.array_equal(numerical_sudoku, board["perception"]):
            four_boards_correct += 1
            break

    total += 1
    if normal_correct_bool and not feedback_correct_bool:
        normal_correct_feedback_not += 1
        print("score: ", normal_correct_feedback_not, feedback_correct_normal_not)
    elif feedback_correct_bool and not normal_correct_bool:
        feedback_correct_normal_not += 1
        print("score: ", normal_correct_feedback_not, feedback_correct_normal_not)
    print(total, normal_correct/total, feedback_correct/total, four_boards_correct/total)

    

In [None]:
# Testing

# Manual loading
trained_cnn = LeNet()
trained_cnn.load_state_dict(torch.load('LeNet_0.987.pth'))
trained_cnn.eval()
trained_sln = SmallLeNet()
trained_sln.load_state_dict(torch.load('SmallLeNet_0.960.pth'))
trained_sln.eval()

total = 0
normal_correct = 0
feedback_correct = 0
four_boards_correct = 0
normal_correct_feedback_not = 0
feedback_correct_normal_not = 0

for i in range(1000, 2000):
    numerical_sudoku = numerical_test_sudokus[i]
    visual_sudoku = visual_test_sudokus[i]

    normal_result = inference_unique_solution(visual_sudoku, trained_cnn)["perception"]
    feedback_result = inference_with_feedback_double_cnn(visual_sudoku, trained_cnn, trained_sln, 3)["perception"]
    four_boards = top_k_inference_unique_solution(visual_sudoku, trained_cnn, k=3)

    normal_correct_bool = False
    feedback_correct_bool = False
    
    if np.array_equal(numerical_sudoku, normal_result):
        normal_correct += 1
        normal_correct_bool = True
    if np.array_equal(numerical_sudoku, feedback_result):
        feedback_correct += 1
        feedback_correct_bool = True
    for board in four_boards:
        if np.array_equal(numerical_sudoku, board["perception"]):
            four_boards_correct += 1
            break

    total += 1
    if normal_correct_bool and not feedback_correct_bool:
        normal_correct_feedback_not += 1
        print("score: ", normal_correct_feedback_not, feedback_correct_normal_not)
    elif feedback_correct_bool and not normal_correct_bool:
        feedback_correct_normal_not += 1
        print("score: ", normal_correct_feedback_not, feedback_correct_normal_not)
    print(total, normal_correct/total, feedback_correct/total, four_boards_correct/total)

    

In [None]:
# Test accuracy of CNN on digits of the visual sudoku boards

# Manual loading
#cnn = SmallLeNet()
#cnn.load_state_dict(torch.load('SmallLeNet_0.964.pth'))

total = 0
cnn_correct = 0
for i in range(0, 2000):
    numerical_sudoku = numerical_test_sudokus[i]
    visual_sudoku = visual_test_sudokus[i]

    ml_predictions = predict_proba_sudoku(cnn, visual_sudoku)

    _, cnn_prediction = torch.max(torch.tensor(ml_predictions).data, -1)
    cnn_prediction = cnn_prediction.numpy()
    non_zero = (numerical_sudoku != 0).sum().item()
    cnn_correct += ((cnn_prediction == numerical_sudoku.numpy())).sum().item()
    
    total += 81
    print(i, cnn_correct/total)