In [None]:
!pip install opentrons matplotlib
#!pip install --no-cache-dir https://github.com/Bioprotocols/labop/archive/main.zip

In [None]:
import opentrons.simulate as simulate
from opentrons.protocols import bundle
import io
import sys

import os
import tempfile
import sbol3
import labop
import tyto
import uml
import json
import rdflib as rdfl
from IPython.display import Markdown, IFrame
# %matplotlib inline
from IPython.display import Image

from labop.execution_engine import ExecutionEngine
# from labop_check.labop_check import check_doc
from labop_convert.markdown.markdown_specialization import MarkdownSpecialization
from labop_convert.opentrons.opentrons_specialization import OT2Specialization

%load_ext autoreload
%autoreload 2

In [None]:
def make_demo_script(filename):
  protocol = io.StringIO()


  protocol.write("""
  from opentrons import protocol_api

  # metadata
  metadata = {
      'protocolName': 'My Protocol',
      'author': 'Name <opentrons@example.com>',
      'description': 'Simple protocol to get started using the OT-2',
      'apiLevel': '2.12'
  }



  # protocol run function
  def run(protocol: protocol_api.ProtocolContext):

      # labware
      plate = protocol.load_labware('corning_96_wellplate_360ul_flat', location='1')
      tiprack = protocol.load_labware('opentrons_96_tiprack_300ul', location='2')

      # pipettes
      left_pipette = protocol.load_instrument(
          'p300_single', mount='left', tip_racks=[tiprack])

      # commands
      left_pipette.pick_up_tip()
      left_pipette.aspirate(100, plate['A1'])
      left_pipette.dispense(100, plate['B2'])
      left_pipette.drop_tip()
  """)

  with open(filename, "w") as f:
      f.write(protocol.getvalue())
  return filename

In [None]:
#############################################
# set up the document
print('Setting up document')
doc = sbol3.Document()
sbol3.set_namespace('https://bbn.com/scratch/')

#############################################
# Import the primitive libraries
print('Importing libraries')
labop.import_library('liquid_handling')
print('... Imported liquid handling')
labop.import_library('plate_handling')
print('... Imported plate handling')
labop.import_library('spectrophotometry')
print('... Imported spectrophotometry')
labop.import_library('sample_arrays')
print('... Imported sample arrays')

protocol = labop.Protocol("OT2_demo")
protocol.name = "OT2 demo"
doc.add(protocol)

CONT_NS = rdfl.Namespace('https://sift.net/container-ontology/container-ontology#')
OM_NS = rdfl.Namespace('http://www.ontology-of-units-of-measure.org/resource/om-2/')
PREFIX_MAP = json.dumps({"cont": CONT_NS, "om": OM_NS})

# plate = protocol.load_labware('corning_96_wellplate_360ul_flat', location='1')
plate_spec = labop.ContainerSpec('sample_plate', name='sample plate', 
                                 queryString='cont:Corning96WellPlate360uLFlat', 
                                 prefixMap=PREFIX_MAP)
plate = protocol.primitive_step('EmptyContainer', specification=plate_spec)
load_plate = protocol.primitive_step('LoadRackOnInstrument', rack=plate_spec, coordinates='1')

# tiprack = protocol.load_labware('opentrons_96_tiprack_300ul', location='2')
tiprack_spec = labop.ContainerSpec('tiprack', queryString='cont:Opentrons96TipRack300uL', prefixMap=PREFIX_MAP)
tiprack = protocol.primitive_step('LoadRackOnInstrument', rack=tiprack_spec, coordinates='2')

# left_pipette = protocol.load_instrument(
#          'p300_single', mount='left', tip_racks=[tiprack])
p300 = sbol3.Agent('p300_single', name='P300 Single')
doc.add(p300)
left_pipette = protocol.primitive_step('ConfigureRobot', instrument=p300, mount="left")

# left_pipette.pick_up_tip()
# left_pipette.aspirate(100, plate['A1'])
# left_pipette.dispense(100, plate['B2'])
# left_pipette.drop_tip()
source_well = protocol.primitive_step('PlateCoordinates', source=plate.output_pin('samples'), coordinates="A1")
dest_well = protocol.primitive_step('PlateCoordinates', source=plate.output_pin('samples'), coordinates="B2")
pip1 = protocol.primitive_step("Transfer", 
                               source=source_well.output_pin('samples'), 
                               destination=dest_well.output_pin('samples'), 
                               amount=sbol3.Measure(100, tyto.OM.microliter)
                               )
protocol.to_dot().render(filename=protocol.display_name, format="png")
Image(protocol.display_name+".png")

In [None]:
filename="ot2_demo_labop"
automated_script = filename+".py"
agent = sbol3.Agent("ot2_machine", name='OT2 machine')
ee = ExecutionEngine(specializations=[OT2Specialization(filename)])
parameter_values = []
execution = ee.execute(protocol, agent, id="test_execution")

#v = doc.validate()
#assert len(v) == 0, "".join(f'\n {e}' for e in v)

doc.write('ot2_demo.ttl', file_format='ttl')

In [None]:
# TODO Put into labop as utility
def run_ot2_sim(filename):
    sys.argv=[]
    """Run the simulation"""
    # parser = argparse.ArgumentParser(
    #     prog="opentrons_simulate", description="Simulate an OT-2 protocol"
    # )
    # parser = simulate.get_arguments(parser)

    # args = parser.parse_args()
    # Try to migrate api v1 containers if needed

    #duration_estimator = DurationEstimator() if args.estimate_duration else None  # type: ignore[no-untyped-call]
    duration_estimator = None

    with open(filename, "r") as protocol:
        runlog, maybe_bundle = simulate.simulate(
            protocol,
            # args.protocol.name,
            # getattr(args, "custom_labware_path", []),
            # getattr(args, "custom_data_path", []) + getattr(args, "custom_data_file", []),
            # duration_estimator=duration_estimator,
            # hardware_simulator_file_path=getattr(args, "custom_hardware_simulator_file"),
            # log_level=args.log_level,
        )

    if maybe_bundle:
        bundle_name = "bundle" # getattr(args, "bundle", None)
        # if bundle_name == args.protocol.name:
        #     raise RuntimeError("Bundle path and input path must be different")
        bundle_dest = simulate._get_bundle_dest(
            bundle_name, "PROTOCOL.ot2.zip", "my_protocol" #args.protocol.name
        )
        if bundle_dest:
            bundle.create_bundle(maybe_bundle, bundle_dest)

    # if args.output == "runlog":
    print(simulate.format_runlog(runlog))

    if duration_estimator:
        duration_seconds = duration_estimator.get_total_duration()
        hours = int(duration_seconds / 60 / 60)
        minutes = int((duration_seconds % (60 * 60)) / 60)
        print("--------------------------------------------------------------")
        print(f"Estimated protocol duration: {hours}h:{minutes}m")
        print("--------------------------------------------------------------")
        print("WARNING: Protocol duration estimation is an experimental feature")

    return runlog

In [None]:
manual_script = "manual_script.py"
run_ot2_sim(make_demo_script(manual_script))
run_ot2_sim(automated_script)

In [None]:
# TODO add Ludox protocol
# TODO add PCR protocol