# State discovering algorithm

## Bucket state search problem (breadthwise and deep)


In [1]:
import numpy as np
import queue
import heapq
import logging

from collections import defaultdict
from dataclasses import dataclass

l = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


In [2]:
# Pour the bucket to 0, raise error if action is forbidden
def rule_empty(state: np.array, index):
    tmp = state.copy()
    if tmp[index] == 0:
        raise RuntimeError('is empty')
    tmp[index] = 0

    l.debug(f'empty: {state}[{index}] -> {tmp}[{index}]')

    return 10, tmp

# Fill the bucket to max, raise error if action is forbidden


def rule_full(state: np.array, index, vls):
    tmp = state.copy()
    if tmp[index] == vls[index]:
        raise RuntimeError('is full')
    tmp[index] = vls[index]

    l.debug(f'full: {state}[{index}] -> {tmp}[{index}]')

    return 5, tmp

# Pour from i_from bucket to i_to bucket, raise error if action is forbidden


def rule_transpose(state: np.array, i_from, i_to, vls):
    tmp = state.copy()

    if tmp[i_from] == 0:
        raise RuntimeError('from is empty')

    if tmp[i_to] == vls[i_to]:
        raise RuntimeError('to is full')

    avail_vl = vls[i_to] - tmp[i_to]
    if avail_vl <= tmp[i_from]:
        tmp[i_to] = vls[i_to]
        tmp[i_from] = tmp[i_from] - avail_vl
    else:
        tmp[i_to] += tmp[i_from]
        tmp[i_from] = 0

    l.debug(
        f'transpose: {state} f[{i_from}] t[{i_to}] -> {tmp} f[{i_from}] t[{i_to}]')

    return 0, tmp

# Generate all possible combinations using rules


