In [1]:
from random import random
from functools import reduce
from collections import namedtuple
from math import ceil
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np
from tqdm.auto import tqdm

In [2]:
State = namedtuple('State', ['taken', 'not_taken'])

def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

def goal_check(state):
    return np.all(covered(state))

In [3]:
PROBLEM_SIZE = 20
NUM_SETS = 100
SETS = tuple(
    np.array([random() < 0.2 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

In [4]:
def h(state):
    largest_set_size = max(sum(s) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(covered(state))
    optimistic_estimate = ceil(missing_size/largest_set_size)
    return optimistic_estimate

def h2(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size/largest_set_size)
    return optimistic_estimate

def h3(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

#that the function that I made myself
# I have  just though about the H3 of the teacher for the taken var
# the heuristic just give less evaluation in situation where the difference in missing size is not increasing
# so the algo will focus a bit more first on the set that appear the best first 
# should be admissible because for all n€N, h4(n) <= h3(n)
def h4(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    heuri = 1
    taken = 1
    best_candidate_sum = sum(candidates[:taken])
    while sum(candidates[:taken]) < missing_size:
        if sum(candidates[:taken]) > best_candidate_sum:
            best_candidate_sum = sum(candidates[:taken])
            heuri += 1
        if sum(candidates[:taken]) == best_candidate_sum:
            heuri +=0.9
        taken += 1
    return heuri

# here I give weigth to candiate < best candidate but I give sligter less weigth 
# seem to be more effective than h3 and still admisible 
def h5(state):
    already_covered = covered(state)
    if np.all(already_covered):
        return 0
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS), reverse=True)
    heuri = 1
    taken = 1
    best_candidate_sum = sum(candidates[:taken])
    while sum(candidates[:taken]) < missing_size:
        if sum(candidates[:taken]) > best_candidate_sum:
            best_candidate_sum = sum(candidates[:taken])
            heuri += 1
        if sum(candidates[:taken]) == best_candidate_sum:
            heuri +=1
        if sum(candidates[:taken]) < best_candidate_sum:
            heuri +=0.8
        taken += 1
    return heuri
  
  
def f(state):
    return len(state.taken) + h5(state)
def fref(state):
    return len(state.taken) + h3(state)

In [5]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((fref(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((fref(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"h3 Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

h3 Solved in 595 steps (4 tiles)


In [6]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)

print(f"h5 Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

0it [00:00, ?it/s]

h5 Solved in 263 steps (4 tiles)
