In [1]:
# Import Plant

from model.plant import BasePlant
from model.plant_rearrangement import RearrangmentPlantStorage, RearrangmentPlantLimitedStorage
from model.tools import SystemSpecification
from model import StationNameType
from model import Vector

import copy


In [2]:
# Data generation

model_file_path = "../model.yaml"

model_file = open(model_file_path, "r")

spec = SystemSpecification(model_stream=model_file)

original_plant = BasePlant(spec)

original_plant.import_config([
    (Vector(2,0), "InOut"),
    (Vector(2,1), "Robot1"),
    (Vector(2,2), "Press"),
    (Vector(1, 1), "PartsStorage"),
    (Vector(1,2), "Robot2"),
])

print("Original plant")
print(original_plant.render())

objective_plant = BasePlant(spec)

objective_plant.import_config([
    (Vector(2,0), "InOut"),
    (Vector(2,1), "Robot1"),
    (Vector(1,1), "Press"),
    (Vector(2, 2), "PartsStorage"),
    (Vector(1,2), "Robot2"),

])

print("Objective plant")
print(objective_plant.render())

manipulated_plant = BasePlant(spec)

Original plant
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2        |        1        |                 |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|                 |                 |      InOut      |                 |     Conveyor    |
|                 |   PartsStorage  |      Robot1     |                 |        A        |
|                 |      Robot2     |      Press      |                 |        B        |
|                 |                 |                 |                 |        C        |
|                 |                 |                 |                 |        D        |
+-----------------+-----------------+-----------------+-----------------+-----------------+
Objective plant
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2   

In [3]:
def map_location(value: Vector[int] | int) -> str:
    if isinstance(value, Vector):
        return f"{chr(ord('@')+value.y + 1)}{value.x + 1}"
    else:
        return f"SP{value + 1}"

SequenceListType = list[tuple[str, Vector[int] | int, Vector[int] | int]]

def render_sequence(sequence_list: SequenceListType):
    result = ""
    for index, step in enumerate(sequence_list):
        result += f"{index}: {render_step(step)}\n"

    return result

def render_step(step: tuple[str, Vector[int] | int, Vector[int] | int]): return f"{step[0]} from {map_location(step[1])} to {map_location(step[2])}"

In [4]:
# Version 1

# Compare the two plants, create a 2D array of the differences

import itertools


manipulated_plant = RearrangmentPlantStorage(spec)

manipulated_plant.import_config(original_plant.export_config())

equal = original_plant.grid_compare(objective_plant)

sequence_list: SequenceListType = []

# In the first version of the algorithm we can only add or remove items from the plant from the right side.
# So, if a coordinate is different we need to remove all items at its right to reach that position.

# Storage buffer will be a linear array representing the storage places for stations transitions
# The first step would be to remove all items from the right until the first difference is reached in each row, to put them in the buffer, for each row
# Then the items in the buffer will be added to the plant in the right position


for x, y in itertools.product(
    range(spec.model.stations.grid.size.x), 
    range(spec.model.stations.grid.size.y)
    ):
    if not equal[y][x]:
        for i in range(spec.model.stations.grid.size.x - 1, x - 1, -1):
            if manipulated_plant.is_empty_by_coord(i, y):
                continue

            station = manipulated_plant.get_station_by_coord(i, y)

            origin, destiny = manipulated_plant.move_station(station.name, "store")

            assert origin is not None, "Result should not be None"

            sequence_list.append((station.name, origin, destiny))


# Now we need to add the items in the buffer to the plant in the right position

# Compare the manipulated plant with the objective plant, to know the locations that require changes
# Iterate over the results, left to right, and add the items from the buffer to the manipulated plant

equal = manipulated_plant.grid_compare(objective_plant)

for x, y in itertools.product(
    range(spec.model.stations.grid.size.x), 
    range(spec.model.stations.grid.size.y)
    ):
    if not equal[y][x]:
        objective_station_name = objective_plant.get_station_by_coord(x, y).name
        if objective_station_name is None:
            continue
        origin, destiny = manipulated_plant.move_station(
            objective_station_name, Vector(x, y)
        )
        sequence_list.append((objective_station_name, origin, destiny))

print("Final plant")
print(manipulated_plant.render())

print(render_sequence(sequence_list))

