# Tic Tac Toe Verifier

We create an ML model that verifies TicTacToe Games

Make sure to use a GPU otherwise you'll take a super long time to train

In [1]:
!pip install onnx


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Generate legal game states and illegal game states

1. Recursively lookup all the possible tictactoe game states and create a legal dataset
2. Use the legal dataset and permute illegal game states

In [1]:
import json

def check_winner(board):
    winning_combinations = [
        [0, 1, 2], [3, 4, 5], [6, 7, 8],  # rows
        [0, 3, 6], [1, 4, 7], [2, 5, 8],  # columns
        [0, 4, 8], [2, 4, 6]              # diagonals
    ]
    for combo in winning_combinations:
        if board[combo[0]] == board[combo[1]] == board[combo[2]] and board[combo[0]] is not None:
            return board[combo[0]]
    return None

def generate_games(board, player):
    winner = check_winner(board)
    if winner or None not in board:
        # Game is over, save the outcome and board state
        return [{
            "history": [list(board)],
            "outcome": winner if winner else "Draw"
        }]

    games = []
    for i in range(9):
        if board[i] is None:
            new_board = board.copy()
            new_board[i] = player
            next_player = 'O' if player == 'X' else 'X'
            next_games = generate_games(new_board, next_player)
            for game in next_games:
                game["history"].insert(0, list(board))
            games.extend(next_games)

    return games

def generate_illegal_games(games):
    illegal_games = []
    for game in games:
        history = game['history']
        illegal_history = []
        for round in history:
            round_list = []
            for item in round:
                # cycle the permutations
                if item is None:
                    round_list.append("X")
                if item == "X":
                    round_list.append("O")
                if item == "O":
                    round_list.append(None)
            illegal_history.append(round_list)
        illegal_games.append({
            "history": illegal_history,
            "outcome": game["outcome"]
        })
    return illegal_games



initial_board = [None for _ in range(9)]
games = generate_games(initial_board, 'X')
illegal_games = generate_illegal_games(games)

with open("tic_tac_toe_games_good.json", "w") as file:
    file.write("[\n")  # Start of the list
    for i, game in enumerate(games):
        json.dump(game, file, separators=(',', ': '))
        if i != len(games) - 1:  # If it's not the last game, add a comma
            file.write(",\n")
        else:
            file.write("\n")
    file.write("]\n")

with open("tic_tac_toe_games_bad.json", "w") as file:
    file.write("[\n")  # Start of the list
    for i, game in enumerate(illegal_games):
        json.dump(game, file, separators=(',', ': '))
        if i != len(illegal_games) - 1:  # If it's not the last game, add a comma
            file.write(",\n")
        else:
            file.write("\n")
    file.write("]\n")


# Generate the Anomaly Detection Model
1. Create a variational autoencoder, as the state space is small enough put the game history into an input matrix

2. Fully connect the final outputs and use two outputs to represent true or false

In [2]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, IterableDataset, random_split
import torch.optim as optim
import json


class TicTacToeNet(nn.Module):
    def __init__(self):
        super(TicTacToeNet, self).__init__()

        # Fully connected layers
        # 9 possible inputs x 10 rounds + 1 final output value
        self.fc0 = nn.Linear(91, 16)
        self.fc1 = nn.Linear(16, 3)
        self.fc2 = nn.Linear(3, 16)
        self.fc3 = nn.Linear(16, 91)
        self.fc4 = nn.Linear(91, 2)


    def forward(self, x):
        # Pass through fully connected layers with ReLU activation
        x = torch.relu(self.fc0(x))
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))

        return x

## Load all possible tictactoe games

