Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [38]:
import heapq

class PriorityQueue:
    """A basic Priority Queue with simple performance optimizations"""

    def __init__(self):
        self._data_heap = list()
        self._data_set = set()

    def __bool__(self):  # True if not empty
        return bool(self._data_set)

    def __contains__(self, item): # True if item is present
        return item in self._data_set

    def push(self, item, p=None): # no duplicated element, items with same priority works as a fifo
        assert item not in self, f"Duplicated element"
        if p is None:
            p = len(self._data_set)
        self._data_set.add(item)
        heapq.heappush(self._data_heap, (p, item))

    def pop(self): # retrieve the first element in priority order (smaller p first), same priority -> fifo
        p, item = heapq.heappop(self._data_heap)
        self._data_set.remove(item)
        return item

In [39]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm

In [40]:
PROBLEM_SIZE = 10 # num of elem per set
NUM_SETS = 20
SETS = tuple(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
#print(SETS)

In [41]:
State = namedtuple('State', ['taken', 'not_taken'])


def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]), #Starting from all False, make or of each set that is taken
    )


def goal_check(state):
    return np.all(covered(state)) # check if covered(state) is all True, meaning that the taken sets cover all elements (True) at least once

In [42]:
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable" # if using all sets there is no covering all

## A*

In [43]:
def h1(state):
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h2(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size: taken += 1
    return taken


def f(state):
    return len(state.taken) + h3(state)

base version

In [44]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
ss = []
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            fn = f(new_state)
            frontier.put((fn, new_state))
            ss.append((fn, new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

16it [00:00, 641.70it/s]

Solved in 16 steps (3 tiles)





my trial

In [47]:
def red_cc(cc): return reduce(np.logical_or, cc, np.array([False for _ in range(PROBLEM_SIZE)]))


def get_candidates(v, n_to_take, already_covered):
    lenv = len(v)
    ii = np.array(range(n_to_take))
    candidates = []
    end = False
    cc = [v[iii] for iii in ii]
    ccc = red_cc(cc)
    ccc = np.logical_and(np.logical_not(already_covered), ccc)
    candidates.append(sum(ccc))
    while not end:
        ii[-1] += 1
        for i in range(1, n_to_take):
            if ii[-i] == lenv + 1 - i:
                ii[-i -1] += 1
                for k in range(-i, 0):
                    ii[k] = ii[k - 1] + 1
        if ii[0] == lenv - (n_to_take - 1): end = True
        else:
            cc = [v[iii] for iii in ii]
            ccc = red_cc(cc)
            candidates.append(sum(ccc))

    return max(candidates)

def f_mine(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return len(state.taken)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    taken = 1
    #while get_candidates(SETS, taken, already_covered) < missing_size: taken += 1
    while get_candidates([SETS[s] for s in list(state.not_taken)], taken, already_covered) < missing_size: taken += 1
    return len(state.taken) + taken*2

In [48]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
ss = []
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            fn = f_mine(new_state)
            frontier.put((fn, new_state))
            ss.append((fn, new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

10it [00:00, 196.53it/s]

Solved in 10 steps (3 tiles)