Final plant
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2        |        1        |                 |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|                 |                 |      InOut      |                 |     Conveyor    |
|                 |      Press      |      Robot1     |                 |        A        |
|                 |      Robot2     |   PartsStorage  |                 |        B        |
|                 |                 |                 |                 |        C        |
|                 |                 |                 |                 |        D        |
+-----------------+-----------------+-----------------+-----------------+-----------------+
0: Robot1 from B3 to SP1
1: PartsStorage from B2 to SP2
2: Press from C3 to SP3
3: Press from SP3 to B2
4: Robot1 from SP1 to B3
5: PartsStorage from SP2 to C3


In [5]:
# Version 2

# The next version should come with limited storage capacity

# Compare the two plants, create a 2D array of the differences

StorageBufferNameType = str

manipulated_plant = RearrangmentPlantLimitedStorage(spec, 4)

manipulated_plant.import_config(original_plant.export_config())

sequence_list: list[tuple[StationNameType, Vector[int] | int, Vector[int] | int]] = []

equal = original_plant.grid_compare(objective_plant)


# In the first version of the algorithm we can only add or remove items from the plant from the right side.
# So, if a coordinate is different we need to remove all items at its right to reach that position.

# Storage buffer will be a linear array representing the storage places for stations transitions
# The first step would be to remove all items from the right until the first difference is reached in each row, to put them in the buffer, for each row
# Then the items in the buffer will be added to the plant in the right position

for x, y in itertools.product(
    range(spec.model.stations.grid.size.x), range(spec.model.stations.grid.size.y)
):
    if not equal[y][x]:
        for i in range(spec.model.stations.grid.size.x - 1, x - 1, -1):
            if manipulated_plant.is_empty_by_coord(i, y):
                continue

            station = manipulated_plant.get_station_by_coord(i, y)

            origin, destiny = manipulated_plant.move_station(station.name, "store")

            assert origin is not None, "Result should not be None"
            sequence_list.append((station.name, origin, destiny))


# Now we need to add the items in the buffer to the plant in the right position

# Compare the manipulated plant with the objective plant, to know the locations that require changes
# Iterate over the results, left to right, and add the items from the buffer to the manipulated plant

equal = manipulated_plant.grid_compare(objective_plant)

for x, y in itertools.product(
    range(spec.model.stations.grid.size.x), range(spec.model.stations.grid.size.y)
):
    if not equal[y][x]:
        objective_station_name = objective_plant.get_station_by_coord(x, y).name
        if objective_station_name is None:
            continue
        origin, destiny = manipulated_plant.move_station(
            objective_station_name, Vector(x, y)
        )
        sequence_list.append((objective_station_name, origin, destiny))

print("Final plant")
print(manipulated_plant.render())

print(render_sequence(sequence_list))

Final plant
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2        |        1        |                 |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|                 |                 |      InOut      |                 |     Conveyor    |
|                 |      Press      |      Robot1     |                 |        A        |
|                 |      Robot2     |   PartsStorage  |                 |        B        |
|                 |                 |                 |                 |        C        |
|                 |                 |                 |                 |        D        |
+-----------------+-----------------+-----------------+-----------------+-----------------+
0: Robot1 from B3 to SP1
1: PartsStorage from B2 to SP2
2: Press from C3 to SP3
3: Press from SP3 to B2
4: Robot1 from SP1 to B3
5: PartsStorage from SP2 to C3


The next version should be able to simplify the sequence list, by merging consecutive moves of the same station

It can be done something similar in three ways:

 - By post-processing the output of the V2 version, merging consecutive moves of the same station. This is the simplest way, but it is not the most efficient one.
 - By, at any time, when a station is moved to the storage buffer, check if can be fitted directly in other row, by brute force. This way is also simple, but will slow down the algorithm.
 - Another option is, when the last station from a row is moved to the storage buffer and the remaining stations are in right place, save the station name and position in a list named ready_to_install. Later, whenever a station is moved to the storage buffer, check if it is in the ready_to_install list, then move it to the right place. 

In [6]:
# Version 3

# Compare the two plants, create a 2D array of the differences


StorageBufferNameType = str

manipulated_plant = RearrangmentPlantLimitedStorage(spec, 3)

manipulated_plant.import_config(original_plant.export_config())

sequence_list: list[tuple[StationNameType, Vector[int] | int, Vector[int] | int]] = []

equal = original_plant.grid_compare(objective_plant)

ready_positions: dict[StationNameType, Vector[int]] = {}

# In the first version of the algorithm we can only add or remove items from the plant from the right side.
# So, if a coordinate is different we need to remove all items at its right to reach that position.

# Storage buffer will be a linear array representing the storage places for stations transitions
# The first step would be to remove all items from the right until the first difference is reached in each row, to put them in the buffer, for each row
# Then the items in the buffer will be added to the plant in the right position

