Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:
from random import random
import math
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np

In [2]:
PROBLEM_SIZE = 5
NUM_SETS = 10
SETS = tuple(
    np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
State = namedtuple('State', ['taken', 'not_taken'])

In [8]:
def goal_check(state):
    return np.all(reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    ))


def covered(state):
    return reduce(np.logical_or,
                  [SETS[i] for i in state.taken],
                  np.array([False for _ in range(PROBLEM_SIZE)] )

    )




def hdist(state):
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))

#1 Find the largest set
#2 Once we find the largest set we calculate the missing size
#3 Doing ceil(missing_size / largest_set_size) we see that at least we need optimistic_size sets to reach our goal
#Very raw algorithm
def h1(state):
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = math.ceil(missing_size / largest_set_size)
    return optimistic_estimate

#Improving the heuristic:
# - Having a tighter heuristic (less optimistic)
# - Solve the problem in fewer steps (expanding less nodes)
# The heuristic must be easy to compute

#In the h2 case calcolo il largest_set_size contando nella somma soltando le caselle ancora non coperte. 
# Quindi ad esempio se ho un set con 5 elementi di cui 2 di questi sono già taken, allora la dimensione sarà 3
def h2(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = math.ceil(missing_size / largest_set_size)
    return optimistic_estimate

#In questo caso quello che succede e che si calcolano i candidati sempre non considerando le caselle già taken,
#Si ordinano i candidati in ordine decrescente dal più grande (set) al più piccolo
# Dopodichè si sommano progressivamente i candidati partendo dal primo e così via e si compara la somma con missing size
#Se la somma è minore di missing size si prende il secondo, il terzo, quarto candidato e così finchè non si trova la somma che sia almeno uguale a missing size. 
# Una volta trovata si ritorna il numero di candidati set presi.
def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken



def f(state):
    return h3(state) + len(state.taken)




In [4]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

In [9]:
# frontier = PriorityQueue()
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
while not goal_check(current_state):
    counter += 1
    for action in current_state[1]:
        new_state = State(
            current_state.taken ^ {action},
            current_state.not_taken ^ {action},
        )
        frontier.put((f(new_state), new_state))
        

        
       # frontier.put((f(new_state), new_state))
    _, current_state = frontier.get()

print(
    f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
)

Solved in 4 steps (2 tiles)


In [10]:
current_state

State(taken={9, 7}, not_taken={0, 1, 2, 3, 4, 5, 6, 8})

In [7]:
SETS[0]

array([False,  True,  True, False, False])