This solution is based on the material provided by Giovanni Squillero <giovanni.squillero@polito.it>
https://github.com/squillero/computational-intelligence

@Author: Natalia Lebedeva <s309608@studenti.polito.it>

<h>A*</h>

In [34]:
from random import random, randint, seed
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm

**The following code snippets were added by the reviewers.**

In [39]:
seed_value = 42
seed(seed_value)

In [40]:
PROBLEM_SIZE = 20
NUM_SETS = 40
SETS = tuple(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))

State = namedtuple('State', ['taken', 'not_taken'])

def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]), #initial state when nothing is taken yet
    )


def goal_check(state):
    return np.all(covered(state))

assert goal_check(State(set(range(NUM_SETS)), set())), "Problem not solvable"

In [51]:
def h1(state):
    '''
    Optimistically the number of sets needed to find the solution
    is the number of still uncovered elements divided by the size of
    the largest untaken set, rounded up to the closest integer
    '''
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size)
    return optimistic_estimate


def h2(state):
    '''
    The size of an untaken set is defined only by the number of
    elements inside that set which are not covered yet
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size )
    return optimistic_estimate


def h3(state):
    '''
    The sizes of untaken elements are defined discarding the covered elements.
    The untaken sets are sorted by size, and the optimistic estimate is
    the minimal number of first N untaken sets, the sum of which sizes is
    greater or equal to the number of uncovered elements
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    optimistic_estimate = 1
    while sum(candidates[:optimistic_estimate]) < missing_size:
        optimistic_estimate += 1
    return optimistic_estimate


