Set-Covering with A* algorithm

In [None]:
import numpy as np
from math import ceil
from collections import namedtuple, deque
from functools import *
from queue import PriorityQueue, LifoQueue, SimpleQueue
from random import random
from tqdm import tqdm

In [None]:
PROBLEM_SIZE = 5
NUM_SETS = 6

State = namedtuple('State', ['taken', 'not_taken'])
sets= tuple(np.array([random() < .2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))


sets=tuple((np.array([True,False,False,True,False]),np.array([True,False,False,True,False]),np.array([False,True,False,True,False]),
           np.array([True,False,False,True,True]),np.array([False,False,True,False,False]),np.array([True,False,False,True,False]),np.array([True,False,False,True,False])))


In [None]:
def covered(state):
    return reduce(
        np.logical_or,
        [sets[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

In [None]:
def goal_check(state):
    return np.all(covered(state))

In [None]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

## Depth First

In [None]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.pop()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.pop()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

4it [00:00, ?it/s]

Solved in 4 steps (4 tiles)





## Breadth First

In [None]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.popleft()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

87it [00:00, 43301.82it/s]

Solved in 87 steps (3 tiles)





## Greedy Best First

In [None]:
def f(state):
    missing_size = PROBLEM_SIZE - sum(covered(state))
    return missing_size

In [None]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

3it [00:00, ?it/s]

Solved in 3 steps (3 tiles)





## A*

 I proposed an heuristic (h2) which takes into account the sets that cover items that are: uncovered in the actual state and not covered by any other set. Basically i order based on how many uncovered items a set covers that are not covered by any other set in the remaining sets.
 Of course it also take into account the number of uncovered elements contained in the set (in case of ties). This heuristic is admissible because if there are items covered by only one set then for sure i will have to take that set at some point. The function is optimistic because when computing the summation over the candidates (ordered in the way already explained) we consider how many uncovered items are covered by each set in total (so if for example a set covers 3 items but it is the only set to cover 1 item in the summation it will count as 3 and not as 1, the number of unique items covered is used just for the ordering). The heuristic is also consistent (or monotonic) because as we add more nodes to the state the number of covered items decreases, in this way a lower number of sets will be needed to cover them all (or in the worst cae exactly the same number).

In [None]:
def get_rem_l(list,x):
    index_to_remove = np.where(list == x)[0]
    if index_to_remove.size > 0:
        new_list=np.delete(list, index_to_remove[0],0)

    return new_list

In [None]:
def sort(s, all_remaining_sets, already_covered):
    unique_count = 0
    for i in range(len(s)) :
        if  s[i] == True:
            unique_item = True
            for other_set in all_remaining_sets:
                if other_set[i] == True:
                    unique_item = False
                    break
            if unique_item:
                unique_count += 1
    
    num_uncovered= sum(np.logical_and(s, np.logical_not(already_covered)))
    return (unique_count, num_uncovered)

In [None]:
def h1(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in sets), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken


def h2(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)

    #I want the ordering to be done based also on how unique a set is 
    
    list = np.array([np.logical_and(s, np.logical_not(already_covered)) for s in sets])
    candidates = sorted(list, key=lambda x: sort(x, get_rem_l(list,x), already_covered),reverse=True)
    taken = 1
    while sum(sum(candidates)[:taken]) < missing_size:
        taken += 1
    return taken
    
    
def f(state):
    return len(state.taken) + h2(state)

For the specific problem i proposed h2 finds a solution with 3 tiles (optimal) in 5 steps while h1 finds a solution (3 tiles) in 10 steps. In general there is no guarantee that for any type of problem h2 is gonna find the optimal solution in less steps than h1. In specific scenarios though it might be very helpful (especially of course when there are a lot of "unique sets")

In [None]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

5it [00:00, 831.54it/s]

Solved in 5 steps (3 tiles)



