<a href="https://colab.research.google.com/github/heghiw/lakovaci-linka/blob/main/experiments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Remove the existing directory
#!rm -rf lakovaci-linka

# Clone the repository again
!git clone https://github.com/heghiw/lakovaci-linka.git

# Navigate to the directory
%cd lakovaci-linka


fatal: destination path 'lakovaci-linka' already exists and is not an empty directory.
/content/lakovaci-linka


data prep

In [2]:
import pandas as pd

# Define the path to the Excel file
file_path = '/content/lakovaci-linka/data1.xlsx'

# Read the sheet 'linka' into a DataFrame
linka_df = pd.read_excel(file_path, sheet_name='linka')

# Read the sheet 'recept' into a DataFrame
recept_df = pd.read_excel(file_path, sheet_name='recept')

print("Linka:")
print(linka_df)
print("\nRecepty:")
print(recept_df)



Linka:
             typ vany  id_vany   pozice_x_rel  pozice_x_cum
0               vstup         0           0.0           0.0
1        Teplý oplach         1        2300.0        2300.0
2             Postřik         2        1900.0        4200.0
3    Ponor odm železo         3        1900.0        6100.0
4    Ponor odm pozink         4        1800.0        7900.0
5            oplach 1         5        1800.0        9700.0
6            oplach 2         6        1800.0       11500.0
7       Moření železo         7        1800.0       13300.0
8       Moření pozink         8        1800.0       15100.0
9      Oplach moř žel         9        1800.0       16900.0
10  Oplach moř pozink        10        1800.0       18700.0
11    oplach společný        11        1800.0       20500.0
12           aktivace        12        1800.0       22300.0
13             fosfát        13        1800.0       24100.0
14             oplach        14        1800.0       25900.0
15            Oplach         21  

manipulator and vozik characteristics

In [5]:
linka_df.columns = linka_df.columns.str.strip()
recept_df.columns = recept_df.columns.str.strip()
print(linka_df.columns)
print(recept_df.columns)

Index(['typ vany', 'id_vany', 'pozice_x_rel', 'pozice_x_cum'], dtype='object')
Index(['tech', 'id_vany', 'poradi_operace', 'cas_min', 'cas_max', 'cas_opt',
       'okap', 'okap_cas'],
      dtype='object')


simulace


vizualizace


In [2]:
!pip install pulp



ver5


In [3]:
import pandas as pd
import numpy as np
import math
import networkx as nx
from pulp import LpProblem, LpVariable, lpSum, LpMinimize, LpBinary, LpContinuous, value, LpStatus

# recept_df: recipe operations with columns:
#    "poradi_operace" : operation order (integer)
#    "id_vany": bath id (integer or string)
#    "cas_opt": optimal dwell time in seconds
#    "okap": 0/1 (whether okap wait is required)
#    "okap_cas": okap wait time in seconds
#
# linka_df: bath configuration with columns:
#    "id_vany": bath id (matches recept_df)
#    "pozice_x_cum": horizontal position in mm

# -------------------------------
# Manipulator Characteristics
# -------------------------------
manipulator_characteristics = {
    "okap_point": 2000,         # mm: height for okap during upward movement
    "highest_point": 2750,       # mm: maximum vertical height
    "deceleration_point": 500,   # mm: point where downward speed is reduced near the bath surface
    "speed_after_ponoreni": 8,   # m/min: slower speed used near the bath surface
    "speed_going_up": 15,        # m/min: upward speed
    "speed_going_down": 12,      # m/min: downward speed before deceleration
    "speed_left_right": 35       # m/min: horizontal speed
}
num_manipulators = 6
takt_limit = 300  # seconds

# For convenience, compute speeds in mm/s.
speed_going_up = manipulator_characteristics["speed_going_up"] * 1000 / 60
speed_going_down = manipulator_characteristics["speed_going_down"] * 1000 / 60
speed_left_right = manipulator_characteristics["speed_left_right"] * 1000 / 60
speed_after_ponoreni = manipulator_characteristics["speed_after_ponoreni"] * 1000 / 60

