Made by 
    Luca Sturaro (Polito s320062)
In collaboration with 
    Gabriele Tomatis (Polito, s313848)

In [15]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue

import numpy as np

In [16]:
PROBLEM_SIZE = 1000
NUM_SETS = 1000
PROBABILITY = 0.3
SETS = tuple(np.array([random() < PROBABILITY for _ in range(PROBLEM_SIZE)]) for _ in range(NUM_SETS))
State = namedtuple('State', ['taken', 'not_taken'])

In [17]:
if PROBLEM_SIZE <= 100: # if the problem size is more than 100, visualization gets messy
    print("Sets:")
    for num, element in enumerate(SETS):
        print(f'{num}:\t', end='')
        for value in element:
            if value:
                print('1 ', end='')
            else:
                print('0 ', end='')
        print()

max_solution = [0] * PROBLEM_SIZE
for i in range(PROBLEM_SIZE):
    for element in SETS:
        if element[i]:
            max_solution[i] += element[i]

if 0 in max_solution:
    print('Problem not solvable')
    raise Exception('Problem is not solvable')
else:
    print('Problem solvable')
# print(max_solution)

Problem solvable


In [18]:
def goal_check(state):
    return np.all(reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    ))


# no longer used
def distance(state):
    return PROBLEM_SIZE - sum(
        reduce(
            np.logical_or,
            [SETS[i] for i in state.taken],
            np.array([False for _ in range(PROBLEM_SIZE)]),
        ))

def weight(state):
    # g is the cost of getting to the current node
    g = len(state.taken)
    # h is the heuristic cost of getting to the goal from the current node
    progress = [False] * PROBLEM_SIZE
    for set in state.taken:
        list_set = SETS[set].tolist()
        for i in range(PROBLEM_SIZE):
            progress[i] = progress[i] or list_set[i]
    # h = PROBLEM_SIZE - sum(progress)                      # basic pessimistic heuristic function (works best)
    # h = h_optimistic_1(PROBLEM_SIZE - sum(progress))      # first optimistic heuristic function
    h = h_optimistic_2(PROBLEM_SIZE - sum(progress))        # second optimistic heuristic function
    return g + h


# very computationally heavy, not sure why
def h_optimistic_1(n_missing_elements):
    optimistic_cost_reduction = 0
    # cost is calculated as the sum of the number of sets that can be taken to complete the problem, weighted by the likelyhood that taking that mnany would be enough
    # the probability is not the actual mathematical probability because I don't know hot to implement that function in a generalized manner (mathematically speaking)
    for i in range(n_missing_elements):
        optimistic_cost_reduction += i * (PROBABILITY ** (n_missing_elements - i))

    # failsafe, if the cost comes up too high, we cap it at the max (else A* breaks down)
    if optimistic_cost_reduction >= n_missing_elements:
        optimistic_cost_reduction = n_missing_elements
    return optimistic_cost_reduction


def h_optimistic_2(n_missing_elements):
    # cost is calculated as the maximum cost (the number of sets) minus the sum of the number of sets we could save (meaning that each set covers more than one point) 
    # weighted by the chanche that we could actually save that many
    # the probability is not the actual mathematical probability because I don't know hot to implement that function in a generalized manner (mathematically speaking)
    optimistic_cost_reduction = n_missing_elements
    for i in range(n_missing_elements):
        optimistic_cost_reduction -= i * (PROBABILITY ** i)
    
    return optimistic_cost_reduction

In [19]:
frontier = PriorityQueue()
state = State(set(), set(range(NUM_SETS)))
frontier.put((0, state))

counter = 0
_, current_state = frontier.get()
while not goal_check(current_state):
    counter += 1
    for action in current_state[1]:
        new_state = State(
            current_state.taken ^ {action},
            current_state.not_taken ^ {action},
        )
        frontier.put((weight(new_state), new_state))
    _, current_state = frontier.get()

print(f'Solved in {counter:,} steps ({len(current_state.taken)} tiles)')
print(f'Solution:     Taken: {current_state.taken}')

# observation: 
# with a 30% chance to have each position as true in each element, most set coverings tested here with a fixed number of sets of 1000
# are gonna be completed with 7 to 11 tiles, as that corresponds to a chanche to have each element covered of 91.7% to 98%
# 
# the takeaway is that the probability increase given by the number of tiles taken is far higher than the probability decrease given by the size of each tile

Solved in 10 steps (10 tiles)
Solution:     Taken: {1, 452, 393, 442, 338, 276, 26, 443, 28, 959}


In [20]:
if PROBLEM_SIZE <= 100: # if the problem size is more than 100, visualization gets messy
    print('The following tyles were taken: ')
    for tile in current_state.taken:
        print(f'{tile}:\t', end='')
        for value in SETS[tile]:
            if value:
                print('1 ', end='')
            else:
                print('0 ', end='')
        print()

In [21]:
goal_check(current_state)

True