In [1]:
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QVBoxLayout, QLabel, QCheckBox, QSpinBox
import serial
import time
import numpy as np
from datetime import datetime, timedelta
import random
import itertools

# UI 

In [2]:
class MotorControlerUI(QWidget): #this ui code should be cleaned up
    def __init__(self):
        super().__init__()
        self.initUI()
        self.ui_events = []

    def initUI(self):
        # Create buttons s1 to s6
        self.buttons = []
        button_pos =[(50,50),(50,100),(50,150),(50,200),(50,250),(50,300)]
        for i in range(1, 7):
            button = QPushButton(f'S{i}', self)
            button.clicked.connect(self.buttonClicked)
            button.move(button_pos[i-1][0],button_pos[i-1][1])
            self.buttons.append(button)

        #labels for s1->s6
        self.labels = []
        label_pos = [(bp[0]*4,bp[1]) for bp in button_pos]
        for i in range(1,7):
            label = QLabel(f"Label{i}",self)
            label.move(label_pos[i-1][0],label_pos[i-1][1])
            label.setText("0")
            label.setFixedWidth(50)
            self.labels.append(label)

        #spinBox for s1->s6
        self.spinboxs = []
        spinbox_pos = [(bp[0]*6,bp[1]) for bp in button_pos]
        for i in range(1,7):
            spinbox = QSpinBox(self)
            spinbox.setObjectName(f"s{i}")
            spinbox.setRange(-500,500)
            spinbox.setFixedWidth(150)
            spinbox.move(spinbox_pos[i-1][0],spinbox_pos[i-1][1])
            self.spinboxs.append(spinbox)
        
        #buton pairs (1-6,2-5,3-4)
        self.pair_buttons = []
        pair_button_pos = [(bp[0]*10,bp[1]) for bp in button_pos[:3]]
        pair_button_names = ["S1-6","S2-5","S3-4"]
        for i in range(3):
            button = QPushButton(pair_button_names[i], self)
            button.clicked.connect(self.pairButtonCliked)
            button.move(pair_button_pos[i][0],pair_button_pos[i][1])
            self.pair_buttons.append(button)
        
        #spinbox pairs (1-6,2-5,3-4)
        self.pair_spinboxs = []
        pair_spinbox_pos = [(bp[0]*12,bp[1]) for bp in button_pos[:3]]
        for i in range(3):
            spinbox = QSpinBox(self)
            spinbox.setObjectName(pair_button_names[i])
            spinbox.setRange(-500,500)
            spinbox.setFixedWidth(150)
            spinbox.move(pair_spinbox_pos[i][0],pair_spinbox_pos[i][1])
            self.pair_spinboxs.append(spinbox)


        #time input
        self.timeInput = QSpinBox(self)
        self.timeInput.setObjectName("timeInput")
        self.timeInput.setRange(700,100000)
        self.timeInput.setFixedWidth(150)
        self.timeInput.move(spinbox_pos[5][0],spinbox_pos[5][1]+100)

        #time input explanation
        self.labelTime = QLabel("Time pr. instruction",self)
        self.labelTime.move(label_pos[5][0]-50,label_pos[5][1]+100)

        #spinbox header
        self.labelInfo = QLabel("- = ClockWise",self)
        self.labelInfo.move(spinbox_pos[0][0],spinbox_pos[0][1]-25)

        #label header
        self.labelPos = QLabel("MotorPosition",self)
        self.labelPos.move(label_pos[0][0],label_pos[0][1]-25)

        #all motors input
        self.allButton = QPushButton(f'S1-6', self)
        self.allButton.clicked.connect(self.allButtonClicked)
        self.allButton.move(button_pos[5][0],button_pos[5][1]+100)

        #start protocol
        self.startProt = QPushButton("StartProtocol",self)
        self.startProt.clicked.connect(self.startProtocol)
        self.startProt.move(button_pos[5][0],button_pos[5][1]+200)

        #reset position labels button

        #TESTPERSON SETUP
        #Start Testperson button
        self.startTP = QPushButton("StartTP",self)
        self.startTP.clicked.connect(self.startTestperson)
        self.startTP.move(button_pos[0][0]*16,button_pos[0][1])

        #Select testperson spinbox
        self.testperson = QSpinBox(self)
        self.testperson.setObjectName("SelectTP")
        self.testperson.setRange(0,9)
        self.testperson.setFixedWidth(150)
        self.testperson.move(button_pos[1][0]*16,button_pos[1][1])

        #Select model spinbox
        self.model = QSpinBox(self)
        self.model.setObjectName("SelectModel")
        self.model.setRange(0,9)
        self.model.setFixedWidth(150)
        self.model.move(button_pos[2][0]*16,button_pos[2][1])

        #Select amount of iterations spinbox
        self.repeats = QSpinBox(self)
        self.repeats.setObjectName("SelectTP")
        self.repeats.setRange(1,9)
        self.repeats.setFixedWidth(150)
        self.repeats.move(button_pos[3][0]*16,button_pos[3][1])

        #Show movment counter label
        self.ctrLabel = QLabel("CTR",self)
        self.ctrLabel.move(button_pos[4][0]*16,button_pos[4][1])


        self.setWindowTitle('Motor controler')
        self.setGeometry(300, 300, 1100, 600)  # Set window position and size

    #UPDATES
    def updateLabels(self,mc): #pass array of motor change values to update labels
        for i in range(6):
            label = self.labels[i]
            cur_val = int(label.text())
            label.setText(str(cur_val+mc[i]))

    def updateCtr(self,ctr): #update the ctr
        self.ctrLabel.setText(f"CTR: {ctr[0]}")

    #UI EVENTS
    def buttonClicked(self): #button click (adds spinbox to label)
        sender = self.sender()
        index = int(sender.text()[1]) - 1
        spinbox = self.spinboxs[index]
        change_val = spinbox.value()
        label = self.labels[index]
        cur_val = int(label.text())
        new_val = change_val+cur_val
        label.setText(str(new_val))
        #get time
        time_val = self.timeInput.value()

        self.ui_events.append(["1B",index,change_val,time_val,f'Button {sender.text()} clicked. With Value {change_val}: {cur_val} -> {new_val}'])

    def allButtonClicked(self): #button for running all motors at the same time
        mc = [0,0,0,0,0,0]
        time_val = self.timeInput.value()
        
        for i in range(6): #get value from each of the spin boxes
            mc[i] = self.spinboxs[i].value()
            label = self.labels[i]
            label.setText(str(mc[i]+int(label.text())))

        self.ui_events.append(["AB",mc,time_val,f"Allbutton {self.sender().text()} clicked. With Values {mc}'"])

    def startProtocol(self): #start a relative move protocol
        self.ui_events.append(["SP",f"StartProtocol {self.sender().text()} clicked."])

    def pairButtonCliked(self): #control motors pairwise
        sender = self.sender().text()
        time_val = self.timeInput.value()
        mc = [0,0,0,0,0,0]
        match sender: #could be made cleaner with a map or somting
            case "S1-6":
                val = self.pair_spinboxs[0].value()
                mc[0] = val
                mc[5] = val
            case "S2-5": #s2-5
                val = self.pair_spinboxs[1].value()
                mc[1] = val
                mc[4] = val   
            case "S3-4": #s3-4
                val = self.pair_spinboxs[2].value()
                mc[2] = val
                mc[3] = val

        self.updateLabels(mc)
        self.ui_events.append(["PB",mc,time_val,f"Pairbutton {sender} clicked. With Values {mc}'"])

    def startTestperson(self): #start a test person movement
        sender = self.sender().text()
        tp_val = self.testperson.value()
        model_val = self.model.value()
        repeats = self.repeats.value()

        self.ui_events.append(["TP",tp_val,model_val,repeats,f"TestPerson {sender} clicked."])