print("Manipulated plant")
print(manipulated_plant.render())

for y in range(spec.model.stations.grid.size.y):
    for x in range(spec.model.stations.grid.size.x):
        if not equal[y][x]:
            

            for i in range(spec.model.stations.grid.size.x - 1, x - 1, -1):

                if manipulated_plant.is_empty_by_coord(i, y):
                    continue

                focus_station_name = manipulated_plant.get_station_by_coord(i, y).name
                if focus_station_name in ready_positions:
                    ready_info = ready_positions.pop(focus_station_name)
                    result = manipulated_plant.move_station(focus_station_name, ready_info)
                    print(f"Moving {focus_station_name} to {ready_info}")
                    print(manipulated_plant.render())

                    sequence_list.append(
                        (focus_station_name, Vector(i, y), copy.copy(ready_info))
                    )

                    # Update the ready positions with the station at the right of the just moved station if any
                    if ready_info.x + 1 < spec.model.stations.grid.size.x:
                        if not objective_plant.is_empty_by_coord(i, y):
                            objective_station = objective_plant.get_station_by_coord(
                                ready_info.x + 1, ready_info.y
                            )

                            ready_positions[objective_station.name] = Vector(
                                ready_info.x + 1, ready_info.y
                            )

                else:
                    origin, destiny = manipulated_plant.move_station(focus_station_name, "store")
                    print(f"Moving {focus_station_name} to storage")
                    print(manipulated_plant.render())
                    sequence_list.append((focus_station_name, origin, destiny))

            ready_positions[objective_plant.get_station_by_coord(i, y).name] = Vector(i, y)
            break


# Now we need to add the items in the buffer to the plant in the right position

# Compare the manipulated plant with the objective plant, to know the locations that require changes
# Iterate over the results, left to right, and add the items from the buffer to the manipulated plant

equal = manipulated_plant.grid_compare(objective_plant)

for x, y in itertools.product(
    range(spec.model.stations.grid.size.x), range(spec.model.stations.grid.size.y)
):
    if not equal[y][x]:
        objective_station_name = objective_plant.get_station_by_coord(x, y).name
        origin, destiny = manipulated_plant.move_station(objective_station_name, Vector(x, y))
        print(f"Moving {objective_station_name} to {x}, {y}")
        print(manipulated_plant.render())
        sequence_list.append((objective_station_name, origin, destiny))


print("Final plant")
manipulated_plant.render()

render_sequence(sequence_list)

Manipulated plant
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2        |        1        |                 |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|                 |                 |      InOut      |                 |     Conveyor    |
|                 |   PartsStorage  |      Robot1     |                 |        A        |
|                 |      Robot2     |      Press      |                 |        B        |
|                 |                 |                 |                 |        C        |
|                 |                 |                 |                 |        D        |
+-----------------+-----------------+-----------------+-----------------+-----------------+
Moving Robot1 to storage
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |

'0: Robot1 from B3 to SP1\n1: PartsStorage from B2 to SP2\n2: Press from C3 to B2\n3: Robot1 from SP1 to B3\n4: PartsStorage from SP2 to C3\n'

In [7]:
# Version 4
"""
If the grid stations can be moved from both sides of the plant, a brute force graph based aproach can be used to find the optimal solution

The graph would be a directed graph, where each node would represent a plant status and each edge would represent a station movement.

First, the graph would start with initial plant status as the root node, and the objetive plant status as the target node. We would then use a BFS algorithm to find the shortest path between the two nodes. Using bfs, once the objetive plant status is reached, we would have the shortest path.

The stations can only be moved to an storage place or to the right final position

"""

from model.plant_rearrangement import RearrangmentPlantLimitedStorage


StorageBufferNameType = str

manipulated_plant = RearrangmentPlantLimitedStorage(spec, 3)

manipulated_plant.import_config(original_plant.export_config())

sequence_list: list[tuple[StationNameType, Vector | StorageBufferNameType, Vector | StorageBufferNameType]] = []

# We have to compare the manipulated plant with the objective plant, to know the locations that require changes. Also, if a station not in the right position is surrounded by stations at both sides, those stations would be allowed to change their location.

In [8]:
from __future__ import annotations
# Directed graph class

from typing import Literal, TypeVar
from graph import DirectedGraphEdge, DirectedGraphNode


RearrangementNodeInterface = TypeVar(
    "RearrangementNodeInterface", bound="RearrangementNode"
)
RearrangementEdgeInterface = TypeVar(
    "RearrangementEdgeInterface", bound="RearrangementEdge"
)

