In [1]:
import numpy as np
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue

from tqdm import tqdm

import heapq


# Problem Definition:

In [2]:
# constants
PROBLEM_SIZE = 20  # dimension of the finite set U to cover
NUMBER_SET = 40  # number of randomized subsets to chose from
PROBABILITY_TRUE = 0.3  # probability of a subset to contain an element
SETS = tuple(
    np.array([random() < PROBABILITY_TRUE for i in range(PROBLEM_SIZE)]) for j in range(NUMBER_SET)
)  # generate radnomized sets

# Define State as a named tuple
State = namedtuple("State", ["taken", "cost", "heuristic"])


In [3]:
def goal_check(state, sets):
    """
    check if the logical OR all the elements yeald a line of all true ie the
    condition for a state to be covering the whole set U
    """
    return np.all(
        reduce(np.logical_or, [sets[i] for i in state.taken], np.zeros(PROBLEM_SIZE))
    )


# assert generated problem is solvable, ie the goal check of a stete with all
# sets taken is true
assert goal_check(State(range(NUMBER_SET), 0, 0), SETS)


# Cost and Heuristic Functions:

In [4]:
def cost(state):
    """the cost is the number of sets taken by the solution"""
    return len(state.taken)


def cost_number_of_spots(state):
    """The cost is the sum of the number of spots taken by each set in the solution."""
    selected_sets = [SETS[i] for i in state.taken]
    return np.sum([np.sum(s) for s in selected_sets])


In [5]:
def TRIVIAL_heuristic(state, sets):
    return 0


def MRSC_heuristic(state, sets):
    """
    Minimum Remaining Set Coverage

    This heuristic estimates the cost based on how many elements in "U" are still
    uncovered and divides it by the number of subsets not taken. This heuristic
    assumes that the subsets have an equal chance of covering remaining uncovered
    elements.

    h(state) = (number of uncovered elements in U) / (number of subsets not taken)
    """

    uncovered = reduce(
        np.logical_or, [sets[i] for i in state.taken], np.zeros(len(sets[0]))
    )

    not_taken_subsets = NUMBER_SET - len(state.taken)

    return -np.sum(uncovered) / not_taken_subsets


def MSC_heuristic(state, sets):
    """
    Maximum Subset Coverage

    This heuristic estimates the cost by assuming that each additional subset chosen
    will cover as many uncovered elements as possible. It divides the number of
    uncovered elements in "U" by the number of subsets already taken.

    h(state) = (number of uncovered elements in U) / (number of subsets already taken)
    """

    uncovered = reduce(
        np.logical_or, [sets[i] for i in state.taken], np.zeros(len(sets[0]))
    )

    return (-np.sum(uncovered) / len(state.taken)) if len(state.taken) > 0 else 0


def MRSC_MSC_heuristic(state, sets):
    return (MRSC_heuristic(state, sets) + MSC_heuristic(state, sets)) / 2


def ASC_heuristic(state, sets):
    """
    Average Subset Coverage

    This heuristic estimates the cost based on the average size of the remaining
    subsets and assumes that each chosen subset will, on average, cover this many
    elements.

    h(state) = (number of uncovered elements in U) / (average size of remaining subsets)
    """

    uncovered = reduce(
        np.logical_or, [sets[i] for i in state.taken], np.zeros(len(sets[0]))
    )

    remaining_sets = [sets[i] for i in range(NUMBER_SET) if i not in state.taken]

    average_size = np.sum([np.sum(s) for s in remaining_sets]) / len(remaining_sets)

    return -np.sum(uncovered) / average_size


def RANDOM_heuristic(state, sets):
    """
    !! not admissible but funny !!
    """
    return random()


def DENSITY_heuristic(state, sets):
    """
    Density Heuristic

    This heuristic estimates the cost based on the density of uncovered elements in
    U. It assumes that the subsets have an equal chance of covering remaining
    uncovered elements.

    h(state) = (density of uncovered elements in U) * (number of subsets)
    """

    uncovered = reduce(
        np.logical_or, [sets[i] for i in state.taken], np.zeros(len(sets[0]))
    )

    # Calculate the density of uncovered elements in U
    uncovered_density = np.sum(uncovered) / len(uncovered)

    # Estimate the remaining cost based on the uncovered density
    return -uncovered_density * NUMBER_SET