#Handle events
def handleUIEvents(ui_widget,mc,timeval,protocol,tp,mc_idx,repeats):
    while ui_widget.ui_events:
        event = ui_widget.ui_events.pop(0)
        print(event)

        match event[0]: #TODO: make a standard structure for events (but good enough for now)
            case "1B": #1:idx 2:changeVal 3: time val   #1 button clicked
                mc[event[1]] = event[2]
                timeval[0] = event[3]
            case "AB": #1: mcVal #2:time val            #All button clicked
                for i in range(6):
                    mc[i] = event[1][i]
                timeval[0] = event[2]
            case "SP":                                  #start protocol cliced
                protocol[0] = True
            case "PB": #1:mc 2:time                     #pair button clicked
                for i in range(6):
                    mc[i] = event[1][i]
                timeval[0] = event[2]
            case "TP": #1:tpval, 2:model val 3:repeats  #test person clicked
                tp[0] = True
                mc_idx[0] = event[1]
                mc_idx[1] = event[2]
                repeats[0] = event[3]
            case _:
                print(f"Unknown first event {event[0]}")

    app.processEvents()


# Serial protocol

In [3]:
#initial protocol from https://forum.arduino.cc/t/serial-input-basics-updated/382007/2
# and https://forum.arduino.cc/t/pc-arduino-comms-using-python-updated/574496 

