In [1]:
from typing import *

# CPS

In [2]:

TVar = TypeVar('TVar')
TVal = TypeVar('TVal')

class CpsConstraint(Generic[TVal]):
    _predicates : List[Callable[[TVal, TVal], bool]]
    
    def __init__(self, predicate: Callable[[TVal, TVal], bool] = None):
        self._predicates = []
        
        if predicate is not None:
            self._predicates.append(predicate)
    
    def append(self, predicate: Callable[[TVal, TVal], bool]) -> None:
        self._predicates.append(predicate)
        
    def is_conflicting(self, a: TVal, b: TVal) -> bool:
        """
        Check if the two values a and b would conflict with any constraint
        """
        
        for p in self._predicates:
            if not p(a, b):
                return True
        return False
    
        

class CpsConfiguration(Generic[TVar, TVal]):
    _variables : List[TVar]
    _values : List[TVal]
    _constraints: Dict[TVar, Dict[TVar, CpsConstraint[TVal]]]
    
    
    def __init__(self, variables : List[str], values : List[str]):
        self._variables = variables
        self._values = values
        self._constraints = {}

    def variables(self) -> List[TVar]:
        return self._variables.copy()
    
    def values(self) -> List[TVal]:
        return self._values.copy()
    
    def _ensure_exists(self, source : TVar, target : TVar) -> None:
        if source not in self._constraints:
            self._constraints[source] = {}
        
        s = self._constraints[source]
        
        if target not in s:
            s[target] = CpsConstraint()
            
    
    def addConstraint(self, source : TVar, target : TVar, predicate : Callable[[TVal, TVal], bool]) -> None:
        """
        Add a new, one directional constraint
        """
        
        self._ensure_exists(source, target)
        
        self._constraints[source][target].append(predicate)
    
        
    def addConstraintRev(self, source : TVar, target : TVar, predicate : Callable[[TVal, TVal], bool]) -> None:
        """
        Add a new, two directional constraint.
        For the target -> source direction the parameters for the predicate are swapped, so the predicate is still called with (source, target)
        """
        
        self._ensure_exists(source, target)
        self._constraints[source][target].append(predicate)
        
        self._ensure_exists(target, source)
        self._constraints[target][source].append(lambda a, b: predicate(b, a))
    
    
    def addUnaryConstraint(self, source : TVar, predicate : Callable[[TVal], bool]) -> None:
        """
        Add a new unary constraint. 
        This is a constraint that does not have a target (eg. var must be val)
        """
        self._ensure_exists(source, None)
        
        self._constraints[source][None].append(lambda a, b: predicate(a))    
        
        
    def allNotEqual(self, variables : List[TVar]) -> None:
        """
        Constraint all variables against each other. (ie. all must have different values)
        """        
        for source in variables:
            for target in variables:
                if source == target:
                    continue
                
                self.addConstraint(source, target, lambda a, b: a != b)

                
    def notEqual(self, source : TVar, target : TVar) -> None:
        self.addConstraintRev(source, target, lambda a, b: a != b)

        
    def equal(self, source : TVar, target : TVar) -> None:
        self.addConstraintRev(source, target, lambda a, b: a == b)


    def mustBe(self, source : TVar, value : TVal) -> None:
        self.addUnaryConstraint(source, lambda a: a == value)
        

    def get_constraints(self, variable : TVar) -> Dict[TVar, CpsConstraint[TVal] ]:
        return self._constraints[variable]




In [3]:

