In [34]:

##################### ARDUINOLIB STARTS HERE ###########################
from serial import Serial
from collections import deque

class Arduino:
  """
  A wrapper class around a serial port with some helpers for easy communication to/from an Arduino.

  Example:

  ```python
  with Arduino('/dev/com.whatever', baudRate=115200) as arduino:
    for line in arduino.lines():
      print("Got line from Arduino: " + line)
      arduino.write(55) # Writes 0x55 as a byte. Also accepts raw bytes or strings.
  ```
  """

  def __init__(self, port, baudRate=9600, logging=False, num_params=8, preamble=[0xFF, 0xAA, 0xFF, 0xAA]):
    """
    Connect to an Arduino
    """
    self.logging = logging
    self.serial_port = Serial(port, baudRate, timeout=0)
    self.preamble = preamble
    self.parameters = [0] * num_params
    self.buffer = list()

    for byte in preamble:
      assert int(byte) == byte and byte >= 0 and byte <= 255

  def _log(self, *args, **kwargs):
    """
    Helper method that acts like `print`, when logging=True but does nothing otherwise.
    """
    if self.logging:
      print("[Arduino]", *args, **kwargs)
  
  # We want to just pass through to the serial port's context manager
  def __enter__(self):
    """
    When using an Arduino as a context manager, the Arduino will intelligently open/close the serial
    port upon entering/exiting the context manager, including doing so multiple times.
    """
    self._log("Entering Arduino context manager, connecting serial port...")
    self.serial_port.__enter__()

    self.serial_port.flush()

    # But return self so you can do `with Arduino(...) as arduino:`
    return self

  def __exit__(self, __exc_type, __exc_value, __traceback):
    """
    When using an Arduino as a context manager, the Arduino will intelligently open/close the serial
    port upon entering/exiting the context manager, including doing so multiple times.
    """
    self._log("Exiting Arduino context manager, disconnecting serial port...")
    return self.serial_port.__exit__(__exc_type, __exc_value, __traceback)
  
  # # NB: Calling lines() or packets() more than once is undefined behavior
  # def lines(self, drain_first=True):
  #   """
  #   Return an iterator that yields each line the Arduino sends over the Serial connection.

  #   If drain_first is True, any serial data already received and buffered but not yet processed will
  #   be erased.

  #   NOTE: This iterator will block while waiting for a line
  #   NOTE: Calling this method more than once, or calling it after packets() has been called, is
  #         undefined behavior.
  #   """
  #   if drain_first:
  #     self.serial_port.reset_input_buffer()

  #   while True:
  #     # NOTE: technically this would get rid of leading spaces too if that was something you cared about
  #     line = self.serial_port.readline().decode('ascii').strip()
  #     if len(line) > 0:
  #       self._log(f"Received Line: {line}")
  #       yield line

  # def packets(self, drain_first=True):
  #   """
  #   Return an iterator that yields each packet the Arduino sends over the Serial connection.

  #   A packet is defined as a newline-terminated, comma-separated list of integers. In other words,
  #   this method expects that your Arduino writes data over serial that looks like this: `1,2,3\n`.

  #   If drain_first is True, any serial data already received and buffered but not yet processed will
  #   be erased.

  #   NOTE: This iterator will block while waiting for a line
  #   NOTE: Calling this method more than once, or calling it after lines() has been called, is
  #         undefined behavior.
  #   """
  #   for line in self.lines(drain_first=drain_first):
  #     packet = tuple(int(data) for data in line.split(','))
  #     self._log(f"Received Packet: {packet}")
  #     yield packet

  def write(self, data):
    """
    Write data to the Arduino over Serial. Data should be a list of integers 0-255.
    """
    self._log(f"Writing data: {data}")

    if not isinstance(data, list):
      if isinstance(data, int):
        return self.write([data])
      else:
        raise Exception("Please pass a list of ints to Arduino.write!")
    else:
      for byte in data:
        assert byte >= 0 and byte <= 255 and int(byte) == 0
        self.serial_port.write(bytearray([byte]))
  
  def fill_buffer(self):
    # read all available bytes (100000 is an arbitrarily large number, it will return less)
    self.buffer.extend(self.serial_port.read(100000))

  @property
  def has_enough_data(self):
    return len(self.buffer) >= (len(self.parameters) * 2 + len(self.preamble))
  
  def tick(self):
    self.fill_buffer()

    while self.has_enough_data and not self.is_preamble_next():
      self.buffer.pop(0)

    if not self.has_enough_data:
      return

    assert self.is_preamble_next()
    del self.buffer[:len(self.preamble)]

    # print(self.buffer)
    
    while len(self.buffer) >= 2 and not self.is_preamble_next(strict=False):
      idx = self.buffer.pop(0) % 128
      val = self.buffer.pop(0)

      assert idx >= 0 and idx <= len(self.parameters)
      assert val >= 0 and val <= 255 and int(val) == val

      if self.parameters[idx] != val:
        print(f"{idx} -> {val}")

      self.parameters[idx] = val
    
  def is_preamble_next(self, strict=True):
    if len(self.buffer) < len(self.preamble):
      if strict:
        return False
      else:
        return self.buffer == self.preamble[:len(self.buffer)]
    else:
      return self.buffer[:len(self.preamble)] == self.preamble
  
  def set_param(self, idx, val):
    assert idx >= 0 and idx <= len(self.parameters) and int(idx) == idx
    assert val >= 0 and val <= 255 and int(val) == val
    self.serial_port.write(bytearray([0xA0]))
    self.serial_port.write(bytearray([idx]))
    self.serial_port.write(bytearray([val]))
    self.serial_port.write(bytearray([0xAF]))
    self.serial_port.flush()