startMarker = '<'
endMarker = '>'
dataStarted = False
dataBuf = ""
messageComplete = False

#========================
#========================
    # the functions

def setupSerial(baudRate, serialPortName):
    global  serialPort
    serialPort = serial.Serial(port= serialPortName, baudrate = baudRate, timeout=0, rtscts=True)
    print("Serial port " + serialPortName + " opened  Baudrate " + str(baudRate))
    waitForArduino()

#========================

def sendToArduino(stringToSend):
    # this adds the start- and end-markers before sending
    global startMarker, endMarker, serialPort
    
    stringWithMarkers = (startMarker)
    stringWithMarkers += stringToSend
    stringWithMarkers += (endMarker)

    serialPort.write(stringWithMarkers.encode('utf-8')) # encode needed for Python3

#==================

def recvLikeArduino():

    global startMarker, endMarker, serialPort, dataStarted, dataBuf, messageComplete

    if serialPort.inWaiting() > 0 and messageComplete == False:
        x = serialPort.read().decode("utf-8") # decode needed for Python3
        
        if dataStarted == True:
            if x != endMarker:
                dataBuf = dataBuf + x
            else:
                dataStarted = False
                messageComplete = True
        elif x == startMarker:
            dataBuf = ''
            dataStarted = True
    
    if (messageComplete == True):
        messageComplete = False
        return dataBuf
    else:
        return "XXX" 

#==================

def waitForArduino():
    # wait until the Arduino sends 'Arduino is ready' - allows time for Arduino reset
    # it also ensures that any bytes left over from a previous message are discarded
    print("Waiting for Arduino to reset")
     
    msg = ""
    while msg.find("Arduino is ready") == -1:
        msg = recvLikeArduino()
        if not (msg == 'XXX'): 
            print(msg)

# Messaging protocol

In [None]:
#<msg> (where "<" starts a msg and ">" ends 1)

#start <
#first char is used for identifying the type of msg. M=reltive move, G=goto move (not implemented yet)
#next 4x6 chars are used for the amount you want the steppers to move 2signed bytes pr stepper
#4bytes for the amount of time to complete the moves
#end >

def num_to_hex(num):
  if num > 32767  or num < -32768:
    raise TypeError(f"Number {num} is does not fit withing 4 char hex")
  
  # Convert negative numbers to two's complement representation
  if num < 0:
    num = (1 << 16) + num
  
  hex_value = hex(num & 0xFFFF)[2:].zfill(4)  # Convert to hexadecimal and pad with zeros
  return hex_value

def hex_to_num(hex_string):
  if not isinstance(hex_string, str) or len(hex_string) != 4:
    TypeError(f"Input must be a string of 4 characters")
  
  num = int(hex_string, 16)
  # Handle negative numbers
  if num >= 0x8000:
      num -= 1 << 16
  return num

#convert array of relative motor steps to a string that can be sent to the arduino
#current types: (R) and M
def encode_motor_msg(motor_steps,timer,msg_type):
  if not(msg_type in ["M","R"]):
    TypeError(f"Unknown msg_type: {msg_type}")
  
  if len(motor_steps) != 6:
    TypeError(f"Expected 6 motor values got: {motor_steps}")
  
  motor_hex = ""
  for ms in motor_steps:
    motor_hex += num_to_hex(ms)
  
  time_hex = num_to_hex(timer)

  return msg_type + motor_hex + time_hex

#decode motor message

def decode_motor_msg(motor_msg):
  msg_type = motor_msg[0]
  motor_hex = motor_msg[1:25]
  timer_hex = motor_msg[25:29]

  motor_vals = []
  for i in range(0,24,4):
    motor_vals.append(hex_to_num(motor_hex[i:i+4])) 
  

  return msg_type, motor_vals, hex_to_num(timer_hex)


#How to use:
#numbers = [-1234, 32767, 9012, 3456, -7890, 1234]
#for i in numbers:
#  hexval = num_to_hex(i)
#  print(hexval,hex_to_num(hexval))
#fullEncode = encode_motor_msg(numbers,1000,"M")
#print(fullEncode,decode_motor_msg(fullEncode))

# Protocols

In [None]:
# Define the ranges for each number
ranges = [
    range(-20, 21,20),  # Range for S1
    range(-20, 21,20),  # Range for S2
    range(-20, 21,20),  # Range for S3
    range(-20, 21,20),  # Range for S4
    range(-20, 21,20),  # Range for S5
    range(-20, 21,20),  # Range for S6
]

# Generate all combinations
combinations = list(itertools.product(*ranges))
comb_list = list(map(list,combinations))
print(len(comb_list))

