This solution is based on the material provided by Giovanni Squillero <giovanni.squillero@polito.it>
https://github.com/squillero/computational-intelligence

@Author: Natalia Lebedeva <s309608@studenti.polito.it>

<h>A*</h>

In [5]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm

In [6]:
PROBLEM_SIZE = 20
NUM_SETS = 40
SETS = tuple(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))

State = namedtuple('State', ['taken', 'not_taken'])

def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]), #initial state when nothing is taken yet
    )


def goal_check(state):
    return np.all(covered(state))

assert goal_check(State(set(range(NUM_SETS)), set())), "Problem not solvable"

In [7]:
def h1(state):
    ''' 
    Optimistically the number of sets needed to find the solution
    is the number of still uncovered elements divided by the size of
    the largest untaken set, rounded up to the closest integer
    '''
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h2(state):
    '''
    The size of an untaken set is defined only by the number of
    elements inside that set which are not covered yet
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h3(state):
    '''
    The sizes of untaken elements are defined discarding the covered elements.
    The untaken sets are sorted by size, and the optimistic estimate is 
    the minimal number of first N untaken sets, the sum of which sizes is 
    greater or equal to the number of uncovered elements
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    optimistic_estimate = 1
    while sum(candidates[:optimistic_estimate]) < missing_size:
        optimistic_estimate += 1
    return optimistic_estimate


In [8]:
def f(state):
    return len(state.taken) + h1(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 29,243 steps (5 tiles)
State(taken={1, 37, 5, 24, 25}, not_taken={0, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 38, 39})


In [9]:
def f(state):
    return len(state.taken) + h2(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 1,763 steps (5 tiles)
State(taken={32, 1, 34, 24, 27}, not_taken={0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 25, 26, 28, 29, 30, 31, 33, 35, 36, 37, 38, 39})


In [10]:
def f(state):
    return len(state.taken) + h3(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 901 steps (5 tiles)
State(taken={1, 2, 5, 37, 22}, not_taken={0, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 38, 39})


All variations of the algorithm find the optimal solution (5 tiles). <br>
Exploring other possible options for the estimated cost h...

In [13]:
def h4(state):
    '''
    The number of sets needed to find the solution 
    is the number of still uncovered elements divided by 
    the average size (rounded up to the closest integer)
    of all untaken sets, rounded up to the closest integer
    '''

    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    avg_set_size = ceil(sum(sum(s) for s in SETS)/len(SETS))
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size / avg_set_size)
    return optimistic_estimate

def f(state):
    return len(state.taken) + h4(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 1,163 steps (5 tiles)
State(taken={32, 1, 34, 3, 21}, not_taken={0, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 33, 35, 36, 37, 38, 39})


In [12]:
def h5(state):
    '''
    The size of an untaken set is defined only by the number of
    elements inside that set which are not covered yet. Then the
    number of sets needed equals to the number of still 
    uncovered elements divided by the average size of all untaken 
    sets, rounded up to the closest integer
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = ceil(sum(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)/len(SETS))
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate

def f(state):
    return len(state.taken) + h5(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 22 steps (5 tiles)
State(taken={1, 34, 3, 14, 18}, not_taken={0, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 35, 36, 37, 38, 39})


The number of steps required for the same given problem: <br>

| Heuristics | Number of steps|
|------------|----------------|
| h1         | 29,243         |
| h2         | 1,763          |
| h3         | 901            |
| h4         | 1,163          |
| h5         |  22            |

<br>
Heuristics h4 and h5 are modifications of h1 and h2.

While the solution for the given problem was obtained in less steps, h4 and h5 in fact are not optimistic and are prone to overestimation, e.g. when the largest untaken set is able to cover the remaining elements, but there are many small untaken sets such that the average size of untaken sets is less than the size of the largest unteaken one.