##################### ARDUINOLIB ENDS HERE ###########################


In [7]:
def pairs(iterable):
	iterator = iter(iterable)
	try:
		while True:
			yield next(iterator), next(iterator)
	except StopIteration:
		pass

def multinext(iterator, n):
	items = []
	while len(items) < n:
		items.append(next(iterator))
	return tuple(items)

In [3]:
class Config:
	def __init__(self, on=100, off=1000):
		self.on = on
		self.off = off

	def send(self, arduino):
		arduino.serial_port.write((0 << 8) + self.on)
		arduino.serial_port.write((1 << 8) + self.off)
	
	def __repr__(self):
		return f"Config(on={self.on}, off={self.off})"

In [38]:
with Arduino('/dev/cu.usbmodem14301', logging=True) as arduino:
	i = 0
	while True:
		arduino.tick()

		if i > 25 and i % 10 == 0:
			arduino.set_param(3, i % 100)

		i += 1

[Arduino] Entering Arduino context manager, connecting serial port...
3 -> 40
3 -> 0
3 -> 60
3 -> 90
3 -> 40
3 -> 80
3 -> 20
3 -> 60
3 -> 10
3 -> 50
3 -> 90
3 -> 30
3 -> 70
3 -> 20
3 -> 60
3 -> 0
3 -> 40
3 -> 80
3 -> 20
3 -> 70
3 -> 10
3 -> 50
3 -> 90
3 -> 30
3 -> 80
3 -> 20
3 -> 60
3 -> 0
3 -> 40
3 -> 90
3 -> 30
3 -> 70
3 -> 20
3 -> 60
3 -> 0
3 -> 40
3 -> 80
3 -> 20
3 -> 60
3 -> 0
3 -> 50
3 -> 90
3 -> 40
3 -> 80
3 -> 20
3 -> 60
3 -> 80
3 -> 40
3 -> 20
3 -> 70
3 -> 40
3 -> 80
3 -> 20
3 -> 70
3 -> 10
3 -> 50
3 -> 90
3 -> 30
3 -> 70
3 -> 20
3 -> 60
3 -> 0
3 -> 40
3 -> 80
3 -> 30
[Arduino] Exiting Arduino context manager, disconnecting serial port...


KeyboardInterrupt: 

In [58]:
def do_not_run_this_cell():
	with Arduino('/dev/cu.usbmodem14301', logging=True) as arduino:
		conf_mostlyoff = Config(on=5, off=6)
	
		read = lambda: arduino.serial_port.read(size=1)
	
		params = [-1] * 8
	
		while True:
			align_state = 0
			while True:
				data = read()
				if len(data) == 0: continue
				if align_state in [0, 2]:
					if data == b'\xFF':
						align_state += 1
					else:
						align_state = 0
						continue
				elif align_state in [1, 3]:
					if data == b'\xAA':
						align_state += 1
						if align_state == 4:
							break
					else:
						align_state = 0
						continue
					
			alldata = []
	
			while len(alldata) < 16:
				data = read()
				if len(data) == 0: continue
				alldata.append(data)
		
			for idx, value in pairs(alldata):
				idx_int = list(idx)[0] // 128
				val_int = list(value)[0]
				if idx_int >= len(params):
					print(f"ERROR: Index out of bounds: {idx_int}. Realigning...")
					continue
				if val_int != params[idx_int]:
					params[idx_int] = val_int
					print(f"{idx_int:02} -> {val_int:02X}")
				else:
					if idx_int == 7:
						# arduino.serial_port.write(bytearray([0, 123]))
						arduino.serial_port.write(bytearray([1]))
						arduino.serial_port.write(bytearray([5]))
	
	
	
	
		

[Arduino] Entering Arduino context manager, connecting serial port...
00 -> 00
01 -> AA
[Arduino] Exiting Arduino context manager, disconnecting serial port...


KeyboardInterrupt: 

In [None]:
#with Arduino('/dev/cu.usbmodem142201') as arduino:
#	conf_mostlyon = Config(on=1000, off=100)
#	conf_mostlyon.send(ardunio)