class CpsState(Generic[TVar, TVal]):
    
    _parent = None
    _variable : TVar = None
    _value : TVal = None
    _config : CpsConfiguration[TVar, TVal]    
    _assigned_cache = None
    _unassigned_cache = None
    
    
    def __init__(self, config : CpsConfiguration[TVar, TVal], parent = None, variable : TVar = None, value : TVal = None):
        self._config = config
        self._parent = parent
        self._variable = variable
        self._value = value
    
    
    def assign(self, variable, value) -> 'CpsState[TVar, TVal]':
        """
        Returns a new state with the given assignment. (A state is immutable)
        """
        if variable not in self._config.variables():
            raise Exception("Invalid variable assigned: " + str(variable), ", allowed: ", self._config.variables())
        
        if value not in self._config.values():
            raise Exception("Invalid value assigned: " + str(value), ", allowed: ", self._config.values())
        
        return CpsState(self._config, self, variable, value)
    
    
    def get_assignments(self) -> Dict[TVar, TVal]:
        """
        Get all assignments
        """
        
        if self._assigned_cache is not None:
            return self._assigned_cache.copy() # return a copy so children can add to it
        
        if self._parent is None:
            return {}
        
        if self._variable is None:
            return self._parent.get_assignments()
        
        t = self._variable, self._value
        
        t2 = self._parent.get_assignments()
        t2[self._variable] = self._value
        self._assigned_cache = t2
        return t2
    
    
    def get_assignment(self, variable: TVar):
        
        assignments = self.get_assignments()
        for var in assignments:
            val = assignments[var]
            
            if var == variable:
                return val
        return None
    
    
    def get_unassigned(self) -> List[TVar]:
        """
        Get all variables that currently have no assignment
        """
        if self._unassigned_cache is not None:
            return self._unassigned_cache.copy()
        
        vars = self._config.variables()
        for a in self.get_assignments():
            if a in vars:
                vars.remove(a)
            
        self._unassigned_cache = vars
        return vars
    
    
    def get_variables(self) -> List[TVar]:
        return self._config.variables().copy()
    
    def get_values(self) -> List[TVal]:
        return self._config.values().copy()
    
    def get_constraints_for(self, variable: TVar) -> Dict[TVar, CpsConstraint[TVal]]:
        return self._config.get_constraints(variable)
    
    def is_complete(self) -> bool:
        """
        check if the CPS has assigned a value to all variables
        """
        return len(self.get_unassigned()) == 0
    
    
    def get_variables_with(self, value : TVal) -> List[TVar]:
        """
        Get all variable the value was assigned to
        """
        return [name for name, val in self.get_assignments().items() if val == value]
    
    
    def is_consistent(self) -> bool:
        
        for var in self.get_variables():
            
            # Assumes an inconsistent value was never assigned
            if self.get_assignment(var) is not None:
                continue
            
            any_consistent = False
            for val in self.get_values():
                if self.will_be_consistent(var, val):
                    any_consistent = True
            
            if not any_consistent:
                return False
        
        return True
                
    
    def will_be_consistent(self, variable: TVar, value: TVal):
        """
        Check if a variable assignment would be consistent
        """
        
        constraints = self._config.get_constraints(variable)
        
        for var in constraints:
            constraint = constraints[var] 
            
            if var is None:
                if constraint.is_conflicting(value, None):
                    return False
            else:
                otherValue = self.get_assignment(var)
                
                if otherValue is not None and constraint.is_conflicting(value, otherValue):
                    return False
                
        return True
    
    
    def get_available_values(self, variable : TVar) -> List[TVal]:
        """
        Get all values that can be assigned to variable
        """
            
        constraints = self._config.get_constraints(variable)
                   
        values = []
        
        for value in self._config.values():
        
            conflict = False
                
            for var in constraints:
                constraint = constraints[var]
                
                # if c is None it is an unary constraint
                if var is None:
                    if constraint.is_conflicting(value, None):
                        conflict = True    
                    continue
                
                otherValue = self.get_assignment(var)
                if otherValue is None: # other is not yet assigned. No check necessary
                    continue
                
                if constraint.is_conflicting(value, otherValue):
                    conflict = True
                    break
                
            if not conflict:
                values.append(value)
                
        return values
            
            
            
        
        
        
    


In [4]:
config = CpsConfiguration(["a", "b", "c"], [1, 2])
base_state = CpsState(config)

final = base_state.assign("a", 1).assign("b", 2).assign("c", 2)

display(final.is_complete())
display(final.get_variables_with(2))

True

['b', 'c']

# CSP Parser

* Dataset: https://huggingface.co/datasets/allenai/ZebraLogicBench

### Task:
* Read ZebraLogicBench, extract puzzles into CSP variables, domains, and constraint lists<br>
* Example: houses, attributes, clues like “Alice lives in the red house”<br>

In [5]:
import pandas as pd
import re

"""
Read dataset and parse it to a dataframe
"""
zebraLogicBench = pd.read_parquet("hf://datasets/allenai/ZebraLogicBench/grid_mode/test-00000-of-00001.parquet")

"""
The column "size" tells us the number of houses and categories
(e.g. 5*6 means that we have 5 houses and 6 categories -> 30 variables)
"""

#TODO Handling categories with same values

def extract_variables(row: pd.Series) -> list[str]:
    return [v.lower() for v in re.findall(r'`([^`]+)`', str(row["puzzle"]))]

def extract_size(row: pd.Series) -> list[int]:
    size_str = row['size'].split('*')
    size_int = [int(x) for x in size_str]
    return size_int

def extract_domains(size: list[int]) -> list[int]:
    number_of_houses = size[0]
    return list(range(1, number_of_houses + 1))

