In [1]:
import random
import itertools
import numpy as np
from operator import eq, neg
from sortedcontainers import SortedSet
import time

In [2]:
class NaryCSP:
    """
    A nary-CSP consists of:
    domains     : a dictionary that maps each variable to its domain
    constraints : a list of constraints
    variables   : a set of variables
    var_to_const: a variable to set of constraints dictionary
    """

    def __init__(self, domains, constraints):
        """Domains is a variable:domain dictionary
        constraints is a list of constraints
        """
        self.variables = set(domains)
        self.domains = domains
        self.constraints = constraints
        self.curr_domains = None
        self.nassigns = 0
        self.ntries = 0
        self.nprunings = 0
        self.var_to_const = {var: set() for var in self.variables}
        for con in constraints:
            for var in con.scope:
                self.var_to_const[var].add(con)

    def __str__(self):
        """String representation of CSP"""
        return str(self.domains)

    def display(self, assignment=None):
        """More detailed string representation of CSP"""
        if assignment is None:
            assignment = {}
        print(assignment)
        
    def choices(self, var):
        """Return all values for var that aren't currently ruled out."""
        return (self.curr_domains or self.domains)[var]
    
    def nconflicts(self, var, val, assignment):
        """Return the number of conflicts var=val has with other variables."""
        count = 0 
        assignment[var] = val
        for con in self.var_to_const[var]:
            if (set(con.scope)).issubset(set(assignment)) and not con.holds(assignment):
                count = count + 1
        del assignment[var]
        return count
    
    def find_neighbors(self, var):
        """Return the list of neighbors of a given variable."""
        neighbors = []
        for con in self.var_to_const[var]:
            neighbors.extend(list(con.scope))
        neighbbors = list(set(neighbors))
        neighbbors.remove(var)
    
        return neighbbors

    
    def assign(self, var, val, assignment):
        """Add {var: val} to assignment; Discard the old value if any."""
        assignment[var] = val
        self.nassigns += 1
        
    def suppose(self, var, value):
        """Start accumulating inferences from assuming var=value."""
        self.support_pruning()
        removals = [(var, a) for a in self.curr_domains[var] if a != value]
        self.curr_domains[var] = [value]
        return removals
    
    def restore(self, removals):
        """Undo a supposition and all inferences from it."""
        for B, b in removals:
            self.curr_domains[B].append(b)

    def consistent(self, assignment):
        """assignment is a variable:value dictionary
        returns True if all of the constraints that can be evaluated
                        evaluate to True given assignment.
        """
        return all(con.holds(assignment)
                   for con in self.constraints
                   if all(v in assignment for v in con.scope))
    
    def prune(self, var, value, removals):
        """Rule out var=value."""
        self.curr_domains[var].remove(value)
        if removals is not None:
            removals.append((var, value))
    
    def support_pruning(self):
        """Make sure we can prune values from domains. (We want to pay
        for this only if we use it.)"""
        if self.curr_domains is None:
            self.curr_domains = {v: list(self.domains[v]) for v in self.variables}
            
    def unassign(self, var, assignment):
        """Remove {var: val} from assignment.
        DO NOT call this if you are changing a variable to a new value;
        just call assign for that."""
        if var in assignment:
            del assignment[var]
            
    def goal_test(self, state):
        """The goal is to assign all variables, with all constraints satisfied."""
        assignment = dict(state)
        return (len(assignment) == len(self.variables)
                and all(self.nconflicts(variables, assignment[variables], assignment) == 0
                        for variables in self.variables))
    
    # This is for min_conflicts search
    def conflicted_vars(self, current):
        """Return a list of variables in current assignment that are in conflict"""
        return [var for var in self.variables
                if self.nconflicts(var, current[var], current) > 0]

In [3]:
class Constraint:
    """
    A Constraint consists of:
    scope    : a tuple of variables
    condition: a function that can applied to a tuple of values
    for the variables.
    """

    def __init__(self, scope, condition):
        self.scope = scope
        self.condition = condition

    def __repr__(self):
        return self.condition.__name__ + str(self.scope)
    
    def name(self):
        return self.condition.__name__

    def holds(self, assignment):
        """Returns the value of Constraint con evaluated in assignment.

        precondition: all variables are assigned in assignment
        """
        #if not (set(self.scope)).issubset(set(assignment.keys())):
        #    return False
        #else:
        return self.condition(*tuple(assignment[v] for v in self.scope))

In [4]:
def find_all_diff_constraints(var_list):
    cons = []
    for index, var1 in enumerate(var_list):
        for var2 in var_list[index+1:]:
            cons.append(Constraint((var1, var2), lambda a, b: a != b))
    return cons