MovementType = Literal['left', 'right', 'store']

class RearrangementNode(DirectedGraphNode[RearrangementEdgeInterface]): # type: ignore
    def __init__(self, plant: RearrangmentPlantLimitedStorage):
        super().__init__()
        self.plant = plant


class RearrangementEdge(DirectedGraphEdge[RearrangementNodeInterface]):
    def __init__(self, station_name: str, from_pos: Vector[int] | int, to_pos: Vector[int] | int, origin_type: MovementType, destiny_type: MovementType):
        """Station movement from actual position to "to" place

        "to" can be a vector representing a new position in the 2D grid plant or an integer representing a storage buffer position

        Args:
            station_name (_type_): _description_
            to (Vector[int] | int): _description_
        """
        super().__init__()
        self.station = station_name
        self.from_pos = from_pos
        self.to_pos = to_pos
        self.origin_type = origin_type
        self.destiny_type = destiny_type


def path_from_root_to_node(node: RearrangementNode) -> list[RearrangementNode]:
    nodes: list[RearrangementNode] = []

    def get_incomming(edge: RearrangementEdge, nodes: list[RearrangementNode]):

        nodes.append(edge.origin)

        if len(edge.origin.incoming_edges) == 0:
            return

        for edge in edge.origin.incoming_edges:
            get_incomming(edge, nodes)

    nodes.append(node)

    for edge in node.incoming_edges:
        get_incomming(edge, nodes)

    return list(reversed(nodes))

def sequence_from_nodes_list(nodes: list[RearrangementNode]) -> SequenceListType:
    sequence: SequenceListType = []

    for node in nodes:
        for edge in node.incoming_edges:
            sequence.append((edge.station, edge.from_pos, edge.to_pos))

    return sequence


In [9]:
import itertools
from typing import Any


def check_isolated_falsy_on_grid(grid: list[list[Any]]) -> None:
    for x, y in itertools.product(range(len(grid)), range(len(grid[0]))):
        if x > 0 and x < len(grid) - 1:
            if grid[x - 1][y] is False and grid[x + 1][y] is False:
                grid[x][y] = False


In [10]:
# Start the graph with the first node

import copy


root_node = RearrangementNode(copy.deepcopy(manipulated_plant))
next_nodes = [root_node]

# From root node we are going to add all the possible movements as edges


def create_node(
    node: RearrangementNode,
    station_name: StationNameType,
    objective_position: Vector[int] | Literal["store"],
    movement_type_origin: MovementType,
    movement_type_destiny: MovementType,
):
    # print(f"Create edge {movement_type_origin} -> {movement_type_destiny}")
    new_plant = copy.deepcopy(node.plant)
    previous_position, new_position = new_plant.move_station(
        station_name, objective_position
    )
    new_node = RearrangementNode(new_plant)
    new_edge = RearrangementEdge(
        station_name,
        previous_position,
        new_position,
        movement_type_origin,
        movement_type_destiny,
    )
    node.outgoing_edges.append(new_edge)
    new_edge.origin = node
    new_edge.destiny = new_node
    new_node.incoming_edges.append(new_edge)
    next_nodes.append(new_node)


