# Executing Gcode Sequences with Priorities

In [1]:
from asyncio import PriorityQueue
import asyncio
import time
from rich import print
from dataclasses import dataclass, field
from enum import Enum, auto, StrEnum
from typing import Optional, Dict, Self, Generic, TypeVar, List, Iterator, NamedTuple, Protocol, Tuple
from collections import namedtuple

## Types

### Commands

In [2]:
class EmitsGcode(Protocol):
    def gcode(self) -> str:
        ...

class CommandType(Enum):
    GO_HOME = auto()
    GO_XY = auto()


class CommandInfo(NamedTuple):
    gcode: str
    fields: List[str]
    description: Optional[str] = None


Numeric = TypeVar("Numeric", int, float)


class Coordinate(NamedTuple):
    x: Optional[Numeric] = None
    y: Optional[Numeric] = None
    z: Optional[Numeric] = None

    def gcode(self) -> Iterator[str]:
        return " ".join([f"{v}" for _, v in self._asdict().items() if v is not None])


### Predefined Co-ordinates

In [3]:
_command_to_info: Dict[CommandType, CommandInfo] = {
    CommandType.GO_HOME: CommandInfo("G28", [], "Go to home position"),
    CommandType.GO_XY: CommandInfo("G1", ["x", "y", "z"], "Go to specified XY position"),
}

_hardcoded_locations: Dict[str, Coordinate] = {
    "tiprack": Coordinate(20, 100, 20),
    "trash": Coordinate(0, 0, 0),
    "home": Coordinate(100, 100, 0),
}


class NamedLocation(StrEnum):
    TIPRACK = "tiprack"
    TRASH = "trash"
    HOME = "home"


def HardcodedLocation(name: NamedLocation) -> Coordinate:
    return _hardcoded_locations[name]


### Command Sequences

In [4]:
@dataclass
class GcodeCommand(EmitsGcode):
    type: CommandType
    arg: Optional[Coordinate] = None
    info: Optional[CommandInfo] = field(init=False)

    def __post_init__(self):
      self.info = _command_to_info[self.type]

      try:
        if self.arg is not None:
          assert len(self.arg) == len(_command_to_info[self.type].fields)
        else:
          assert len(_command_to_info[self.type].fields) == 0
      except AssertionError:
        raise ValueError(
            f"Command {self.type} expects {len(_command_to_info[self.type].fields)} arguments, but got {len(self.arg)}")

    def gcode(self) -> str:
        return f"{_command_to_info[self.type].gcode} {self.arg.gcode() if self.arg else ''}"


@dataclass
class CommandSequence(EmitsGcode):
    seq: list[GcodeCommand] = field(default_factory=list[GcodeCommand])
    name: str = ""

    def __len__(self) -> int:
        return len(self.seq)

    def __add__(self, gc: GcodeCommand) -> Self:
        self.seq.append(gc)
        return self

    def __iter__(self) -> Iterator[GcodeCommand]:
        return iter(self.seq)

    def gcode(self) -> str:
        return "\n".join([gc.gcode() for gc in self.seq])


### Predefined Command Sequences

In [5]:
pick_tip = CommandSequence([
    GcodeCommand(
        type=CommandType.GO_XY,
        arg=HardcodedLocation(NamedLocation.TIPRACK)),
    GcodeCommand(
        type=CommandType.GO_XY,
        arg=HardcodedLocation(NamedLocation.HOME)),
    GcodeCommand(
        type=CommandType.GO_HOME
    ),
], name="Pick Tip")


go_home = CommandSequence(
    [GcodeCommand(type=CommandType.GO_HOME)], name="Go Home")

# Tasks

In [6]:
@dataclass
class TaskStats:
    created_at: float = field(default_factory=time.time)
    started_at: float = field(init=False, repr=False)
    completed_at: float = field(init=False, repr=False)
    duration: float = field(init=False, repr=False)
    elapsed: float = field(init=False, repr=False)

    def start(self) -> None:
        self.started_at = time.time()

    def complete(self) -> None:
        self.completed_at = time.time()
        self.duration = self.completed_at - self.started_at
        self.elapsed = self.completed_at - self.created_at

    def __str__(self) -> str:
        msg: str = f"Duration: {self.duration:.2f}s"
        if self.elapsed - self.duration > 0.5:
          msg += f", Waited For: +{self.elapsed - self.duration:.2f}s"
        return msg


class Tool(Enum):
    NONE = auto()
    PIPETTE = auto()
    DISK_PNP = auto()


@dataclass
class GcodeTask:
    seq: CommandSequence
    priority: int = 7
    stats: TaskStats = field(default_factory=TaskStats)
    needs_tool: Tool = field(default=Tool.NONE)