def first_unassigned_variable(assignment, csp):
    """The default variable order."""
    return first([var for var in csp.variables if var not in assignment])

def unordered_domain_values(var, assignment, csp):
    """The default value order."""
    return csp.choices(var)

def no_inference(csp, var, value, assignment, removals):
    return True

def first(iterable, default=None):
    """Return the first element of an iterable; or default."""
    return next(iter(iterable), default)

def count(seq):
    """Count the number of items in sequence that are interpreted as true."""
    return sum(map(bool, seq))

identity = lambda x: x

def shuffled(iterable):
    """Randomly shuffle a copy of iterable."""
    items = list(iterable)
    random.shuffle(items)
    return items

def argmin_random_tie(seq, key=identity):
    """Return a minimum element of seq; break ties at random."""
    return min(shuffled(seq), key=key)

def mrv(assignment, csp):
    """Minimum-remaining-values heuristic."""
    return argmin_random_tie([v for v in csp.variables if v not in assignment],
                             key=lambda var: num_legal_values(csp, var, assignment))


def num_legal_values(csp, var, assignment):
    if csp.curr_domains:
        return len(csp.curr_domains[var])
    else:
        return count(csp.nconflicts(var, val, assignment) == 0 for val in csp.domains[var])

    
def lcv(var, assignment, csp):
    """Least-constraining-values heuristic."""
    return sorted(csp.choices(var), key=lambda val: csp.nconflicts(var, val, assignment))


def forward_checking(csp, var, value, assignment, removals):
    """Prune neighbor values inconsistent with var=value."""
    
    csp.support_pruning()
    for con in csp.var_to_const[var]:
        neighbors = list(con.scope)
        neighbors.remove(var)
        unassigned_neighbors = []
        for neighbor in neighbors:
            if neighbor not in assignment.keys():
                unassigned_neighbors.append(neighbor)
        
        if len(unassigned_neighbors) == 1:
            csp.ntries = csp.ntries + 1
            unassigned_neighbor = unassigned_neighbors[0]
            for domain_val in csp.curr_domains[unassigned_neighbor]:
                assignment[unassigned_neighbor] = domain_val
                if not con.holds(assignment):
                    csp.prune(unassigned_neighbor, domain_val, removals)
                del assignment[unassigned_neighbor]
            
            if len(csp.curr_domains[unassigned_neighbor]) == 0:
                csp.nprunnings = csp.nprunnings + 1
                return False
        
        return True


def dom_j_up(csp, queue):
    return SortedSet(queue, key=lambda t: neg(len(csp.curr_domains[t[1]])))


def AC3(csp, assignment, queue=None, removals=None, arc_heuristic=dom_j_up):
    """[Figure 6.3]"""
    if queue is None:
        queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.find_neighbors(Xi)}
    csp.support_pruning()
    queue = arc_heuristic(csp, queue)
    checks = 0
    while queue:
        (Xi, Xj) = queue.pop()
        revised, checks = revise(csp, Xi, Xj, assignment, removals, checks)
        if revised:
            if not csp.curr_domains[Xi]:
                return False, checks  # CSP is inconsistent
            for Xk in csp.find_neighbors(Xi):
                if Xk != Xj:
                    queue.add((Xk, Xi))
    return True, checks  # CSP is satisfiable


def revise(csp, Xi, Xj, assignment, removals, checks=0):
    """Return true if we remove a value."""
    revised = False
    for con in csp.var_to_const[Xj]:
        neighbors = list(con.scope)
        if Xi in neighbors:
            if Xj in neighbors:
                neighbors.remove(Xj)
                neighbors.remove(Xi)
                unassigned_neighbors = []
                for neighbor in neighbors:
                    if neighbor not in assignment.keys():
                        unassigned_neighbors.append(neighbor)
        
                if len(unassigned_neighbors) == 0:
                    csp.ntries = csp.ntries + 1
                    for x in csp.curr_domains[Xi]:
                        conflict = True
                        assignment[Xi] = x
                        for y in csp.curr_domains[Xj]:
                            assignment[Xj] = y
                            if con.holds(assignment):
                                conflict = False
                            checks = checks + 1
                            if not conflict:
                                break
                        if conflict:
                            csp.prune(Xi, x, removals)
                            csp.nprunings = csp.nprunings + 1
                            revised = True
        return revised, checks


def mac(csp, var, value, assignment, removals, constraint_propagation=AC3):
    """Maintain arc consistency."""
    return constraint_propagation(csp, assignment, {(X, var) for X in csp.find_neighbors(var)}, removals)