In [6]:
#function for repeatin protocol (does not create copies, uses same ref)
def repeatProtocol(prot,amount):
  return prot*amount

#Transform exact positions into relative positions
def calcRelativePos(cur,moves):
  rel_moves = []
  cur_poses = []
  for m in moves:
    rel = [0,0,0,0,0,0]
    for i in range(6):
      rel[i] = m[i] - cur[i]
    cur = m
    rel_moves.append(rel)
    cur_poses.append(cur)
  return rel_moves, cur_poses

In [9]:
prot_20_gridsearch, _ = calcRelativePos([0,0,0,0,0,0],comb_list)

prot_ur = [[50,50,50,50,50,50],[-50,-50,-50,-50,-50,-50]]
prot_full = [[100,100,100,100,100,100],[-100,-100,-100,-100,-100,-100],[-100,-100,-100,-100,-100,-100],[100,100,100,100,100,100]] 
prot_34 = [[0,0,100,100,0,0],[0,0,-100,-100,0,0],[0,0,-100,-100,0,0],[0,0,100,100,0,0]]
prot_dr = [[-100,-100,-100,-100,-100,-100],[100,100,100,100,100,100]]
prot_34_down = [[0,0,100,100,0,0],[0,0,-100,-100,0,0]]


prot_full_10 = repeatProtocol(prot_full,10)
prot_updwon_10 = repeatProtocol(prot_ur,50)
prot_34_10 = repeatProtocol(prot_34,10)
prot_dr_10 = repeatProtocol(prot_dr,50)
prot_34_down_50 = repeatProtocol(prot_34_down,50)

# Testperson data -> protocol

In [10]:
#check size of relative moves
def inspect_tp_vals(tp,min_val,max_val):
  print(f"MAX {np.max(np.array(tp),axis=0)} MIN {np.min(np.array(tp),axis=0)}")
  for s,i in enumerate(tp):
    for j in i:
      if j > max_val or j < min_val:
        print(i,s)
        break

In [None]:
#load model predictions from /Predictions/
path = r"Predictions\Other\5_step_isolation_input.npy"
predictions = np.load(path)
rel_preds, _ = calcRelativePos([0,0,0,0,0,0],list(map(list,predictions)))

In [None]:
#inspect testperson prediction
# print(predictions.shape)

# for t in range(predictions.shape[0]):
#   l = list(map(list,predictions[t]))
#   rels, _ = calcRelativePos([0,0,0,0,0,0],l)
#   print(t,np.array(rels).shape,np.array(l).shape)
#   model0_rel.append(rels)

# print(np.array(model0_rel).shape)

# for i in range(len(model0_rel)):
#   print(f"######## tp {i} ########")
#   #print(type(arramodel0_rel[i][0]))
#   inspect_tp_vals(model0_rel[i],-20,20)

# #inspect_tp_vals(rel_iso,-20,20)

# Main loop


In [15]:
setupSerial(1000_000,"COM3")

motor_vals = [0,0,0,0,0,0] #current motor positions
motor_change = [0,0,0,0,0,0] #Changes for next motor iteration
time_val = [1] # time pointer

def calcMotorvals(cur,new):
    return [a+b for a,b in zip(cur,new)]

#OWN PROTOCOL SETUP
protocol_running = [False] #protocol pointer
protocol_ctr = 0 #counter for where in protocl we are
protocol_delay = 2 #in seconds
protocol_mc = prot_updwon_10 #Specify protocol (eg. grid search or testperson)
protocol_prevtime = 0
protocol_maxctr = len(protocol_mc)
protocol_protection = 100 #protection against overflow in ms

#TESTPERSON PROTOCOL SETUP (Not implemented yet)
tp_running = [False]
tp_ctr = 0
tp_delay = 0.8 #time between tp moves
tp_mc_idx = [0,0] #pointer to model number,testperson id
tp_mc = []  #mc for all models/testpersons
tp_prevtime = 0
tp_maxctr = 0 #needs to be set depending on protocol
tp_repeat = [1]
tp_protection = 50 #protection against overflow in ms

