Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
import copy
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
State = namedtuple('State', ['taken', 'not_taken'])


def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


def goal_check(state):
    return np.all(covered(state))

In [3]:
PROBLEM_SIZE = 20
NUM_SETS = 40
SETS = tuple(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable"

## Depth First

In [9]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.pop()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.pop()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

19it [00:00, 7271.15it/s]

Solved in 19 steps (19 tiles)





## Breadth First

In [10]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.popleft()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

139939it [01:01, 2284.63it/s]


KeyboardInterrupt: 

## Greedy Best First

In [5]:
def f(state):
    missing_size = PROBLEM_SIZE - sum(covered(state))
    return missing_size

In [6]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

5it [00:00, 499.17it/s]

Solved in 5 steps (5 tiles)





## A*

In [7]:
def h(state):
    largest_set_size = max(sum(s) for s in SETS) # I take the set with the biggest number of true
    missing_size = PROBLEM_SIZE - sum(covered(state)) # number of missing tiles
    optimistic_estimate = ceil(missing_size / largest_set_size) # minimum number of missing tiles
    return optimistic_estimate


def h2(state):
    already_covered = covered(state) #already covered elements
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS) # I take the tile with the max number of missing elements
    missing_size = PROBLEM_SIZE - sum(already_covered) # elements i still have to cover
    optimistic_estimate = ceil(missing_size / largest_set_size) # minimum number of missing tiles
    return optimistic_estimate


def h3(state):
    already_covered = covered(state) #already covered elements
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered) # number of elements not covered
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True) # i sort the remaining tiles from the biggest remaining n of elements occupied to the last
    taken = 1
    while sum(candidates[:taken]) < missing_size: # scorro il vettore dei tile per stimare al meglio quante ne mancano
        taken += 1
    return taken

def h4(state):

    already_covered = covered(state) #already covered elements
    if np.all(already_covered):
        return 0
    taken = 1
    missing_size = PROBLEM_SIZE - sum(already_covered) # number of elements not covered
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True) # i sort the remaining tiles from the biggest remaining n of elements occupied to the last


    #start of while
    while sum(candidates[:taken]) < missing_size: # scorro il vettore dei tile per stimare al meglio quante ne mancano

        updated_state = State(new_state.taken ^ {candidates[0]}, new_state.not_taken ^ {candidates[0]},)
        candidates.pop(0)
        already_covered = covered(updated_state) #already covered elements

        missing_size = PROBLEM_SIZE - sum(already_covered) # number of elements not covered
        candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True) # i sort the remaining tiles from the biggest remaining n of elements occupied to the last

        taken += 1
    return taken

def h5(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    heuri = 1
    taken = 1
    a = sum(candidates[:taken])
    while sum(candidates[:taken]) < missing_size:
        if sum(candidates[:taken]) > a:
            a = sum(candidates[:taken])
            heuri += 1
        if sum(candidates[:taken]) == a:
            heuri +=0.9
        taken += 1
    return heuri


def f(state):
    return len(state.taken) + h4(state)

In [8]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))
print(f(state), state)

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
            #print(f(new_state), new_state)

        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

NameError: name 'new_state' is not defined