Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


In [1]:
# Lab 1: Set Covering

First lab + peer review. List this activity in your final report, it will be part of your exam.

## Task

Given a number $N$ and some lists of integers $P = (L_0, L_1, L_2, ..., L_n)$, 
determine, if possible, $S = (L_{s_0}, L_{s_1}, L_{s_2}, ..., L_{s_n})$
such that each number between $0$ and $N-1$ appears in at least one list

$$\forall n \in [0, N-1] \ \exists i : n \in L_{s_i}$$

and that the total numbers of elements in all $L_{s_i}$ is minimum. 

## Instructions

* Create the directory `lab1` inside the course repo (the one you registered with Andrea)
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)
* Use `problem` to generate the problems with different $N$
* In the `README.md`, report the the total numbers of elements in $L_{s_i}$ for problem with $N \in [5, 10, 20, 100, 500, 1000]$ and the total number on $nodes$ visited during the search. Use `seed=42`.
* Use `GitHub Issues` to peer review others' lab

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.

**Deadline**

* Sunday, October 16th 23:59:59 for the working solution
* Sunday, October 23rd 23:59:59 for the peer reviews

SyntaxError: invalid syntax (1857879623.py, line 3)

In [5]:
import random
import numpy as np
from typing import Callable
import logging

In [6]:
import heapq
from collections import Counter


class PriorityQueue:
    """A basic Priority Queue with simple performance optimizations"""

    def __init__(self):
        self._data_heap = list()
        self._data_set = set()

    def __bool__(self):
        return bool(self._data_set)

    def __contains__(self, item):
        return item in self._data_set

    def push(self, item, p=None):
        assert item not in self, f"Duplicated element"
        if p is None:
            p = len(self._data_set)
        self._data_set.add(item)
        heapq.heappush(self._data_heap, (p, item))

    def pop(self):
        p, item = heapq.heappop(self._data_heap)
        self._data_set.remove(item)
        return item


class Multiset:
    """Multiset"""

    def __init__(self, init=None):
        self._data = Counter()
        if init:
            for item in init:
                self.add(item)

    def __contains__(self, item):
        return item in self._data and self._data[item] > 0

    def __getitem__(self, item):
        return self.count(item)

    def __iter__(self):
        return (i for i in sorted(self._data.keys()) for _ in range(self._data[i]))

    def __len__(self):
        return sum(self._data.values())

    def __copy__(self):
        t = Multiset()
        t._data = self._data.copy()
        return t

    def __str__(self):
        return f"M{{{', '.join(repr(i) for i in self)}}}"

    def __repr__(self):
        return str(self)

    def __or__(self, other: "Multiset"):
        tmp = Multiset()
        for i in set(self._data.keys()) | set(other._data.keys()):
            tmp.add(i, cnt=max(self[i], other[i]))
        return tmp

    def __and__(self, other: "Multiset"):
        return self.intersection(other)

    def __add__(self, other: "Multiset"):
        return self.union(other)

    def __sub__(self, other: "Multiset"):
        tmp = Multiset(self)
        for i, n in other._data.items():
            tmp.remove(i, cnt=n)
        return tmp

    def __eq__(self, other: "Multiset"):
        return list(self) == list(other)

    def __le__(self, other: "Multiset"):
        for i, n in self._data.items():
            if other.count(i) < n:
                return False
        return True

    def __lt__(self, other: "Multiset"):
        return self <= other and not self == other

    def __ge__(self, other: "Multiset"):
        return other <= self

    def __gt__(self, other: "Multiset"):
        return other < self

    def add(self, item, *, cnt=1):
        assert cnt >= 0, "Can't add a negative number of elements"
        if cnt > 0:
            self._data[item] += cnt

    def remove(self, item, *, cnt=1):
        assert item in self, f"Item not in collection"
        self._data[item] -= cnt
        if self._data[item] <= 0:
            del self._data[item]

    def count(self, item):
        return self._data[item] if item in self._data else 0

    def union(self, other: "Multiset"):
        t = Multiset(self)
        for i in other._data.keys():
            t.add(i, cnt=other[i])
        return t

    def intersection(self, other: "Multiset"):
        t = Multiset()
        for i in self._data.keys():
            t.add(i, cnt=min(self[i], other[i]))
        return t