# A*

In [6]:
def astar(sets, heuristic,cost_f):
    # Initialize the priority queue with the initial state
    initial_state = State(
        taken=[],
        cost=0,
        heuristic=0,
    )

    memoization = {}

    open_set = []
    heapq.heappush(
        open_set, (initial_state.cost + initial_state.heuristic, initial_state)
    )

    # Initialize the closed set as an empty set
    closed_set = set()

    progress_bar = tqdm(total=None)

    while not len(open_set) == 0:
        # Get the state with the lowest f score from the priority queue
        _, current_state = heapq.heappop(open_set)

        progress_bar.update(1)

        # If the current state is a goal state, return the solution
        if goal_check(current_state, sets):
            progress_bar.close()
            return current_state

        # Add the current state to the closed set
        closed_set.add(tuple(current_state.taken))

        # Generate successor states by adding one more subset
        for subset in range(NUMBER_SET):
            if subset not in current_state.taken:
                # Create a new state by adding the subset
                new_taken = current_state.taken + [subset]

                # Check if the heuristic value is already in the memoization dictionary
                if tuple(new_taken) in memoization:
                    new_heuristic = memoization[tuple(new_taken)]
                else:
                    # Calculate the heuristic value
                    new_heuristic = heuristic(State(new_taken, 0, 0), sets)
                    # Store it in the memoization dictionary
                    memoization[tuple(new_taken)] = new_heuristic

                new_cost = cost_f(State(new_taken, 0, 0))
                new_state = State(new_taken, new_cost, new_heuristic)

                # If the state is not in the closed set, add it to the open set
                if tuple(new_taken) not in closed_set:
                    heapq.heappush(
                        open_set, (new_state.cost + new_state.heuristic, new_state)
                    )

    # If the open set is empty and no solution is found, return None
    progress_bar.close()
    return None


# Results:

In [7]:
heuristics = [
    TRIVIAL_heuristic,
    MRSC_heuristic,
    MSC_heuristic,
    MRSC_MSC_heuristic,
    ASC_heuristic,
    DENSITY_heuristic,
    RANDOM_heuristic,
]

print("Problem size:", PROBLEM_SIZE, "Number of sets:", NUMBER_SET, "\n")

for h in heuristics:
    print(h.__name__)
    solution = astar(SETS, h, cost)
    # solution = astar(SETS,h,cost_number_of_spots) # slow af
    print(" Solution:", solution.taken)
    print(" Solution cost:", solution.cost)
    print(" Solution check:", goal_check(solution, SETS))
    print()


Problem size: 20 Number of sets: 40 

TRIVIAL_heuristic


476it [00:00, 2138.69it/s]

3613it [00:01, 2099.28it/s]


 Solution: [1, 14, 37]
 Solution cost: 3
 Solution check: True

MRSC_heuristic


1602it [00:01, 1069.61it/s]


 Solution: [1, 14, 37]
 Solution cost: 3
 Solution check: True

MSC_heuristic


328it [00:00, 1024.68it/s]


 Solution: [1, 14, 37]
 Solution cost: 3
 Solution check: True

MRSC_MSC_heuristic


902it [00:01, 797.65it/s]


 Solution: [1, 14, 37]
 Solution cost: 3
 Solution check: True

ASC_heuristic


18it [00:00, 117.11it/s]


 Solution: [14, 1, 37]
 Solution cost: 3
 Solution check: True

DENSITY_heuristic


5it [00:00, 706.61it/s]


 Solution: [12, 14, 1, 0]
 Solution cost: 4
 Solution check: True

RANDOM_heuristic


5169it [00:02, 2530.52it/s]

 Solution: [14, 33, 37]
 Solution cost: 3
 Solution check: True




