In [50]:
# standard imports
from pylabrobot.liquid_handling.backends.backend import LiquidHandlerBackend
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.backends import ChatterBoxBackend
from pylabrobot.resources.opentrons import OTDeck
from pylabrobot.visualizer.visualizer import Visualizer

import os
import numpy as np
import pandas as pd
import random
import math
from typing import Iterator, Tuple, List, AsyncIterator
import asyncio
import sys

# resources for deck setup
from pylabrobot.resources import (
    Deck,
    set_tip_tracking,

    set_volume_tracking,
    corning_96_wellplate_360ul_flat,
    opentrons_96_tiprack_1000ul,
)



In [51]:
set_tip_tracking(enabled = True)
set_volume_tracking(enabled = True)


In [14]:
async def visualize_deck(deck: Deck,
                         backend: LiquidHandlerBackend):
    try:
        lh = LiquidHandler(backend=backend, deck=deck)
        vis = Visualizer(resource = lh)
        await lh.setup()
        await vis.setup()
        return lh
    except Exception as e:
        print(f"Error! Got excpetion: {e}")

In [17]:
# @aidan-baydush [Aidan Baydush] @ ... input name bowen
import os
import numpy as np
import pandas as pd
import random
import math
from typing import Iterator, Tuple, List, AsyncIterator
import asyncio
import sys

from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.backends.backend import LiquidHandlerBackend
from pylabrobot.liquid_handling.backends import ChatterBoxBackend
from pylabrobot.resources import Plate, corning_96_wellplate_360ul_flat, OTDeck, Deck, opentrons_96_tiprack_300ul, axygen_1_reservoir_90ml, set_tip_tracking, set_volume_tracking, set_cross_contamination_tracking, TipRack
from pylabrobot.visualizer.visualizer import Visualizer
import pylabrobot.resources.functional as F


# some function headers
def set_deck() -> Deck:
    deck = OTDeck(name='deck')
    plate = corning_96_wellplate_360ul_flat(name='game')
    deck.assign_child_at_slot(plate, 1)
    tip_racks = [opentrons_96_tiprack_300ul(f'tip_rack_{i}') for i in range(9)]
    for i in range(9):
        # puts tip racks in 3-11
        deck.assign_child_at_slot(tip_racks[i], i+3)
    res = VWR_1_troughplate_195000uL_Ub('res')
    deck.assign_child_at_slot(res, 2)

    return deck


def get_tip_generator(deck: Deck) -> AsyncIterator:
    # get all tip spots
    tip_racks = [deck.get_resource(f'tip_rack_{i}') for i in range(9)]

    tip_spots = F.get_all_tip_spots(tip_racks)

    # build a linear generator to get all new spots (from module 2)
    linear_generator = F.linear_tip_spot_generator(
        tip_spots=tip_spots,                      # the list of tip spots to use
        repeat=True,                              # repeat the tip spots if they run out
    )
    return linear_generator