async def execute_seq(cmd_seq: CommandSequence, *, verbose: bool = False) -> None:
  [print("⏳ Execution Start") if verbose else None]

  try:
    for index, gc in enumerate(cmd_seq):
      if verbose:
        msg: str = f"   ↓ Executing #{index+1}: {gc.gcode():12}"
        msg += f" > Intent='{gc.info.description}'"
        print(msg)

      await asyncio.sleep(0.1)
    [print("✅ Execution Complete") if verbose else None]
  except (KeyboardInterrupt) as e:
    print(f"Execution Failed\n{e}")
  finally:
    [print("---") if verbose else None]


async def execute_task(task: GcodeTask, *, verbose: bool = False) -> None:
    print(f"Task: priority={task.priority}, steps={len(task.seq)}")

    task.stats.start()
    await execute_seq(task.seq, verbose=verbose)
    task.stats.complete()

    print(f" ↳ {task.seq.name}")
    print(f"   ↳ {task.stats}")


# Prioritising Tasks

## Approach 1: Single Priority Based Queue

In [34]:
async def CmdExecute(taskQueue: PriorityQueue[GcodeTask]):
  while True:
    prio, task = await taskQueue.get()
    print("pending task count: {}, is empty?: {}, needs: {}".format(
      taskQueue.qsize(), 
      taskQueue.empty(),
      task.needs_tool))
    await execute_task(task, verbose=False)
    taskQueue.task_done()


async def CmdStream(taskQueue: PriorityQueue[GcodeTask]):
  gtasklist: List[GcodeTask] = [
      GcodeTask(seq=pick_tip, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=pick_tip, priority=4, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=3, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=pick_tip, priority=6, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, priority=1, needs_tool=Tool.PIPETTE),
  ]

  for task in gtasklist:
    await taskQueue.put((task.priority, task))
    await asyncio.sleep(0.05)
  await taskQueue.join()


GcodeTaskQueue: PriorityQueue[GcodeTask] = PriorityQueue(maxsize=100)

In [None]:
loop = asyncio.get_event_loop()
# loop.set_debug(True)


verbose: bool = True


async def main():
  async with asyncio.TaskGroup() as tg:
    tg.create_task(CmdStream(GcodeTaskQueue))
    tg.create_task(CmdExecute(GcodeTaskQueue))
  print("Done")


try:
  loop.create_task(main())
except (KeyboardInterrupt, asyncio.CancelledError):
  print(f"Exited with {GcodeTaskQueue.qsize()} tasks remaining")
  if verbose:
    print(GcodeTaskQueue._queue)


