# Control Centre

In [1]:
import time
import serial
import numpy as np
import sys
import os
import asyncio
from multiprocessing import Process
import threading
from scipy import constants
import itertools

import binascii
from datetime import datetime
import logging

#from gsioc import gsioc_Protocol, ensure_xy_position_will_be_reached

## Get started with GSIOC

To reset the COM port:
1) Open device manager and disable COM1
2) Turn machine off and on
3) Enable COM1 in device manager

Use ser.close() here to close the serial port

NOTE: Python seems to have problems when the GX-241 is in COM1 (motherboard), it will connect but cannot send or recieve. This is easily fixed by using a USB to serial adapter instead.

In [2]:
from equip_class import gsioc_Protocol

In [3]:
# Set up the serial connection
PORT1 = 'COM6'
ser = serial.Serial(PORT1, 19200, 8, "N", 1, 0.1)

In [4]:
# Create an instance of the gsioc_Protocol class
g = gsioc_Protocol(ser, 'GX-241 II', 30)

In [5]:
g.connect()

[32m2025-02-11 11:03:00.583[0m | [34m[1mDEBUG   [0m | [36mequip_class[0m:[36mverify_open[0m:[36m172[0m - [34m[1mCheck if port is open.[0m
[32m2025-02-11 11:03:01.095[0m | [34m[1mDEBUG   [0m | [36mequip_class[0m:[36miCommand[0m:[36m240[0m - [34m[1mSending immediate command % to device.[0m
[32m2025-02-11 11:03:03.080[0m | [34m[1mDEBUG   [0m | [36mequip_class[0m:[36miCommand[0m:[36m269[0m - [34m[1mSending immediate command complete.[0m
[32m2025-02-11 11:03:03.083[0m | [34m[1mDEBUG   [0m | [36mequip_class[0m:[36miCommand[0m:[36m278[0m - [34m[1mreceived bytearray(b'GX-241 II v2.0.2.5') as response.[0m
[32m2025-02-11 11:03:03.286[0m | [34m[1mDEBUG   [0m | [36mequip_class[0m:[36mconnect[0m:[36m224[0m - [34m[1mConnected to device 30[0m


AttributeError: module 'datetime' has no attribute 'now'

In [None]:
g.iCommand('%')

In [None]:
g.iCommand('e')

In [None]:
g.bCommand('e[n]')

In [6]:
g.bCommand('H')

[32m2025-02-11 10:55:24.884[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m171[0m - [34m[1mSending buffered command "H" to device.[0m
[32m2025-02-11 10:55:24.885[0m | [1mINFO    [0m | [36mgsioc[0m:[36mbCommand[0m:[36m176[0m - [1mGSIOC <<< H[0m
[32m2025-02-11 10:55:24.990[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m195[0m - [34m[1mready signal: b'\n'[0m
[32m2025-02-11 10:55:24.990[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m204[0m - [34m[1mStarting to send buffered command.[0m
[32m2025-02-11 10:55:24.991[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m215[0m - [34m[1m10[0m
[32m2025-02-11 10:55:24.992[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m220[0m - [34m[1mWrites buffered Command character b'H' to the device GX-241 II.[0m
[32m2025-02-11 10:55:24.993[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m222[0m - [34m[1m

bytearray(b'\nH\r')

In [None]:
g.iCommand('P')

In [None]:
g.iCommand('X')

In [8]:
g.bCommand('H')

[32m2025-02-11 10:55:31.747[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m171[0m - [34m[1mSending buffered command "H" to device.[0m
[32m2025-02-11 10:55:31.748[0m | [1mINFO    [0m | [36mgsioc[0m:[36mbCommand[0m:[36m176[0m - [1mGSIOC <<< H[0m
[32m2025-02-11 10:55:31.860[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m195[0m - [34m[1mready signal: b'\n'[0m
[32m2025-02-11 10:55:31.860[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m204[0m - [34m[1mStarting to send buffered command.[0m
[32m2025-02-11 10:55:31.861[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m215[0m - [34m[1m10[0m
[32m2025-02-11 10:55:31.862[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m220[0m - [34m[1mWrites buffered Command character b'H' to the device GX-241 II.[0m
[32m2025-02-11 10:55:31.864[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m222[0m - [34m[1m

bytearray(b'\nH\r')

In [7]:
g.bCommand('X62.3/146')

[32m2025-02-11 10:55:28.492[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m171[0m - [34m[1mSending buffered command "X62.3/146" to device.[0m
[32m2025-02-11 10:55:28.493[0m | [1mINFO    [0m | [36mgsioc[0m:[36mbCommand[0m:[36m176[0m - [1mGSIOC <<< X62.3/146[0m
[32m2025-02-11 10:55:28.597[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m195[0m - [34m[1mready signal: b'\n'[0m
[32m2025-02-11 10:55:28.598[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m204[0m - [34m[1mStarting to send buffered command.[0m
[32m2025-02-11 10:55:28.598[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m215[0m - [34m[1m10[0m
[32m2025-02-11 10:55:28.599[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m220[0m - [34m[1mWrites buffered Command character b'X' to the device GX-241 II.[0m
[32m2025-02-11 10:55:28.599[0m | [34m[1mDEBUG   [0m | [36mgsioc[0m:[36mbCommand[0m:[36m222

bytearray(b'\nX62.3/146\r')

To find the correct Z depth max!

In [None]:
g.bCommand('X61/245.5')

In [None]:
def g_inject():
    g.bCommand('X146.5/0')
    g.bCommand('Z105')

In [None]:
g_inject()

In [None]:
g.bCommand('X70/0')

In [None]:
g.bCommand('Z20')

In [None]:
g.bCommand('H')

In [None]:
# Create an instance of the gsioc_Protocol class
g_dim = gsioc_Protocol(ser, 'GX D Inject', 3)

In [None]:
g_dim.connect()

In [None]:
g_dim.iCommand('%')

In [None]:
 g_dim.bCommand('VI')

In [None]:
g_dim.bCommand('VL')

In [None]:
g_dim.iCommand('X')

In [None]:
def pos_change():
    if g_dim.iCommand('X') == 'L' :
        g_dim.bCommand('VI')
    else :
        g_dim.bCommand('VL')

In [None]:
pos_change()

## Basic method byte by byte

In [None]:
ser.isOpen()

In [None]:
# 1. Imports and Initial Setup
# Done above

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()

In [None]:
class DeviceController:
    def __init__(self, serial_connection, max_repeats=100):
        self.serial = serial_connection
        self.max_repeats = max_repeats
        self.connection_repeats = max_repeats

    # 3. Method Definition
    def iCommand(self, commandstring):
        logger.debug(f'Sending immediate command {commandstring} to device.')
        
        # Convert to binary
        command = binascii.a2b_qp(commandstring)
        logger.debug(f'Converted command to binary: {command}')

        # Write command
        self.serial.flushInput()
        self.serial.write(command)
        logger.debug('Command written to serial port.')

        # Retrieve Response
        resp = bytearray()
        while True:
            resp_raw = self.serial.read(10)    # Will return empty array after timeout
            logger.debug(f'Received raw response: {resp_raw}')

            if len(resp_raw) == 0:
                logger.debug(f'No response received. Connection repeats left: {self.connection_repeats}')
                if self.connection_repeats > 0:
                    self.connection_repeats -= 1
                    logger.error(f'Attempt {self.max_repeats - self.connection_repeats}/{self.max_repeats}: sent Immediate Command {commandstring}, in binascii: {command}')
                    return self.iCommand(commandstring)  # Safe recursion
                else:
                    logger.critical('No response from device after maximum retries.')
                    raise Exception(str(datetime.datetime.now()) + " No response from device")

            resp.extend(resp_raw)
            logger.debug(f'Current response buffer: {resp}')

            # Extended ASCII represents end of message
            if resp[-1] > 127:
                resp[-1] -= 128
                logger.debug(f'End of message detected, adjusted last byte: {resp[-1]}')
                logger.debug('Sending immediate command complete.')
                break

            # Write Acknowledge to Device to signal next byte can be retrieved
            else:
                self.serial.flushInput()
                self.serial.write(bytes.fromhex("06"))
                logger.debug('Acknowledgement sent to device.')

        logger.debug(f'Received full response: {resp}')
        return resp.decode("ascii")

In [None]:
# 4. Testing and Debugging
# Create an instance of DeviceController
device = DeviceController(ser)

# Example command to send (replace with actual command for your device)
command_string = '%'

# Call the iCommand method
try:
    response = device.iCommand(command_string)
    print(f'Response from device: {response}')
except Exception as e:
    print(f'Error: {e}')

In [None]:
ser.close()

## Try Run and Platform Setup

This seems a little complex as is, too many unneeded things bundled in run.py. Try to extract the relevant functions and start again.

In [None]:
from platform_setup_new import *

In [None]:
rack_position_offset_x=92       #distance in mm between rack_position=1 and =2 on x-axis
rack_position_offset_y=0        #distance in mm between rack_position=1 and =2 on y-axis

############################# RACK 1 DEFINITION #################################

# From platform_setup.py 
rack1 = Rack([4,16], 7.5, 39.5, 18.5, 13.75, 65) # groundlevel_height assumed the minimum Z

#  array_dimensions, offset_x, offset_y=offset_y, vial2vial_x, vial2vial_y, groundlevel_height

# Previous vial2vialx = (2.11+15.6)
# Previous vial2vial7 = (2.72+15.6+0.35)

array_order1 = np.array([      #user is obliged to define a integer number i>=1 for each vial in the rack in ascending order 
    [1, 2, 3, 4],
    [5, 6, 7, 8],
    [9, 10,11,12],
    [13,14,15,16],
    [17,18,19,20],
    [21,22,23,24],     
    [25,26,27,28],     
    [29,30,31,32],
    [33,34,35,36],
    [37,38,39,40],
    [41,42,43,44],
    [45,46,47,48],
    [49,50,51,52],
    [53,54,55,56],
    [57,58,59,60],
    [61,62,63,64]        
    ])
    
rack_pos1=1

global rack1_commands

# Not sure what rack_position_offset_x/y are for x=92 and y=0

rack1_commands = Rackcommands(rack1, array_order1, rack_pos1, rack_position_offset_x, rack_position_offset_y)

global vial_selfmade

vial_selfmade = Vial(1.5, 1, 33, 31.08)

In [None]:
# It works!
thing = rack1_commands.get_xy_command(2)
thing
g.bCommand(thing[0])

In [None]:
g.bCommand('H')

## SPKA code - to put in a function

In [None]:
# Define kinetic screen
no_sub_A = 2      # First Column
no_sub_B = 1      # Second Column
no_cat = 0        # Third Column
no_add = 0        # Fourth Column

# Create lists for each
sub_A_list = [1 + 4 * i for i in range(no_sub_A)]
sub_B_list = [2 + 4 * i for i in range(no_sub_B)] if no_sub_B > 0 else []
cat_list = [3 + 4 * i for i in range(no_cat)] if no_cat > 0 else []
add_list = [4 + 4 * i for i in range(no_add)] if no_add > 0 else []

#print(sub_A_array, sub_B_array, cat_array)

In [None]:
# Generate all possible combinations
all_combinations = list(itertools.product(sub_A_list, sub_B_list, cat_list, add_list))

# Reorder combinations so that all combinations with the same elements
# in sub_B_array and cat_array are grouped together
spka_combinations = []

# Iterate through all unique combinations of sub_B_array and cat_array
# This will go through all sub_A first, then sub_B, then cat
for a in sub_A_list:
    if sub_B_list:
        for b in sub_B_list:
            if cat_list:  # Check if cat_list is not empty
                for c in cat_list:
                    if add_list:  # Only include d if add_list is not empty
                        for d in add_list:
                            spka_combinations.append((a, b, c, d))
                    else:
                        spka_combinations.append((a, b, c))  # Append without d
            else:  # If cat_list is empty
                if add_list:
                    for d in add_list:
                        spka_combinations.append((a, b, d))  # Append without c
                else:
                    spka_combinations.append((a, b))  # Append only a and b
    else:  # If sub_B_list is empty
        if cat_list:
            for c in cat_list:
                if add_list:
                    for d in add_list:
                        spka_combinations.append((a, c, d))  # Append without b
                else:
                    spka_combinations.append((a, c))  # Append only a and c
        else:  # If both sub_B_list and cat_list are empty
            if add_list:
                for d in add_list:
                    spka_combinations.append((a, d))  # Append only a and d
            else:
                spka_combinations.append((a,))  # Append only a

# Display the combinations
spka_combinations

In [None]:
# Iterate over each tuple in the list (each separate kinetic profile)
for tuple in spka_combinations:
    
    # Iterate over each number in the tuple (each reagent in an experiment)
    for element in tuple:
        
        # Find the xy position of the vial        
        vial_xy_pos = rack1_commands.get_xy_command(element)
        
        ##### TO DO #####
        
        # Display vial number - must be a two digit integer
        g.bCommand(f"W{element:02}")
        
        # Go to Vial
        g.bCommand(vial_xy_pos[0])

        # Needle Down - define this distance somewhere above!
        g.bCommand('Z85')

        # Run the pump
        # This will be a while away...
        # How to make the autosampler idle while this is happening?

        # Needle Up
        g.bCommand('Z120')

        # Log the vial and the amount taken
        # Add a logger...
        print(vial_xy_pos)

        # Create air gap
        # Create a tiny airgap with the pump, will this be necessary?
        
    # Go to the DIM and inject
    g_inject()
    
    # Blank the display
    g.bCommand('WBB')
    
    # For now, let's just go to Home instead of the DIM
    g_inject()

g.bCommand('H')