def extract_clues(puzzle_text: str) -> str:
    if "## Clues:" not in puzzle_text:
        raise ValueError("No clues section found!")
    return puzzle_text.split("## Clues:", 1)[1].strip()

def extract_clue_sentences(clues : str) -> list[list[str]]:
    prepared_clues = []
    clue_sentences = re.findall(r"\d+\.\s*(.+)", clues)
    for sentence in clue_sentences:
        s = sentence.lower()
        s = re.sub(r"[^\w\s-]", "", s)
        words_of_s = s.split()
        prepared_clues.append(words_of_s)

    # TODO Change the elements of the lists for simple parsing afterwards


    return prepared_clues

def next_door(a, b):
    return a == b - 1 or a == b + 1

def right_next_to(a, b):
    return (b + 1) == a

# TODO Parse the clue sentences
# TODO Define and add the constraints per instance
def define_constraints(csp : CpsConfiguration, clues : List[List[str]], csp_variables:list[str]) -> None:
    for sentence in clues:
        entities = []
        for word in sentence:
            if word in csp_variables:
                entities.append(word)

def all_not_equal(csp: CpsConfiguration,size: list[int], csp_variables: list[str]):
    expected_number_of_variables = size[0] * size[1]
    if expected_number_of_variables != len(csp_variables):
        raise ValueError("Number of variables does not match!")
    for j in range(size[0], size[0] * size[1] + 1, size[0]):
        csp.allNotEqual(csp_variables[j - size[0]:j])

for i, row in zebraLogicBench.iterrows():
    csp_variables = extract_variables(row)
    size = extract_size(row)
    domains_int = extract_domains(size)
    domains_str = [str(x) for x in domains_int]
    clues_text = extract_clues(row['puzzle'])
    prepared_clues = extract_clue_sentences(clues_text)

    csp = CpsConfiguration(csp_variables, domains_str)
    all_not_equal(csp, size, csp_variables)
    print(csp_variables)
    # define_constraints(csp, clues, csp_variables)


# Parameter are two lists
# Constraint definition per methods on the instance
# csp = CpsConstraint = Dict[TVal, Callable[[TVal, TVal], bool]]

