In [55]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue
import numpy as np

In [56]:
PROBLEM_SIZE = 10
NUM_SETS = 40
SETS = tuple(
    [
        np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)])
        for _ in range(NUM_SETS)
    ]
)
# prob 30% to be true, 70% to be false. This is the problem space.
State = namedtuple("State", ["taken", "not_taken"])

In [57]:
# Returns the state covered by the taken sets
def overall_state_covered(taken) -> list[bool]:
    return reduce(
        np.logical_or,
        [SETS[i] for i in taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


# Returns True if all state are covered, False otherwise
def goal_check(state: State("taken", "not_taken")) -> bool:
    return np.all(overall_state_covered(state.taken))

In [58]:
# Is the problem solvable?
assert goal_check((State(set(range(NUM_SETS)), set()))), "Problem not solvable"

In [59]:
# Returns the number of sets taken, which is the number of steps done 'till now.
def g(state: State("taken", "not_taken")) -> int:
    return len(state.taken)


# Returns 0 if we are covering every state, we don't need any more step.
# Returns 1 otherwise.
# This function is optimistic because if we are not covering some state,
# it assumes that we will just need 1 more step to cover them.
def h(state: State("taken", "not_taken")) -> int:
    return 0 if goal_check(state) else 1


# Returns the distance to the solution. Is optimistic and represents
# in how many steps we can reach goal, given the not_taken state.
def h3(state: State("taken", "not_taken")) -> int:
    already_covered = overall_state_covered(state.taken)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    # state not taken ordered by num of sets covered.
    candidates = sorted(
        (sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS),
        reverse=True,
    )
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken


# Returns the number of overlap in the state, normalized.
# num of overlap is min_max_normalized because helps us to take a better solution between the best one,
# so it doen't have to influence the other steps.
def num_of_overlap(
    state: State("taken", "not_taken"), normalize: bool = True
) -> int:
    # num of overlap is defined as the number of True that are stacked.
    n_overlap = np.sum(np.sum([SETS[i] for i in state.taken], axis=0) > 1)
    if n_overlap > 1 and normalize:
        # the maximum number of overlap is when all the sets overlaps.
        min_max_normalizer = g(state) * PROBLEM_SIZE
        n_overlap = n_overlap / min_max_normalizer
    return n_overlap


# Returns the sum between g, h and the min_max_normalized number of overlap.
# def a_star_func(state: State("taken", "not_taken")) -> int:
#     return g(state) + h(state) + num_of_overlap(state)


# Returns the sum between g, h and the min_max_normalized number of overlap
def a_star_func(state: State("taken", "not_taken")) -> int:
    return g(state) + h3(state) + num_of_overlap(state)

In [60]:
# Returns True if the action doesn't add more covered set.
# Returns False if the action cover set that are currently not covered.
def is_overlapping(taken: set[int], action: int) -> bool:
    new_taken = taken | {action}
    state_covered_by_new_taken = overall_state_covered(new_taken)

    state_covered_by_taken = overall_state_covered(taken)

    if all(state_covered_by_new_taken == state_covered_by_taken):
        return True
    else:
        return False


# Search function finds the path to the solution.
def search(queue: object):
    frontier = queue
    state = State(set(), set(range(NUM_SETS)))

    frontier.put((a_star_func(state), state))

    counter = 0
    _, current_state = frontier.get()  # take the state

    # remove sets containing all false.
    for set_index in current_state.not_taken:
        if not any(SETS[set_index]):
            current_state.not_taken - {set_index}

    # With this we don't insert already queued states.
    # Key will be a tuple representing the state.taken.
    # Value will not be usefull in this case.
    queued_taken_state = {}

    # check if state is the solution
    while not goal_check(current_state):
        counter += 1
        # for all actions that we can take now
        for action in current_state.not_taken:
            # if the action let us cover more sets
            if not is_overlapping(current_state.taken, action):
                new_state = State(
                    current_state.taken | {action},
                    current_state.not_taken - {action},
                )
                if not tuple(new_state.taken) in queued_taken_state.keys():
                    frontier.put((a_star_func(new_state), new_state))
                    queued_taken_state[tuple(new_state.taken)] = 1

        _, current_state = frontier.get()

    print(
        f"Solved in {counter:,} steps ({len(current_state.taken)} tiles), with state: {current_state.taken} and overlap: {num_of_overlap(current_state, normalize=False)}"
    )

In [61]:
from time import time

start = time()
print("A* solution: ")
search(PriorityQueue())
end = time()
print(f"Execution time: {end-start:.2f} seconds.")

print()

start = time()
print("Breadth-first solution :")
search(SimpleQueue())
end = time()
print(f"Execution time: {end-start:.2f} seconds.")

A* solution: 
Solved in 12 steps (3 tiles), with state: {0, 18, 28} and overlap: 0
opt_counter: 26
Execution time: 0.05 seconds.

Breadth-first solution :
Solved in 1,005 steps (3 tiles), with state: {0, 9, 11} and overlap: 2
opt_counter: 14449
Execution time: 1.54 seconds.
