Copyright **(c)** 2023 Abdelouahab Moubane <abdelmub@gmail.com>
https://github.com/AbdelouahabMoubane

In [81]:
from random import random, seed
from math import ceil
from functools import reduce
from collections import namedtuple, deque
from queue import PriorityQueue
from tqdm.auto import tqdm
import numpy as np

# Problem parameters

In [82]:
seed(8)
PROBLEM_SIZE = 100
NUM_SETS = 50
SETS = tuple(np.array([random() < .3 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))

SETS = set(tuple(tuple(set) for set in SETS))
SETS = list(np.array(set) for set in SETS)

SETS.sort(key=lambda x: -sum(x))
SETS = tuple(SETS)

NUM_SETS = len(SETS)

State = namedtuple('State', ['taken', 'not_taken'])

len(SETS)

50

In [83]:
def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

# Breadth Search

In [84]:
def goal_check(state):
    return np.all(reduce(np.logical_or, [SETS[i] for i in state.taken], np.array([False for _ in range(PROBLEM_SIZE)])))


def distance(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return Num_EmptySpaces

def distance_Convex(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return 0.1*Num_EmptySpaces+0.9*Num_SetsTaken

def distance_stochastic(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return (Num_SetsTaken/(PROBLEM_SIZE-Num_EmptySpaces+1))+100*(1-(0.5)**(Num_EmptySpaces))

def distance_stochastic_1(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    p = (2**(PROBLEM_SIZE-Num_EmptySpaces))/(2**PROBLEM_SIZE-Num_SetsTaken)
    return (Num_SetsTaken/(PROBLEM_SIZE-Num_EmptySpaces+1))+(1-p)


def distance_stochastic_2(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    p = (2**(PROBLEM_SIZE-Num_EmptySpaces))/(2**PROBLEM_SIZE-Num_SetsTaken)
    return (1-p)



# A* search

## Deterministic heuristic

In [85]:
def h(state):
    already_covered = covered(state)
    index = list()
    if np.all(already_covered):
        return 0
    for i in range(PROBLEM_SIZE):
        if already_covered[i] == False:
            index.append(i)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    candidates = sorted((sum([SETS[h][k] for k in index]) for h in state.not_taken),reverse=True)
    taken = 1
    while sum(candidates[:taken]) < missing_size:
        taken += 1
    return taken

def f1(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return PROBLEM_SIZE*(Num_SetsTaken/(PROBLEM_SIZE-Num_EmptySpaces+1)) + h(state)


def f2(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return (Num_SetsTaken/(PROBLEM_SIZE-Num_EmptySpaces+1))+h(state)/(PROBLEM_SIZE-Num_SetsTaken+1)




## Stochastic heuristic

In [86]:

def f3(state):
    Num_EmptySpaces = PROBLEM_SIZE - np.sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))
    Num_SetsTaken = len(state.taken)
    return (Num_SetsTaken/(PROBLEM_SIZE-Num_EmptySpaces+1))+100*(1-(0.95)**(Num_EmptySpaces))


## Tests

In [87]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Probelm not solvable"

In [88]:
frontier = PriorityQueue()
#frontier = SimpleQueue()
#frontier = LifoQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f1(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f1(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)
    pbar.close
print(
    f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
)

7it [00:00, 15.10it/s]

3313it [02:34, 21.42it/s]

Solved in 3,313 steps (6 tiles)





In [89]:
current_state

State(taken={1, 38, 6, 13, 15, 19}, not_taken={0, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 14, 16, 17, 18, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49})

In [90]:
sum(sum(SETS[i] for i in current_state.taken))

200

In [91]:
frontier = PriorityQueue()
#frontier = SimpleQueue()
#frontier = LifoQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((f3(state), state))

counter = 0
_, current_state = frontier.get()
with tqdm(total=None) as pbar:
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((f3(new_state), new_state))
        _, current_state = frontier.get()
        pbar.update(1)
    pbar.close
print(
    f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
)

6it [00:00, 272.48it/s]

Solved in 6 steps (6 tiles)





In [92]:
current_state

State(taken={0, 4, 7, 11, 15, 22}, not_taken={1, 2, 3, 5, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 19, 20, 21, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49})

In [93]:
sum(sum(SETS[i] for i in current_state.taken))

211