Copyright **`(c)`** 2023 Giovanni Squillero `<giovanni.squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  

In [1]:
from random import random
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue, SimpleQueue, LifoQueue
import time
%pip install tqdm
from tqdm import tqdm

import numpy as np

Note: you may need to restart the kernel to use updated packages.


In [2]:
PROBLEM_SIZE = 10 # 400
NUM_SETS = 20 # 150
SETS = tuple(np.array([random() < .3 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
State = namedtuple('State', ['taken', 'not_taken'])

In [3]:
def get_n_overlaps(state):
    return np.sum(np.sum([SETS[i] for i in state.taken], axis=0) > 1)

In [4]:
# cheks how many True value are covered in a state
def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

def goal_check(state):
    return np.all(covered(state))

# returns how many False need to be covered yet
def distance(state):
    return PROBLEM_SIZE - sum(covered(state))


# --------------------- A* search ---------------------

# actual cost
def g(state):
    return len(state.taken)

# estimated cost
def h(state):
    already_covered = covered(state)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    estim_max = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    return missing_size - estim_max

def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    # list of sets best fit the uncovered element in current state, reversed so that it gives priority to best fitting ones
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

def distance_A_star(state):

    w1 = 1 # g
    w2 = 1 # h
    w3 = 1 # overlap
    w_tot = w1 + w2 + w3

    # optimization: try also to minimize overlap. 
    # It is normalized by the dimenion of the problem PROBLEM_SIZE
    n_overlap = get_n_overlaps(state)/PROBLEM_SIZE
    
    return (w1/w_tot)*g(state) + (w2/w_tot)*h(state) + (w3/w_tot)*n_overlap 

In [5]:
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable"

In [6]:
def search():

    frontier = deque()
    state = State(set(), set(range(NUM_SETS)))
    frontier.append(state)

    counter = 0
    current_state = frontier.popleft()
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()

    return counter, current_state

def search_A_star():

    frontier = PriorityQueue()  
    state = State(set(), set(range(NUM_SETS)))
    frontier.put((distance_A_star(state), state))

    counter = 0
    _, current_state = frontier.get()
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            if not np.array_equal(SETS[action], [False for _ in range(PROBLEM_SIZE)]): # optimization
                new_state = State(
                    current_state.taken ^ {action},
                    current_state.not_taken ^ {action},
                )
                frontier.put((distance_A_star(new_state), new_state))
        _, current_state = frontier.get()
    
    return counter, current_state

## Search

In [9]:
print(f'PROBLEM_SIZE = {PROBLEM_SIZE}, NUM_SETS = {NUM_SETS}\n')
# set distance function:    distance   distance_A_star
start_t1 = time.time()
print('Search with Breadth first ...')
counter1, current_state1 = search()
end_t1 = time.time()
print(f"Solved in {counter1:,} steps ({len(current_state1.taken)} tiles)")
print(f'# overlaps: {get_n_overlaps(current_state1)}')
print(f'(time: {end_t1-start_t1:.2f}s)')

print()

start_t2 = time.time()
print('Search with A* ...')
counter2, current_state2 = search_A_star()
end_t2 = time.time()
print(f"Solved in {counter2:,} steps ({len(current_state2.taken)} tiles)")
print(f'# overlaps: {get_n_overlaps(current_state2)}')
print(f'(time: {end_t2-start_t2:.2f}s)')

PROBLEM_SIZE = 10, NUM_SETS = 20

Search with Breadth first ...
Solved in 467 steps (3 tiles)
# overlaps: 2
(time: 0.04s)

Search with A* ...
Solved in 139 steps (3 tiles)
# overlaps: 1
(time: 0.44s)


In [10]:
is_print_solutions = True # False

if is_print_solutions:
    print(f'SOLUTIONS:\n')
    print('Original search:')
    for s in current_state1.taken:
        print(f'{SETS[s]}\t<- {s}')
    print()
    print('A* search:')
    for s in current_state2.taken:
        print(f'{SETS[s]}\t<- {s}')

SOLUTIONS:

Original search:
[ True  True False False False False  True  True False  True]	<- 0
[False False  True False False  True False False  True False]	<- 4
[False False False  True  True False False False  True  True]	<- 14

A* search:
[ True  True False False False False  True  True False  True]	<- 0
[False False  True False False  True False False False False]	<- 11
[False False False  True  True False False False  True  True]	<- 14


# Compare 2 algorithms

In [7]:
import warnings
warnings.filterwarnings("ignore")

def score(n_steps, n_tiles, n_overlap, t):
    if n_overlap==0:
        n_overlap = 1e-8 #approximation
    if t==0:
        t = 1e-8 #approximation 

    p = n_steps*n_tiles*n_overlap*t

    return 1/p

In [8]:
SIM_ITER = 50

a1 = 'Breadth first'
a2 = 'A*'

scores_a1 = []
scores_a2 = []

pbar = tqdm(total=SIM_ITER, desc=f"Simulation progress: ", leave=False)

for _ in range(SIM_ITER):

    SETS = tuple(np.array([random() < .3 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
    
    start_t1 = time.time()
    counter1, current_state1 = search()
    end_t1 = time.time()
    score1 = score(counter1, len(current_state1.taken), get_n_overlaps(current_state1), end_t1-start_t1)
    scores_a1.append(score1)

    start_t2 = time.time()
    counter2, current_state2 = search()
    end_t2 = time.time()
    score2 = score(counter2, len(current_state2.taken), get_n_overlaps(current_state2), end_t2-start_t2)
    scores_a2.append(score2)

    pbar.update(1)

print(f'SCORES:\n')
print(f'Score algorithm 1 ({a1}): {sum(scores_a1)/SIM_ITER:.4f}')
print(f'Score algorithm 2 ({a2}): {sum(scores_a2)/SIM_ITER:.4f}')

pbar.close()

                                                                    

SCORES:

Score algorithm 1 (Breadth first): 3234310.2730
Score algorithm 2 (A*): 3888684.4249