In [52]:
def f(state):
    return len(state.taken) + h1(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 7 steps (5 tiles)
State(taken={0, 8, 11, 15, 31}, not_taken={1, 2, 3, 4, 5, 6, 7, 9, 10, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 38, 39})


In [53]:
def f(state):
    return len(state.taken) + h2(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 7 steps (5 tiles)
State(taken={0, 8, 11, 15, 31}, not_taken={1, 2, 3, 4, 5, 6, 7, 9, 10, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 38, 39})


In [54]:
def f(state):
    return len(state.taken) + h3(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 842 steps (5 tiles)
State(taken={38, 6, 8, 14, 31}, not_taken={0, 1, 2, 3, 4, 5, 7, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 39})


All variations of the algorithm find the optimal solution (5 tiles). <br>
Exploring other possible options for the estimated cost h...

In [56]:
def h4(state):
    '''
    The number of sets needed to find the solution
    is the number of still uncovered elements divided by
    the average size (rounded up to the closest integer)
    of all untaken sets, rounded up to the closest integer
    '''

    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    avg_set_size = ceil(sum(sum(s) for s in SETS)/len(SETS))
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size / avg_set_size)
    return optimistic_estimate

def f(state):
    return len(state.taken) + h4(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 7 steps (5 tiles)
State(taken={0, 8, 11, 15, 31}, not_taken={1, 2, 3, 4, 5, 6, 7, 9, 10, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 38, 39})


In [57]:
def h5(state):
    '''
    The size of an untaken set is defined only by the number of
    elements inside that set which are not covered yet. Then the
    number of sets needed equals to the number of still
    uncovered elements divided by the average size of all untaken
    sets, rounded up to the closest integer
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = ceil(sum(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)/len(SETS))
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate

def f(state):
    return len(state.taken) + h5(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 41 steps (5 tiles)
State(taken={0, 2, 8, 11, 31}, not_taken={1, 3, 4, 5, 6, 7, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 38, 39})


The number of steps required for the same given problem: <br>

| Heuristics | Number of steps|
|------------|----------------|
| h1         | 29,243         |
| h2         | 1,763          |
| h3         | 901            |
| h4         | 1,163          |
| h5         |  22            |

<br>
Heuristics h4 and h5 are modifications of h1 and h2.

While the solution for the given problem was obtained in less steps, h4 and h5 in fact are not optimistic and are prone to overestimation, e.g. when the largest untaken set is able to cover the remaining elements, but there are many small untaken sets such that the average size of untaken sets is less than the size of the largest unteaken one.

<h2>Special sets case</h2>
In this part, I'm trying to explore the possibilities of using A* algorithm with weighted sets (all weights are non-negative). The problem tranfsorms into multioptimization, when we try to minimize the number of taken sets while also minimizing the sum of the taken weights.

In [None]:
PROBLEM_SIZE = 8
NUM_SETS = 30

SetWeighted = namedtuple('SetWeighted', ['elements', 'weight'])
#weight is assigned as a random non-negative integer value, not greater than the number of sets
SETS = [SetWeighted(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]), randint(0, NUM_SETS)) for _ in range(NUM_SETS)]

State = namedtuple('State', ['taken', 'not_taken'])

def covered(state):
    elements_covered = np.zeros(PROBLEM_SIZE, dtype=bool)
    for i in state.taken:
        elements_covered = np.logical_or(elements_covered, SETS[i].elements)
    return elements_covered


def goal_check(state):
    return np.all(covered(state))

assert goal_check(State(set(range(NUM_SETS)), set())), "Problem not solvable"

In [None]:
SETS

[SetWeighted(elements=array([False, False, False,  True, False, False,  True, False]), weight=1),
 SetWeighted(elements=array([False, False, False, False, False,  True, False, False]), weight=30),
 SetWeighted(elements=array([False, False, False, False, False, False,  True, False]), weight=19),
 SetWeighted(elements=array([False, False, False, False, False, False, False, False]), weight=3),
 SetWeighted(elements=array([False, False,  True, False, False, False,  True,  True]), weight=17),
 SetWeighted(elements=array([False, False, False, False,  True, False, False,  True]), weight=5),
 SetWeighted(elements=array([False,  True, False, False, False,  True,  True, False]), weight=3),
 SetWeighted(elements=array([False,  True, False,  True,  True, False, False, False]), weight=14),
 SetWeighted(elements=array([False, False, False,  True, False,  True,  True,  True]), weight=10),
 SetWeighted(elements=array([ True,  True, False, False, False, False,  True,  True]), weight=30),
 SetWeighted(e

In [None]:
def h(state):
    '''
    The estimated cost is the minimal sum of weights of {missing_sets} sets,
    where {missing_sets} is the result of division of the number of still
    uncovered elements by the largest size of untaken set
    while size is define as the number of uncovered elements contained in that set.
    '''
    already_covered = covered(state)
    if np.all(already_covered):
        return 0

    largest_set_size = max(sum(np.logical_and(s.elements, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    missing_sets = ceil(missing_size / largest_set_size)

    untaken_weights = [SETS[i].weight for i in state.not_taken if not np.all(already_covered)]
    untaken_weights.sort() #ascending order to get the smaller weights first

    optimistic_estimate = np.sum(untaken_weights[:missing_sets])


    return optimistic_estimate


def f(state):
    # the actual cost is both the number of taken sets and the sum of their weights
    return len(state.taken) + sum(SETS[i].weight for i in state.taken) + h(state)

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles, weight: {sum(SETS[i].weight for i in current_state.taken)})")
print(current_state)
print([SETS[i] for i in current_state.taken])

0it [00:00, ?it/s]

Solved in 1,505 steps (4 tiles, weight: 13)
State(taken={25, 20, 5, 6}, not_taken={0, 1, 2, 3, 4, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 21, 22, 23, 24, 26, 27, 28, 29})
[SetWeighted(elements=array([ True, False, False, False, False, False, False, False]), weight=1), SetWeighted(elements=array([False, False,  True,  True, False, False, False,  True]), weight=4), SetWeighted(elements=array([False, False, False, False,  True, False, False,  True]), weight=5), SetWeighted(elements=array([False,  True, False, False, False,  True,  True, False]), weight=3)]


P.S. Thanks for the deadline given in AOE

**The following code snippets were added by the reviewers.**

In [48]:
def f(state):
    return len(state.taken) + max(h1(state),h2(state), h3(state), h4(state), h5(state))

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 41 steps (5 tiles)
State(taken={0, 2, 8, 11, 31}, not_taken={1, 3, 4, 5, 6, 7, 9, 10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 38, 39})


In [50]:
def f(state):
    return len(state.taken) + max(h1(state),h2(state), h3(state))

frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
print(current_state)

0it [00:00, ?it/s]

Solved in 842 steps (5 tiles)
State(taken={38, 6, 8, 14, 31}, not_taken={0, 1, 2, 3, 4, 5, 7, 9, 10, 11, 12, 13, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 32, 33, 34, 35, 36, 37, 39})
