Copyright **`(c)`** 2023 Florentin-Cristian Udrea (udrea.florentin00@gmail.com)  

In [1]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm
from collections import defaultdict


In [2]:
State = namedtuple('State', ['taken', 'not_taken'])


def covered(problem, state):
    SETS = problem["SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]
    
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


def goal_check(problem, state):
    return np.all(covered(problem, state))

In [3]:
# PROBLEM_SIZE = 50
# NUM_SETS = 200
# PROBABILITY = 0.2

def generate_problem(p_size, n_sets, prob):
    max_tries = 1000
    count = 0

    SETS = tuple(np.array([random() < prob for _ in range(p_size)]) for _ in range(n_sets))
    problem = {"PROBLEM_SIZE": p_size,
               "NUM_SETS": n_sets,
               "PROBABILITY": prob,
               "SETS": SETS}
    
    while goal_check(problem, State(set(range(n_sets)), set())) != True:
        SETS = tuple(np.array([random() < prob for _ in range(p_size)]) for _ in range(n_sets))
        count += 1
        if count > max_tries:
            raise Exception("Solvable problem is too dificult to create!")
    

    
    return problem

    

## Depth First

In [4]:
def dfs(problem, verbose=False):
    NUM_SETS = problem["NUM_SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    frontier = deque()
    state = State(set(), set(range(NUM_SETS)))
    frontier.append(state)

    counter = 0
    current_state = frontier.pop()
    while not goal_check(problem, current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.pop()
    if verbose:
        print(f"DFS: Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
        
    return counter, len(current_state.taken)


## Breadth First

In [5]:
def bfs(problem, verbose=False):
    NUM_SETS = problem["NUM_SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    frontier = deque()
    state = State(set(), set(range(NUM_SETS)))
    frontier.append(state)

    counter = 0
    current_state = frontier.popleft()
    while not goal_check(problem, current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.append(new_state)
        current_state = frontier.popleft()
    if verbose:
        print(f"BFS: Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
        
    return counter, len(current_state.taken)



## Greedy Best First

In [6]:
def greedy(problem, verbose=False):
    NUM_SETS = problem["NUM_SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    def f(state):
        missing_size = PROBLEM_SIZE - sum(covered(problem, state))
        return missing_size


    frontier = PriorityQueue()
    state = State(set(), set(range(NUM_SETS)))
    frontier.put((f(state), state))

    counter = 0
    _, current_state = frontier.get()
    while not goal_check(problem, current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
    if verbose:
        print(f"Greedy: Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

        
    return counter, len(current_state.taken)


## A*

In [7]:
def h(problem, state):
    SETS = problem["SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(problem, state))
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h2(problem, state):
    SETS = problem["SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    already_covered = covered(problem, state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h3(problem, state):
    SETS = problem["SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    already_covered = covered(problem, state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

def my_h(problem, state):
    SETS = problem["SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    already_covered = covered(problem, state)
    if np.all(already_covered):
        return 0
    candidates = sorted([tuple([sum(np.logical_and(s, np.logical_not(already_covered))), s]) for s in SETS], key=lambda a:a[0], reverse=True)
    
    h = 0
    while not np.all(already_covered):
        already_covered = np.logical_or(candidates[h][1], already_covered)
        candidates = sorted([tuple([sum(np.logical_and(s, np.logical_not(already_covered))), s]) for s in SETS], key=lambda a:a[0], reverse=True)
        h += 1

    return h

def h_wrong(problem, state):
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]
    missing_size = PROBLEM_SIZE - sum(covered(problem, state))
    return missing_size

In [8]:
def astar(problem, heuristic, verbose=False):
    NUM_SETS = problem["NUM_SETS"]
    PROBLEM_SIZE = problem["PROBLEM_SIZE"]

    def f(state):
        return len(state.taken) + heuristic(problem, state)
        
    frontier = PriorityQueue()
    state = State(set(), set(range(NUM_SETS)))
    frontier.put((f(state), state))

    counter = 0
    _, current_state = frontier.get()
    while not goal_check(problem, current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
            
    if verbose:
        print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
    
    return counter, len(current_state.taken)


In [16]:
NUM_PROBLEMS = 10

counters = defaultdict(lambda: [])

for i in tqdm(range(NUM_PROBLEMS)):
    problem = generate_problem(p_size=30, n_sets=50, prob=0.4)
    counters["dfs"].append(   dfs(problem)   )
    counters["bfs"].append(   bfs(problem)   )
    counters["greedy"].append(   greedy(problem)   )
    counters["astar_h"].append(   astar(problem, heuristic=h)   )
    counters["astar_h2"].append(   astar(problem, heuristic=h2)   )
    counters["astar_h3"].append(   astar(problem, heuristic=h3)   )
    counters["astar_my_heuristic"].append(   astar(problem, heuristic=my_h)   )
    counters["astar_wrong_heuristic"].append(   astar(problem, heuristic=h_wrong)   )



  0%|          | 0/10 [00:00<?, ?it/s]

In [17]:
from prettytable import PrettyTable

x = PrettyTable()

x.field_names = ["method", "steps", "tiles"]


for k,v in counters.items():
    x.add_row([k, sum([c[0] for c in v])/len(v), sum([c[1] for c in v])/len(v)])

print(x)

+-----------------------+--------+-------+
|         method        | steps  | tiles |
+-----------------------+--------+-------+
|          dfs          |  8.9   |  8.9  |
|          bfs          | 8961.4 |  3.0  |
|         greedy        |  3.4   |  3.4  |
|        astar_h        | 297.4  |  3.0  |
|        astar_h2       |  21.3  |  3.0  |
|        astar_h3       |  21.3  |  3.0  |
|   astar_my_heuristic  |  8.9   |  3.0  |
| astar_wrong_heuristic |  4.2   |  3.4  |
+-----------------------+--------+-------+


# Conclusions

A* algorithm usually outperforms the other algorithms while also getting the (presumably) optimal solutions

Of course it heavily depends on the heuristic implemented:
- the professors heuristic does heavily improve over the brute-force BFS or DFS, going down in two orders of magnitude
- my heuristic, instead, outperforms the professor's, going down one more order of magnitude, while keeping its optimality