# Tip Inventory Consolidation

In [1]:
import numpy as np
import random
import time

## Workcell Setup

In [2]:
# === Configuration ===
script_mode = "execution"  # "simulation" or "execution"
liquid_handler_choice = "starlet"  # star | ot2 | evo100 | etc.

In [3]:
# For development: auto-reload modules
%load_ext autoreload
%autoreload 2

import logging
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.visualizer.visualizer import Visualizer

# === Liquid handler configuration ===
liquid_handler_config = {
    "star": {
        "deck": ("pylabrobot.resources.hamilton", "STARDeck"),
        "execution": ("pylabrobot.liquid_handling.backends", "STARBackend"),
        "simulation": ("pylabrobot.liquid_handling.backends", "LiquidHandlerChatterboxBackend"),
    },
    "starlet": {
        "deck": ("pylabrobot.resources.hamilton", "STARLetDeck"),
        "execution": ("pylabrobot.liquid_handling.backends", "STARBackend"),
        "simulation": ("pylabrobot.liquid_handling.backends", "LiquidHandlerChatterboxBackend"),
    },
    "vantage": {
        "deck": {
            "module": "pylabrobot.resources.hamilton",
            "class": "VantageDeck",
            "args": { "size": 1.3 }
        },
        "execution": {
            "module": "pylabrobot.liquid_handling.backends",
            "class": "VantageBackend",
        },
        "simulation": {
            "module": "pylabrobot.liquid_handling.backends",
            "class": "LiquidHandlerChatterboxBackend",
        }
    },
    "ot2": {
        "deck": ("pylabrobot.resources.opentrons", "OTDeck"),
        "execution": {
            "module": "pylabrobot.liquid_handling.backends",
            "class": "OpentronsBackend"
        },
        "simulation": {
            "module": "pylabrobot.liquid_handling.backends",
            "class": "LiquidHandlerChatterboxBackend",
            "args": { "num_channels": 1 }
        }
    },
    "evo100": {
        "deck": ("pylabrobot.resources.tecan", "EVO100Deck"),
        "execution": ("pylabrobot.liquid_handling.backends", "EVOBackend"),
        "simulation": ("pylabrobot.liquid_handling.backends", "LiquidHandlerChatterboxBackend"),
    },
    "evo150": {
        "deck": ("pylabrobot.resources.tecan", "EVO150Deck"),
        "execution": ("pylabrobot.liquid_handling.backends", "EVOBackend"),
        "simulation": ("pylabrobot.liquid_handling.backends", "LiquidHandlerChatterboxBackend"),
    },
    "evo200": {
        "deck": ("pylabrobot.resources.tecan", "EVO200Deck"),
        "execution": ("pylabrobot.liquid_handling.backends", "EVOBackend"),
        "simulation": ("pylabrobot.liquid_handling.backends", "LiquidHandlerChatterboxBackend"),
    },
}

# === Liquid handler selection ===
# Define these before running
# liquid_handler_choice = "vantage"
# script_mode = "simulation"  # or "execution"

lh_entry = liquid_handler_config.get(liquid_handler_choice)
if lh_entry is None:
    raise ValueError(f"Unknown liquid handler: {liquid_handler_choice}")

# === Deck loading ===
deck_entry = lh_entry["deck"]

if isinstance(deck_entry, tuple):
    deck_module, deck_class = deck_entry
    deck_args = {}
elif isinstance(deck_entry, dict):
    deck_module = deck_entry["module"]
    deck_class = deck_entry["class"]
    deck_args = deck_entry.get("args", {})
else:
    raise ValueError(f"Invalid deck entry format: {deck_entry}")

exec(f"from {deck_module} import {deck_class} as Deck")
deck = Deck(**deck_args)

# === Backend loading ===
backend_entry = lh_entry.get(script_mode)
if backend_entry is None:
    raise ValueError(f"No backend configured for mode '{script_mode}' in '{liquid_handler_choice}'")

if isinstance(backend_entry, tuple):
    backend_module, backend_class = backend_entry
    backend_args = {}
elif isinstance(backend_entry, dict):
    backend_module = backend_entry["module"]
    backend_class = backend_entry["class"]
    backend_args = backend_entry.get("args", {})
