In [2]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue
from math import ceil

import numpy as np

In [3]:
PROBLEM_SIZE = 5
NUM_SETS = 10
SETS = tuple(np.array([random() < .4 for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
State = namedtuple('State', ['taken', 'not_taken'])

In [4]:
def goal_check(state):
    return np.all(reduce(np.logical_or, [SETS[i] for i in state.taken], np.array([False for _ in range(PROBLEM_SIZE)])))

In [5]:
assert goal_check(State(set(range(NUM_SETS)), set())), "Probelm not solvable" # sometimes an error may occur

In [6]:
def search(frontier):
    current_state = frontier.get()

    # to count the number of sets put in the frontier
    counter = 0
    while not goal_check(current_state):
        counter += 1
        for action in current_state.not_taken:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put(new_state)
        current_state = frontier.get()

    print(f"Solved in {counter} steps")
    return current_state

Breadth First Search and Depth First Search

In [7]:
fifo = SimpleQueue()
lifo = LifoQueue()

fifo.put(State(set(), set(range(NUM_SETS))))
lifo.put(State(set(), set(range(NUM_SETS))))

breadth_solution_state = search(fifo)
depth_solution_state = search(lifo)

Solved in 34 steps
Solved in 3 steps


In [8]:
print(
    f"Solution with a Breadth Search:\n{breadth_solution_state}\nSolution with a Depth Search:\n{depth_solution_state}"
)

Solution with a Breadth Search:
State(taken={2, 6}, not_taken={0, 1, 3, 4, 5, 7, 8, 9})
Solution with a Depth Search:
State(taken={8, 9, 7}, not_taken={0, 1, 2, 3, 4, 5, 6})


Dijkstra Search

In [9]:
def cost_function(taken):
    return sum([np.sum(SETS[i]) for i in taken])

In [10]:
def search_dijkstra(frontier):
    current_state = frontier.get()

    counter = 0
    while not goal_check(current_state):
        counter += 1
        for action in current_state.not_taken:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((cost_function(new_state.taken), new_state))
        current_state = frontier.get()[1]

    print(f"Solved in {counter} steps")
    return current_state

In [11]:
frontier = PriorityQueue()
frontier.put(State(set(), set(range(NUM_SETS))))
dijkstra_solution_state = search_dijkstra(frontier)

Solved in 169 steps


In [12]:
dijkstra_solution_state

State(taken={8, 9, 1}, not_taken={0, 2, 3, 4, 5, 6, 7})

Gready Search

In [13]:
def distance(state):
    uncovered = PROBLEM_SIZE - sum(
        reduce(
            np.logical_or, 
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        )
    )
    return uncovered

def greedy_best_searh(frontier):
    current_state = frontier.get()

    steps = 0
    while not goal_check(current_state):
        steps += 1
        for action in current_state.not_taken:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            frontier.put((distance(new_state), new_state))
        _, current_state = frontier.get()
    print(f"Solved in {steps} steps and with {len(current_state.taken)} tiles")
    return current_state

In [14]:
frontier = PriorityQueue()
frontier.put(State(set(), set(range(NUM_SETS))))

greedy_solution_search = greedy_best_searh(frontier)
print([SETS[i] for i in greedy_solution_search.taken])

Solved in 2 steps and with 2 tiles
[array([ True, False,  True,  True,  True]), array([ True,  True, False,  True, False])]


A star

In [15]:
def is_special(s):
    return sum(s) / PROBLEM_SIZE >= 0.6

def g_cost(state):
    return len(state.taken)

# questa funzione non è optimistic ==> non è ammissibile ==> non è garantita l'ottimalità di A*

# def distance(state):
#     uncovered = PROBLEM_SIZE - sum(
#         reduce(
#             np.logical_or, 
#             [SETS[i] for i in state.taken],
#             np.array([False for _ in range(PROBLEM_SIZE)]),
#         )
#     )
#     return uncovered

def h(state):
    uncovered = PROBLEM_SIZE - sum(
        reduce(
            np.logical_or, 
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        )
    )
    largest_set_size = max(sum(s) for s in SETS)
    return ceil(uncovered/largest_set_size)

# A* approach
def astar_search():
    frontier = PriorityQueue()
    start_state = State(set(), set(range(NUM_SETS)))
    frontier.put((0 + distance(start_state), start_state))
    
    counter = 0
    _, current_state = frontier.get()
    while not goal_check(current_state):
        sorted_actions = sorted(list(current_state.not_taken), key=lambda x: -sum(SETS[x]))
        for action in sorted_actions:
            new_taken = set(current_state.taken ^ {action})
            new_not_taken = set(current_state.not_taken ^ {action})
            new_state = State(new_taken, new_not_taken)
            frontier.put((g_cost(new_state) + distance(new_state), new_state))
        counter += 1
        _, current_state = frontier.get()
    return counter, current_state
counter, current_state = astar_search()
print(f"Solved using A* in {counter:,} steps with {len(current_state.taken)} sets")

Solved using A* in 2 steps with 2 sets
