In [1]:
from ctypes import *
import logging
import numpy as np
import time
import os

from hamamatsu import Hamamatsu
from pxi import PXI
from pxierrors import IMAQError

In [2]:
class FakePXI(PXI):
    def __init__(self, address = None):
        self.logger = logging.getLogger(str(self.__class__))
        self.logger.setLevel(logging.DEBUG)
        #fh = logging.FileHandler('spam.log')
        #fh.setLevel(logging.DEBUG)
        #self.logger.addHandler(fh)
        self._stop_connections = False
        self._reset_connection = False
        self._exit_measurement = False
        self.cycle_continuously = False
        self.return_data = ""
        self.return_data_queue = ""
        self.measurement_timeout = 0
        self.keylisten_thread = None
        #self.command_queue = Queue(0)  # 0 indicates no maximum queue length enforced.
        self.element_tags = []  # for debugging
        self.devices = []
        
    @property
    def stop_connections(self) -> bool:
        return self._stop_connections

    @stop_connections.setter
    def stop_connections(self, value):
        self._stop_connections = value

    @property
    def reset_connection(self) -> bool:
        return self._reset_connection

    @reset_connection.setter
    def reset_connection(self, value):
        self._reset_connection = value

    @property
    def exit_measurement(self) -> bool:
        return self._exit_measurement

    @exit_measurement.setter
    def exit_measurement(self, value):
        self._exit_measurement = value

In [3]:
def bare_hamamatsu_init(HM: Hamamatsu):
    session = HM.session
    session.open_interface("img0")
    session.open_session()
    
def write_serial(HM: Hamamatsu, command):
    return HM.session.hamamatsu_serial(command)

In [4]:
hamamatsu = Hamamatsu(FakePXI())
try:
    bare_hamamatsu_init(hamamatsu)
except IMAQError as e:
    if e.error_code == -1074396995:
        print("whoops! we already have a session!")
    else:
        raise

In [None]:
def LL_write_serial(HM: Hamamatsu, command):
    
    session = HM.session
    imaq = session.imaq
    expected_response = command
    check_error = True
    timeout = 10000
    
    enc_exp_rsp = command
    
    c_cmd = create_unicode_buffer(command)
    #print(c_cmd.value)
    bf_size = c_uint32(100)

    error_code = imaq.imgSessionSerialWrite(
        session.session_id,   # SESSION_ID
        c_cmd,             # void*
        byref(bf_size),    # uInt32*
        c_int32(timeout),  # uInt32
    )

    if error_code != 0 and check_error:
        session.check(error_code, f"IMAQ serial write command {command}")
        return error_code, "Error"

    str_bf = create_string_buffer(b"",size = 20)
    print(f"buffer size = {bf_size.value}")
    error_code = imaq.imgSessionSerialRead(
        session.session_id,  # SESSION_ID
        str_bf,           # void*
        byref(bf_size),   # uInt32*
        c_int32(timeout)  # Int32
    )

    if error_code != 0 and check_error:
        session.check(error_code, f"IMAQ serial read command {command}")

    enc_rsp = str_bf.value
    if expected_response == "Nothing" or enc_exp_rsp == enc_rsp:
        return error_code, enc_rsp
    msg = f"Serial write {command}.\n Expected Response {enc_exp_rsp} got {enc_rsp}\n"
    session.logger.warning(msg)
    return error_code, enc_rsp

LL_write_serial(hamamatsu,"AMD N\r")


In [None]:
try:
    session = hamamatsu.session
    bf_size = c_uint32(100)
    timeout = 10000
    str_bf = create_string_buffer(b"", bf_size.value)
    print(f"buffer size = {bf_size.value}")
    error_code = session.imaq.imgSessionSerialRead(
        session.session_id,  # SESSION_ID
        str_bf,           # void*
        byref(bf_size),   # uInt32*
        c_int32(timeout)  # Int32
    )

    if error_code != 0 and True:
        session.check(error_code, f"IMAQ serial read")
except IMAQError as e:
    pass

In [None]:
#hamamatsu.close()