else:
    raise ValueError(f"Invalid backend entry format: {backend_entry}")

exec(f"from {backend_module} import {backend_class} as Backend")
backend = Backend(**backend_args)

# === Create LiquidHandler ===
lh = LiquidHandler(backend=backend, deck=deck)

# === Logging setup ===
logger = logging.getLogger("pylabrobot")
logger.setLevel(logging.DEBUG)


In [4]:
lh = LiquidHandler(backend=backend, deck=deck)

await lh.setup()
vis = Visualizer(resource=lh)
await vis.setup()

from pylabrobot.resources import set_tip_tracking, set_volume_tracking
set_tip_tracking(True), set_volume_tracking(False);

if script_mode == "execution":
    await lh.backend.disable_cover_control()
    await lh.backend.move_all_channels_in_z_safety()
    lh.backend.allow_firmware_planning = True # very powerful
    lh.backend.read_timeout = 240 # give your commands more time

Websocket server started at http://127.0.0.1:2121
File server started at http://127.0.0.1:1337 . Open this URL in your browser.


### Function Defintions

In [5]:
from typing import List, Any, Generator

def divide_list_into_chunks(
        list_l: List[Any],
        chunk_size: int
    ) -> Generator[List[Any], None, None]:
    """
    Divides a list into smaller chunks of a specified size.

    Parameters:
    - list_l (List[Any]): The list to be divided into chunks.
    - chunk_size (int): The size of each chunk.

    Returns:
    - Generator[List[Any], None, None]: A generator that yields chunks of the list.
    """
    for i in range(0, len(list_l), chunk_size):
        yield list_l[i:i + chunk_size]

## Deck Setup

In [6]:
if any([liquid_handler_choice == "star", liquid_handler_choice == "starlet"]):

    from pylabrobot.resources import (
        TIP_CAR_480_A00, HTF, STF, TIP_50ul
    )
    
    tip_carrier = TIP_CAR_480_A00(name="tip carrier")
    
    tip_carrier[2] = tip_rack_1000ul_3 = HTF(name="tip_rack_1000ul_3", with_tips=False)
    tip_carrier[1] = tip_rack_1000ul_2 = HTF(name="tip_rack_1000ul_2")
    tip_carrier[0] = tip_rack_1000ul_1 = HTF(name="tip_rack_1000ul_1")
    
    lh.deck.assign_child_resource(tip_carrier, rails=15)
    
    tip_carrier_2 = TIP_CAR_480_A00(name="tip carrier 2")
    
    tip_carrier_2[2] = tip_rack_50ul_3 = TIP_50ul(name="tip_rack_50ul_3", with_tips=False)
    tip_carrier_2[1] = tip_rack_50ul_2 = TIP_50ul(name="tip_rack_50ul_2")
    tip_carrier_2[0] = tip_rack_50ul_1 = TIP_50ul(name="tip_rack_50ul_1")
    
    lh.deck.assign_child_resource(tip_carrier_2, rails=22)

    # Move 50ul tips in random source location to empty tip_rack, filled "down_left"
    dest_tip_spot_chunked = tip_rack_50ul_3.traverse(
            batch_size=lh.backend.num_channels,
            direction="down_left"
        ) 
    
    dest_tip_spot_iterator = iter(
        [tip_spot for column in dest_tip_spot_chunked for tip_spot in column]
    )
    
    n = 60
    random_source_numbers = random.sample(range(96), k=n)
    
    source_tip_spot_chunked =  divide_list_into_chunks(
        list_l=[tip_rack_50ul_1.children[idx] for idx in sorted(random_source_numbers)],
        chunk_size = lh.backend.num_channels
    )
    
    for source_tip_spots in source_tip_spot_chunked:
    
        destination_tip_spots = [next(dest_tip_spot_iterator) for idx in source_tip_spots]
    
        await lh.pick_up_tips(
            source_tip_spots,
        )
    
        if script_mode == "simulation":
            time.sleep(1)
    
        await lh.drop_tips(
            destination_tip_spots,
            )
    # # if script_mode == "simulation":
    # _ = [tip_rack_1000ul_1.children[idx].tracker.remove_tip() for idx in random_source_numbers]
    # _ = [tip_rack_1000ul_1.children[idx].tracker.commit() for idx in random_source_numbers]