# -------------------------------
# Helper: Estimate processing time for an operation
# -------------------------------
def compute_processing_time(row, current_x):
    """
    Approximate processing time for an operation.
    Compute horizontal move time from current_x to target_x (from linka_df),
    plus vertical down/up times (using constant speeds), plus dwell time.

    For simplicity, assume:
      - Horizontal time = |target_x - current_x| / speed_left_right.
      - Vertical down: from highest_point to 0:
          Time_down = ( (highest_point - deceleration_point) / speed_going_down +
                        (deceleration_point - 0) / speed_after_ponoreni )
      - Vertical up: similar, but if okap is required, add okap_cas.
    """
    # Get target_x from linka_df:
    bath_id = row["id_vany"]
    target_row = linka_df[linka_df["id_vany"] == bath_id]
    if target_row.empty:
        raise ValueError(f"Bath id {bath_id} not found in linka_df")
    target_x = float(target_row.iloc[0]["pozice_x_cum"])
    horizontal_time = abs(target_x - current_x) / speed_left_right

    # Vertical down time:
    vertical_down_time = (manipulator_characteristics["highest_point"] -
                          manipulator_characteristics["deceleration_point"]) / speed_going_down
    vertical_down_time += (manipulator_characteristics["deceleration_point"] - 0) / speed_after_ponoreni

    # Vertical up time:
    vertical_up_time = (manipulator_characteristics["okap_point"] - 0) / speed_going_up
    # If okap is required, add wait time:
    okap_time = row["okap_cas"] if row["okap"] == 1 else 0
    vertical_up_time += okap_time
    vertical_up_time += (manipulator_characteristics["highest_point"] - manipulator_characteristics["okap_point"]) / speed_going_up

    dwell_time = row["cas_opt"]

    return horizontal_time + vertical_down_time + dwell_time + vertical_up_time, target_x

# -------------------------------
# Stage 1: MILP Scheduling (using PuLP)
# -------------------------------
def schedule_operations_MILP(recept_df, num_manipulators, takt_limit=300):
    """
    Use MILP to schedule operations.
    We assume round-robin assignment of operations to manipulators.
    For each operation j (sorted by poradi_operace) we define a start time s_j.
    We enforce that for operations at the same bath, their intervals do not overlap.
    For simplicity, here we assume processing time p_j is computed by compute_processing_time.

    Returns: a DataFrame with columns:
      "poradi_operace", "id_vany", "manipulator", "start_time", "finish_time", "processing_time"
    """
    # Sort operations by order.
    ops = recept_df[recept_df["tech"]=="tech1"].sort_values("poradi_operace").reset_index(drop=True)
    n_ops = len(ops)
    # Assign each operation to a manipulator round-robin.
    ops["manipulator"] = [i % num_manipulators for i in range(n_ops)]

    # Create MILP problem.
    prob = LpProblem("ManipulatorScheduling", LpMinimize)

    # Decision variables: s_j: start time for each operation j.
    s = {j: LpVariable(f"s_{j}", lowBound=0, cat=LpContinuous) for j in range(n_ops)}

    # Parameters: processing time p_j and target_x.
    p = {}
    target_x_vals = {}
    current_x = {}  # assume each manipulator starts at position 0.
    for m in range(num_manipulators):
        current_x[m] = 0

    # For simplicity, we compute processing time in sequence for each manipulator.
    # (In a full MILP you would have to decide order per manipulator.)
    p_list = [0]*n_ops
    target_list = [0]*n_ops
    for j in range(n_ops):
        m = ops.loc[j, "manipulator"]
        # Use current_x for manipulator m.
        p_j, target_x = compute_processing_time(ops.loc[j], current_x[m])
        p_list[j] = p_j
        target_list[j] = target_x
        # Update current_x for that manipulator.
        current_x[m] = target_x

    ops["processing_time"] = p_list

    # Objective: minimize overall finish time (which must be <= takt_limit).
    # Let finish time for op j be s_j + p_j.
    T = LpVariable("T", lowBound=0, upBound=takt_limit, cat=LpContinuous)
    prob += T  # minimize T
    for j in range(n_ops):
        prob += s[j] + p_list[j] <= T

    # Non-overlap constraints for operations on the same bath.
    # For each pair j, k with same bath, enforce: either op_j finishes before op_k starts or vice versa.
    # We use big-M formulation.
    M = 10000
    y = {}
    for j in range(n_ops):
        for k in range(j+1, n_ops):
            if ops.loc[j, "id_vany"] == ops.loc[k, "id_vany"]:
                # Define binary variable y_jk.
                y[(j,k)] = LpVariable(f"y_{j}_{k}", cat=LpBinary)
                # Constraint: s_j + p_j <= s_k + M*(1 - y_jk)
                prob += s[j] + p_list[j] <= s[k] + M*(1 - y[(j,k)])
                # Constraint: s_k + p_k <= s[j] + M*y_jk
                prob += s[k] + p_list[k] <= s[j] + M*y[(j,k)]

    # Also, each operation must finish within takt_limit.
    for j in range(n_ops):
        prob += s[j] + p_list[j] <= takt_limit

    # Solve the MILP.
    prob.solve()

    print("MILP Status:", LpStatus[prob.status])
    schedule = []
    for j in range(n_ops):
        schedule.append({
            "poradi_operace": ops.loc[j, "poradi_operace"],
            "id_vany": ops.loc[j, "id_vany"],
            "manipulator": ops.loc[j, "manipulator"],
            "start_time": value(s[j]),
            "finish_time": value(s[j]) + p_list[j],
            "processing_time": p_list[j]
        })
    schedule_df = pd.DataFrame(schedule)
    return schedule_df