if __name__ == '__main__':
    app = QApplication(sys.argv)
    mcUI = MotorControlerUI()
    mcUI.show()

    # Your main loop
    while True:
        # Receive and handle messages from the microcontroller 
        arduinoReply = recvLikeArduino() # check for a reply
        if not (arduinoReply == 'XXX'):
            print ("Time %s  Reply %s" %(time.time(), arduinoReply))
    
        # Handle UI events
        handleUIEvents(mcUI,motor_change,time_val,protocol_running,tp_running,tp_mc_idx,tp_repeat)

        # Respond to the microcontroller
        if motor_change != [0,0,0,0,0,0] and not protocol_running[0]: #control with UI
            motor_vals = calcMotorvals(motor_vals,motor_change) #update total motor values
            enc_msg = encode_motor_msg(motor_change,time_val[0],"M")
            sendToArduino(enc_msg)
            print("motorchange:",motor_change,"msg:", enc_msg, "time:", time_val)

            motor_change = [0,0,0,0,0,0]

        elif protocol_running[0]: #protocol handling
            t = time.time()
            if protocol_prevtime == 0:
                protocol_prevtime = t
            if t - protocol_prevtime > protocol_delay: #send next move after certain time
                protocol_prevtime = (datetime.fromtimestamp(protocol_prevtime) + timedelta(seconds=protocol_delay)).timestamp()
                motor_vals = calcMotorvals(motor_vals,protocol_mc[protocol_ctr])

                mcUI.updateLabels(protocol_mc[protocol_ctr]) #update labels in UI

                enc_msg = encode_motor_msg(protocol_mc[protocol_ctr],int(protocol_delay*1000-protocol_protection),"M")
                sendToArduino(enc_msg)

                print(protocol_prevtime,f"protocol iteration {protocol_ctr}",protocol_mc[protocol_ctr])
                protocol_ctr += 1
            
            if protocol_ctr == protocol_maxctr: #reset after protocol is finished
                protocol_running[0] = False
                protocol_ctr = 0
                protocol_prevtime = 0
                print("protocol finished")

        elif tp_running[0]: #test person handling (not fully implemented)
            t = time.time()

            if tp_prevtime == 0: #handle first start
                tp_prevtime = time.time()

            if tp_ctr == 0: #handele each new iteration
                tp_maxctr = len(tp_mc[tp_mc_idx[0]][tp_mc_idx[1]])
                print(f"Starting Model {tp_mc_idx[0]} TP {tp_mc_idx[1]} Iteration {tp_repeat[0]}")

            if t - tp_prevtime > tp_delay: #send next move after certain time
                tp_prevtime = t

                next_motorval = tp_mc[tp_mc_idx[0]][tp_mc_idx[1]][tp_ctr]
                motor_vals = calcMotorvals(motor_vals,next_motorval)

                mcUI.updateLabels(next_motorval) #update UI labels
                mcUI.updateCtr([tp_ctr]) #update ctr

                enc_msg = encode_motor_msg(next_motorval,int(tp_delay*1000-tp_protection),"M")
                sendToArduino(enc_msg)

                print(tp_prevtime,f"M: {tp_mc_idx[0]} TP: {tp_mc_idx[1]} I: {tp_repeat[0]} step: {tp_ctr} mc: {next_motorval}")
                tp_ctr += 1
            
            if tp_ctr == tp_maxctr: #reset protocol / start next iteration
                tp_ctr = 0
                if tp_repeat[0] == 1: #check if all iterations are finished
                    tp_running[0] = False
                else:
                    print(f"Finished Model {tp_mc_idx[0]} TP {tp_mc_idx[1]} Iteration {tp_repeat[0]}")
                    tp_repeat[0] = tp_repeat[0] - 1


    
    sys.exit(app.exec_()) #independently run the program


Serial port COM3 opened  Baudrate 1000000
Waiting for Arduino to reset
Arduino is ready
['AB', [-50, -50, -50, -50, -50, -50], 2000, "Allbutton S1-6 clicked. With Values [-50, -50, -50, -50, -50, -50]'"]
motorchange: [-50, -50, -50, -50, -50, -50] msg: Mffceffceffceffceffceffce07d0 time: [2000]
Time 1717492252.8531232  Reply first: M
Time 1717492252.8591266  Reply This just in ... Mffceffceffceffceffceffce07d0   M   -50   -50   -50   -50   -50   -50   2000   18091
Time 1717492252.8611298  Reply Start driving18091
Time 1717492254.8587165  Reply Finished driving 20092 S1done S2done S3done S4done S5done S6done
['AB', [-50, -50, -50, -50, -50, -50], 2000, "Allbutton S1-6 clicked. With Values [-50, -50, -50, -50, -50, -50]'"]
motorchange: [-50, -50, -50, -50, -50, -50] msg: Mffceffceffceffceffceffce07d0 time: [2000]
Time 1717492262.6918695  Reply first: M
Time 1717492262.6978693  Reply This just in ... Mffceffceffceffceffceffce07d0   M   -50   -50   -50   -50   -50   -50   2000   27928
Time