def process_node(node: RearrangementNode):
    """This function deep another level in the BFS algorithm

    Args:
        node (RearrangementNode): _description_
    """

    # print("-----")
    # print(node.plant.export_config())

    # First, we need to compare the node plant with the original plant, to know the station that can be moved

    equal = node.plant.grid_compare(objective_plant)
    if all(all(row) for row in equal):
        # print("Node plant is equal to the original plant")
        return node

    check_isolated_falsy_on_grid(equal)

    station_available_to_move: set[StationNameType] = set()

    for station_name, station_position in node.plant.stations().items():
        objective_position = objective_plant.stations()[station_name]
        if isinstance(objective_position, int):
            raise ValueError("Station in objective plant cannot be in a storage buffer")

        if not isinstance(station_position, Vector) or not station_position.equal(
            objective_position
        ):
            station_available_to_move.add(station_name)

    # print(f"Stations available to move: {station_available_to_move}")

    # Now we are going to iterate over the stations that can be moved and create movement edges for each one

    for station_name in station_available_to_move:
        objective_position = objective_plant.get_station_location_by_name(station_name)
        actual_position = node.plant.get_station_location_by_name(station_name)

        # print(f"Station {station_name}")
        # print(f"Actual position: {actual_position}")
        # print(f"Objective position {objective_position}")
        
        if not isinstance(objective_position, Vector):
            raise ValueError("Station in objective plant cannot be in a storage buffer")

        actual_left_locked = False
        actual_right_locked = False

        # First we have to check if the station can be moved
        if isinstance(actual_position, Vector):
            for x in range(0, actual_position.x):
                if not node.plant.is_empty_by_coord(x, actual_position.y):
                    actual_left_locked = True
                    break
            for x in range(spec.model.stations.grid.size.x - 1, actual_position.x, -1):
                if not node.plant.is_empty_by_coord(x, actual_position.y):
                    actual_right_locked = True
                    break

            if actual_left_locked and actual_right_locked:
                # print(f"Station {station_name} cant be moved")
                continue

        # If the objective is empty
        if not node.plant.is_empty_by_coord(objective_position.x, objective_position.y):
            pass
            # print(f"Objective position no available")
        else:
            # Check if, from x = 0 to the objective position, there is no station in the way
            # If there is no statison in the way, the station can be moved to the objective position

            objective_left_locked = False
            objective_right_locked = False

            for x in range(0, objective_position.x):
                if not node.plant.is_empty_by_coord(x, objective_position.y):
                    objective_left_locked = True
                    break
            for x in range(
                spec.model.stations.grid.size.x - 1, objective_position.x, -1
            ):
                if not node.plant.is_empty_by_coord(x, objective_position.y):
                    objective_right_locked = True
                    break
            
            # print(f"Objective locks {objective_left_locked} {objective_right_locked}")
            # print(f"Station locks {actual_left_locked} {actual_right_locked}")

            if objective_left_locked and objective_right_locked:
                pass
                # print(f"Objective position locked")
            else:
                # For each convination of true unlocked sides, we are going to create a movement edge
                if isinstance(actual_position, Vector):
                    if not objective_right_locked and not actual_right_locked:
                        create_node(
                            node, station_name, objective_position, "right", "right"
                        )
                    if not objective_left_locked and not actual_left_locked:
                        create_node(node, station_name, objective_position, "left", "left")


                    if not objective_left_locked and not actual_right_locked:

                        create_node(node, station_name, objective_position, "right", "left")

                    if not objective_right_locked and not actual_left_locked:
                        create_node(node, station_name, objective_position, "left", "right")

                if isinstance(actual_position, int):
                    if not objective_right_locked:
                        create_node(
                            node, station_name, objective_position, "store", "right"
                        )
                    if not objective_left_locked:
                        create_node(node, station_name, objective_position, "store", "left")


        # If the actual position is a vector, and it is not the objective position, the station can be moved to a storage buffer

        if actual_position != objective_position and isinstance(
            actual_position, Vector
        ):
            # print(f"Store into buffer")
            # It can be move form left or right side, so we need to check both sides to see if there is no station in the way
            if actual_right_locked:
                create_node(node, station_name, "store", "right", "store")
            if actual_left_locked:
                create_node(node, station_name, "store", "left", "store")


node = None
# BFS
while len(next_nodes) > 0:
    node = process_node(next_nodes.pop(0))
    if node is not None:
        print("Algorithm finished")
        break


print("Result obtained: ")

if node is not None:
    print(node.plant.render())
    nodes = path_from_root_to_node(node)
    sequence = sequence_from_nodes_list(nodes)
    print(render_sequence(sequence))

    print("----------")
    print("Plant sequences")
    print("Origin")
    print(original_plant.render())

    nodes_iterator = nodes.__iter__()
    next(nodes_iterator)

    for index, (node, step) in enumerate(zip(nodes_iterator,sequence)):
        print(f"Step {index}: {render_step(step)}")
        print(node.plant.render())
        if node.plant.is_storage_buffer_not_empty():
            print("Storage buffer: ", node.plant.render_storage_buffer())
            print("---------------------")

else:
    print("Algorithm failed to find a result")

Algorithm finished
Result obtained: 
+-----------------+-----------------+-----------------+-----------------+-----------------+
|        4        |        3        |        2        |        1        |                 |
+-----------------+-----------------+-----------------+-----------------+-----------------+
|                 |                 |      InOut      |                 |     Conveyor    |
|                 |      Press      |      Robot1     |                 |        A        |
|                 |      Robot2     |   PartsStorage  |                 |        B        |
|                 |                 |                 |                 |        C        |
|                 |                 |                 |                 |        D        |
+-----------------+-----------------+-----------------+-----------------+-----------------+
0: Press from C3 to SP1
1: PartsStorage from B2 to C3
2: Press from SP1 to B2

----------
Plant sequences
Origin
+-----------------+---