<a href="https://colab.research.google.com/github/ilangofman/SATNet-Logic-Puzzles/blob/main/KenKen_via_SATNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# KenKen Solving With SATNet

Download the model artifacts for the original SATNet model 

Po-Wei Wang, Priya L. Donti, Bryan Wilder, and J. Zico Kolter. Satnet: Bridging deep learning and logical reasoning
using a differentiable satisfiability solver. In Proceedings of ICML’19, pages 6545–6554, 2019. URL https: [github.com/locuslab/SATNet.git.](https://github.com/locuslab/SATNet.git)

In [None]:
!git clone https://github.com/locuslab/SATNet
%cd SATNet
!python setup.py develop > install.log 2>&1

Cloning into 'SATNet'...
remote: Enumerating objects: 101, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 101 (delta 0), reused 2 (delta 0), pack-reused 96[K
Receiving objects: 100% (101/101), 497.29 KiB | 4.48 MiB/s, done.
Resolving deltas: 100% (37/37), done.
/content/SATNet


In [None]:
import os
import shutil
import argparse
from collections import namedtuple

import numpy as np
import numpy.random as npr

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

import matplotlib.pyplot as plt
from IPython.display import display, Markdown, Latex, clear_output
import tqdm

import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from tqdm.auto import tqdm

import random
random.seed(6)



if not torch.cuda.is_available(): 
    print('[WARNING] Not using GPU.')
    print('Please select "Runtime -> Change runtime type" and switch to GPU for better performance')
else:
    print('Using', torch.cuda.get_device_name(0))

Using Tesla T4


# Introduction to SATNet
SATNet is a differentiable (smoothed) maximum satisfiability (MAXSAT) solver that can be integrated into the loop of larger deep learning systems.  Our (approximate) solver is based upon a fast coordinate descent approach to solving the semidefinite program (SDP) associated with the MAXSAT problem.



In [None]:
import satnet
print('SATNet document\n', satnet.SATNet.__doc__)

SATNet document
 Apply a SATNet layer to complete the input probabilities.

    Args:
        n: Number of input variables.
        m: Rank of the clause matrix.
        aux: Number of auxiliary variables.

        max_iter: Maximum number of iterations for solving
            the inner optimization problem.
            Default: 40
        eps: The stopping threshold for the inner optimizaiton problem.
            The inner Mixing method will stop when the function decrease
            is less then eps times the initial function decrease.
            Default: 1e-4
        prox_lam: The diagonal increment in the backward linear system
            to make the backward pass more stable.
            Default: 1e-2
        weight_normalize: Set true to perform normlization for init weights.
            Default: True

    Inputs: (z, is_input)
        **z** of shape `(batch, n)`: 
            Float tensor containing the probabilities (must be in [0,1]).
        **is_input** of shape `(batch, 

# Building SATNet-based Models


In [None]:
class KenKenSolver(nn.Module):
    def __init__(self, input_variables, aux, m):
        super(KenKenSolver, self).__init__()
        n = input_variables
        self.sat = satnet.SATNet(n, m, aux)

    def forward(self, y_in, mask):
        out = self.sat(y_in, mask)
        del y_in, mask
        return out

The experimental parameters we use in the paper are below.

# KenKen Dataset

In [None]:
import json
# Opening JSON file
f = open('/content/games_json-3x3_cage2.json')
  
# returns JSON object as 
# a dictionary
data = json.load(f)

In [None]:
random.shuffle(data)

## Load Dataset and create encoding 

For different configurations of the game, some constants will need to be changed. Such as:

- *GAME_SIZE* : the value of n in an *n x n* board. 
- *max_cage_total* The maximum total of a cage
- *MAX_CELL_SIZE* : the maximum number of cells in a cage
- *MASK_OUT_SINGLE*: boolean flag to mask out single cells from learning. 

In [None]:
X_dataset = []
solutions = []
kenken_masks = []

GAME_SIZE = 3
MAX_CELL_SIZE = 2

num_possible_cages = GAME_SIZE ** 2 

max_cage_total = 5 #5 # TODO make this not hard coded

MASK_OUT_SINGLE = True

for game in data:
  # For each cell in the game, add a boolean variable to represent all possible values it can take
  # The first GAME_SIZE multiplication ensures one variable for all possible variables in one cell.
  # multiplying again by GAME_SIZE squared, adds variables for all cells in the game 
  # empty_value_variables = [0] * (GAME_SIZE * (GAME_SIZE ** 2))
  solution_value_variables = []
  empty_value_variables = []
  input_mask = []


  for i in range(len(game['solution'])):
    for j in range(len(game['solution'][0])):
      soln_value_variable = [0] * (GAME_SIZE)
      soln_value_variable[game['solution'][i][j] - 1] = 1

      solution_value_variables += soln_value_variable

      cell_group = game['cellGroups'][i][j]
      operation_for_current_cell = game['groupValues'][cell_group-1][1]
      # TODO make sure for multiplication it is * and not "x"

      value_variables = [0] * GAME_SIZE 
      if not MASK_OUT_SINGLE or operation_for_current_cell[0] == "+" or operation_for_current_cell[0] == "-" or operation_for_current_cell[0] == "*" or operation_for_current_cell[0] == "/":
        empty_value_variables += value_variables
        input_mask += [0] * GAME_SIZE
      else:
        empty_value_variables += soln_value_variable
        input_mask += [1] * GAME_SIZE


  CAGE_VARIABLES = []
  CAGE_TOTALS = []
  # Loop through the board and create cage membership variables
  for i in range(len(game['cellGroups'])):
    for j in range(len(game['cellGroups'][0])):
      cage_booleans = [0] * num_possible_cages
      cage_booleans[game['cellGroups'][i][j] - 1] = 1

      CAGE_VARIABLES += cage_booleans

  # Create the total values for the cages
  for i in range(len(game['groupValues'])):
    possible_cage_totals = [0] * max_cage_total
    current_group_total = int(game['groupValues'][i][1])

    possible_cage_totals[current_group_total-1] = 1

    CAGE_TOTALS += possible_cage_totals

  # There is a high change the number of cages generated in the game is going to be less than 
  # the number of max possible cages. Just so we are consistent, lets add the remainding cage totals padding of 0s

  for i in range(num_possible_cages - len(game['groupValues'])):
    possible_cage_totals = [0] * max_cage_total
    CAGE_TOTALS += possible_cage_totals


  # input_mask = [0] * len(empty_value_variables) 
  input_mask += [1] * (len(CAGE_VARIABLES) + len(CAGE_TOTALS))

  print("Number of empty value variables = ", len(empty_value_variables))
  print("Number of solution value variables = ", len(solution_value_variables))
  print("Number of cage membership variables = ", len(CAGE_VARIABLES))
  print("Number of cage total= ", len(CAGE_TOTALS))
  print("Input mask len", len(input_mask))
  print(input_mask)

  X_dataset.append(empty_value_variables + CAGE_VARIABLES + CAGE_TOTALS)
  solutions.append(solution_value_variables + CAGE_VARIABLES + CAGE_TOTALS)
  kenken_masks.append(input_mask)



In [None]:
X_dataset = torch.tensor(X_dataset, dtype=torch.float)
kenken_masks = torch.tensor(kenken_masks, dtype=torch.int32)
solutions = torch.tensor(solutions, dtype=torch.float)

## Model Hyperparameters

In [None]:
from exps.sudoku import FigLogger, find_unperm
args_dict = {'lr': 2e-4 * 5, # TODO CHANGE THE 4 to a 3 
             'cuda': torch.cuda.is_available(), 
             'batchSz': 40,
             'mnistBatchSz': 50,
             'boardSz': 4, # TODO CHANGE 
             'm': 600,
             'aux': 600,
             'nEpoch': 50
            }
args = namedtuple('Args', args_dict.keys())(*args_dict.values())

In [None]:
if args.cuda: X_dataset, kenken_masks,solutions = X_dataset.cuda(), kenken_masks.cuda(), solutions.cuda()


Split data into train and test 

In [None]:
N = X_dataset.size(0)
nTrain = int(N*0.9)
kenken_train = TensorDataset(X_dataset[:nTrain], kenken_masks[:nTrain], solutions[:nTrain])
kenken_test =  TensorDataset(X_dataset[nTrain:], kenken_masks[nTrain:], solutions[nTrain:])

## KenKen Training

In [None]:
def run(boardSz, epoch, model, optimizer, logger, dataset, batchSz, to_train=False, unperm=None):

    loss_final, err_final = 0, 0

    loader = DataLoader(dataset, batch_size=batchSz)
    tloader = tqdm(enumerate(loader), total=len(loader))

    print("loader len: ", len(loader))

    for i,(data,is_input,label) in tloader:
        if to_train: optimizer.zero_grad()
        preds = model(data.contiguous(), is_input.contiguous())
        # print("PRED::: ", preds)
        # print("label::: ", label)
        
        loss = nn.functional.binary_cross_entropy(preds, label)

        if to_train:
            loss.backward()
            optimizer.step()

        err = computeErr(preds.data, label, boardSz, unperm)
        # err = computeErr(preds.data, label, boardSz, unperm)/batchSz
        tloader.set_description('Epoch {} {} Loss {:.4f} Err: {:.4f}'.format(epoch, ('Train' if to_train else 'Test '), loss.item(), err))
        loss_final += loss.item()
        err_final += err

    loss_final, err_final = loss_final/len(loader), err_final/len(loader)
    logger.log((epoch, loss_final, err_final))

    if not to_train:
        print('TESTING SET RESULTS: Average loss: {:.4f} Err: {:.4f}'.format(loss_final, err_final))

    #print('memory: {:.2f} MB, cached: {:.2f} MB'.format(torch.cuda.memory_allocated()/2.**20, torch.cuda.memory_cached()/2.**20))
    torch.cuda.empty_cache()


In [None]:
def train(args, epoch, model, optimizer, logger, dataset, batchSz, unperm=None):
    run(args, epoch, model, optimizer, logger, dataset, batchSz, True, unperm)

@torch.no_grad()
def test(args, epoch, model, optimizer, logger, dataset, batchSz, unperm=None):
    run(args, epoch, model, optimizer, logger, dataset, batchSz, False, unperm)

@torch.no_grad()
def computeErr(pred_flat, label,  n, unperm):

    res = pred_flat[:, : GAME_SIZE * (GAME_SIZE ** 2)]
    res = res.view(-1, GAME_SIZE, GAME_SIZE, GAME_SIZE)
    res_values = torch.argmax(res, dim=3) + 1

    sol = label[:, : GAME_SIZE * (GAME_SIZE ** 2)]
    sol = sol.view(-1, GAME_SIZE, GAME_SIZE, GAME_SIZE)
    solution_values = torch.argmax(sol, dim=3) + 1

    res_values = res_values.view(-1, GAME_SIZE *GAME_SIZE)
    solution_values = solution_values.view(-1, GAME_SIZE *GAME_SIZE)


    num_correct = ((abs(res_values - solution_values)).sum(dim=1) == 0).sum()


    total = len(label)

    print("ERROR RATE",float((total - num_correct) / total) )

    return float((total - num_correct) / total)

### Train the model 

In [None]:
kenken_model = KenKenSolver(len(X_dataset[0]), args.aux, args.m)
if args.cuda: kenken_model = kenken_model.cuda()

plt.ioff()
optimizer = optim.Adam(kenken_model.parameters(), lr=args.lr)

fig, axes = plt.subplots(1,2, figsize=(10,4))
plt.subplots_adjust(wspace=0.4)
train_logger = FigLogger(fig, axes[0], 'Traininig')
test_logger = FigLogger(fig, axes[1], 'Testing')

test(args.boardSz, 0, kenken_model, optimizer, test_logger, kenken_test, args.batchSz)
plt.pause(0.01)
for epoch in range(1, args.nEpoch+1):
    train(args.boardSz, epoch, kenken_model, optimizer, train_logger, kenken_train, args.batchSz)
    test(args.boardSz, epoch, kenken_model, optimizer, test_logger, kenken_test, args.batchSz)
    display(fig)


Save the model 

In [None]:
torch.save(kenken_model.state_dict(), '/content/drive/MyDrive/2547/model_3x3_cell_size_3_m_' + str(args.m) + "_aux_" + str(args.aux) + '.pt')