class Dino:
    def __init__(self, tip_gen: AsyncIterator, deck: Deck, lh: LiquidHandler, plate: Plate, res):
        self.board: List[str] = []            # contains occupied wells
        self.tip_gen = tip_gen,
        self.deck = deck
        self.lh = lh
        self.plate = plate
        self.res = res
        self.dino_pos: List[str] = []
        self.obstacle_vol = 50
        self.dino_vol = 100
        self.score = 0

    async def setup(self):
        '''
            Adds dino to plate.
        '''
        await self._pipette_res_to_well(dest='G2', vols=self.dino_vol)
        await self._pipette_res_to_well(dest='H2', vols=self.dino_vol)
        self.dino_pos.append('G2')
        self.dino_pos.append('H2')

    async def _pipette_plate_to_plate(self, dest: str, source: str, vols: int):
        '''
            Method to pipette from the plate to the plate
        '''
        await self.lh.pick_up_tips(tip_spots=[await anext(self.tip_gen)], use_channels=[0])
        await self.lh.aspirate(resources=[self.plate.get_well(source)], vols=[vols], use_channels=[0])
        await self.lh.dispense(resources=[self.plate.get_well(dest)], vols=[vols], use_channels=[0])
        await self.lh.return_tips()
        self.board.append(dest)
        self.board.remove(source)

    async def _pipette_res_to_well(self, dest: str, vols: int):
        '''
            Method to pipette from the resivoir to the well. 
        '''
        await self.lh.pick_up_tips(tip_spots=[await anext(self.tip_gen)], use_channels=[0])
        await self.lh.aspirate(resources=[self.res.get_well('A1')], vols=[vols], use_channels=[0])
        await self.lh.dispense(resources=[self.plate.get_well(dest)], vols=[vols], use_channels=[0])
        await self.lh.return_tips()
        self.board.append(dest)

    async def _pipette_well_to_res(self, source: str, vols: int):
        '''
            Method to pipette from plate to the resivoir
        '''
        await self.lh.pick_up_tips(tip_spots=[await anext(self.tip_gen)], use_channels=[0])
        await self.lh.aspirate(resources=[self.plate.get_well(source)], vols=[vols], use_channels=[0])
        await self.lh.dispense(resources=[self.res.get_well('A1')], vols=[vols], use_channels=[0])
        await self.lh.return_tips()
        self.board.remove(source)

    async def push_board(self):
        no_dino = self.board.copy()
        for pos in self.dino_pos:                                           # dont move dino!
            no_dino.remove(pos)
        for pos in no_dino:
            if pos[-1] == '1':                                              # remove from board
                await self._pipette_well_to_res(source=pos, vols=self.obstacle_vol)
                self.score += 1
            else:
                # collision detection
                if f'{pos[0]}{int(pos[-1])-1}' in self.dino_pos:
                    self.game_over()                                        # end game
                # else, push all board obstacles over
                await self._pipette_plate_to_plate(source=pos, vols=self.obstacle_vol, dest=f'{pos[0]}{int(pos[-1])-1}')

    async def dino_jump(self):
        await self._pipette_plate_to_plate(dest='F2', source='H2', vols=self.dino_vol)
        self.dino_pos.remove('H2')
        self.dino_pos.append('F2')

    async def dino_squat(self):
        await self._pipette_well_to_res(source='G2', vols=self.dino_vol)
        self.dino_pos.remove('G2')

    async def dino_return(self):
        if self.dino_pos == ['F2', 'G2']:
            await self._pipette_plate_to_plate(dest='H2', source='F2', vols=self.dino_vol)
        elif self.dino_pos == ['H2']:
            await self._pipette_res_to_well(dest='G2', vols=self.dino_vol)
        self.dino_pos = ['H2', 'G2']

    async def new_obstacle(self):
        obstacle_type = int(random.random() * 3)
        loc = {0: 'H12', 1: 'G12', 2: 'F12'}
        await self._pipette_res_to_well(dest=loc[obstacle_type], vols=self.obstacle_vol)

    def game_over(self):
        print('the game has ended. thank you for playing')
        print(f'score: {self.score}')
        sys.exit(0)


def run(dino_obj, turns):
    asyncio.run(dino_obj.setup())
    for i in range(turns):
        # user input
        input = input(
            'your move: s is for squat, w is for jump, and space is for staying.')
        match input:
            case 's':
                asyncio.run(dino_obj.dino_squat())
            case 'w':
                asyncio.run(dino_obj.dino_jump())
            case ' ':
                asyncio.run(dino_obj.dino_return())
        # now push board and every 3rd push add a new obstacle
        asyncio.run(dino_obj.push_board())
        if i % 3 == 0:
            asyncio.run(dino_obj.new_obstacle())
        # now change the score
        asyncio.run(display_number(lh, dino_obj.score))
    sys.exit()


if __name__ == '__main__':
    deck = set_deck()
    tip_gen = get_tip_generator(deck=deck)
    lh = LiquidHandler(backend=LiquidHandlerChatterboxBackend, deck=deck)
    dino = Dino(tip_gen=tip_gen, deck=deck, lh=lh,
                plate=deck.get_resource('game'), res=deck.get_resource('res'))
    N = 100
    run(dino, N)