In [7]:
if liquid_handler_choice == "ot2" :

    from pylabrobot.resources import (
        opentrons_96_filtertiprack_1000ul,
        opentrons_96_filtertiprack_20ul
    )

    tip_rack_1000ul_3 = opentrons_96_filtertiprack_1000ul(name="tip_rack_1000ul_3", with_tips=False)
    tip_rack_1000ul_2 = opentrons_96_filtertiprack_1000ul(name="tip_rack_1000ul_2")
    tip_rack_1000ul_1 = opentrons_96_filtertiprack_1000ul(name="tip_rack_1000ul_1")

    lh.deck.assign_child_at_slot(tip_rack_1000ul_3, slot=8)
    lh.deck.assign_child_at_slot(tip_rack_1000ul_2, slot=5)
    lh.deck.assign_child_at_slot(tip_rack_1000ul_1, slot=2)

    tip_rack_20ul_3 = opentrons_96_filtertiprack_20ul(name="tip_rack_20ul_3", with_tips=False)
    tip_rack_20ul_2 = opentrons_96_filtertiprack_20ul(name="tip_rack_20ul_2")
    tip_rack_20ul_1 = opentrons_96_filtertiprack_20ul(name="tip_rack_20ul_1")

    lh.deck.assign_child_at_slot(tip_rack_20ul_3, slot=9)
    lh.deck.assign_child_at_slot(tip_rack_20ul_2, slot=6)
    lh.deck.assign_child_at_slot(tip_rack_20ul_1, slot=3)

    # Move 20ul tips in random source location to empty tip_rack, filled "down_left"
    dest_tip_spot_chunked = tip_rack_20ul_3.traverse(
            batch_size=lh.backend.num_channels,
            direction="down_left"
        ) 
    
    dest_tip_spot_iterator = iter(
        [tip_spot for column in dest_tip_spot_chunked for tip_spot in column]
    )
    
    n = 60
    random_source_numbers = random.sample(range(96), k=n)
    
    source_tip_spot_chunked =  divide_list_into_chunks(
        list_l=[tip_rack_20ul_1.children[idx] for idx in sorted(random_source_numbers)],
        chunk_size = lh.backend.num_channels
    )
    
    for source_tip_spots in source_tip_spot_chunked:
    
        destination_tip_spots = [next(dest_tip_spot_iterator) for idx in source_tip_spots]
    
        await lh.pick_up_tips(
            source_tip_spots,
        )
    
        if script_mode == "simulation":
            time.sleep(1)
    
        await lh.drop_tips(
            destination_tip_spots,
            )

In [8]:
if liquid_handler_choice == "evo150" :

    from pylabrobot.resources import (
        opentrons_96_filtertiprack_1000ul,
        opentrons_96_filtertiprack_20ul
    )

    # TODO: generate EVO150 showcase

In [9]:
dest_tip_spot_chunked = tip_rack_1000ul_3.traverse(
        batch_size=lh.backend.num_channels,
        direction="down_left"
    )

dest_tip_spot_iterator = iter(
    [tip_spot for column in dest_tip_spot_chunked for tip_spot in column]
)

n = 40
random_source_numbers = random.sample(range(96), k=n)

source_tip_spot_chunked =  divide_list_into_chunks(
    list_l=[tip_rack_1000ul_1.children[idx] for idx in sorted(random_source_numbers)],
    chunk_size = lh.backend.num_channels
)

for source_tip_spots in source_tip_spot_chunked:

    destination_tip_spots = [next(dest_tip_spot_iterator) for idx in source_tip_spots]

    await lh.pick_up_tips(
        source_tip_spots,
    )

    if script_mode == "simulation":
        time.sleep(0.5)

    await lh.drop_tips(
        destination_tip_spots,
        )

# if script_mode == "simulation":
#     _ = [tip_rack_1000ul_1.children[idx].tracker.remove_tip() for idx in numbers]
#     _ = [tip_rack_1000ul_1.children[idx].tracker.commit() for idx in numbers]

In [10]:
n = 30
# random_source_numbers = random.sample(range(96), k=n)

