In [1]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np

In [14]:
PROBLEM_SIZE = 5
NUM_SETS = 10
SETS = tuple(
    np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
State = namedtuple('State', ['taken', 'not_taken'])

In [15]:
SETS

(array([ True, False,  True,  True, False]),
 array([False, False, False, False, False]),
 array([False, False, False,  True,  True]),
 array([False,  True,  True, False,  True]),
 array([ True, False, False,  True, False]),
 array([False, False,  True, False, False]),
 array([False, False,  True, False,  True]),
 array([False, False, False,  True, False]),
 array([False, False, False,  True, False]),
 array([False,  True, False, False, False]))

In [11]:
def goal_check(state):
    return np.all(reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    ))


def distance(state):
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))

# h_function thought as the the distance from the starting point, evaluated on how many sets have been taken
def h_function(state):
    return len(state.taken)

def f_function(state):
    return distance(state) + h_function(state)

In [5]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

In [16]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((distance(state), state))

counter = 0
_, current_state = frontier.get()
#print(f"current_state before while: {current_state}")
while not goal_check(current_state):
    counter += 1
    for action in current_state[1]:
        new_state = State(
            current_state.taken ^ {action},
            current_state.not_taken ^ {action},
        )
        # print(f"new state for action {action}: {new_state}")
        # while we have a priority queue, the first element of the frontier is the best covering the set 
        # frontier.put((distance(new_state), new_state)) -> need to add the heuristic function to estimate the cost 
        frontier.put((f_function(new_state), new_state))

    _, current_state = frontier.get()

print(
    f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
)

Solved in 2 steps (2 tiles)


In [17]:
current_state

State(taken={0, 3}, not_taken={1, 2, 4, 5, 6, 7, 8, 9})