In [1]:
import cv2
from PIL import Image
import numpy as np
import tensorflow as tf
import serial
import time
import threading

In [None]:
# Among Us
print(chr(sum(range(ord(min(str(not())))))))

In [3]:
model_path = "./converted_savedmodel/model.savedmodel"
label_path = "./converted_savedmodel/labels.txt"

In [4]:
model = tf.saved_model.load(model_path)
with open(label_path, 'r') as f:
  labels = f.read().splitlines()

In [None]:
first_key = lambda d: list(d.keys())[0]
signature = model.signatures["serving_default"]
model_input = first_key(signature.structured_input_signature[1])
model_output = first_key(signature.structured_outputs)
print("Model Input:", model_input)
print("Model Output:", model_output)

In [6]:
import matplotlib.pyplot as plt

In [7]:
def predict_image(frame):
  img = Image.fromarray(frame)
  left = (img.width - img.height) / 2
  top = (img.height - img.height) / 2
  right = (img.width + img.height) / 2
  bottom = (img.height + img.height) / 2
  img = img.crop((left, top, right, bottom)).resize((224, 224))
  img_array = np.array(img) / 255.0
  img_array = img_array[..., ::-1]
  
  plt.imshow(img_array)
  plt.show()
  
  img_array = img_array.astype(np.float32)
  img_array = np.expand_dims(img_array, axis=0)
  try:
    infer = model.signatures['serving_default']
    predictions = infer(**{ model_input: tf.constant(img_array) })
    predicted_scores = predictions[model_output].numpy()
    print(predicted_scores)
    return np.argmax(predicted_scores)
  except Exception as e:
    print("Exception: ", e)
    return None

In [8]:
class Camera:
  def __init__(self, camera_id, window_name="Camera Feed"):
    self.camera_id = camera_id
    self.window_name = window_name
  
  def start(self):
    self.video_capture = cv2.VideoCapture(self.camera_id)
    self.loop_enabled = True
    self.loop_thread = threading.Thread(target=self.loop)
    self.loop_thread.start()
    
  def loop(self):
    while self.loop_enabled:
      ret, frame = self.video_capture.read()
      if not ret:
        print("Failed to capture frame.")
      self.frame = frame
      cv2.imshow(self.window_name, frame)
      
      if cv2.waitKey(1) & 0xFF == ord('q'):
        self.stop()
    self.video_capture.release()
    cv2.destroyAllWindows()
  
  def stop(self):
    self.loop_enabled = False
  
  def capture(self):
    return self.frame


In [9]:
opcode = {
  "pusher_push":    0b01000000,
  "conveyor_off":   0b01000010,
  "conveyor_on":    0b01000011,
  "sorter_default": 0b01100000,
  "sorter_label_0": 0b01100000,
  "sorter_label_1": 0b01100001,
  "sorter_label_2": 0b01100010,
  "sorter_label_3": 0b01100011,
}

In [10]:
class Conveyor:
  def __init__(self, port_name, camera_id):
    self.port_name = port_name
    self.camera_id = camera_id
    
  def initialize(self):
    self.serial_port = serial.Serial(self.port_name, 9600)
    time.sleep(2)
    
    self.camera = Camera(self.camera_id)
    self.camera.start()
  
  def execute_sequence(self):
    try:
      def send_command(cmd):
        val = opcode[cmd]
        byte = bytes([val])
        self.serial_port.write(byte)
        print(f"Sent {hex(val)} ({cmd})")
      
      send_command("pusher_push")
      time.sleep(self.pusher_activation_duration)
      
      send_command("conveyor_on")
      time.sleep(self.conveyor_to_camera_duration)
      send_command("conveyor_off")
      
      frame = self.camera.capture()
      print("Captured")
      prediction = predict_image(frame)
      print(f"Prediction: #{prediction} ({labels[prediction]})")
      if   prediction == 0: send_command("sorter_label_0")
      elif prediction == 1: send_command("sorter_label_1")
      elif prediction == 2: send_command("sorter_label_2")
      elif prediction == 3: send_command("sorter_label_3")
      else: send_command("sorter_label_0")
      
      send_command("conveyor_on")
      time.sleep(self.camera_to_sorter_duration)
      send_command("conveyor_off")
    except Exception as e:
      print(e)
      self.stop()
  
  def stop(self):
    self.camera.stop()
    self.serial_port.close()

In [None]:
conveyor = Conveyor("COM3", 0)
conveyor.pusher_activation_duration = 1.6
conveyor.conveyor_to_camera_duration = 1
conveyor.camera_processing_delay = 0.2
conveyor.camera_to_sorter_duration = 2

conveyor.initialize()
for i in range(4):
  conveyor.execute_sequence()
conveyor.stop()