def expand(state, vls) -> list[tuple[float, np.array]]:
    l.info(f'expanding {state}')

    inc = []
    try:
        inc.append(rule_empty(state, 0))
        l.info('-> empty 0')
    except RuntimeError:
        pass

    try:
        inc.append(rule_empty(state, 1))
        l.info('-> empty 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_empty(state, 0))
        inc.append(rule_empty(state, 1))
        l.info('-> empty 0, 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_full(state, 0, vls))
        l.info('-> full 0')
    except RuntimeError:
        pass

    try:
        inc.append(rule_full(state, 1, vls))
        l.info('-> full 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_full(state, 0, vls))
        inc.append(rule_full(state, 1, vls))
        l.info('-> full 0, 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_empty(state, 0))
        inc.append(rule_full(state, 1, vls))
        l.info('-> empty 0, full 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_empty(state, 1))
        inc.append(rule_full(state, 0, vls))
        l.info('-> empty 1, full 0')
    except RuntimeError:
        pass

    try:
        inc.append(rule_transpose(state, 0, 1, vls))
        l.info('-> transpose 0 to 1')
    except RuntimeError:
        pass

    try:
        inc.append(rule_transpose(state, 1, 0, vls))
        l.info('-> transpose 1 to 0')
    except RuntimeError:
        pass

    l.info(f'done {state}: {len(inc)}: {inc}')

    return inc


# Test states for equality
def test_equal(a: np.array, b: np.array):
    return a[0] == b[0] and a[1] == b[1]

# Calculate heuristics for A* algorithm (from current state to target state)


def heuristic(state: np.array, target: np.array) -> float:
    return np.sqrt(np.power(target - state, 2).sum())


class N:  # Node to trace states (! states are traced fro target to start so no need to hold next node, only parent)
    parent = None
    payload = None
    cost = 0  # Cumulative cost

    def __init__(self, payload, parent=None, cost=0):
        self.payload = payload
        self.parent = parent
        self.cost = cost

    def backtrace(self, vls):
        l.info(f'^ {self.payload} ({self.cost}) | max {vls}')
        if self.parent is not None:
            self.parent.backtrace(vls)

    def assert_not_ancestor(self, n):
        if self.parent is not None:
            assert not test_equal(self.parent.payload, n.payload)
            self.parent.assert_not_ancestor(n)

    def __eq__(self, other):
        return self.cost == other.cost

    def __lt_(self, other):
        return self.cost < other.cost

    def __gt__(self, other):
        return self.cost > other.cost


In [3]:
max_volumes = np.array([4.0, 3.0])
start_volumes = np.array([0.0, 0.0])
end_volumes = np.array([4.0, 2.0])

l.info(f'max: {max_volumes}, start: {start_volumes} -> end: {end_volumes}')


INFO:__main__:max: [4. 3.], start: [0. 0.] -> end: [4. 2.]


### Breadthwise


In [10]:
root = N(start_volumes, None)
to_open = [root]
closed = []

# Loop over all to be opened states
while len(to_open):
    n = to_open.pop(0)  # Take first inserted (FIFO)
    closed.append(n)

    l.info(f'opening: {n.payload}')

    # Find all incident states
    inc = expand(n.payload, max_volumes)

    # Exclude expanded or to be expanded states
    for i_cst, i_s in inc:
        # Test state is target state
        if test_equal(i_s, end_volumes):
            l.info('--- target state reached ---')
            N(i_s, n).backtrace(max_volumes)
            break

        # Not in to open list
        for o_n in to_open:
            if test_equal(i_s, o_n.payload):
                break
        else:
            # Not in closed list
            for c_n in closed:
                if test_equal(i_s, c_n.payload):
                    break
            else:
                l.info(f'+ discovered: {i_s}')
                # Create incident node from incident state to backtrack route
                to_open.append(N(i_s, n))
    else:  # Stop at closest solution
        continue

    break
else:
    l.info('--- target state unreachable ---')


INFO:__main__:opening: [0. 0.]
INFO:__main__:expanding [0. 0.]
INFO:__main__:-> full 0
INFO:__main__:-> full 1
INFO:__main__:-> full 0, 1
INFO:__main__:done [0. 0.]: 4: [(5, array([4., 0.])), (5, array([0., 3.])), (5, array([4., 0.])), (5, array([0., 3.]))]
INFO:__main__:+ discovered: [4. 0.]
INFO:__main__:+ discovered: [0. 3.]
INFO:__main__:opening: [4. 0.]
INFO:__main__:expanding [4. 0.]
INFO:__main__:-> empty 0
INFO:__main__:-> full 1
INFO:__main__:-> empty 0, full 1
INFO:__main__:-> transpose 0 to 1
INFO:__main__:done [4. 0.]: 6: [(10, array([0., 0.])), (10, array([0., 0.])), (5, array([4., 3.])), (10, array([0., 0.])), (5, array([4., 3.])), (0, array([1., 3.]))]
INFO:__main__:+ discovered: [4. 3.]
INFO:__main__:+ discovered: [1. 3.]
INFO:__main__:opening: [0. 3.]
INFO:__main__:expanding [0. 3.]
INFO:__main__:-> empty 1
INFO:__main__:-> full 0
INFO:__main__:-> empty 1, full 0
INFO:__main__:-> transpose 1 to 0
INFO:__main__:done [0. 3.]: 6: [(10, array([0., 0.])), (5, array([4., 3.]

### A\*


In [11]:
root = N(start_volumes, None, 0)

to_open = []
closed = []

# Push first state to expand
heapq.heappush(to_open, (heuristic(root.payload, end_volumes), root))

# Loop over all to be opened states
while len(to_open):
    hrst, n = heapq.heappop(to_open)
    closed.append(n)

    l.info(f'opening [{hrst:.2f}]: {n.payload}')

    # Find all incident states
    inc = expand(n.payload, max_volumes)

    # Exclude expanded or to be expanded states
    for i_cst, i_s in inc:
        # Test state is target state
        if test_equal(i_s, end_volumes):
            l.info('--- target state reached ---')
            N(i_s, n, n.cost  + i_cst).backtrace(max_volumes)
            break

        # Not in to open list
        for _, o_n in to_open:
            if test_equal(i_s, o_n.payload):
                break
        # Not in closed list
        else:
            for c_n in closed:
                if test_equal(i_s, c_n.payload):
                    break
            else:
                l.info(f'+ discovered: {i_s} (cst: {i_cst})')
                # Create incident node from incident state to backtrack route
                heapq.heappush(to_open, (heuristic(i_s, end_volumes) + i_cst, N(i_s, n, n.cost + i_cst)))                
    else:  # Stop at closest solution
        continue

    break
else:
    l.info('--- target state unreachable ---')


INFO:__main__:opening [4.47]: [0. 0.]
INFO:__main__:expanding [0. 0.]
INFO:__main__:-> full 0
INFO:__main__:-> full 1
INFO:__main__:-> full 0, 1
INFO:__main__:done [0. 0.]: 4: [(5, array([4., 0.])), (5, array([0., 3.])), (5, array([4., 0.])), (5, array([0., 3.]))]
INFO:__main__:+ discovered: [4. 0.] (cst: 5)
INFO:__main__:+ discovered: [0. 3.] (cst: 5)
INFO:__main__:opening [7.00]: [4. 0.]
INFO:__main__:expanding [4. 0.]
INFO:__main__:-> empty 0
INFO:__main__:-> full 1
INFO:__main__:-> empty 0, full 1
INFO:__main__:-> transpose 0 to 1
INFO:__main__:done [4. 0.]: 6: [(10, array([0., 0.])), (10, array([0., 0.])), (5, array([4., 3.])), (10, array([0., 0.])), (5, array([4., 3.])), (0, array([1., 3.]))]
INFO:__main__:+ discovered: [4. 3.] (cst: 5)
INFO:__main__:+ discovered: [1. 3.] (cst: 0)
INFO:__main__:opening [3.16]: [1. 3.]
INFO:__main__:expanding [1. 3.]
INFO:__main__:-> empty 0
INFO:__main__:-> empty 1
INFO:__main__:-> empty 0, 1
INFO:__main__:-> full 0
INFO:__main__:-> empty 1, full