In [964]:
from random import random
from functools import reduce
from collections import namedtuple
from queue import PriorityQueue, SimpleQueue, LifoQueue
from math import ceil

import numpy as np

In [965]:
PROBLEM_SIZE = 20
NUM_SETS = 30
SETS = tuple(
    np.array([random() < 0.3 for _ in range(PROBLEM_SIZE)])
    for _ in range(NUM_SETS)
)
State = namedtuple('State', ['taken', 'not_taken'])

In [966]:
def goal_check(state):
    return np.all(reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    ))

def cost(state):
    return len(state.taken)

def distance(state, action):

    taken_sets=list(state.taken)
    print("taken sets", taken_sets)
    #print("action",action)
    currently_covered=reduce(
        np.logical_or,
        [SETS[i] for i in taken_sets],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )
    #print("currently_covered",currently_covered)
 
    if action:
        newly_covered=SETS[action]
        #print("newly_covered", newly_covered)
        difference=0
        for i in range(len(newly_covered)):
            if (newly_covered[i] == currently_covered[i] ):
                difference+=1
        #print("difference",difference)
        overlap=difference
        taken_sets.append(action)
        

    else:
        overlap=0
        
    distance=PROBLEM_SIZE - sum(
            reduce(
                np.logical_or,
                [SETS[i] for i in taken_sets],
                np.array([False for _ in range(PROBLEM_SIZE)]),
            ))
    #print("overlap",overlap)
    print("distance",distance)
    return overlap*distance

def h(state):
    
    already_covered = reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )
   
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)

    new_metric=max(sum(np.logical_not(np.logical_xor(SETS[s], already_covered))) for s in state.not_taken)
    
    return optimistic_estimate*0.9+0.1*new_metric
    
def old_h(state):
    
    already_covered = reduce(
        np.logical_or,
        [SETS[i] for i in state.taken],
        np.array([False for _ in range(PROBLEM_SIZE)]),
    )
   
    if np.all(already_covered):
        return 0
    largest_set_size = max(sum(np.logical_and(s, np.logical_not(already_covered))) for s in SETS)
    missing_size = PROBLEM_SIZE - sum(already_covered)
    optimistic_estimate = ceil(missing_size / largest_set_size)

    new_metric=max(sum(np.logical_not(np.logical_xor(SETS[s], already_covered))) for s in state.not_taken)
    

    return optimistic_estimate
def old_f(state):
    return old_h(state)+cost(state)
def f(state):
    return h(state)+cost(state)


In [967]:
assert goal_check(
    State(set(range(NUM_SETS)), set())
), "Problem not solvable"

In [968]:
def old_astar():
    frontier = PriorityQueue()
    state = State(set(), set(range(NUM_SETS)))
    frontier.put((f(state), state))

    counter = 0
    _, current_state = frontier.get()
    #print("initial current state", current_state)
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            #print("new state", new_state)
            frontier.put((old_f(new_state), new_state))
    
        _, current_state = frontier.get()
        #print("current state",current_state)
    print("Old solution", current_state.taken)
    print(
        f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
    )
def new_astar():
    frontier = PriorityQueue()
    state = State(set(), set(range(NUM_SETS)))
    frontier.put((f(state), state))

    counter = 0
    _, current_state = frontier.get()
    #print("initial current state", current_state)
    while not goal_check(current_state):
        counter += 1
        for action in current_state[1]:
            new_state = State(
                current_state.taken ^ {action},
                current_state.not_taken ^ {action},
            )
            #print("new state", new_state)
            frontier.put((f(new_state), new_state))
    
        _, current_state = frontier.get()
        #print("current state",current_state)
    print("New solution", current_state.taken)
    print(
        f"Solved in {counter:,} steps ({len(current_state.taken)} tiles)"
    )


In [969]:
old_astar()
new_astar()

Old solution {19, 29, 15}
Solved in 28 steps (3 tiles)
New solution {19, 29, 15}
Solved in 5 steps (3 tiles)


In [970]:
for i in range(len(SETS)):
    print(i)
    print(SETS[i])

0
[False  True False False False False False False  True False False  True
 False False  True  True False False False False]
1
[False False  True False  True False  True False False  True False False
  True False  True False False False False False]
2
[False False False False  True  True  True  True False False  True  True
  True False False False False False False False]
3
[False False False False  True False False  True False False False False
 False False  True False False False False False]
4
[False False  True False False False False  True False False  True False
  True  True False False  True False False False]
5
[False False  True  True False False  True False  True False  True  True
 False False False False  True False False False]
6
[ True False False False False  True False False  True False False False
 False False  True  True False False False False]
7
[False  True False False  True False False False False False False  True
 False  True False False False False False False]