In [31]:
class JsonDataset(IterableDataset):
    def __init__(self, file_good, file_bad):
        self.file_good = file_good
        self.file_bad = file_bad
        self.data = self.load_data()



    def parse_json_object(self, line):
        try:
            return json.loads(line)
        except json.JSONDecodeError:
            return None

    def encode_board(self, board):
        encoding = []
        for cell in board:
            if cell == 'X':
                encoding.extend([0])
            elif cell == 'O':
                encoding.extend([1])
            else:
                encoding.extend([2])

        return encoding

    def flatten_list(self, matrix):
        flat_list = []
        for row in matrix:
            flat_list.extend(row)
        return flat_list



    def encode_outcome(self, outcome):
        if outcome == 'X':
            return 0
        elif outcome == 'O':
            return 1
        else:
            return 2

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        padded_history, sample_outcome = self.data[idx]
        return torch.tensor(padded_history, dtype=torch.float), torch.tensor(sample_outcome, dtype=torch.float)

    def load_data_type(self, file, valid):
        data = []
        with open(file, 'r') as f:
            # Skip the first line (which is "[")
            next(f)
            for line in f:
                # Remove the trailing comma for all lines except the last one (which is "]")
                if line.endswith(",\n"):
                    line = line[:-2]
                sample = self.parse_json_object(line)
                if sample is not None:
                    max_length = 10  # Maximum length of a Tic Tac Toe game
                    history = sample['history']

                    if len(history) == max_length:
                        padded_history = history
                    else:
                        padded_history = history + [[None] * 9 for _ in range(max_length - len(history))]

                    padded_history = [self.encode_board(x) for x in padded_history]
                    padded_history = self.flatten_list(padded_history)
                    sample_outcome = self.encode_outcome(sample['outcome'])

                    padded_history.extend([sample_outcome])

                    data.append((padded_history, valid))
        return data


    def load_data(self):
        data = []
        data.extend(self.load_data_type(self.file_good, [1, 0]))
        data.extend(self.load_data_type(self.file_bad, [0, 1]))
        print("good", data[0])
        print("bad", data[400000])
        return data

dataset = JsonDataset('tic_tac_toe_games_good.json', 'tic_tac_toe_games_bad.json')  # Add other files as necessary

total_size = len(dataset)
print(total_size)
train_size = int(0.8 * total_size)
test_size = total_size - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

good ([2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 0, 1, 2, 2, 2, 2, 2, 2, 2, 0, 1, 0, 2, 2, 2, 2, 2, 2, 0, 1, 0, 1, 2, 2, 2, 2, 2, 0, 1, 0, 1, 0, 2, 2, 2, 2, 0, 1, 0, 1, 0, 1, 2, 2, 2, 0, 1, 0, 1, 0, 1, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0], [1, 0])
bad ([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 2, 1, 0, 0, 1, 0, 0, 0, 0, 2, 1, 0, 0, 1, 0, 2, 0, 0, 2, 1, 1, 0, 1, 0, 2, 0, 2, 2, 1, 1, 0, 1, 0, 2, 0, 2, 2, 1, 1, 0, 1, 0, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0], [0, 1])
510336


## Train

In [32]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model = TicTacToeNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)

MAX_EPOCH = 5

for epoch in range(MAX_EPOCH):
    model.train()
    for data, valid in train_loader:
        data = data.to(device)  # Move history to CUDA
        valid = valid.to(device)  # Move outcome to CUDA
        optimizer.zero_grad()
        outputs = model(data)

        loss = criterion(outputs, valid)
        loss.backward()
        optimizer.step()

    # Validation phase
    model.eval()
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for data, valid in test_loader:

            data = data.to(device)  # Move history to CUDA
            valid = valid.to(device)  # Move outcome to CUDA
            outputs = model(data)

            # Get the predicted class (the index of the maximum value in the output tensor)
            _, predicted = torch.max(outputs.data, 1)
            _, valid = torch.max(valid, 1)


            total_predictions += valid.size(0)
            correct_predictions += (predicted == valid).sum().item()

    accuracy = 100 * correct_predictions / total_predictions
    print(f"Epoch {epoch + 1}/{MAX_EPOCH} - Accuracy: {accuracy:.2f}%")

cpu


RuntimeError: mat1 and mat2 must have the same dtype

## Export Onnx and Data out for the Hub

In [19]:
# Move the model to the CPU
model = model.cpu()

# Set the model to evaluation mode
model.eval()

# Obtain a sample datapoint from the DataLoader
sample_data_iter = iter(train_loader)
sample_history, _ = next(sample_data_iter)

# No need to move the sample to the device since everything is on the CPU now
x = sample_history
print(x)

# Export the model using ONNX
torch.onnx.export(
    model,                        # model being run
    x,                            # model input (or a tuple for multiple inputs)
    "tictactoe_network.onnx",     # where to save the model (can be a file or file-like object)
    export_params=True,           # store the trained parameter weights inside the model file
    opset_version=10,             # the ONNX version to export the model to
    do_constant_folding=True,     # whether to execute constant folding for optimization
    input_names=['input'],        # the model's input names
    output_names=['output'],      # the model's output names
    dynamic_axes={
        'input': {0: 'batch_size'},     # variable length axes
        'output': {0: 'batch_size'}
    }
)

