In [1]:
import random
import logging
import sys
import numpy as np
from typing import Callable
import heapq
from collections import Counter
logging.basicConfig(format="%(message)s", level=logging.INFO)

In [2]:
N=5
MIN_NUMBER = sys.float_info.min

In [3]:
# Copyright © 2022 Giovanni Squillero <squillero@polito.it>
# https://github.com/squillero/computational-intelligence
# Free for personal or classroom use; see 'LICENSE.md' for details.


class PriorityQueue:
    """A basic Priority Queue with simple performance optimizations"""

    def __init__(self):
        self._data_heap = list()
        self._data_set = set()

    def __bool__(self):
        return bool(self._data_set)

    def __contains__(self, item):
        return item in self._data_set

    def push(self, item, p=None):
        assert item not in self, f"Duplicated element"
        if p is None:
            p = len(self._data_set)
        self._data_set.add(item)
        heapq.heappush(self._data_heap, (p, item))

    def pop(self):
        p, item = heapq.heappop(self._data_heap)
        self._data_set.remove(item)
        return item

In [4]:
def problem(N, seed=42):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [5]:
GOAL={i for i in range(N)}
print(GOAL)
list_of_lists=problem(N)
print(list_of_lists)
N_of_lists=len(list_of_lists)
print(len(list_of_lists))

{0, 1, 2, 3, 4}
[[0], [1], [0], [4], [0], [1], [4], [4], [4], [1, 3], [0, 1], [2], [1], [0], [0, 2], [2, 4], [3], [3], [4], [2, 4], [0], [1], [0, 1], [3], [2, 3]]
25


In [6]:
def concat_list_of_lists(state):
    c_list = []
    for n in state:
        c_list += list_of_lists[n]
    return c_list

def print_lists(state):
    for n in state:
        print("List number: ",n, " with elements ", list_of_lists[n])

In [7]:
def goal_test(state):
    if set(concat_list_of_lists(state)) == GOAL:
        return True
    return False

In [8]:
def result(state):
    new_states=[]
    for n in range(state[len(state)-1]+1, len(list_of_lists)):
        N=(n,)
        new_state=state+N
        new_states.append(new_state)
    return new_states

In [9]:
def calculate_cost(state):
    c_list=concat_list_of_lists(state)
    repetitions=len(c_list)-len(set(c_list))
    if repetitions==0:
        repetitions=MIN_NUMBER
    normalized_cost=repetitions/len(c_list)
    return normalized_cost

In [10]:
def search(
    goal_test: Callable,
    state_cost: dict,
    priority_function: Callable,
    calculate_cost: Callable,
):  
    #initialize collections
    frontier = PriorityQueue()
    state_cost.clear()
    #keeping track of the iterations/nodes
    n=1
    N_nodes=len(list_of_lists)
    #INITIAL_STATE
    for s in range(0,len(list_of_lists)):
        S=(s,)
        state_cost[hash(S)] = calculate_cost(S)
        #print(S)
        #print(state_cost[hash(S)])
        frontier.push(S,p=priority_function(S))
    state=frontier.pop()
    #print(state)
    #print(state_cost[hash(state)])
        
    #BEGIN OF THE LOOP
    while state is not None and not goal_test(state):
        n+=1
        new_states = result(state)
        for new_state in new_states:
            if hash(new_state) not in state_cost and new_state not in frontier:
                state_cost[hash(new_state)] = calculate_cost(new_state) 
                frontier.push(new_state, p=priority_function(new_state))
                N_nodes+=1
                #logging.debug(f"Added new node to frontier (cost={state_cost[hash(new_state)]})")
        if frontier:
            state = frontier.pop()
        else:
            state = None
    
    #OUTPUT       
    if state is not None:
        c_list=concat_list_of_lists(state)
        repetitions=len(c_list)-len(set(c_list))
        logging.info(f"Found a solution for N={N:,} with {n:,} processed states, with {N_nodes:,} nodes added in the frontier and with {repetitions:,} repetitions.")
        print("The lists are:")
        print_lists(state)
    else:
        logging.info(f"No solution found")

In [11]:
state_cost = dict()

search(
    goal_test=goal_test,
    state_cost=state_cost,
    priority_function=lambda s: state_cost[hash(s)],
    calculate_cost=calculate_cost,
)

Found a solution for N=5 with 3 processed states, with 50 nodes added in the frontier and with 0 repetitions.


The lists are:
List number:  9  with elements  [1, 3]
List number:  14  with elements  [0, 2]
List number:  18  with elements  [4]
