Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

# Set Covering

##  A* search

In [5]:
from random import random
from functools import reduce
from math import ceil
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np
from tqdm.auto import tqdm

In [6]:
PROBLEM_SIZE = 20
NUM_SETS = 40
SETS = tuple(
    np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
State = namedtuple('State', ['taken', 'not_taken'])


In [7]:
def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

def goal_check(state):
    return np.all(
        covered(state)
    )


#def h(state): #heuristic
#    already_covered = covered(state)
#    if np.all(already_covered):
#        return 0
#    #largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
#    largest_set_size = max(sum(s) for s in SETS)
#    missing_size = PROBLEM_SIZE - sum(already_covered)
#    optimistic_estimate = ceil(missing_size / largest_set_size)
#    return optimistic_estimate

def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

def g(state): #actual distance from start
    return len(state.taken)

def f_prof(state):
    return len(state.taken) + h3(state)

def f(state):
    return len(state.taken) + h(state)

In [8]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

In [9]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f_prof(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f_prof(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

57it [00:01, 40.74it/s]

Solved in 57 steps (4 tiles)