In [7]:
def problem(N, seed=None):
    random.seed(seed)
    return [
        list(set(random.randint(0, N - 1) for n in range(random.randint(N // 5, N // 2))))
        for n in range(random.randint(N, N * 5))
    ]

In [8]:

def greedy(N):
    goal = set(range(N))
    covered = set()
    solution = list()
    all_lists = sorted(problem(N, seed=42), key=lambda l: len(l))
    while goal != covered:
        x = all_lists.pop(0)
        if not set(x) < covered:
            solution.append(x)
            covered |= set(x)

    logging.info(
        f"Greedy solution for N={N}: w={sum(len(_) for _ in solution)} (bloat={(sum(len(_) for _ in solution)-N)/N*100:.0f}%)"
    )
    logging.debug(f"{solution}")

In [9]:
logging.getLogger().setLevel(logging.INFO)
for N in [5, 10, 20, 100, 500, 1000]:
    greedy(N)

INFO:root:Greedy solution for N=5: w=5 (bloat=0%)
INFO:root:Greedy solution for N=10: w=13 (bloat=30%)
INFO:root:Greedy solution for N=20: w=46 (bloat=130%)
INFO:root:Greedy solution for N=100: w=332 (bloat=232%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=1000: w=4652 (bloat=365%)


In [10]:
%timeit greedy(5_00)

INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)
INFO:root:Greedy solution for N=500: w=2162 (bloat=332%)


411 ms ± 49.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
def flatten(data_list: list):
    flat_list = [item for sublist in data_list for item in sublist]
    return flat_list

In [12]:
class State:
    def __init__(self, data: list):
        self._data = data.copy()
        self._set_data = set(flatten(data))

    def __hash__(self):
        data = flatten(self._data)
        data = [element % 256 for element in data]
        return hash(bytes(data))

    def __eq__(self, other):
        data = flatten(self._data)
        data = [element % 256 for element in data]
        other_data = flatten(other._data)
        other_data = [element % 256 for element in other_data]
        return bytes(data) == bytes(other_data)

    def __lt__(self, other):
        data = flatten(self._data)
        data = [element % 256 for element in data]
        other_data = flatten(other._data)
        other_data = [element % 256 for element in other_data]
        return bytes(data) < bytes(other_data)

    def __str__(self):
        return str(self._data)

    def __repr__(self):
        return repr(self._data)

    @property
    def data(self):
        return self._data

    def cost(self):
        return sum([len(_) for _ in self._data])

    def print_lists (self):
        for x in self._data:
            print(x)


    def copy_data(self):
        return self._data.copy()

In [13]:
def result(state, action):
    prev_state_list = state._data.copy()
    new_list = prev_state_list
    new_list.append(action)
    return State(new_list)

In [14]:
def goal_test(state, N):
    return len(state._set_data) == N

In [15]:
def possible_actions(state, space):
    possible_steps = []
    for step in space:
        if step not in state.data:
            possible_steps.append(step)
    return possible_steps

In [23]:
def search(
    goal_test: Callable,
    parent_state: dict,
    state_cost: dict,
    priority_function: Callable,
    cost_function: Callable,
    N: int,
    space,
    name: str
):
    #space = sorted(problem(N, seed=42), key=lambda l: -len(l))
    frontier = PriorityQueue()
    parent_state.clear()
    state_cost.clear()

    initial_state = State([space[0]])
    state = initial_state
    parent_state[state] = None
    state_cost[state] = state.cost()


    while state is not None and not goal_test(state, N):
        for step in possible_actions(state, space):
            new_state = result(state, step)
            cost = cost_function(step)
            if new_state not in state_cost and new_state not in frontier:
                state_cost[new_state] = state_cost[state] + cost
                frontier.push(new_state, p=priority_function(new_state))
                logging.debug(f"Added new node to frontier (cost={state_cost[new_state]})")
            elif new_state in frontier and state_cost[new_state] > state_cost[state] + cost:
                old_cost = state_cost[new_state]
                state_cost[new_state] = state_cost[state] + cost
                logging.debug(f"Updated node cost in frontier: {old_cost} -> {state_cost[new_state]}")
        if frontier:
            state = frontier.pop()
        else:
            state = None
    if state is None:
        print("Not able to find the solution")
        return

    print (f"- N = {N}")
    print(f"    weight = {state.cost()}, bloat = {(state.cost() - N)/N * 100:.1f}")
    print(f"    total visited nodes =  {len(state_cost)}")
    print()

## Breadth-First Search

In [22]:
parent_state = dict()
state_cost = dict()
for N in [5, 10, 20]:
    space = problem(N, seed=42)

    search(
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: len(state_cost),
        cost_function=lambda a: len(a),
        N = N,
        space = space,
        name="Breadth-First Search"
    )

- N = 5
    weight = 5, bloat = 0.0
    total nodes =  213

- N = 10
    weight = 15, bloat = 50.0
    total nodes =  71935



KeyboardInterrupt: 

## Depth-First Search

In [27]:
parent_state = dict()
state_cost = dict()
for N in [5, 10, 20, 100, 1000]:
    space = problem(N, seed=42)

    search(
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: -len(state_cost),
        cost_function=lambda a: len(a),
        N = N,
        space = space,
        name="Depth-First Search"
    )

- N = 5
    weight = 11, bloat = 120.0
    total visited nodes =  40

- N = 10
    weight = 24, bloat = 140.0
    total visited nodes =  267

- N = 20
    weight = 66, bloat = 230.0
    total visited nodes =  331

- N = 100
    weight = 351, bloat = 251.0
    total visited nodes =  4632



KeyboardInterrupt: 

## Gready Best-First Search

In [28]:
parent_state = dict()
state_cost = dict()

def h(state):
    return (N - len(state._set_data))

for N in [5, 10, 20, 100, 1000]:
    space = sorted(problem(N, seed=42), key=lambda l: -len(l))

    final = search(
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: h(s),
        cost_function=lambda a: len(a),
        N = N,
        space = space,
        name="Gready Best-First Search"
    )

- N = 5
    weight = 6, bloat = 20.0
    total visited nodes =  18

- N = 10
    weight = 11, bloat = 10.0
    total visited nodes =  82

- N = 20
    weight = 29, bloat = 45.0
    total visited nodes =  97

- N = 100
    weight = 192, bloat = 92.0
    total visited nodes =  1699



KeyboardInterrupt: 

## A* Search


In [30]:
parent_state = dict()
state_cost = dict()

def h(state):
    return (N - len(state._set_data))

for N in [5, 10, 20]:
    space = problem(N, seed=42)



    search(
        goal_test=goal_test,
        parent_state=parent_state,
        state_cost=state_cost,
        priority_function=lambda s: state_cost[s] + h(s),
        cost_function=lambda a: len(a),
        N = N,
        space = space,
        name="A* Search"
    )

- N = 5
    weight = 5, bloat = 0.0
    total visited nodes =  29

- N = 10
    weight = 10, bloat = 0.0
    total visited nodes =  623

- N = 20
    weight = 23, bloat = 15.0
    total visited nodes =  37941