In [5]:
def backtracking_search(csp, select_unassigned_variable=first_unassigned_variable,
                        order_domain_values=unordered_domain_values, inference=no_inference):

    def backtrack(assignment):
        if len(assignment) == len(csp.variables):
            return assignment
        
        var = select_unassigned_variable(assignment, csp)
        for value in order_domain_values(var, assignment, csp):
            if 0 == csp.nconflicts(var, value, assignment):
                csp.assign(var, value, assignment)
                removals = csp.suppose(var, value)
                if inference(csp, var, value, assignment, removals):
                    result = backtrack(assignment)
                    if result is not None:
                        return result
                csp.restore(removals)
        csp.unassign(var, assignment)
        return None

    result = backtrack({})
    assert result is None or csp.goal_test(result)
    return result

In [6]:
# C R O S S + R O A D S = D A N G E R
# 9 6 2 3 3 + 6 2 5 1 3 = 1 5 8 7 4 6

domain_3 = {'C': set(range(1, 10)), 'R': set(range(1, 10)), 'D': set(range(1, 10)),
            'O': set(range(0, 10)), 'S': set(range(0, 10)), 'A': set(range(0, 10)),
            'N': set(range(0, 10)), 'G': set(range(0, 10)), 'E': set(range(0, 10)),
            'C1': set(range(0, 2)), 'C2': set(range(0, 2)), 'C3': set(range(0, 2)),
            'C4': set(range(0, 2)), 'C5': set(range(0, 2))}

constraints_3 = [Constraint(('S', 'R', 'C1'), lambda s, r, c1: s + s == r + 10 * c1),
                 Constraint(('S', 'D', 'E', 'C1', 'C2'), lambda s, d, e, c1, c2: c1 + s + d == e + 10 * c2),
                 Constraint(('O', 'A', 'G', 'C2', 'C3'), lambda o, a, g, c2, c3: c2 + o + a == g + 10 * c3),
                 Constraint(('R', 'O', 'N', 'C3', 'C4'), lambda r, o, n, c3, c4: c3 + r + o == n + 10 * c4),
                 Constraint(('C', 'R', 'A', 'C4', 'C5'), lambda c, r, a, c4, c5: c4 + c + r == a + 10 * c5),
                 Constraint(('D', 'C5'), lambda d, c5: d == c5)]

constraints_3.extend(find_all_diff_constraints(['C', 'R', 'O', 'S', 'A', 'D', 'N', 'G', 'E']))

cross_roads_danger = NaryCSP(domain_3, constraints_3)

### Basic Backtracking

In [8]:
cross_roads_danger = NaryCSP(domain_3, constraints_3)
start = time.time()
for i in range(0, 10):
    result = backtracking_search(cross_roads_danger)
end = time.time()
print("Time taken for 10 runs: ", end - start)
print("Average time taken: ", (end - start)/10)

Time taken for 10 runs:  105.68341898918152
Average time taken:  10.568341898918153


### Backtracking with MRV heuristic

In [9]:
cross_roads_danger = NaryCSP(domain_3, constraints_3)
start = time.time()
for i in range(0, 10):
    result = backtracking_search(cross_roads_danger, select_unassigned_variable = mrv)
end = time.time()
print("Time taken for 10 runs: ", end - start)
print("Average time taken: ", (end - start)/10)

Time taken for 10 runs:  2.780909776687622
Average time taken:  0.27809097766876223


### Backtracking with MRV and LCV heuristics

In [10]:
cross_roads_danger = NaryCSP(domain_3, constraints_3)
start = time.time()
for i in range(0, 10):
    result = backtracking_search(cross_roads_danger, select_unassigned_variable = mrv, order_domain_values = lcv)
end = time.time()
print("Time taken for 10 runs: ", end - start)
print("Average time taken: ", (end - start)/10)

Time taken for 10 runs:  10.427436113357544
Average time taken:  1.0427436113357544


### Backtracking with MRV, LCV heuristics and FC, MAC filtering

In [11]:
cross_roads_danger = NaryCSP(domain_3, constraints_3)
start = time.time()
for i in range(0, 10):
    result = backtracking_search(cross_roads_danger, 
                                 select_unassigned_variable = mrv, 
                                 order_domain_values = lcv, 
                                 inference = forward_checking)
end = time.time()
print("Time taken for 10 runs: ", end - start)
print("Average time taken: ", (end - start)/10)


start = time.time()
for i in range(0, 10):
    result = backtracking_search(cross_roads_danger, 
                                 select_unassigned_variable = mrv, 
                                 order_domain_values = lcv, 
                                 inference = mac)
end = time.time()
print("\n\nTime taken for 10 runs: ", end - start)
print("Average time taken: ", (end - start)/10)

Time taken for 10 runs:  2.414524793624878
Average time taken:  0.2414524793624878


Time taken for 10 runs:  0.011522054672241211
Average time taken:  0.001152205467224121
