### The following section includes sourced code to generate a graph coloring problem.

In [1]:
################################################################
##
## GRAPH COLORING PROBLEM GENERATOR
##
## Generates a planar graph.
##
################################################################

import random

class Point:
    ID_COUNT = 0
    def __init__(self, x, y):
        self.id = Point.ID_COUNT
        Point.ID_COUNT += 1
        self.x = x
        self.y = y

    def transform(self, xt, yt):
        return (xt(self.x), yt(self.y))

    def dist(self, them):
        return abs(self.x - them.x) + abs(self.y - them.y)

    def __repr__(self):
        return "p({:.4f}, {:.4f})".format(self.x, self.y)

## From http://bryceboe.com/2006/10/23/line-segment-intersection-algorithm/
def _ccw(A,B,C):
    return (C.y-A.y)*(B.x-A.x) > (B.y-A.y)*(C.x-A.x)

class Line:
    def __init__(self, p1, p2):
        self.p1 = p1
        self.p2 = p2

    @property
    def endpoints(self):
        return [self.p1, self.p2]

    def transform(self, xt = lambda x: x, yt = lambda y: y):
        start = self.p1.transform(xt, yt)
        end = self.p2.transform(xt, yt)
        return (start, end)

    def intersects(self, them):
        A = self.p1
        B = self.p2
        C = them.p1
        D = them.p2
        return _ccw(A,C,D) != _ccw(B,C,D) and _ccw(A,B,C) != _ccw(A,B,D)

    def __repr__(self):
        return "[{} -> {}]".format(self.p1, self.p2)

def _random_point():
    return Point(random.uniform(-10, 10), random.uniform(-10, 10))

def _find_line(x, lines, pairs):
    for i, x1 in enumerate(x[:-1]):
        shortest_items = sorted([(x2, x1.dist(x2)) for x2 in x[i+1:]], key=lambda item: item[1])
        for x2, _ in shortest_items:
            l1 = Line(x1, x2)
            if not (x1, x2) in pairs and not _line_intersects(l1, lines):
                return l1
    return None

def _line_intersects(l1, lines):
    for l2 in lines:
        if l1.p1 in l2.endpoints or l1.p2 in l2.endpoints:
            continue
        if l1.intersects(l2): return True
    return False

def gen(num_points=100):
    x = [_random_point() for _ in range(num_points)]
    lines = set([])
    pairs = set([])
    while True:
        random.shuffle(x)
        line = _find_line(x, lines, pairs)
        if line:
            pairs.add((line.p1, line.p2))
            lines.add(line)
        else:
            break
    return (x, lines)

In [2]:
import json

num_points = 100
output_file = "gcp.json"

print("Generating a planar graph with {} points...".format(num_points))
(x, lines) = gen(num_points=num_points)

print("Writing to '{}'...".format(output_file))
with open(output_file, 'w') as f:
    f.write(json.dumps({
        'num_points': len(x),
        'points': { p.id: (p.x, p.y) for p in x },
        'edges': [ (line.p1.id, line.p2.id) for line in lines ]
    }, indent=2))
print("""Done! You can now import the results into a Python script with the code:

import json

with open('{}', \'r\') as f:
    data = json.load(f)
""".format(output_file))

Generating a planar graph with 100 points...
Writing to 'gcp.json'...
Done! You can now import the results into a Python script with the code:

import json

with open('gcp.json', 'r') as f:
    data = json.load(f)



In [3]:
import json

with open('gcp.json', 'r') as f:
    data = json.load(f)
    print(data)