pending task count: [1;36m0[0m, is empty?: [3;92mTrue[0m, needs: Tool.PIPETTE
Task: [33mpriority[0m=[1;36m7[0m, [33msteps[0m=[1;36m3[0m
 ↳ Pick Tip
   ↳ Duration: [1;36m0.[0m30s
pending task count: [1;36m4[0m, is empty?: [3;91mFalse[0m, needs: Tool.NONE
Task: [33mpriority[0m=[1;36m2[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s
pending task count: [1;36m5[0m, is empty?: [3;91mFalse[0m, needs: Tool.NONE
Task: [33mpriority[0m=[1;36m2[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s
pending task count: [1;36m5[0m, is empty?: [3;91mFalse[0m, needs: Tool.PIPETTE
Task: [33mpriority[0m=[1;36m1[0m, [33msteps[0m=[1;36m3[0m
 ↳ Pick Tip
   ↳ Duration: [1;36m0.[0m30s, Waited For: +[1;36m0.[0m51s
pending task count: [1;36m4[0m, is empty?: [3;91mFalse[0m, needs: Tool.NONE
Task: [33mpriority[0m=[1;36m3[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s, Waited For: +[1

## Approach 2: Priority Queue per Tool

### Switching Costs and Discounted Priority

In [66]:
switching_cost: Dict[Tuple[Tool, Tool], float] = {
  (Tool.NONE, Tool.PIPETTE): 1.0,
  (Tool.PIPETTE, Tool.NONE): 1.0,
  (Tool.NONE, Tool.DISK_PNP): 1.0,
  (Tool.DISK_PNP, Tool.NONE): 1.0,
  (Tool.PIPETTE, Tool.DISK_PNP): 1.0,
  (Tool.DISK_PNP, Tool.PIPETTE): 1.0,
  (Tool.PIPETTE, Tool.PIPETTE): 0.0,
  (Tool.DISK_PNP, Tool.DISK_PNP): 0.0,
  (Tool.NONE, Tool.NONE): 0.0,
}

def discounted_priority(
  task: GcodeTask, 
  current_tool: Tool, 
  index: int, *, 
  discount_factor: float = 1.1) -> int:
  '''
  Calculate the priority of a task, taking into account the cost of switching tools.
  # Example:
  ```
  task = GcodeTask(seq=pick_tip, priority=4, needs_tool=Tool.PIPETTE)
  current_tool = Tool.NONE
  index = 5
  discounted_priority(task, current_tool, index)
  ```
  '''
  cost = int(switching_cost[(current_tool, task.needs_tool)] * index * discount_factor)
  print(cost)
  return task.priority + cost

In [67]:
task = GcodeTask(seq=pick_tip, priority=4, needs_tool=Tool.PIPETTE)
current_tool = Tool.NONE
index = 3
discounted_priority(task, current_tool, index)

[1;36m3[0m


7

In [None]:
class DiscountedPriorityQueue(PriorityQueue):
  def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.current_tool: Tool = Tool.NONE
    self.index: int = 0

  async def put(self, item: Tuple[int, GcodeTask]) -> None:
    prio, task = item
    prio = discounted_priority(task, self.current_tool, self.index)
    self.index += 1
    # self.current_tool = task.needs_tool
    await super().put((prio, task))

  async def get(self) -> Tuple[int, GcodeTask]:
    prio, task = await super().get()
    self.current_tool = task.needs_tool
    return prio, task

In [68]:
from asyncio import Queue

async def CmdExecute(taskQueue: Queue[GcodeTask]):
  while True:
    current_tool = Tool.NONE
    task = await taskQueue
    # print(task)
    
    print("pending task count: {}, is empty?: {}, needs: {}".format(
        taskQueue.qsize(),
        taskQueue.empty(),
        task.needs_tool))    

    # update priorities
    if task.needs_tool != current_tool:
      current_tool = task.needs_tool
      for index, task in enumerate(taskQueue._queue):
        task.priority = discounted_priority(task, current_tool, index)
      # print(taskQueue)

    await execute_task(task, verbose=False)

    taskQueue.task_done()


async def CmdStream(taskQueue: Queue[GcodeTask]):
  gtasklist: List[GcodeTask] = [
      GcodeTask(seq=pick_tip, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=1, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, priority=2, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, priority=6, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
      GcodeTask(seq=pick_tip, priority=1, needs_tool=Tool.PIPETTE),
      GcodeTask(seq=go_home, priority=2, needs_tool=Tool.NONE),
  ]

  for task in gtasklist:
    await taskQueue.put(task)
    await asyncio.sleep(0.05)
  await taskQueue.join()


GcodeTaskQueue: Queue[GcodeTask] = Queue(maxsize=100)


Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/satyamtiwary/Library/Python/3.11/lib/python/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/Users/satyamtiwary/Library/Python/3.11/lib/python/site-packages/traitlets/config/application.py", line 1041, in launch_instance
    app.start()
  File "/Users/satyamtiwary/Library/Python/3.11/lib/python/site-packages/ipykernel/kernelapp.py", line 711, in start
    self.io_loop.start()
  File "/Users/satyamtiwary/Library/Python/3.11/lib/python/site-packages/tornado/platform/asyncio.py", line 215, in start
    self.asyncio_loop.run_forever()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 607, in run_forever
    self._run_once()
  File "/Library/Frameworks/Python.framework/Versi

In [None]:
loop = asyncio.get_event_loop()
loop.set_debug(True)


verbose: bool = True


async def main():
  async with asyncio.TaskGroup() as tg:
    tg.create_task(CmdStream(GcodeTaskQueue))
    tg.create_task(CmdExecute(GcodeTaskQueue))
  print("Done")


try:
  loop.create_task(main())
except (KeyboardInterrupt, asyncio.CancelledError):
  print(f"Exited with {GcodeTaskQueue.qsize()} tasks remaining")
  if verbose:
    print(GcodeTaskQueue._queue)


pending task count: [1;36m0[0m, is empty?: [3;92mTrue[0m, needs: Tool.PIPETTE
Task: [33mpriority[0m=[1;36m7[0m, [33msteps[0m=[1;36m3[0m
 ↳ Pick Tip
   ↳ Duration: [1;36m0.[0m30s
pending task count: [1;36m4[0m, is empty?: [3;91mFalse[0m, needs: Tool.NONE
Task: [33mpriority[0m=[1;36m1[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s
pending task count: [1;36m5[0m, is empty?: [3;91mFalse[0m, needs: Tool.PIPETTE
[1;36m0[0m
[1;36m0[0m
[1;36m2[0m
[1;36m0[0m
[1;36m4[0m
Task: [33mpriority[0m=[1;36m6[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s
pending task count: [1;36m6[0m, is empty?: [3;91mFalse[0m, needs: Tool.NONE
Task: [33mpriority[0m=[1;36m2[0m, [33msteps[0m=[1;36m1[0m
 ↳ Go Home
   ↳ Duration: [1;36m0.[0m10s, Waited For: +[1;36m0.[0m52s
pending task count: [1;36m5[0m, is empty?: [3;91mFalse[0m, needs: Tool.PIPETTE
[1;36m0[0m
[1;36m0[0m
[1;36m2[0m
[1;36m0[0m
[1;36m