# -------------------------------
# Stage 2: Graph-Based Local Path Planning
# -------------------------------
def plan_local_paths(schedule_df, linka_df):
    """
    Build a graph from bath positions.
    Nodes: bath ids from linka_df.
    Edge weight: Euclidean distance between baths.
    For each manipulator, for each consecutive pair of operations, compute the shortest path.

    Returns a dict keyed by manipulator containing list of transitions.
    Each transition is a dict: { "from": bath_id_from, "to": bath_id_to, "path": [list of bath_ids], "distance": total_distance }
    """
    # Build graph: nodes are bath ids with positions; edges between every pair.
    G = nx.Graph()
    for idx, row in linka_df.iterrows():
        bath = row["id_vany"]
        x = float(row["pozice_x_cum"])
        # Use x coordinate; you could add more dimensions.
        G.add_node(bath, pos=(x, 0))

    baths = list(linka_df["id_vany"])
    # Fully connect the baths
    for i in range(len(baths)):
        for j in range(i+1, len(baths)):
            bath_i = baths[i]
            bath_j = baths[j]
            pos_i = G.nodes[bath_i]['pos']
            pos_j = G.nodes[bath_j]['pos']
            dist = math.dist(pos_i, pos_j)
            G.add_edge(bath_i, bath_j, weight=dist)

    # For each manipulator, get its schedule (ordered by start_time).
    paths = {}
    for m in schedule_df["manipulator"].unique():
        ops = schedule_df[schedule_df["manipulator"] == m].sort_values("start_time")
        transitions = []
        # Assume the starting position for each manipulator is "Vstup" (which we may set to a default bath or leave as None)
        from_bath = "Vstup"
        for idx, row in ops.iterrows():
            to_bath = row["id_vany"]
            if from_bath == "Vstup":
                # For simplicity, assume the first bath is reached directly.
                path = [to_bath]
                dist = 0
            else:
                # Use A* search to plan path from from_bath to to_bath.
                path = nx.astar_path(G, from_bath, to_bath, weight='weight')
                # Compute total distance.
                dist = sum(G[u][v]['weight'] for u, v in zip(path[:-1], path[1:]))
            transitions.append({
                "from": from_bath,
                "to": to_bath,
                "path": path,
                "distance": dist
            })
            from_bath = to_bath
        paths[m] = transitions
    return paths

# -------------------------------
# Main Execution: Build Schedule and Paths
# -------------------------------
if __name__ == '__main__':
    # First, schedule operations using MILP.
    schedule_df = schedule_operations_MILP(recept_df, num_manipulators, takt_limit=takt_limit)
    print("MILP Schedule:")
    print(schedule_df)

    # Then, for each manipulator, plan local paths between consecutive baths.
    path_plan = plan_local_paths(schedule_df, linka_df)
    print("\nLocal Path Planning per Manipulator:")
    for m, transitions in path_plan.items():
        print(f"Manipulator {m}:")
        for t in transitions:
            print(t)


KeyError: 'id_vany'