{'num_points': 100, 'points': {'42': [0.4439153266580913, -4.909228964218022], '43': [-1.0821217382353936, 6.228414118353456], '65': [-8.011786781415926, -2.5289444604522675], '67': [-3.1289108735715843, -8.793639652801682], '40': [5.608461197710708, -8.827946362531005], '46': [2.516669968340029, -6.6993429263084785], '91': [-9.085158876344753, -6.140115810632625], '29': [5.246985433262603, 5.9309197780742124], '39': [-7.755529831757064, -4.894619129576705], '41': [5.54068801609629, -0.7501424905406466], '81': [-4.280043868163432, -5.577701547129175], '59': [3.3656270077699517, -3.087724240339842], '66': [-2.0873563453636095, 3.47732653854748], '99': [4.916383976133957, 5.590426380707594], '33': [9.984730144598764, -9.690977122285416], '70': [-4.509234126773725, -6.044344468934629], '10': [-7.9686216270270265, 3.373308465680962], '89': [0.07506223702342396, -0.14545237949844392], '57': [-6.356060611857752, -3.901525600731281], '23': [2.7744004098394655, 9.027742197922453], '34': [-8.58

### The following section includes sourced code to generate a Sudoku problem.

In [4]:
################################################################
## CLI Wrapper for a Sudoku generator
##  by Chad Crawford, using code from Gareth Rees
################################################################

import random
from functools import *

## From Gareth Rees at https://codereview.stackexchange.com/a/88866
def make_board(m: int=3) -> list[list[int]]:
    """Return a random filled m**2 x m**2 Sudoku board."""
    n = m**2
    board = [[None for _ in range(n)] for _ in range(n)]

    def search(c: int=0) -> list[list[int]]:
        "Recursively search for a solution starting at position c."
        i, j = divmod(c, n)
        i0, j0 = i - i % m, j - j % m # Origin of mxm block
        numbers = list(range(1, n + 1))
        random.shuffle(numbers)
        for x in numbers:
            if (x not in board[i]                     # row
                and all(row[j] != x for row in board) # column
                and all(x not in row[j0:j0+m]         # block
                        for row in board[i0:i])):
                board[i][j] = x
                if c + 1 >= n**2 or search(c + 1):
                    return board
        else:
            # No number is valid in this cell: backtrack and try again.
            board[i][j] = None
            return None

    return search()

In [5]:
import json

def write_board(cell_length: int, num_missing: int, output_file: str):
    board = make_board(cell_length)

    ms = int(pow(cell_length, 2))
    indices = random.sample(list(range(ms * ms)), num_missing)
    for index in indices:
        r = int(index / ms)
        c = int(index % ms)
        board[r][c] = 0

    ## Write to file
    with open(output_file, 'w') as f:
        f.write(json.dumps(board))

### The following section includes my own implementation of solving a constraint satisfaction problem.

In [25]:
# Before we get into solving a constraint satisfaction problem, we need to define one.
from enum import Enum
from queue import LifoQueue, PriorityQueue, Queue
from typing import Self
import copy

class Consistency(Enum):
    BACKTRACK = 1
    FORWARD_CHECK = 2
    AC3 = 3

class CSPHeuristic(Enum):
    RANDOM = 1 # Random variable is picked next
    MIN_REMAINING = 2 # Variable with the fewest remaining values to be assigned to it is picked next
    MIN_REMAINING_AND_DEGREE = 3 # Variable with the most other variables connected to it is picked next, with tie-breakers going to the fewest remaining available variables to assign

class CSPGraph:
    """
    The graph class will be responsible for remembering connections between nodes and maintaining the variables available for each node.
    In addition, when a node is assigned a value, this class is responsible for maintaining the variable options remaining for each node.
    This class is also going to be responsible for maintaining the specified k-consistency (backtrack for k=0, forward_check for k=1, ac3 for k=2) as each node is assigned a value
    """
    # The following function is mainly book-keeping for its format - the functions used to validate two sets of options between two adjacent nodes have this format
    @staticmethod
    def __consistency_checker_format(first_options: set[int], second_options: set[int]) -> set[int]:
        """
        Return all variables removed from the second set of options to preserve consistency
        """
        pass

    class __Variable:
        """
        A class representation for a variable, which will include an id, a degree, and the number of variables connected
        """
        def __init__(self, id: int, degree: int, num_var_options: int, heuristic: CSPHeuristic):
            self.__id = id
            self.__degree = degree
            self.__num_var_options = num_var_options
            self.__heuristic = heuristic

        def get_id(self) -> int:
            return self.__id
        
        def __copy__(self) -> Self:
            new_var = type(self)(id=self.__id, degree=self.__degree, num_var_options=self.__num_var_options, heuristic=self.__heuristic)
            return new_var

        def get_copy(self) -> Self:
            return Self(id=self.__id, degree=self.__degree, num_var_options=self.__num_var_options, heuristic=self.__heuristic)
        
        def __gt__(self, other: Self) -> bool:
            if self.__heuristic == CSPHeuristic.RANDOM:
                return True
            elif self.__heuristic == CSPHeuristic.MIN_REMAINING:
                return self.__num_var_options > other.__num_var_options
            else:
                if self.__degree == other.__degree:
                    return self.__num_var_options > other.__num_var_options
                else:
                    return self.__degree < other.__degree

        def __eq__(self, other: Self) -> bool:
            if self.__heuristic == CSPHeuristic.RANDOM:
                return True
            elif self.__heuristic == CSPHeuristic.MIN_REMAINING:
                return self.__num_var_options == other.__num_var_options
            else:
                return self.__degree == other.__degree and self.__num_var_options == other.__num_var_options
        
        def __lt__(self, other: Self) -> bool:
            if self.__heuristic == CSPHeuristic.RANDOM:
                return True
            elif self.__heuristic == CSPHeuristic.MIN_REMAINING:
                return self.__num_var_options < other.__num_var_options
            else:
                if self.__degree == other.__degree:
                    return self.__num_var_options < other.__num_var_options
                else:
                    return self.__degree > other.__degree
                
        def __hash__(self):
            """
            Hash Code
            """
            return hash(self.__id)

    def __init__(self, cell_size: int, consistency: Consistency, consistency_checker: type[__consistency_checker_format], heuristic: CSPHeuristic):
        """
        When initializing a Graph object, all we need to know is the number of possible values that can be assigned to each of this graph's nodes.
        """
        self.__default_options = range(1, cell_size+1)
        self.__consistency = consistency
        self.__heuristic = heuristic
        self.__connections = {}
        self.__available_values = {}
        self.__assigned_values = {}
        self.__consistency_checker = consistency_checker
    
    def add_node(self, id: int):
        """
        Helper method to add a node to our underlying graph if it does not already exist
        """
        if id in self.__connections.keys():
            return
        self.__connections[id] = set()
        self.__available_values[id] = set()
        for value in self.__default_options:
            self.__available_values[id].add(value)
        self.__assigned_values[id] = None

    def connect(self, first: int, second: int):
        """
        Connect those two nodes to each other in the underlying graph
        """
        self.__connections[first].add(second)
        self.__connections[second].add(first)

    def remove_option(self, id: int, option: int):
        """
        Method to remove an option from the input node's set of options if it is not already removed
        """
        if option in self.__available_values[id]:
            self.__available_values[id].remove(option)

    def solve_graph(self):
        """
        Start assigning variable different values until the CSP is solved
        """
        # Set up some initial variables
        variable_queue = PriorityQueue()
        for id in self.__connections.keys():
            variable_queue.put(CSPGraph.__Variable(id=id, degree=len(self.__connections[id]), num_var_options=len(self.__available_values[id]), heuristic=self.__heuristic))
        assigned = set()
        assigned_stack = LifoQueue(maxsize=len(self.__connections))

        # For each state of the stack, remember the available value assignments for all variables at that time
        available_options_states = {}
        # Also remember what the queue of variables looked like at that time
        variable_queue_states = {}
        # Star when the stack is empty
        stack_size = 0
        available_options_states[stack_size] = self.__copy_all_available()
        variable_queue_states[stack_size] = self.__copy_variable_queue(variable_queue)

        while not variable_queue.empty():
            next = variable_queue.get()
            next_id = next.get_id()
            if next_id in assigned:
                # Repeats in the queue are okay
                continue
            assignment_result = self.__assign(id=next_id)
            successful, assignment, changed_vars = assignment_result[0], assignment_result[1], assignment_result[2]
            if successful:
                assigned.add(next_id)
                self.__assigned_values[next_id] = assignment
                stack_size += 1
                assigned_stack.put(next)
                available_options_states[stack_size] = self.__copy_all_available()
                # All the variables whose options were updated need to be reinserted into the queue - according to our heuristics their priority could ONLY have increased.
                for changed_var in changed_vars:
                    variable_queue.put(CSPGraph.__Variable(id=changed_var, degree=len(self.__connections[changed_var]), num_var_options=len(self.__available_values[changed_var]), heuristic=self.__heuristic))
                variable_queue_states[stack_size] = self.__copy_variable_queue(variable_queue)
            else:
                prev_assigned = assigned_stack.get()
                prev_assigned_id = prev_assigned.get_id()
                stack_size -= 1
                prev_assignment = self.__assigned_values[prev_assigned_id]
                self.__assigned_values[prev_assigned_id] = None
                assigned.remove(prev_assigned_id)
                # Restore the queue and variable assignments to what they were before - restoring the queue is necessary because many variables affected by this most recent assignment - which failed - could have had their options limited and priorities increased - we need to undo that
                self.__available_values = available_options_states[stack_size]
                variable_queue = variable_queue_states[stack_size]
                # This previous assignment did not work for that previously assigned variable
                self.__available_values[prev_assigned_id].remove(prev_assignment)
                # Enqueue a new variable corresponding to the previously assigned id - the priority of this new variable could only increase so it's okay that we have a duplicate in the queue for the previously assigned id
                variable_queue.put(CSPGraph.__Variable(id=prev_assigned_id, degree=len(self.__connections[prev_assigned_id]), num_var_options=len(self.__available_values[prev_assigned_id]), heuristic=self.__heuristic))

    def __assign(self, id: int) -> tuple[bool, int, set[int]]:
        """
        Assign any value that works to a given node, and update all other nodes' available values according to the arc consistency of this graph.
        Return if the assignment was successful, what the assigned value was, and the set of other variables whose available options were updated.
        """
        initial_options = self.__copy_options(id=id)
        for option in initial_options:
            # Try assigning this value to the variable
            self.__assigned_values[id] = option
            # Clear out this node's options - its only possibility now is what we are assigning it
            self.__available_values[id] = set()
            self.__available_values[id].add(option)

            # For each id, remember the variables removed from it
            removed = {}
            if self.__update_available(id=id, removed=removed):
                # As far as we know according to our consistency, we haven't backed ourselves into an unsolvable assignment set yet...
                return (True, option, removed.keys())
            else:
                for id, removed_options in removed.items():
                    # Restore the removed options to each variable
                    for option in removed_options:
                        self.__available_values[id].add(option)

        # No assignments worked
        return (False, None, None)
    
    def __copy_options(self, id: int) -> set[int]:
        """
        Helper method to copy the options associated with a particular variable
        """
        copy = set()
        for v in self.__available_values[id]:
            copy.add(v)
        return copy
    
    def __copy_all_available(self) -> dict[int, set[int]]:
        """
        Helper method to return a copy of all variables' value options
        """
        copy = {}
        for id in self.__available_values.keys():
            copy[id] = self.__copy_options(id)
        return copy
    
    def __copy_variable_queue(self, queue: PriorityQueue[__Variable]) -> PriorityQueue[__Variable]:
        """
        Helper method to return a copy of a queue of variables at a given time
        """
        copy_queue = PriorityQueue()
        holder = PriorityQueue()
        # Copy all of the variables in the new queue
        while not queue.empty():
            next = queue.get()
            copy_queue.put(copy.copy(next))
            holder.put(next)
        # Restore the old queue
        while not holder.empty():
            queue.put(holder.get())
        return copy_queue

    def __update_available(self, id: int, removed: dict[int, set[int]]) -> bool:
        """
        Given a node that has just been assigned a value, update all neighbors' available values according to our arc consistency, and return False if any problems occur.
        """
        # In all cases, we remove options from our immediate neighbors, but some consistencies check farther...
        if self.__consistency == Consistency.BACKTRACK:
            # We're not doing any kind of forward checking and we only run into a problem once we hit a node with no available values to assign to it
            for neighbor in self.__connections[id]:
                to_remove = self.__consistency_checker(self.__available_values[id], self.__available_values[neighbor])
                for var in to_remove:
                    self.__available_values[neighbor].remove(var)
                    if neighbor not in removed.keys():
                        removed[neighbor] = set()
                    removed[neighbor].add(var)
            return True
        elif self.__consistency == Consistency.FORWARD_CHECK:
            # Forward checking
            for neighbor in self.__connections[id]:
                to_remove = self.__consistency_checker(self.__available_values[id], self.__available_values[neighbor])
                for var in to_remove:
                    self.__available_values[neighbor].remove(var)
                    if neighbor not in removed.keys():
                        removed[neighbor] = set()
                    removed[neighbor].add(var)
                if len(self.__available_values[neighbor]) == 0:
                    # The neighbor RAN OUT of options to be assigned to it - the most previous variable assignment for the node of the input id did not work
                    return False
            return True
        else:
            # AC3 consistency
            edge_queue = Queue()
            for neighbor in self.__connections[id]:
                edge_queue.put((id, neighbor))
            while not edge_queue.empty():
                next_edge = edge_queue.get()
                current, neighbor = next_edge[0], next_edge[1]
                to_remove = self.__consistency_checker(self.__available_values[current], self.__available_values[neighbor])
                for var in to_remove:
                    self.__available_values[neighbor].remove(var)
                    if neighbor not in removed.keys():
                        removed[neighbor] = set()
                    removed[neighbor].add(var)
                if len(self.__available_values[neighbor]) == 0:
                    # The neighbor RAN OUT of options to be assigned to it - the most previous variable assignment for the node of the input id did not work
                    return False
                elif len(to_remove) > 0:
                    # Continue the algorithm
                    for other_neighbor in self.__connections[neighbor]:
                        if other_neighbor != current:
                            edge_queue.put((neighbor, other_neighbor))
            # The queue of edges emptied with no problems
            return True

    def get_assignment(self, id: int) -> int:
        """
        Retrieve the assignment for the variable with the given id
        """
        return int(self.__assigned_values[id]) if self.__assigned_values[id] != None else self.__assigned_values[id]

In [26]:
import json
from math import sqrt
import numpy as np

class SudokuSolver:
    """
    Solver for Sudoku Problem
    """
    
    def __init__(self, puzzle_file: str, consistency: Consistency, heuristic: CSPHeuristic):

        with open(puzzle_file, 'r') as f:
            rows = json.load(f)
            array = np.array(rows, dtype=int)
            # Turn the 2D sudoku board into a 1D array
            self.__underlying_values = array.flatten()
            # Store a value that equals the following three equivalent things: number of rows, number of columns, and number of cells
            self.__length = len(rows)
            # Store the indices that belong in each row
            self.__rows_by_indices = [[self.__length*i + j for j in range(self.__length)] for i in range(self.__length)]
            # Store the indices that belong in each column
            self.__cols_by_indices = [[self.__length*i + j for i in range(self.__length)] for j in range(self.__length)]
            # Store the indices that belong in each cell
            self.__cells_by_indices = []
            cell_length = int(sqrt(self.__length))
            for j in range(self.__length):
                current_row_indices = range(j//cell_length*cell_length, j//cell_length*cell_length+cell_length)
                current_col_indices = range((j % cell_length)*cell_length,(j % cell_length)*cell_length+cell_length)
                current_cell_indices = []
                for r_idx in current_row_indices:
                    for c_idx in current_col_indices:
                        current_cell_indices.append(self.__length*r_idx + c_idx)
                self.__cells_by_indices.append(current_cell_indices)
            # Initialize the graph
            self.__graph = CSPGraph(cell_size=self.__length, consistency=consistency, consistency_checker=self.__check_available, heuristic=heuristic)

    def solve(self):
        """
        Create an underlying CSPGraph to solve the problem.
        """
        # Display all the initial values
        for row in self.__rows_by_indices:
            print([int(self.__underlying_values[i]) for i in row])
        print('\n')

        for i in range(len(self.__underlying_values)):
            # If the value at this position is zero, then as far as we know all values are an option.
            if self.__underlying_values[i] == 0:
                self.__graph.add_node(id=i)
        
        # Connect all rows together
        for row in self.__rows_by_indices:
            self.__connect_all(row)

        # Connect all columns together
        for col in self.__cols_by_indices:
            self.__connect_all(col)
        
        # Connect all cells together
        for cell in self.__cells_by_indices:
            self.__connect_all(cell)

        # Let the graph do the solving
        self.__graph.solve_graph()

        # Now display all the assigned values
        for row in self.__rows_by_indices:
            print([self.__graph.get_assignment(id=i) if self.__underlying_values[i] == 0 else int(self.__underlying_values[i]) for i in row])

    def __connect_all(self, group: list[int]):
        """
        For a given row, column, or cell, ensure that the respective portion in the underlying graph is fully connected.
        """
        taken_values = set()
        free_spots = []
        for posn in group:
            if self.__underlying_values[posn] == 0:
                self.__graph.add_node(id=posn)
                free_spots.append(posn)
            else:
                taken_values.add(self.__underlying_values[posn])
        # Connect all free spots in the underlying graph
        for i in range(len(free_spots)-1):
            for j in range(i+1, len(free_spots)):
                self.__graph.connect(free_spots[i],free_spots[j])
        # Remove all taken_values from options of free_spots
        for spot in free_spots:
            for v in taken_values:
                self.__graph.remove_option(id=spot, option=v)

    def __check_available(self, first_values: set[int], second_values: set[int]) -> set[int]:
        """
        This is how a Sudoku Solver determines if any variable options from the second set need to be pruned.
        """
        remove = set() # If the first variable has only ONE option available for it, then the second variable CANNOT have that option in its set of options.
        if len(first_values) == 1:
            first_value = list(first_values)[0]
            if first_value in second_values:
                remove.add(first_value)
        return remove

#### We can now observe the performance of the above algorithm when trying different heuristics for ordering variables, as well as different consistencies being enforced as we solve.

In [8]:
write_board(cell_length=4, num_missing=50, output_file="sudoku.json")

In [27]:
import time

for consistency in Consistency:
    for heuristic in CSPHeuristic:
        print(f"Consistency: {consistency.name}; Heuristic: {heuristic.name}:")
        start_time = time.time()
        sudoku_solver = SudokuSolver(puzzle_file="sudoku.json",consistency=consistency, heuristic=heuristic)
        sudoku_solver.solve()
        end_time = time.time()
        print(f"Time: {end_time - start_time} seconds")
        print("\n=================================================================\n")

Consistency: BACKTRACK; Heuristic: RANDOM:
[4, 10, 13, 8, 0, 5, 2, 15, 0, 7, 0, 1, 9, 14, 3, 12]
[11, 6, 5, 0, 4, 9, 8, 0, 14, 0, 12, 10, 0, 1, 13, 0]
[0, 7, 15, 0, 11, 14, 1, 16, 13, 3, 0, 4, 10, 5, 8, 0]
[16, 1, 14, 9, 3, 13, 0, 0, 8, 5, 2, 6, 4, 0, 15, 11]
[3, 9, 10, 13, 7, 4, 15, 14, 0, 8, 5, 12, 6, 16, 11, 1]
[6, 0, 7, 2, 8, 0, 16, 3, 4, 0, 1, 11, 0, 9, 5, 15]
[15, 11, 1, 4, 9, 10, 13, 5, 7, 0, 3, 16, 0, 12, 2, 0]
[0, 0, 16, 0, 0, 0, 6, 1, 9, 0, 15, 13, 3, 10, 0, 7]
[8, 15, 11, 10, 1, 0, 4, 13, 5, 16, 6, 0, 12, 3, 0, 14]
[1, 4, 12, 6, 14, 8, 11, 2, 15, 0, 10, 3, 7, 13, 0, 5]
[13, 0, 9, 16, 15, 3, 12, 10, 1, 0, 14, 7, 2, 0, 6, 0]
[14, 2, 3, 7, 16, 6, 5, 9, 12, 4, 13, 8, 11, 15, 1, 10]
[0, 13, 4, 15, 12, 0, 0, 8, 6, 1, 11, 0, 5, 2, 7, 9]
[7, 3, 0, 14, 0, 15, 9, 11, 16, 2, 8, 5, 1, 4, 12, 13]
[5, 16, 8, 1, 13, 2, 7, 6, 0, 12, 4, 9, 15, 11, 14, 0]
[0, 12, 2, 0, 5, 1, 14, 4, 3, 0, 7, 15, 8, 6, 10, 16]


[4, 10, 13, 8, 6, 5, 2, 15, 11, 7, 16, 1, 9, 14, 3, 12]
[11, 6, 5, 3, 4, 9, 8, 7, 1

KeyboardInterrupt: 