['peter', 'alice', 'bob', 'eric', 'arnold', 'norwegian', 'german', 'dane', 'brit', 'swede', 'fantasy', 'biography', 'romance', 'mystery', 'science fiction', 'stir fry', 'grilled cheese', 'pizza', 'spaghetti', 'stew', 'red', 'green', 'blue', 'yellow', 'white', 'bird', 'dog', 'cat', 'horse', 'fish']
['alice', 'eric', 'arnold', 'peter', 'artist', 'engineer', 'teacher', 'doctor', 'fantasy', 'science fiction', 'mystery', 'romance', 'google pixel 6', 'iphone 13', 'oneplus 9', 'samsung galaxy s21']
['bob', 'alice', 'peter', 'eric', 'arnold', 'carol', 'fred', 'timothy', 'samantha', 'alice', 'meredith', 'bella', 'pop', 'hip hop', 'classical', 'jazz', 'rock', 'country', 'average', 'very tall', 'tall', 'super tall', 'very short', 'short']
['bob', 'arnold', 'carol', 'alice', 'peter', 'eric', 'sarah', 'janelle', 'aniya', 'kailyn', 'holly', 'penny', 'fred', 'samantha', 'bella', 'meredith', 'alice', 'timothy', 'city', 'mountain', 'camping', 'beach', 'cruise', 'cultural', 'romance', 'mystery', 'histor

# Test

Testing CPS components on a puzzle

In [6]:
colors = [
    "gelb",
    "grün",
    "weiß",
    "rot",
    "blau"
]

nationality = [
    "Norweger",
    "Ukrainer",
    "Engländer",
    "Spanier",
    "Japaner"
]

pet = [
    "Zebra",
    "Hund",
    "Schnecke",
    "Pferd",
    "Fuchs"
]

beverage = [
    "Kaffee",
    "O-Saft",
    "Milch",
    "Tee",
    "Wasser"
]

zigarette = [
    "Kools",
    "Chesterfield",
    "Old Gold",
    "Lucky Strike",
    "Parliaments"
]

values = [1, 2, 3, 4, 5]
variables = []
variables += colors
variables += nationality
variables += pet
variables += beverage
variables += zigarette


config = CpsConfiguration[str, int](variables, values)

config.allNotEqual(colors)
config.allNotEqual(nationality)
config.allNotEqual(pet)
config.allNotEqual(beverage)
config.allNotEqual(zigarette)


def next_door(a, b):
    return a == b - 1 or a == b + 1

def right_next_to(a, b):
    return (b + 1) == a 

config.equal("Engländer", "rot")
config.equal("Spanier", "Hund")
config.equal("Kaffee", "grün")
config.equal("Ukrainer", "Tee")
config.addConstraintRev("grün", "weiß", right_next_to) # a should be to the right of b (ie. one more)
config.equal("Old Gold", "Schnecke")
config.equal("Kools", "gelb")
config.mustBe("Milch", 3)
config.mustBe("Norweger", 1)
config.addConstraintRev("Chesterfield", "Fuchs", next_door)
config.addConstraintRev("Kools", "Pferd", next_door)
config.equal("Lucky Strike", "O-Saft")
config.equal("Japaner", "Parliaments")
config.addConstraintRev("Norweger", "blau", next_door)




In [7]:
baseState = CpsState(config)

test_state = baseState.assign("Norweger", 1).assign("Ukrainer", 2)
 
display(test_state.will_be_consistent("Spanier", 1))
display(test_state.will_be_consistent("Spanier", 2))
display(test_state.will_be_consistent("Spanier", 3))

False

False

True

In [8]:
baseState = CpsState[str, int](config)

for val in variables[3:8]:
    display(val, baseState.get_available_values(val))

'rot'

[1, 2, 3, 4, 5]

'blau'

[1, 2, 3, 4, 5]

'Norweger'

[1]

'Ukrainer'

[1, 2, 3, 4, 5]

'Engländer'

[1, 2, 3, 4, 5]

# BT-Search

In [9]:


from abc import abstractmethod


class BtSearchTools(Generic[TVar, TVal]):
    
    @abstractmethod
    def get_next_variable(self, state : CpsState[TVar, TVal]):
        """
        Get the next variable that should be assigned
        """
        raise NotImplementedError()
    
    @abstractmethod
    def get_values(self, state : CpsState[TVar, TVal], variable : TVar) -> List[TVal]:
        """
        """
        raise NotImplementedError()
    
    @abstractmethod
    def inference(self, state : CpsState[TVar, TVal], variable : TVar, value : TVal) -> bool:
        """
        """
        raise NotImplementedError()


In [10]:
class BtSearch(Generic[TVar, TVal]):
    _tool : BtSearchTools
    
    count : int
    result : CpsState[TVar, TVal] | None
    
    def __init__(self, tool: BtSearchTools):
        self._tool = tool
        self.count = 0
        
    
    @staticmethod
    def search(tool: BtSearchTools, initialState : CpsState[TVar, TVal]) -> 'BtSearch[TVar, TVal]':
        instance = BtSearch(tool)
        instance.result = instance._search(initialState)
        return instance
        
    def _search(self, state : CpsState[TVar, TVal]) -> CpsState[TVar, TVal] | None:
        
        if state.is_complete():
            return state
        
        if not state.is_consistent():
            return None
        
        self.count = self.count + 1
        
        variable = self._tool.get_next_variable(state)
        if variable is None:
            return None
        
        for value in self._tool.get_values(state, variable):
            
            if state.will_be_consistent(variable, value):
                new_state = state.assign(variable, value)
                
                if self._tool.inference(new_state, variable, value):
                    
                    result = self._search(new_state)
                    
                    if result is not None:
                        return result
        
        return None
    

def get_bt_result(bt_search : BtSearch[TVar, TVal]) -> Dict[TVal, List[TVar]]:
    
    if bt_search.result is None:
        print("No result found")
        return {}
    
    print("Step count: ", bt_search.count)
    
    d = {}
    
    assignments = bt_search.result.get_assignments()
    
    for val in bt_search.result.get_values():
        
        d[val] = bt_search.result.get_variables_with(val)
        # l = []
        
        # for var in assignments:
        #     if assignments[var] == val:
        #         l.append(var)
                
        # d[val] = l
        
    return d
    

## Simple BT-Search

This is a simple brute force search method. No fancy optimizations yet

In [11]:

class SimpleBtSearch(Generic[TVar, TVal], BtSearchTools[TVar, TVal]):
    
    def __init__(self):
        pass
    
    
    def get_next_variable(self, state : CpsState[TVar, TVal]):
        """
        Get the next variable that should be assigned
        """
        
        unassigned = state.get_unassigned()
        if len(unassigned) == 0:
            return None
        return unassigned[0]
        
    
    def get_values(self, state : CpsState[TVar, TVal], variable : TVar) -> List[TVal]:
        """
        """
        return state.get_values()
        
    
    def inference(self, state : CpsState[TVar, TVal], variable : TVar, value : TVal) -> bool:
        return True


In [12]:

result = BtSearch.search(SimpleBtSearch(), baseState)
get_bt_result(result)

Step count:  451


{1: ['gelb', 'Norweger', 'Fuchs', 'Wasser', 'Kools'],
 2: ['blau', 'Ukrainer', 'Pferd', 'Tee', 'Chesterfield'],
 3: ['rot', 'Engländer', 'Schnecke', 'Milch', 'Old Gold'],
 4: ['weiß', 'Spanier', 'Hund', 'O-Saft', 'Lucky Strike'],
 5: ['grün', 'Japaner', 'Zebra', 'Kaffee', 'Parliaments']}

**Expected**

```
House: 1
  gelb
  Norweger
  Fuchs
  Wasser
  Kools
House: 5
  grün
  Japaner
  Zebra
  Kaffee
  Parliaments
-- Steps: 2277
```

# BT-Search

BT-Search with MRV and "Gradheuristik"

 * MRV (Minimum remaining values): choose the variable with the least number of free values next
 * Gradheuristk: if MRV returns multipel variables, choose the one with more constraints

In [16]:
class MrvBtSearch(Generic[TVar, TVal], BtSearchTools[TVar, TVal]):
    
    def __init__(self):
        pass
    
    
    def get_next_variable(self, state : CpsState[TVar, TVal]):
        """
        Get the next variable that should be assigned
        """
        
        # MRV
        open_values = 10000
        variables = []
        
        for var in state.get_unassigned():
            consistent_values = 0
            
            for val in state.get_values():
                if state.will_be_consistent(var, val):
                    consistent_values = consistent_values + 1
            
            if consistent_values == 0:
                raise Exception("No consistent value found")
            
            if consistent_values > 0 and consistent_values < open_values:
                variables = [var]
                open_values = consistent_values
            
            elif consistent_values == open_values:
                variables.append(var)
                
        if len(variables) == 0:
            # something is wrong
            raise Exception("MRV selected no value")
            # return None
        
        if len(variables) == 1:
            return variables[0]
        
        
        # Gradheuristik
        variable = None
        variable_constraints = -1
        
        
        for var in variables:
            constraints = state.get_constraints_for(var)
            
            # count the number of constraints that point to unassigned variables
            c = 0
            for to in constraints:
                if to is not None and state.get_assignment(to) is None:
                    c = c + 1
                    
            if c > variable_constraints:
                variable = var
                variable_constraints = c
        
        if variable is None:
            raise Exception("Gradheuristik selected no value")
        
        return variable
        
        
    
    def get_values(self, state : CpsState[TVar, TVal], variable : TVar) -> List[TVal]:
        """
        """
        return state.get_values()
        
    
    def inference(self, state : CpsState[TVar, TVal], variable : TVar, value : TVal) -> bool:
        return True
    

In [14]:
result = BtSearch.search(MrvBtSearch(), baseState)
get_bt_result(result)

Step count:  52


{1: ['Norweger', 'gelb', 'Kools', 'Wasser', 'Fuchs'],
 2: ['blau', 'Pferd', 'Ukrainer', 'Tee', 'Chesterfield'],
 3: ['Milch', 'rot', 'Engländer', 'Old Gold', 'Schnecke'],
 4: ['weiß', 'Spanier', 'Hund', 'O-Saft', 'Lucky Strike'],
 5: ['grün', 'Kaffee', 'Japaner', 'Parliaments', 'Zebra']}

In [17]:
names = [
    "Peter",
    "Eric",
    "Arnold"
]

colors = [
    "red",
    "white",
    "yellow"
]

child = [
    "Fred",
    "Meridith",
    "Bella"
]

variables = []
variables += names
variables += colors
variables += child

values = [1, 2, 3]

config = CpsConfiguration[str, int](variables, values)

config.allNotEqual(names)
config.allNotEqual(colors)
config.allNotEqual(child)



config.equal("Arnold", "red")
config.addConstraintRev("Fred", "Eric", lambda a, b: a < b)
config.addUnaryConstraint("red", lambda a: a == 2)
config.addUnaryConstraint("Bella", lambda a: a == 1)
config.equal("white", "Meredith")


baseState = CpsState[str, int](config)
result = BtSearch.search(MrvBtSearch(), baseState)
get_bt_result(result)

Step count:  9


{1: ['Bella', 'Peter', 'white'],
 2: ['red', 'Arnold', 'Fred'],
 3: ['Eric', 'Meridith', 'yellow']}