In [1]:
import random

In [2]:
def problem(N, seed=None):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [3]:
print(problem(5,42))

[[0], [1], [0], [4], [0], [1], [4], [4], [4], [1, 3], [0, 1], [2], [1], [0], [0, 2], [2, 4], [3], [3], [4], [2, 4], [0], [1], [0, 1], [3], [2, 3]]


In [4]:
import numpy as np

In [5]:
from gx_utils import *

In [6]:
import logging


def greedy(N):
    goal = set(range(N))
    covered = set()
    solution = list()
    all_lists = sorted(problem(N, seed=42), key=lambda l: len(l))
    while goal != covered:
        x = all_lists.pop(0)
        if not set(x) < covered:
            solution.append(x)
            covered |= set(x)

    logging.info(
        f"Greedy solution for N={N}: w={sum(len(_) for _ in solution)} (bloat={(sum(len(_) for _ in solution)-N)/N*100:.0f}%)"
    )
    logging.debug(f"{solution}")

In [7]:
logging.getLogger().setLevel(logging.INFO)
for N in [5, 10, 20, 100, 500, 1000]:
    greedy(N)

INFO:root:Greedy solution for N=5: w=5 (bloat=0%)
INFO:root:Greedy solution for N=10: w=13 (bloat=30%)
INFO:root:Greedy solution for N=20: w=46 (bloat=130%)
INFO:root:Greedy solution for N=100: w=332 (bloat=232%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=1000: w=4652 (bloat=365%)


In [8]:
import logging
from random import seed, choice
from typing import Callable

logging.basicConfig(format="%(message)s", level=logging.INFO)

In [9]:
class State:
    def __init__(self, data: np.ndarray):
        self._data = data.copy()
        self._data.flags.writeable = False

    def __hash__(self):
        return hash(bytes(self._data))

    def __eq__(self, other):
        return bytes(self._data) == bytes(other._data)

    def __lt__(self, other):
        return bytes(self._data) < bytes(other._data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return self._data.copy()

In [20]:
#return a new_state if elem contains element different from state
def result(state, elem):
    elem_set = set(elem)
    state_set = set()
    for e in state.copy_data():
            [state_set.add(i) for i in e]
    intersection = list(elem_set - state_set)
    if intersection == []: return False

    new_state = []
    [new_state.append(e) for e in state.copy_data().tolist()]
    new_state.append(elem)
    #print(f'append element {elem}')
    new_state = sorted(new_state, key=lambda l: len(l))
    
    return State(np.array(new_state))

def goal_test(state):
    state_set = set()
    test = state.copy_data().tolist()
    for e in test:
        [state_set.add(i) for i in e]
    return list(state_set) == GOAL


In [38]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    unit_cost: Callable,
    N: int

):  
    frontier = PriorityQueue()
    parent_state.clear()
    state_cost.clear()

    state = initial_state
    parent_state[state] = None
    state_cost[state] = 0
    state_repeat = 0
    
    GOAL = list(set(range(N)))
    #logging.info(f"Goal:\n{GOAL}")
    visited_nodes = 0
    for elem in input_state:
        while state is not None and not goal_test(state):
            #print(f'n.state {state_repeat}, state: {state}, elem: {elem}')
            #index = input_state.index(elem)
            new_state = result(state, elem)
            visited_nodes += 1
            # check if new node contains element different from ones in the state
            if not new_state or elem in state.copy_data().tolist():
                break
            else:
                cost = unit_cost(elem)
                if new_state not in state_cost and new_state not in frontier:
                    parent_state[new_state] = state
                    state_cost[new_state] = state_cost[state] + cost
                    frontier.push(new_state, p=priority_function(new_state))
                    logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
                elif new_state in frontier and state_cost[new_state] > state_cost[state] + cost:
                    old_cost = state_cost[new_state]
                    parent_state[new_state] = state
                    state_cost[new_state] = state_cost[state] + cost
                    logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        
            if frontier:
                state = frontier.pop()
            else:
                state = None
        
    path = list()
    s = state
    while s:
        path.append(s.copy_data())
        s = parent_state[s]

    #logging.info(f"N = {N} | Found a solution in {len(path):,} steps; visited {len(state_cost):,} states, w = {sum([len(i) for i in state.copy_data().tolist()])}")
    logging.info(f"N = {N} | Found a solution in {visited_nodes} steps; visited {len(state_cost):,} states, w = {sum([len(i) for i in state.copy_data().tolist()])}")
    #print(len(input_state))
    #logging.info(f'statee: {state}')
    return list(reversed(path))

# Breadth-first

In [40]:
parent_state = dict()
state_cost = dict()

logging.getLogger().setLevel(logging.INFO)

for n in [5,10,20,50,100,500,1000]:
    input_state = sorted(problem(n, seed=42), key=lambda l: len(l))
    initial_state = State(np.array([input_state[0]]))
    search(
        initial_state=initial_state,
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: len(state_cost),
        unit_cost=lambda a: len(a),
        N=n
    )

INFO:root:N = 5 | Found a solution in 29 steps; visited 5 states, w = 5
  return State(np.array(new_state))
INFO:root:N = 10 | Found a solution in 56 steps; visited 7 states, w = 13
INFO:root:N = 20 | Found a solution in 45 steps; visited 12 states, w = 46
INFO:root:N = 50 | Found a solution in 227 steps; visited 15 states, w = 137
INFO:root:N = 100 | Found a solution in 40 steps; visited 19 states, w = 332
INFO:root:N = 500 | Found a solution in 1832 steps; visited 24 states, w = 2162
INFO:root:N = 1000 | Found a solution in 3644 steps; visited 26 states, w = 4652


# A*

In [42]:
parent_state = dict()
state_cost = dict()


def h(state):
    return np.sum((state._data != GOAL._data) & (state._data > 0))

for n in [5,10,20,50,100,500,1000]:
    input_state = sorted(problem(n, seed=42), key=lambda l: len(l))
    initial_state = State(np.array([input_state[0]]))
    search(
        initial_state=initial_state,
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: state_cost[s] + h(s),
        unit_cost=lambda a: len(a),
        N=n
    )

AttributeError: 'list' object has no attribute '_data'