In [1]:
from random import random
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue

import numpy as np
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
PROBLEM_SIZE = 20
NUM_SETS = 40
SETS = tuple(
    np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS)
)
State = namedtuple("State", ["taken", "not_taken"])

In [3]:
def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )


def goal_check(state):
    return np.all(covered(state))

In [4]:
assert goal_check(State(set(range(NUM_SETS)), set())), "Problem not solvable"

## Greedy Best First

In [5]:
def f(state):
    missing_size = PROBLEM_SIZE - sum(covered(state))
    return missing_size

In [6]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

4it [00:00, 1160.89it/s]

Solved in 4 steps (4 tiles)





In [7]:
current_state

State(taken={9, 18, 3, 39}, not_taken={0, 1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38})

In [8]:
goal_check(current_state)

True

## A*

## Algorithm

In [9]:
def f(state, cost, heuristic):
    return cost(state) + heuristic(state)


def a_star(cost, heuristic):
    frontier = PriorityQueue()

    state = State(set(), set(range(NUM_SETS)))
    frontier.put((f(state, cost, heuristic), state))

    counter = 0
    _, current_state = frontier.get()
    with tqdm(total=None) as pbar:
        while not goal_check(current_state):
            counter += 1
            for action in current_state[1]:
                new_state = State(
                    current_state.taken ^ {action},
                    current_state.not_taken ^ {action},
                )
                frontier.put((f(new_state, cost, heuristic), new_state))
            _, current_state = frontier.get()
            pbar.update(1)

    print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")
    return current_state

## Heuristics defined in the lecture

- `h1(state)`: calculates the number of additional sets required to cover the remaining elements, assuming the size of the largest set;
- `h2(state)`: similar to `h1(state)`, but does not consider sets that had already been taken when finding the largest set;
- `h3(state)`: similar to `h2(state)` and `h(3)`, but it estimates the number of additional needed sets by iteratively selecting sets in descending order of size until the uncovered elements are sufficiently covered; 

In [10]:
def cost(state):
    return len(state.taken)


def h1(state):
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h2(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(
        sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS
    )
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)
    return optimistic_estimate


def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted(
        (sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS),
        reverse=True,
    )
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

### Results

In [11]:
print("Heuristic 1")
print(a_star(cost, h1))
print()

print("Heuristic 2")
print(a_star(cost, h2))
print()

print("Heuristic 3")
print(a_star(cost, h3))

Heuristic 1


61it [00:00, 222.56it/s]


Solved in 61 steps (4 tiles)
State(taken={3, 4, 29, 39}, not_taken={0, 1, 2, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36, 37, 38})

Heuristic 2


99it [00:00, 189.61it/s]


Solved in 99 steps (4 tiles)
State(taken={18, 3, 29, 39}, not_taken={0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36, 37, 38})

Heuristic 3


52it [00:00, 179.06it/s]

Solved in 52 steps (4 tiles)
State(taken={11, 18, 3, 39}, not_taken={0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38})





## Additional heuristics

- At first, I attempted to use the greedy algorithm's distance function as the heuristic, but that did not work since it is a pessimistic function, and therefore would not lead to the optimal solution.

In [12]:
def h4(state):
    return PROBLEM_SIZE - np.count_nonzero(
        reduce(
            np.logical_or,
            [SETS[i] for i in new_state.taken],
            np.array([0] * PROBLEM_SIZE),
        )
    )

print("Heuristic 4")
print(a_star(cost, h4))

Heuristic 4


65214it [00:26, 2458.06it/s]

Solved in 65,214 steps (4 tiles)
State(taken={9, 18, 3, 39}, not_taken={0, 1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38})



