Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [2]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm

In [21]:
State = namedtuple('State', ['taken', 'not_taken'])
PROBLEM_SIZE = 2
NUM_SETS = 5
SETS = tuple(np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
print(SETS)
# return an array of True and False with PROBLEM_SIZE length with True if a part of the segment is covered, False otherwise
def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


def goal_check(state):
    return np.all(covered(state))

(array([False, False]), array([False, False]), array([ True, False]), array([False, False]), array([False,  True]))


In [22]:

assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable"

## Depth First

 - Depth-first search (DFS) is an algorithm for traversing or searching tree or graph data structures. The algorithm starts at the root node and explores as far as possible along each branch before backtracking.
 - The first node that is expanded is the last one
 - It is a Uninformed strategy

In [4]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.pop()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.pop()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

Solved in 16 steps (16 tiles)


## Breadth First

- Breadth-first search (BFS) is an algorithm for searching a tree data structure for a node that satisfies a given property. It starts at the tree root and explores all nodes at the present depth prior to moving on to the nodes at the next depth level.
- The first node is expanded than its successor at the same level of the tree
- Uninformed strategy

In [5]:
frontier = deque()
state = State(set(), set(range(NUM_SETS)))
frontier.append(state)

counter = 0
current_state = frontier.popleft()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

KeyboardInterrupt: 

## Greedy Best First

- Best-first search is a class of search algorithms, which explores a graph by expanding the most promising node chosen according to a specified rule.
- It consists in moving in the direction of the final goal
- Not Optimal since sometimes is better to go in the wrong direction and than in the right one (because it is samller)

In [None]:
# RULE: Number of tiles that are not coverend, yet
def f(state):
    missing_size = PROBLEM_SIZE - sum(covered(state))
    return missing_size

In [None]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state)) # set as priority the "distance" from the goal 
# if the state covers more tiles --> f(state) is small --> high priority
# if the state covers less tiles --> f(state) is big --> low priority

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

Solved in 4 steps (4 tiles)


## A*

- A* is an informed search algorithm, or a best-first search, meaning that it is formulated in terms of weighted graphs: starting from a specific starting node of a graph, it aims to find a path to the given goal node having the smallest cost (least distance travelled, shortest time, etc.)
- At each iteration of its main loop, A* needs to determine which of its paths to extend. It does so based on the cost of the path and an estimate of the cost required to extend the path all the way to the goal. Specifically, A* selects the path that minimizes:
f(n) = g(n) + h(n)
where g(n) is the cost of the path from the starting node and h(n) is a heuristic function that estimates the cost of the cheapest path from n (actual state) to the goal.

In [None]:
def h(state):
    largest_set_size = max(sum(s) for s in SETS) # select the larget tiles (more number of true)
    missing_size = PROBLEM_SIZE - sum(covered(state)) # evaluates the number of tiles that are not covered
    optimistic_estimate = ceil(missing_size / largest_set_size) # estimate the number of set that are missing for the solution in a optimistic way
    # if the largest set is 5 and the missing size is 10 --> "maybe" 2 sets are missing (optimistic assumption)
    return optimistic_estimate


def h2(state):
    already_covered = covered(state)
    if np.all(already_covered): # if the current state covers the segment return 0
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS) # evaluate the largest set among all
    missing_size = PROBLEM_SIZE - sum(already_covered) # evaluates the number of tiles that are not covered
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h3(state):
    already_covered = covered(state)
    if np.all(already_covered): # if the current state covers the segment return 0
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered) # evaluates the number of tiles that are not covered
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

# f(n) = g(n) + h(n)
def f(state):
    return len(state.taken) + h3(state)

In [None]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

Solved in 215 steps (4 tiles)
