# BME 590 - Workshop 5 - Interfacing with Peripherals and Synthetic Data
**Professor:** Emma Chory, Ph.D.

**Authors:** 
Rick Wierenga, Joe Laforet, Stefan Golas, Ben Perry

---

### Usage Note
**Reminder** - You should be running this notebook **locally** on **VS Code** not navigating it through **GitHub**.

**Reminder 2** - Remember to run `git pull` before copying this notebook so you get the most recent class updates. See [Section 6 of the class README](https://github.com/chory-lab/bme590-fall-2025#step-6-updating-to-the-latest-version)

---

### Types of Peripherals

Peripherals are machines that are wired into a liquid handling set-up. They can really be anything, such as:

- Robotic arms

- Centrifuges

- Heater-Shakers

- Plate Sealers

- PCR Machines

- Microscopes

- Plate Readers

- Storage

- Refrigeration Units

You have actually already interfaced with one of these peripherals, the robotic gripper. Unlike the gripper, however, many peripherals **generate data** in the form of absorbance values, fluorescence values, microscope images, quantification cycle counts, chromatograns, and more.

While only some of these devices are explictly controllable via PLR, many can be controlled from an external library or set of customized backends that convert high level Python commands to firmware commands.

Like workshop 3, we will **simulate** using these devices through Python classes. We are particularly interested in the ability to **simulate data**.

Being able to simulate data is critical in laboratory robotics methods because:

- It can be used to fully simulate a workflow **prior** to purchasing expensive equipment or making critcial errors.

- It can be used to test the **extreme limits** of a workflow, such as when an assay produces unexpected data, null values, or maxes out.

- It enables testing of **data processing pipelines** which may or may not include **AI methods** to analyze images and other results.

To make the Peripheral accessible by our code, we will make a dummy class. Use the following code as inspiration should you wish to make a Peripheral for a different machine. The general idea is that the Peripheral is an object that produces formatted data. Most times, you will have to then perform some data processing to format the data correctly, and then apply a processing pipeline (like making a line of best fit for protein concentration or image analysis pipeline to get cell confluency), as most relevant quantities aren't directly measurable.

### Plate Reader Example
A plate reader (also called a microplate reader or microplate spectrophotometer) is a laboratory instrument used to detect and measure biological, chemical, or physical reactions that occur in microplates. Some common modalities of measurement are:

- **Absorbance:** Measures how much light is absorbed by a sample (common in enzyme assays and cell growth measurements).

- **Fluorescence:** Detects light emitted by fluorescent molecules (used in DNA/RNA or protein quantification).

- **Luminescence:** Measures light produced by chemical reactions (used in ATP or reporter gene assays).

Let's showcase how we would simulate the data produced by a plate reader using a Python class.

First, let's import our libraries.


In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw

from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.liquid_handling.backends.backend import LiquidHandlerBackend
from pylabrobot.liquid_handling.backends import LiquidHandlerChatterboxBackend
from pylabrobot.resources import Plate, corning_96_wellplate_360ul_flat, OTDeck, Deck, opentrons_96_tiprack_300ul, axygen_1_reservoir_90ml, set_tip_tracking, set_volume_tracking, set_cross_contamination_tracking
from pylabrobot.visualizer.visualizer import Visualizer

Enable our trackers to throw errors and run visualizer correctly

In [None]:
set_tip_tracking(enabled = True)
set_volume_tracking(enabled = True)
set_cross_contamination_tracking(enabled = True)

Create our standard visualize deck function

In [None]:
async def visualize_deck(deck: Deck,
                         backend: LiquidHandlerBackend):
    try:
        lh = LiquidHandler(backend=backend, deck=deck)
        vis = Visualizer(resource = lh)
        await lh.setup()
        await vis.setup()
        return lh
    except Exception as e:
        print(f"Error! Got excpetion: {e}")

Now let's define a `PlateReader` class.

In [None]:
class PlateReader:

    # define the constructor which can take in a place to save the absorbance data
    def __init__(self,
                 save_folder_path: str | os.PathLike):
        self.save_folder = save_folder_path


    # create a function to simulate reading a 96-well plate and saving the absorbance values to file.
    def read_plate(self,
                   plate: Plate, # for now, this argument doesn't do anything since we are hard-coding a 96 well plate.
                   filename: str | os.PathLike = "workshop_5_data.csv"):

        # list to save data
        data = []

        # for each row and column in the data, create psuedo-data from normal distribution
        # centered at loc and with standard deviation of scale
        for row in range(8):
            row_data = []
            for col in range(12):
                absorbance_value = np.random.normal(loc=0.5,
                                                    scale=0.1)
                row_data.append(absorbance_value)
            data.append(row_data)

        # Write all generated data to a csv file
        df = pd.DataFrame(data,
                          index = [chr(65 + i) for i in range(8)],
                          columns=[str(i + 1) for i in range(12)])
        save_path = os.path.join(self.save_folder, filename)
        df.to_csv(save_path, index=True, header=True)
        print(f"Wrote data to {save_path}!")

Now let's initialize our plate reader

In [None]:
# define the save path
cwd = os.getcwd()
save_path = os.path.join(os.path.dirname(cwd), "workshop_5_plate_reader_output")

# create the save path, if it does not exist
os.makedirs(save_path, exist_ok = True)

# create plate reader with this folder
reader = PlateReader(save_folder_path = save_path)

Now, let's create a 96-well plate as an example.

In [None]:
# We'll demonstrate functionality with both a 96 and 384 well plate
plate = corning_96_wellplate_360ul_flat("plate")

Since the peripherals aren't implemented in the visualizer yet, we won't be able to see the movement of the plates to the PlateReader. That's okay, though. We can use our imaginations and pass in the plate to our reader.

Let's simulate reading this plate with our plate reader.

In [None]:
# d the plate and save the data to file
reader.read_plate(
    plate = plate,
    filename = "96_well_plate.csv"
)

One of the key ideas of using the PlateReader is that it will only give us a raw signal of absorbance. This is **NOT** the same as concentration.

Let's look at our absorbance values

In [None]:
print("Raw Absorbance")
df_path = os.path.join(reader.save_folder, "96_well_plate.csv")
pd.read_csv(df_path,
            index_col = 0)

In order to convert these **absorbance** values to **concentration**, we should make use of the Beer-Lambert law:

According to the Beer-Lambert Law, the concentration of a compound has a linear correlation to its absorbance at a defined wavelength at a constant pathlength:

$$ A = \epsilon \cdot C \cdot d$$

where:

- $A$ = Abosrbance

- $\epsilon$ = Extinction (Molar Absorption) Coefficient

- $C$ = Concentration

- $d$ = Pathlength (in cm)

When measuring, e.g., nucleic acid (DNA, RNA) absorbance on a 96-well plate, the liquid pathlength is not fixed to 1 cm, so the absorbance values cannot be used as
such for concentration calculation. Therefore, the true liquid pathlength of each well must be known before the BeerLambert Law can be applied.

One easy solution to this is to calculate the concentration is to simply use a series of **concentration standards**, where several samples of known concentration are used to create a line of best fit. This method avoids path-length correction and enables conversion of abosrbance to concentration.

**Note -** If you would like to read more, see this [guide](https://documents.thermofisher.com/TFS-Assets/LCD/Application-Notes/AN-SkanIT-Microplate-Based-Pathlength-Correction-Technical-Note-EN.pdf) by Thermo Scientific

For now, let's simulate creating this line of best fit with a function

In [None]:
# Linear equation parameters (example values)
slope = 100.0  # Example slope
intercept = 0.0  # Example intercept

# function to simulate and calculate the concentration of a sample
def read_and_calculate_concentrations(file_path: str | os.PathLike,
                                      slope: float,
                                      intercept: float):
    # read data
    df = pd.read_csv(file_path, index_col=0)

    # for each row and column
    for row in df.index:
        for col in df.columns:

            # convert the absorbance to concentration
            absorbance = df.at[row, col]
            concentration = absorbance * slope + intercept
            df.at[row, col] = concentration
            
    return df

# Process the data
conc_df = read_and_calculate_concentrations(df_path, slope, intercept)

Now we have a dataframe of the concentration of sample in each well. Let's visualize this

In [None]:
print("Concentration of Sample (ng/uL)")
conc_df

We now have a dataframe of the concentrations of our sample in each well. It is a common operation to take a plate of randomly distributed samples and do a dilution to make everything a uniform concentration.

Recall the dilution equation:

$$C_1V_1 = C_2V_2$$

where:

- $C_1$ - Concentration of solution 1.

- $V_1$ - Volume of solution 1.

- $C_2$ - Concentration of solution 2.

- $V_2$ - Volume of solution 2.

Let's create a function that can implement this calculation for our plate.

In [None]:
# C1 * V1 = C2 * V2
def calculate_water_volume(target_conc: float,
                           initial_volume: float,
                           initial_conc: float):

    # calculate final volume needed
    final_volume = (initial_conc * initial_volume) / target_conc

    # calculate volume to add to read final_volume
    volume_of_diluent = final_volume - initial_volume

    return volume_of_diluent

# Method to convert the original dataframe into the amount of water needed to dilute each well to a target concentraiton.
def apply_dilution(conc_df: pd.DataFrame,
                   target_conc: float,
                   initial_volume: float):

    # copy the dataframe so we don't overwrite the original
    dilution_df = conc_df.copy()

    # for each row and col
    for row in conc_df.index:
        for col in conc_df.columns:

            # conver the concentration to the amount dilution needd
            initial_conc = conc_df.at[row, col]
            volume_of_water = calculate_water_volume(target_conc, initial_volume, initial_conc)
            dilution_df.at[row, col] = volume_of_water

    return dilution_df

Now let's run and print the results of this function

In [None]:
# Target final concentration and initial volume
target_conc = 20  # ng/uL
initial_volume = 50  # uL

water_to_add = apply_dilution(conc_df, target_conc, initial_volume)
print("Volume of water to add (uL)")
water_to_add

If we change the target concentraiton to a higher value, let's see what happens to the volume to add.

In [None]:
target_conc = 50  # ng/uL
initial_volume = 50  # uL

water_to_add = apply_dilution(conc_df, target_conc, initial_volume)
print("Volume of water to add (uL)")
water_to_add

Notice that some of these volumes to add are actually **negative**, which means there is not enough **protein** itself to reach the desired concentration.

Since you can't easily remove the solvent from the wells (as it will contain the protein of interest) by pipetting, you should consider that when making a liquid handling protocol.

### Cell Counter

While tabular data is created by a plate reader, sometimes you may have interest in simulating **image data** such as microscopy images of cell cultures, which is critical to estimating cell confluency. A good introduction to the importance of cell culture comes from [this article](https://www.thermofisher.com/blog/life-in-the-lab/how-to-measure-cell-confluency/#h-what-is-cell-confluency-nbsp) from ThermoFisher:

>**Cell confluency** is a routine measurement used to track cell proliferation during cell culture. It is not the absolute cell number, but rather the percentage of culture dish or flask area covered by adherent cells.
>
>Cell confluency is a crucial parameter that helps researchers determine timing for passaging, transfecting, or harvesting for downstream applications like drug treatments, cell therapies, induced pluripotent stem cell (iPSC) work, differentiation experiments, and more. Essentially, it’s a non-negotiable step in maintaining healthy cell cultures and ensuring accurate experimental results.
>
>Overconfluency in particular can lead to cell stress or death as culture nutrients deplete, and cells begin competing for physical space. Cell behavior in a crowded environment may misrepresent natural gene expression, growth, or morphologies. Lysing cells will also release cytotoxin debris, and overconfluent cell cultures in general are more susceptible to fungal and bacterial contamination – issues that can irretrievably ruin your culture.

Let's create a method for **simulating images** for cell culture microscopy. Let's define a `CellImager`


In [None]:
class CellImager:

    # constructor to create the imager
    def __init__(self,
                 image_output_dir: str | os.PathLike):

        # we need to point the cell imager to a directory since it will be saving many files
        self.image_output_dir = image_output_dir

    # method to generate cell images
    def _generate_cell_image(self,
                            confluency: float):
        """
        Creates a single synthetic image by iteratively drawing until the target confluency is reached.
        """
        
        # create gray background 256x256 image
        img = Image.new('RGB', [256, 256], color = "lightgrey")
        draw = ImageDraw.Draw(img)

        # target confluency as total percent of pixels
        total_pixels = 256 * 256
        target_pixel_count = total_pixels * (confluency / 100.0)
        
        # draw circular cells until max attempts or target pixels are covered
        current_cell_pixels = 0
        max_attempts = int(target_pixel_count * 5) # Safety break
        attempts = 0

        while current_cell_pixels < target_pixel_count and attempts < max_attempts:

            # get random x, y coords
            x = np.random.randint(0, 256)
            y = np.random.randint(0, 256)
            
            # if the pixel is still the background color
            if img.getpixel((x, y)) == (211, 211, 211):

                # draw the circle
                radius = np.random.randint(2, 5)
                draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill="red")
                current_cell_pixels += np.pi * radius**2
            
            # increment attempts
            attempts += 1
            
        return img

    # given an input plate, capture an image for each well
    def capture_images_for_plate(self,
                                 plate: Plate):
        """
        Function to actually capture image for a plate
        """

        # create folder to store images
        if not os.path.exists(self.image_output_dir):
            os.makedirs(self.image_output_dir)
            print(f"Created directory: {self.image_output_dir}")

        # capture images for each well
        image_paths = {}
        for well in plate.get_all_items():

            # get a random confluence value between 10 - 120%
            random_confluency = np.random.uniform(low = 10.0,
                                                  high = 120.0)
            
            # generate an image for that well
            img = self._generate_cell_image(confluency=random_confluency)

            # write the image to file
            file_path = os.path.join(self.image_output_dir, f"well_{well.name}.png")
            img.save(file_path)
            
            # update the returned mapping to contain the file for the well in question
            image_paths[well.name] = file_path
            
        print(f"Finished capturing {len(image_paths)} images.")
        return image_paths

Now let's instantiate the image capturer.

In [None]:
# make path for imager results
cwd = os.getcwd()
save_path = os.path.join(os.path.dirname(cwd), "workshop_5_imager_data")

# create the imager
imager = CellImager(image_output_dir = save_path)

Let's "image' a cell culture plate

In [None]:
# call the image function to capture the images.
image_paths = imager.capture_images_for_plate(plate)

Let's visualize some of these images that the plate reader has captured.

In [None]:
# these imports allow us to display images in markdown output (jupyter notebook)
from IPython.display import display, Markdown
from IPython.display import Image as IMG

# function to display images
def display_images(image_dir):

    # for each image in the target dir
    png_files = [f for f in os.listdir(image_dir) if f.endswith('.png')]

    # show the first 3
    for i, filename in enumerate(png_files[:3]):
        file_path = os.path.join(image_dir, filename)
        display(Markdown(f"#### {filename}"))
        display(IMG(filename=file_path))
        if i < 2:
            display(Markdown("---"))

Run the function. You should see several simulated "cell images" where the cells are red.

In [None]:
display_images(imager.image_output_dir)

Great. Now let's create a function that can estimate the cell confluency, given an image.

In [None]:
def analyze_confluency_from_image(file_path: str | os.PathLike,
                                  bg_color_rgb: tuple[int] = (211, 211, 211)):

    # open the image
    img = Image.open(file_path).convert('RGB')
    
    # convert to a numpy array
    img_array = np.array(img)
    
    # create a boolean mask where True means the pixel is NOT the background color
    is_cell_mask = ~np.all(img_array == bg_color_rgb, axis=-1)
    
    # calculate confluency by counting the is_cell_mask / the total pixels
    cell_pixel_count = np.sum(is_cell_mask)
    total_pixel_count = img.width * img.height
    confluency = (cell_pixel_count / total_pixel_count) * 100
    
    # round result to two decimal points
    return round(confluency, 2)

Great. Now remember our cell imager returned to us the saved location of each well and its name in the form of a dictionary. Let's look at some of the items in this dictionary:

In [None]:
print(image_paths.keys()) # keys are the well names
print(image_paths.values()) # the values are the file paths to the image of the well in the key

Let's write a **for** loop that can iterate over these items, and for each well calculate a binary yes or no value if that cell is ready for passaging.

In [None]:
def predict_cell_confluency(image_paths: dict, confluency_threshold: float):

    # for each well and image location in our returned data
    for well_name, image_path in image_paths.items():

        # calcualte it's confluency
        measured_confluency = analyze_confluency_from_image(image_path)

        # using a threshold value of our choice, print whether or not that cell is ready for passaging.
        print(f"Well {well_name}: Measured Confluency = {measured_confluency}%")
        if measured_confluency > confluency_threshold:
            print(f"  -> Decision: Well {well_name} is ready for passaging.")
        else:
            print(f"  -> Decision: Well {well_name} needs more incubation time.")

predict_cell_confluency(image_paths, 80.00)

## Exercises

---

### **Exercise 1.** Using Plate Reader (60 pts)

**1.A.** Implement plate reader

Our plate reader in the prior problem was hard coded to only handle 96-well plates. Fill in the methods below to create a `GeneralPlateReader` class which can operate on any size plate.

You should implement

- `read_plate` - Input a plate and parameters for an absorbance distribution and create a `self.df` attribute with the return dataframe.

- `calculate_concentrations` - Uses the `self.df` attribute, and inputs a slope and intercept for a simulated line of best fit from a standard curve to update the `self.df` attribute to contain the values in concentrations.

- `calculate_water_volume` - Inputs a target concentration, initial volume, and initial concentration to calculate the water needed to add to a given well.

- `apply_dilutions` - Uses the `calculate_water_volume` method and the `self.df` attribute to convert all the concentrations of `self.df` into the volume needed to add to each well.

**Note -** Comment your code and submit it as `exercise_1A.txt`

In [None]:
class GeneralPlateReader:

    # define the constructor which can take in a place to save the absorbance data
    def __init__(self,
                 save_folder_path: str | os.PathLike):
        self.save_folder = save_folder_path


    # create a function to simulate reading a 96-well plate and saving the absorbance values to file.
    def read_plate(self,
                   plate: Plate,
                   mean_abs: float,
                   abs_stdev: float):
        
        ... # YOUR CODE HERE
        # assign output as df attribute of class
        self.df = df

    def calculate_concentrations(self,
                                 slope: float,
                                 intercept: float):

        ... # YOUR CODE HERE

    def calculate_water_volume(self,
                               target_conc: float,
                               initial_volume: float,
                               initial_conc: float):

        ... # YOUR CODE HERE
        return volume_of_diluent

    # Method to convert the original dataframe into the amount of water needed to dilute each well to a target concentraiton.
    def apply_dilution(self,
                       target_conc: float,
                       initial_volume: float):

        ... # YOUR CODE HERE

# instantiate plate reader and example plate
reader = GeneralPlateReader(save_folder_path = save_path)
plate = corning_96_wellplate_360ul_flat("plate")

# read the plate and print results
reader.read_plate(plate, 1, 0.5)
print(f"---PLATE READING RESULTS---")
print(reader.df)
print()

# calculate the concentrations and print results
reader.calculate_concentrations(100, 0)
print(f"---CONCENTRATION CALCULATION RESULTS---")
print(reader.df)
print()

# apply the dilutions and print results
reader.apply_dilution(40, 20)
print(f"---DILUTION CALCULATION RESULTS---")
print(reader.df)
print()

**1.B.** 20 ng / uL Dilutions

Using your `GeneralPlateReader` class, you will need to implement a liquid handling protocol which starts with a DNA plate, tips, and a reservoir of water, reads the plate, and then using the calculated volumes to pipette, performs the dilutions. Your code should contain:

- Relevant imports to execute your script

- A `make_deck()` function which inputs:

    - A `plate_type` to create either a 96 well or 24 well plate for dilutions

    - An `initial_vol` argument to set the initial volume of DNA on the DNA plate.

    - The function should then:

        - Create an OpenTrons Deck

        - Assign the DNA plate to the deck

        - Assign tips to the deck

        - Set the initial DNA liquid levels

        - Set the initial dilution reservoir level of water to 90 mL

- A `remove_plate_and_read()` function that simulates a gripper moving your plate to a plate reader and returning data. We have already implemented this function for you.

- A helper function to perform a **mixing** operation and a helper function to create the **tip generator**.

- A `pipette_vols()` function that inputs a row and column to move diluent from the diluent reservoir to the DNA plate at the row-column position.

Using these functions, write a liquid handling protocol that:

1. Set up the OT-2 Deck

2. Reads the plate using the defined function

3. Pipettes the correct volume to dilute each sample to 20 ng / uL

Make sure to **record a GIF** including the deck setup and pipetting operations. Use the following settings for plate reading:

- mean_abs = 0.5

- abs_stdev = 0.1

- slope = 100

- intercept = 0

- target_conc = 20

- initial_volume = 50

**Note -** Comment your code and submit it as `exercise_1B.txt` and `exercise_1B.gif`

In [None]:
# Imports
import time
from pylabrobot.resources import Resource
import ... # YOUR CODE HERE

def make_deck(plate_type = "96_well", initial_vol = 50):

    if plate_type == "96_well":
        dna_plate = ... # YOUR CODE HERE
    elif plate_type == "24_well":
        dna_plate = ... # YOUR CODE HERE

    ... # YOUR CODE HERE
    return deck

def remove_plate_and_read(lh,
                          plate_name: str,
                          plate_slot: int,
                          mean_abs: float, 
                          abs_stdev: float,
                          slope: float, 
                          intercept: float,
                          target_conc: float,
                          initial_volume: float):

    # get plate and "move it" to plate reader
    plate = lh.get_resource(plate_name)
    lh.deck.unassign_child_resource(plate)

    # read the plate
    reader.read_plate(plate, mean_abs, abs_stdev)
    time.sleep(5) # simulate plate reading time

    # calculate the concentrations
    reader.calculate_concentrations(slope, intercept)

    # calculat the amount of water to add
    reader.apply_dilution(target_conc, initial_volume)

    # put the plate back onto the deck
    lh.deck.assign_child_at_slot(plate, plate_slot)
    return reader.df

def prepare_tip_generator(
        tip_racks
):
    ... # YOUR CODE HERE

async def mix(
        lh,
        plate,
        well,
        n_mixes,
        vol
):
    ... # YOUR CODE HERE

async def pipette_vols(lh: LiquidHandler,
                 diluent_reservoir: Resource,
                 dna_plate: Plate,
                 tip_generator,
                 row: str,
                 col: str,
                 volume: float):

    ... # YOUR CODE HERE

async def run_protocol_1B(lh):

    ... # YOUR CODE HERE
    vol_df = remove_plate_and_read(...)

    # for each row and col, pipette the volumes into the dna plate
    ... # YOUR CODE HERE

# run the protocol
deck = make_deck()
lh = await visualize_deck(deck, LiquidHandlerChatterboxBackend())
await run_protocol_1B(lh)

**1.C.** 40 ng / uL diliutions

Repeat the dilution experiment above, but this time, implement a function called `pipette_vols_careful()`, which implements a logical check to skip the well if:

- The volume to pipette is negative, or...

- The volume is greater than the biggest tip capacity, or...

- The volume to pipette would cause the current volume in the well to exceed its maximum capacity.

Using these functions, write a liquid handling protocol that:

1. Set up the OT-2 Deck

2. Reads the plate using the defined function

3. Pipettes the correct volume to dilute each sample to 40 ng / uL

Make sure to **record a GIF** including the deck setup and pipetting operations. Use the following settings for plate reading:

- mean_abs = 0.5

- abs_stdev = 0.1

- slope = 100

- intercept = 0

- target_conc = 40

- initial_volume = 2000

**Note -** You may have to use larger tips (i.e. adjust your make_deck function). Comment your code and submit it as `exercise_1C.txt` and `exercise_1C.gif`

In [None]:
async def pipette_vols_careful(lh: LiquidHandler,
                 diluent_reservoir: Resource,
                 dna_plate: Plate,
                 tip_generator,
                 row: str,
                 col: str,
                 volume: float):

    ... # YOUR CODE HERE

async def run_protocol_1C(lh):

    ... # YOUR CODE HERE
    vol_df = remove_plate_and_read(...)

    ... # YOUR CODE HERE

# run the protocol
deck = make_deck(plate_type = "24_well", initial_vol = 2000)
lh = await visualize_deck(deck, LiquidHandlerChatterboxBackend())
await run_protocol_1C(lh)

### **Exercise 2.** Cell Counter + Passaging (40 pts)

**2.A.** Modified Cell Counter

We will be creating a modified version of the cell counter which is able to simulate confluency **over several days** of cell growth. Below, we provide a template `GrowthSimulatedCellImager` class with some methods defined. You will need to implement the following methods:

- `__init__()`

    - This function should now take in new growth simulation parameters and assign them to the `self` attribute of the class.

    - Also you should create a `self.confluent_wells` attribute that is a python `set()` that can store the wells that are currently confluent.

- `_simulate_confluency()`

    - This method hsould take in an argument named `time_point` to simulate the base confluency with the following equation:

        - $B = I \cdot r^t$ where $I$ is the initial confluency, $r$ is the growth rate and $t$ is the time point.

        - $B_{noise} = B + C_{noise}$ and $C_{noise}$ is drawn from a uniform distribution $\left[-\text{cell variability}, +\text{cell variability}\right]$

        - Use [np.clip](https://numpy.org/doc/2.1/reference/generated/numpy.clip.html) to clip the value beetween $0$ and `max_confluency`

- `capture_images_for_plate()`

    - Same as the prior capture image function, but should check if a well is already in the `self.confluent_wells` attribute

- `predict_cell_confluency()`

    - For each well name and image path in the input `well_data` attribute and `confluency_threshold`,  add to the `self.confluent_wells` attribute wells above the `confluency_threshold` paramter.

    - For each well ready for passaging, return it as a `list()` so your liquid handler knows which wells to passage.

**Note -** Comment your code and submit it as `exercise_2A.txt`

In [None]:
class GrowthSimulatedCellImager:

    # constructor with variables for growth rate and confluency measurements
    def __init__(self,
                 image_output_dir: str | os.PathLike,
                 initial_confluency: float = 10.0,
                 growth_rate: float = 1.6,
                 max_confluency: float = 120.0,
                 per_well_variability: float = 30.0
                ):

        ... # YOUR CODE HERE

        # way to track current wells
        self.confluent_wells = ... # YOUR CODE HERE

    # simulate growth over time
    def _simulate_confluency(self, time_point: int) -> float:

        base_confluency = ... # YOUR CODE HERE
        noise = np.random.uniform(...) # YOUR CODE HERE
        simulated_confluency = base_confluency + noise # YOUR CODE HERE
        
        # clamp the value between 0 and the maximum confluency
        return np.clip(simulated_confluency, 0, self.max_confluency)

    def _generate_cell_image(self,
                             confluency: float):
        
        # draw background image
        img = Image.new('RGB', [256, 256], color = "lightgrey")
        draw = ImageDraw.Draw(img)

        # target confluency
        total_pixels = 256 * 256
        target_pixel_count = total_pixels * (confluency / 100.0)
        
        # keep drawing cells until confluency reached or break condition reached
        current_cell_pixels = 0
        max_attempts = int(target_pixel_count * 5) + 500 # safety break
        attempts = 0

        # draw cells until confluency reached
        while current_cell_pixels < target_pixel_count and attempts < max_attempts:

            # get random x, y coords
            x = np.random.randint(0, 256)
            y = np.random.randint(0, 256)
            
            # if the pixel is still the background color
            if img.getpixel((x, y)) == (211, 211, 211):

                # draw the circle
                radius = np.random.randint(2, 5)
                draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill="red")
                current_cell_pixels += np.pi * radius**2
            
            # increment attempts
            attempts += 1
            
        return img

    def capture_images_for_plate(self,
                                 plate: Plate,
                                 time_point: int
                                ):

        # create folder to store images
        if not os.path.exists(self.image_output_dir):
            os.makedirs(self.image_output_dir)
            print(f"Created directory: {self.image_output_dir}")

        # dictionary to hold data
        well_data_from_run = {}
        
        # capture images for each well
        for well in plate.get_all_items():

            # if the well name is already confluent, skip the image
            well_name = well.name.split("_")[-1]
            if ... # YOUR CODE HERE

            # otherwise, image the cell
            simulated_confluency = self._simulate_confluency(time_point=time_point)
            img = self._generate_cell_image(confluency=simulated_confluency)
            file_path = os.path.join(self.image_output_dir, f"well_{well_name}_t{time_point}.png")
            img.save(file_path)

            # save the image path
            well_data_from_run[well_name] = file_path
            
        return well_data_from_run

    def analyze_confluency_from_image(self,
                                      file_path: str | os.PathLike,
                                      bg_color_rgb: tuple[int] = (211, 211, 211)):
        """
        Analyzes the actual pixel-based confluency from a saved image.
        (This function is unchanged)
        """
        # open the image
        img = Image.open(file_path).convert('RGB')
        
        # convert to a numpy array
        img_array = np.array(img)
        
        # Create a boolean mask where True means the pixel is NOT the background color
        is_cell_mask = ~np.all(img_array == bg_color_rgb, axis=-1)
        
        # Calculate confluency
        cell_pixel_count = np.sum(is_cell_mask)
        total_pixel_count = img.width * img.height
        confluency = (cell_pixel_count / total_pixel_count) * 100
        
        return round(confluency, 2)

    def predict_cell_confluency(self,
                                well_data,
                                confluency_threshold: float
                               ):

        ready_for_passaging = []
        
        print(f"\n--- Confluency Analysis (Threshold: {confluency_threshold}%) ---")

        # for each well in the dictionary
        for well_name, image_path in well_data.items():
            ... # YOUR CODE HERE
            
            # if confluent, add to confluent wells
            if ... # YOUR CODE HERE
                print(f"  -> Decision: Well {well_name} is ready for passaging.")
                self.confluent_wells.add(...) # YOUR CODE HERE
                ready_for_passaging.append(...) # YOUR CODE HERE
            
            # Otherwise, skip
            else:
                print(f"  -> Decision: Well {well_name} needs more incubation time.")
                
        # return list of wells ready for passaging
        return ...

**2.B.** Cell Passaging Liquid Handling

Using your `GrowthSimulatedCellImager` class, you will need to implement a liquid handling protocol which starts with a cell culture plate, tips, and reservoirs of PBS, trypsin, and media.

The protocol should then simulate **5 days** of cell growth, imaging the wells on the plate that are still growing each day, and then passaging any wells that need passaging. Your script should include:

- Relevant imports to execute your script

- A `make_deck()` function which creates a deck with:

    - An OpenTrons Deck

    - A cell culture plate

    - A destination plate

    - Enough tip racks (may need 7 or 8 here)

    - A 6-well reservoir with enough PBS, Media, and Trypsin to complete the protocol

- A `prepare_new_plate_media()` function which pipettes fresh starting media (180 uL per well) from the media reservoir to the destination plate.

- A `passage_well()` function that inputs the well name to passage along with other parameters and:

    - Aspirates all media (200 uL) from the well

    - Washes the well with 100 uL of PBS

    - Detaches the cells with 50 uL of Trypsin

    - Inactivates the trypsin with 50 uL of cell media

    - Aspirates the trypsin-cell-media mixture and moves it to the corresponding well on the destination plate at a 1:10 ratio (20 uL in 180 uL)

- A `simulate_imaging()` function that simulates a gripper moving your plate to a cell imager, and returns a dictionary of well_id to image path data of the wells imaged (or returns None if all wells imaged are confluent). We implement this for you.

- A helper function to perform a **mixing** operation and a helper function to create the **tip generator**.

Using these functions, write a liquid handling protocol called `run_protocol_2B()` that:

- Get's all relevant resources from the deck.

- Instantiates a `GrowthSimulatedCellImager` class.

- Prepares the destination plate with fresh media in all wells.

- For each day in 5 days, the protocol should:

    - Call the `simulate_imaging` function to get the wells images for wells not passaged so far.

    - Using these images, call `predict_cell_confluency()` on your imager to get the wells to passage, if any.

    - For each well in the wells to passage, call your `passage_well()` function to passage that well to the new destination plate at a 1:10 ratio.

    - If no new wells exist in the `simulate_imaging()` function, break the loop early.

    - Use the default class parameters we define in **Exercise 2A.** Use **80%** as the confluence threshold for passaging.

**Note -** Comment your code and submit it as `exercise_2B.txt` and `exercise_2B.gif`

In [None]:
from pylabrobot.resources import Porvair_6_reservoir_47ml_Vb

def make_deck_2B():
    ... # YOUR CODE HERE
    return deck

def prepare_tip_generator(
        tip_racks
):
    ... # YOUR CODE HERE

async def mix(
        lh,
        plate,
        well,
        n_mixes,
        vol
):
    ... # YOUR CODE HERE

async def prepare_new_plate_media(lh: LiquidHandler,
                                  media_reservoir: Resource,
                                  media_well: str,
                                  target_plate: Plate,
                                  volume: float,
                                  tip_gen):
    ... # YOUR CODE HERE

async def passage_well(lh: LiquidHandler,
                       well_name: str,
                       tip_gen,
                       reservoir_plate: Plate,
                       media_well: str,
                       pbs_well: str,
                       trypsin_well: str,
                       cell_plate: Plate,
                       destination_plate: Plate):

    # 1. Aspirate Media
    ... # YOUR CODE HERE

    # 2. Wash with PBS
    ... # YOUR CODE HERE

    # 3. Detach with Trypsin
    ... # YOUR CODE HERE

    # 4. Neutralize and 5. Transfer (split)
    ... # YOUR CODE HERE
    
    # Transfer for a 1:10 split (20uL from 100uL total, into 180uL new media)
    ... # YOUR CODE HERE

async def simulate_imaging(lh, plate_name, plate_slot, imager, day):

    # get plate and "move it" to cell imager
    plate = lh.get_resource(plate_name)
    lh.deck.unassign_child_resource(plate)
    time.sleep(5)

    # capture plate images
    new_well_data = imager.capture_images_for_plate(
        plate=plate,
        time_point=day
    )

    # put the plate back onto the deck
    lh.deck.assign_child_at_slot(plate, plate_slot)

    return new_well_data

async def run_protocol_2B(lh, num_days=5):

    ... # YOUR CODE HERE
    imager = ... # YOUR CODE HERE

    # prepare new plate
    await prepare_new_plate_media(...)
    
    for day in range(1, num_days + 1):
        new_well_data = await simulate_imaging(...)

        if ... # YOUR CODE HERE
            print("All wells are confluent. Experiment complete.")
            break
        ... # YOUR CODE HERE

deck = make_deck_2B()
lh = await visualize_deck(deck, LiquidHandlerChatterboxBackend())
await run_protocol_2B(lh)

---

#### Conclusion

That's all for workshop 5! Double check that you have submitted a `.txt` file for every problem, a `.gif` file for problems 1B, 1C, and 2B. You should have submitted:

- `exercise_1A.txt` with the code for exercise 1A.

- `exercise_1B.txt` with the code for exercise 1B.

- `exercise_1B.gif` showing the full deck setup and protocol of exercise 1B

- `exercise_1C.txt` with the code for exercise 1C.

- `exercise_1C.gif` showing the full deck setup and protocol of exercise 1C

- `exercise_2A.txt` with the code for exercise 2A.

- `exercise_2B.txt` with the code for exercise 2B.

- `exercise_2B.gif` showing the full deck setup and protocol of exercise 2B

**EVERY CODE BLOCK SHOULD HAVE WELL-WRITTEN COMMENTS**

If you are still feeling unsure on deck setup, please **reach out to the teaching team**, contact info for whom can be found in the .`README.md` file on the [class GitHub](https://github.com/chory-lab/bme590-fall-2025)

---