# A* algorithm (set-covering problem)

Copyright **`(c)`** 2023 Thomas Baracco `<s308722@studenti.polito.it>`  
[`https://github.com/baraccothomas/computational-intelligence`](https://github.com/baraccothomas/computational-intelligence)  
Free for personal or classroom use.

## Imports and constants

In [198]:
import numpy as np

from random import random
from collections import namedtuple
from functools import reduce
from queue import PriorityQueue, SimpleQueue, LifoQueue
from math import ceil


In [199]:
PROBLEM_SIZE = 30
NUM_SETS = 30
SETS = tuple(
    np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)

def covered(state):
    return reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )

State = namedtuple('State', ['taken', 'not_taken'])

## Node class

In [200]:
class Node:
    def __init__(self, state, g, h):
        self.state = state
        self.g = g
        self.h = h
    
    def f(self) -> np.int64:
        return self.g + self.h

    def __str__(self):
        return f"State: {self.state} | g = {self.g} | h = {self.h} | f = {self.f()}"
    
    def __eq__(self, other):
        return (self.state == other.state) and (self.g == other.g)

    def __ne__(self, other):
        return not (self == other)

    def __lt__(self, other):
        return (self.state < other.state) and (self.g < other.g)

    def __gt__(self, other):
        return (self.state > other.state) and (self.g > other.g)

    def __le__(self, other):
        return (self < other) or (self == other)

    def __ge__(self, other):
        return (self > other) or (self == other)

## Goal check

In [201]:
def goal_check(state):
    return np.all(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for i in range(PROBLEM_SIZE)]),
        )
    )

## h function

The ```h``` function basically calculates the distance from the optimal state. Since the optimal state is to have all the sets covered, the distance represents the remaining set elements that need to be covered.

In [202]:
def h(state):
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        )
    )
    

## Main code

### Goal check assert

In [203]:
assert goal_check(State(set(range(NUM_SETS)), set())), "Problem not solvable"

### Depth First

In [204]:
frontier = SimpleQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put(state)

counter = 0
current_state = frontier.get()

while not goal_check(current_state):
    counter += 1
    for action in current_state[1]:
        new_state = State(
            current_state.taken ^ {action},
            current_state.not_taken ^ {action},
        )
        frontier.put(new_state)
    current_state = frontier.get()

print(f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)")

Solved in 32,681 steps (4 tiles)


### A star

In this algorithm we use a ```PriorityQueue``` to sort the open list of nodes by ```f```, in order to make the ```get()``` method return the node (state) with the lowest ```f```. Since this is a queue, the node (state) will be always removed from the open list. It's like it's been moved to a "closed" list without the need to instantiate it.

Every time a new node is discovered, it's added to the queue, with ```g = current_node.g + 1```, which indicates the number of sets taken in a given state, and ```h = current_node.h + h(new_state)```. The ```f``` is calculated automatically and can be called with the method ```.f()```

In [205]:
frontier = PriorityQueue()
initialState = State(set(), set(range(NUM_SETS)))
initialNode = Node(initialState, 0, h(initialState))
frontier.put((initialNode.f(), initialNode))

counter = 0
_, current_node = frontier.get()

while not goal_check(current_node.state):
    counter += 1
    for action in current_node.state[1]:
        new_state = State(
            current_node.state.taken ^ {action},
            current_node.state.not_taken ^ {action},
        )
        new_node = Node(new_state, current_node.g + 1, current_node.h + h(new_state))
        frontier.put((new_node.f(), new_node))
        print(new_node)
    
    
    
    _, current_node = frontier.get()

print(current_node.state)

print(
    f"Solved in {counter:,} steps ({len(current_node.state.taken)} tiles)"
)

State: State(taken={0}, not_taken={1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h = 49 | f = 50
State: State(taken={1}, not_taken={0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h = 51 | f = 52
State: State(taken={2}, not_taken={0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h = 51 | f = 52
State: State(taken={3}, not_taken={0, 1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h = 49 | f = 50
State: State(taken={4}, not_taken={0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h = 49 | f = 50
State: State(taken={5}, not_taken={0, 1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29}) | g = 1 | h 