ImportError: cannot import name 'axygen_1_reservoir_90ml' from 'pylabrobot.resources' (/Users/aidanbaydush/anaconda3/envs/590-dino/lib/python3.14/site-packages/pylabrobot/resources/__init__.py)

In [53]:
# create a list of all tip positions in order
tip_positions = []
for row in 'ABCDEFGH':
    for col in range(1, 13):
        tip_positions.append(f'{row}{col}')

# track which tip we're on
current_tip_index = 0
current_rack_index = 0

async def get_next_tip_spot(lh):
    global current_tip_index, current_rack_index
    
    tip_racks = [lh.deck.get_resource("tip_rack_0"),
                 lh.deck.get_resource("tip_rack_1"),
                 lh.deck.get_resource("tip_rack_2")]
    
    current_rack = tip_racks[current_rack_index]
    tip_position = tip_positions[current_tip_index]
    tip_spot = current_rack[tip_position]  
    
    current_tip_index += 1
    if current_tip_index >= 96:
        current_tip_index = 0
        current_rack_index += 1
    
    return tip_spot

In [None]:
def get_digit_wells(digit_value, col_start):
    """
    Get wells for a digit spanning 3 columns (col_start to col_start+2)
    Rows A-E form the 7-segment display
    """
    col_left = col_start
    col_mid = col_start + 1
    col_right = col_start + 2
    
    segments = {
        0: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_left}', f'C{col_left}', f'D{col_left}',  # left side
            f'B{col_right}', f'C{col_right}', f'D{col_right}',  # right side
            f'E{col_left}', f'E{col_mid}', f'E{col_right}'],  # bottom
        
        1: [f'A{col_right}', f'B{col_right}', f'C{col_right}', 
            f'D{col_right}', f'E{col_right}'],  # right side only
        
        2: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_right}', f'C{col_right}',  # top-right
            f'C{col_mid}',  # middle
            f'D{col_left}', f'E{col_left}',  # bottom-left
            f'E{col_left}', f'E{col_mid}', f'E{col_right}'],  # bottom
        
        3: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_right}', f'C{col_right}', f'D{col_right}', f'E{col_right}',  # right side
            f'C{col_mid}',  # middle
            f'E{col_left}', f'E{col_mid}'],  # bottom
        
        4: [f'A{col_left}', f'B{col_left}', f'C{col_left}',  # top-left
            f'C{col_mid}',  # middle
            f'A{col_right}', f'B{col_right}', f'C{col_right}', f'D{col_right}', f'E{col_right}'],  # right side
        
        5: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_left}', f'C{col_left}',  # top-left
            f'C{col_mid}',  # middle
            f'D{col_right}', f'E{col_right}',  # bottom-right
            f'E{col_left}', f'E{col_mid}'],  # bottom
        
        6: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_left}', f'C{col_left}', f'D{col_left}', f'E{col_left}',  # left side
            f'C{col_mid}',  # middle
            f'D{col_right}', f'E{col_right}',  # bottom-right
            f'E{col_mid}'],  # bottom
        
        7: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_right}', f'C{col_right}', f'D{col_right}', f'E{col_right}'],  # right side
        
        8: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_left}', f'B{col_right}',  # row B sides
            f'C{col_left}', f'C{col_mid}', f'C{col_right}',  # row C (middle segment)
            f'D{col_left}', f'D{col_right}',  # row D sides
            f'E{col_left}', f'E{col_mid}', f'E{col_right}'],  # bottom
        
        9: [f'A{col_left}', f'A{col_mid}', f'A{col_right}',  # top
            f'B{col_left}', f'B{col_right}',  # row B both sides
            f'C{col_left}', f'C{col_mid}', f'C{col_right}',  # middle
            f'D{col_right}', f'E{col_right}'],  # right side bottom
    }
    
    return segments[digit_value]

def get_all_digit_wells(col_start):
    return [f"{row}{col_start + 1}" for row in ['A', 'B', 'C', 'D', 'E']]