source_tip_spot_chunked =  divide_list_into_chunks(
    list_l=[tip_rack_1000ul_2.children[idx] for idx in sorted(random_source_numbers)],
    chunk_size = lh.backend.num_channels
)

for source_tip_spots in source_tip_spot_chunked:

    destination_tip_spots = [next(dest_tip_spot_iterator) for idx in source_tip_spots]

    await lh.pick_up_tips(
        source_tip_spots,
    )

    if script_mode == "simulation":
        time.sleep(0.5)

    await lh.drop_tips(
        destination_tip_spots,
        )

# if script_mode == "simulation":
#     _ = [tip_rack_1000ul_1.children[idx].tracker.remove_tip() for idx in numbers]
#     _ = [tip_rack_1000ul_1.children[idx].tracker.commit() for idx in numbers]

## Consolidate tip inventory

In [11]:
lh.backend.num_channels

8

In [12]:
await lh.consolidate_tip_inventory(
    # lh=lh,
    ignore_tiprack_list = ["teaching_tip_rack"]
)

Consolidating:
 - tip_rack_1000ul_3, tip_rack_1000ul_1, tip_rack_1000ul_2
     - tip transfer cycle: 0 / 7
     - tip transfer cycle: 1 / 7
     - tip transfer cycle: 2 / 7
     - tip transfer cycle: 3 / 7
     - tip transfer cycle: 4 / 7
     - tip transfer cycle: 5 / 7
     - tip transfer cycle: 6 / 7
     - tip transfer cycle: 7 / 7
Consolidating:
 - tip_rack_50ul_3, tip_rack_50ul_1
     - tip transfer cycle: 0 / 4
     - tip transfer cycle: 1 / 4
     - tip transfer cycle: 2 / 4
     - tip transfer cycle: 3 / 4
     - tip transfer cycle: 4 / 4


# Liquid handler shut down

In [13]:
[lh.head[idx].has_tip for idx in range(8)]

[False, False, False, False, False, False, False, False]

In [14]:
if script_mode == 'execution':
    await lh.backend.move_all_channels_in_z_safety()
    if not lh.backend.core_parked: # return grippers
        await lh.backend.put_core()
    # discard tips if any are present
    has_tip_check = [lh.head[idx].has_tip for idx in range(8)]
    if any(has_tip_check):
        await lh.discard_tips()
    await lh.backend.spread_pip_channels()
    # Stop temperature control
    try:
        await lh.backend.stop_temperature_control_at_hhs(1)
    except Exception as e:
        print(f"An error occurred while stopping temperature control:\n{e}")
     # Stop temperature control
    try:
        await lh.backend.stop_temperature_control_at_hhc(2)
    except Exception as e:
        print(f"An error occurred while stopping temperature control:\n{e}")

await lh.stop()
await vis.stop()

An error occurred while stopping temperature control:
'STARBackend' object has no attribute 'stop_temperature_control_at_hhs'
An error occurred while stopping temperature control:
'STARBackend' object has no attribute 'stop_temperature_control_at_hhc'


In [15]:
import random
import matplotlib.pyplot as plt

# Parameters
total_length = 182
num_tips = 106

# Generate current tip list
random.seed(42)
current_tips_list = all_tip_presence_list

# Generate target tip list
target_tips_list = target_tips_list

# Compute movement list
tip_movement_list = [
    c - t for c, t in zip(current_tips_list, target_tips_list)
]

# Stack lists for visualization
tips_matrix = [current_tips_list, target_tips_list, tip_movement_list]

# Convert to 2D list of floats for imshow
tips_matrix_float = [[float(x) for x in row] for row in tips_matrix]

# Plot using matplotlib
fig, ax = plt.subplots(figsize=(12, 3))
cax = ax.imshow(tips_matrix_float, cmap="bwr", aspect="auto", vmin=-1, vmax=1)
ax.set_yticks([0, 1, 2])
ax.set_yticklabels(["Current", "Target", "Movement"])
ax.set_xticks([])
ax.set_title("Tip Movement Overview (No NumPy)")
plt.colorbar(cax, orientation='vertical', label='Tip Delta')
plt.tight_layout()
plt.show()


NameError: name 'all_tip_presence_list' is not defined