data_array = ((x[-1]).detach().numpy()).reshape([-1]).tolist()

data = dict(input_data = [data_array])

print(data)

    # Serialize data into file:
json.dump(data, open("data.json", 'w'))


# use the test set to calibrate the circuit
cal_data = dict(input_data = x.flatten().tolist())

# Serialize calibration data into file:
json.dump(data, open("cal_data.json", 'w'))

tensor([[2., 2., 2.,  ..., 0., 1., 0.],
        [2., 2., 2.,  ..., 2., 2., 1.],
        [2., 2., 2.,  ..., 1., 1., 2.],
        ...,
        [2., 2., 2.,  ..., 2., 2., 0.],
        [0., 0., 0.,  ..., 2., 2., 0.],
        [0., 0., 0.,  ..., 2., 2., 1.]])
verbose: False, log level: Level.ERROR

{'input_data': [[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 2.0, 1.0, 1.0, 0.0, 1.0, 0.0, 2.0, 0.0, 2.0, 2.0, 1.0, 1.0, 0.0, 1.0, 0.0, 2.0, 0.0, 2.0, 2.0, 1.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0]]}


In [20]:
!pip install ezkl==2.5.0


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [21]:
import os
import ezkl

data_path = os.path.join("data.json")
calibration_path = os.path.join("cal_data.json")
model_path = os.path.join('tictactoe_network.onnx')
compiled_model_path = os.path.join('network.compiled')
pk_path = os.path.join('test.pk')
vk_path = os.path.join('test.vk')
settings_path = os.path.join('settings.json')
srs_path = os.path.join('kzg.srs')
witness_path = os.path.join('witness.json')

In [22]:
res = ezkl.gen_settings(model_path, settings_path)


In [23]:
res = await ezkl.calibrate_settings(data_path, model_path, settings_path, "resources")


In [24]:
res = ezkl.compile_circuit(model_path, compiled_model_path, settings_path)
assert res == True

In [25]:
res = ezkl.get_srs(srs_path, settings_path)


In [26]:
res = ezkl.gen_witness(data_path, compiled_model_path, witness_path)
assert os.path.isfile(witness_path)

In [27]:
res = ezkl.mock(witness_path, compiled_model_path)
assert res == True

In [28]:
res = ezkl.setup(
        compiled_model_path,
        vk_path,
        pk_path,
        srs_path,
    )

assert res == True
assert os.path.isfile(vk_path)
assert os.path.isfile(pk_path)
assert os.path.isfile(settings_path)

In [29]:
proof_path = os.path.join('test.pf')

res = ezkl.prove(
        witness_path,
        compiled_model_path,
        pk_path,
        proof_path,
        srs_path,
        "single",
    )

print(res)
assert os.path.isfile(proof_path)

{'instances': [[[5432626032756654017, 1961834588764195904, 11835134460333605139, 1680038075927467451], [17322468249157237758, 16138382668123003341, 5640928463521831588, 1008128559805110475]]], 'proof': '28f998d05eb5075afba83b5aaeb6ba142929a8ce2b962a7b94b083f4b58d403e0ff66648ed7ef9313bd588190acb8e49b163643385e6d30e5dd4dceb6f989e632185e1a26678288cf282885c6d810819b1003b2aa8788fac9496688d04da33541bbb6bd013e108829d67cfe895f71e93a1c4005fdb71c4bce5da78046f28267c1e976291d7d9f4dc52eb6b18f1ed6fe8a39606971e91b4368d5a5966f13ce5940d9608c7b45b15ab9284d3d84827df57c074a66b811feab8fa8241fae6b6c47c00a2bc6dd433c4df1f543e14d5193c5b00e2e9d780b3c1672e5208028b7079ef0891c5a4cbbec3f0f956dece62879362288619d06b0f672cf3df5eaa31b6c73625b65ef716e33e651fb1c5b065bf1d78910f5ab0dbff327218391f4869a5914d1c2cc2cc099f91edb4f04907802e4bff55e81f218339406b17d40968f33b2ed611294038f4ba448e1932c2b1fec2737e30d3cdf06b46a3db36943b1703aee70b0fa35ce4f1864405f5dbbd70cc9b29b6a7984cecfc73b326810286f71b758b202b4df89bb69dc2d88d6254d6ec2fe

In [30]:
# VERIFY IT
res = ezkl.verify(
        proof_path,
        settings_path,
        vk_path,
        srs_path,
    )

assert res == True
print("verified")

verified