async def display_number(lh, number: int):
    if number < 0 or number > 9999:
        raise ValueError("Number must be between 0 and 9999")
    
    dino_plate = lh.deck.get_resource("dino_plate")
    trough = lh.deck.get_resource("trough")
    
    digits_str = str(number).zfill(4)[::-1]
    digit_positions = [10, 7, 4, 1]
    
    current_display = [set() for _ in range(4)]
    if hasattr(lh, '_current_display'):
        current_display = lh._current_display
    
    for idx, (digit_char, col_start) in enumerate(zip(digits_str, digit_positions)):
        if number < 10 ** idx:
            continue
            
        digit = int(digit_char)
        new_wells = set(get_digit_wells(digit, col_start))
        old_wells = current_display[idx]
        
        wells_to_remove = old_wells - new_wells
        wells_to_add = new_wells - old_wells
        
        for well_id in wells_to_remove:
            tip_spot = await get_next_tip_spot(lh)
            await lh.pick_up_tips(tip_spots=tip_spot, use_channels=[0])
            await lh.aspirate(dino_plate[well_id], vols=[200])  
            await lh.dispense(trough["A1"], vols=[200])  
            await lh.discard_tips()
        
        for well_id in wells_to_add:
            tip_spot = await get_next_tip_spot(lh)
            await lh.pick_up_tips(tip_spots=tip_spot, use_channels=[0])
            await lh.aspirate(trough["A1"], vols=[200])  
            await lh.dispense(dino_plate[well_id], vols=[200])  
            await lh.discard_tips()
        
        current_display[idx] = new_wells
    
    lh._current_display = current_display

In [55]:
async def make_dini_ot2():
    deck = OTDeck()
    deck.assign_child_at_slot(corning_96_wellplate_360ul_flat(name=f"dino_plate"), 5)
    deck.assign_child_at_slot(agilent_1_reservoir_290ml(name="trough"), 6)
    
    tip_rack_slots = [1, 2, 3]
    for i, tip_slot in enumerate(tip_rack_slots):
        deck.assign_child_at_slot(opentrons_96_tiprack_1000ul(name=f"tip_rack_{i}"), tip_slot)
    
    return deck

# call function
deck = await make_dini_ot2()
lh = await visualize_deck(deck, LiquidHandlerChatterboxBackend())

# set liquid after lh is created
trough = lh.deck.get_resource("trough")
trough.get_item('A1').tracker.set_liquids([(None, 290000)])

Setting up the liquid handler.
Resource deck was assigned to the liquid handler.
Resource trash_container was assigned to the liquid handler.
Resource dino_plate was assigned to the liquid handler.
Resource trough was assigned to the liquid handler.
Resource tip_rack_0 was assigned to the liquid handler.
Resource tip_rack_1 was assigned to the liquid handler.
Resource tip_rack_2 was assigned to the liquid handler.
Websocket server started at ws://127.0.0.1:2134
File server started at http://127.0.0.1:1350


In [56]:
import time
for i in range(21):
    print(f"Displaying: {i}")
    await display_number(lh, i)
    time.sleep(1)

Displaying: 0
Displaying: 1
Picking up tips:
pip#  resource             offset           tip type     max volume (µL)  fitting depth (mm)   tip length (mm)  filter    
  p0: tip_rack_0_A1        0,0,0            Tip          1000.0           7.95                 88               No        
Aspirating:
pip#  vol(ul)  resource             offset           flow rate  blowout    lld_z       
  p0: 200.0    trough_A1            0,0,0            None       None       None       
Dispensing:
pip#  vol(ul)  resource             offset           flow rate  blowout    lld_z       
  p0: 200.0    dino_plate_E12       0,0,0            None       None       None       
Dropping tips:
pip#  resource             offset           tip type     max volume (µL)  fitting depth (mm)   tip length (mm)  filter    
  p0: trash                0,0.0,0          Tip          1000.0           7.95                 88               No        
Picking up tips:
pip#  resource             offset           tip type     