# CI - lab1 

In [None]:
import random
import logging
from random import seed, choice
from typing import Callable
import numpy as np
from gx_utils import *

logging.basicConfig(format="%(message)s", level=logging.INFO)

In [None]:
def problem(N, seed=None):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [None]:
class State:
    def __init__(self, data: np.array):
        self._data = data.copy()
        self._data.flags.writeable = False

    def __hash__(self):
        return hash(bytes(self._data))

    def __eq__(self, other):
        return bytes(self._data) == bytes(other._data)

    def __lt__(self, other):
        return bytes(self._data) < bytes(other._data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def copy_data(self):
        return self._data.copy()

In [None]:
def search(
    initial_state: State,
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    unit_cost: Callable,
    all_list: list
):
    frontier = PriorityQueue()
    parent_state.clear()
    state_cost.clear()

    state = initial_state
    parent_state[state] = None
    state_cost[state] = 0
    nodes = 1

    while state is not None and not goal_test(state):
        for a in all_list:
            if a in state.copy_data():
                continue
            new_state = result(state, a)
            cost = unit_cost(a)
            if new_state not in state_cost and new_state not in frontier:
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost
                frontier.push(new_state, p=priority_function(new_state))
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in frontier and state_cost[new_state] > state_cost[state] + cost:
                old_cost = state_cost[new_state]
                parent_state[new_state] = state
                state_cost[new_state] = state_cost[state] + cost
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:
            state = frontier.pop()
            nodes += 1
        else:
            state = None

    path = list()
    s = state
 
    while s:
        path.append(s.copy_data())
        s = parent_state[s]


    logging.info(f"Found a solution in {len(path):,} steps; visited {len(state_cost):,} states; w = {sum(len(_) for _ in path[0])}; nodes processed = {nodes}")
    return list(reversed(path))

In [None]:
def goal_test(state):
    #check if the goal is reached: if the two set are equal (have the same numbers) then it's ok!
    x = number_set(state)
    return x == set(GOAL.copy_data().tolist())

def result(state, tuple):
    data = state.copy_data()
    a = data.tolist()
    a.append(tuple)
    s = State(np.asarray(a))
    return s

def number_set(x):
    #compute the set of number in the sequence.
    y=set(range(0))
    for _ in x._data.tolist():
        if type(_) == list :
            y = y.union(_)
        else:
            y = y.add(_)

    return y

In [None]:
N= 1000
GOAL = State(np.array(range(N)))
all_list = sorted(problem(N, seed=42), key= lambda s: len(s))
INITIAL_STATE = State(np.asarray([all_list[0]]))

## A* + cost = len(a)

In [None]:
parent_state = dict()
state_cost = dict()


def h(state):
    return len(range(N)) - len(number_set(state))


final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: state_cost[s] + h(s),
    unit_cost=lambda a: len(a),
    all_list=all_list,
)

print(final[-1])

## A* + cost = 1

In [None]:
parent_state = dict()
state_cost = dict()


def h(state):
    return len(range(N)) - len(number_set(state))


final = search(
    INITIAL_STATE,
    goal_test=goal_test,
    parent_state=parent_state,
    state_cost=state_cost,
    priority_function=lambda s: state_cost[s] + h(s),
    unit_cost=lambda a: 1,
    all_list=all_list,
)

print(final[-1])