In [1]:
# !pip install pycapnp
# !pip install typer
# !pip install ipywidgets

In [2]:
# Packages
import asyncio
import zmq.asyncio
import ipywidgets as widgets
from IPython.display import display
import numpy as np
import time

# Utils
from deepdrrzmq.utils import zmq_util, typer_util, server_util, config_util
from deepdrrzmq.utils.zmq_util import zmq_no_linger_context
from deepdrrzmq.utils.typer_util import unwrap_typer_param
from deepdrrzmq.utils.server_util import messages, make_response, DeepDRRServerException
from deepdrrzmq.utils.config_util import load_config

In [3]:
class ZMQAsyncMsgHandler:
    """
    Manage the setup and configuration of ZMQ publisher/subscriber sockets for communication over TCP. 
    It uses the asyncio variant of ZMQ for asynchronous operations.
    """
    def __init__(self):
        """
        Initializes the ZMQ connection handler with the address and port numbers for REP, PUB, and SUB sockets.

        :param addr: The address of the ZMQ server.
        :param rep_port: The port number for REP (reply) socket connections.
        :param pub_port: The port number for PUB (publish) socket connections.
        :param sub_port: The port number for SUB (subscribe) socket connections.
        """
        self.config = load_config()
        self.addr = self.config['network']['addr_localhost'] # 'addr_localhost' or 'addr_ip'
        self.rep_port = self.config['network']['rep_port']
        self.pub_port = self.config['network']['pub_port']
        self.sub_port = self.config['network']['sub_port']
        print(self.addr, self.rep_port, self.pub_port, self.sub_port)
        
        self.context = zmq.asyncio.Context() # zmq.Context()
        self.setup()

    def setup(self):
        """
        Sets up ZMQ PUB and SUB sockets for communication.
        """
        # Setup the PUB socket
        self.pub_socket = self.context.socket(zmq.PUB)                # Create a PUB socket
        self.pub_socket.hwm = 10000                                   # Set the high water mark (HWM) for message buffering
        self.pub_socket.connect(f"tcp://{self.addr}:{self.pub_port}") # Connect the PUB socket to the specified address and port
        # Setup the SUB socket
        self.sub_socket = self.context.socket(zmq.SUB)                # Create a SUB socket
        self.sub_socket.hwm = 10000                                   # Set the high water mark (HWM) for message buffering
        self.sub_socket.connect(f"tcp://{self.addr}:{self.sub_port}") # Connect the SUB socket to the specified address and port
        self.sub_socket.subscribe(b"/loggerd/")                       # Subscribe to the "/loggerd/" topic


In [4]:
ZMQAMH = ZMQAsyncMsgHandler()

localhost 40120 40121 40122


In [5]:
record_button = widgets.Button(description='New Session', disabled=True, button_style='', icon='circle')
stop_button = widgets.Button(description='Stop', disabled=True, button_style='', icon='square')

session_id = widgets.Text(value='None', placeholder='', description='Session ID:', disabled=True)
record_status = widgets.Text(value='None', placeholder='', description='Recording:', disabled=True)

async def f():
    last_heard_time = 0

    while True:
        latest_msgs = {}

        try:
            # for i in range(1000):
            #     topic, data = ZMQAMH.sub_socket.recv_multipart(flags=zmq.NOBLOCK)
            #     latest_msgs[topic] = data
            topic, data = await ZMQAMH.sub_socket.recv_multipart()
            latest_msgs[topic] = data
        except zmq.ZMQError:
            print(f"ZMQ Error exception")
            pass

        for topic, data in latest_msgs.items():
            if topic == b"/loggerd/status/":
                last_heard_time = time.time()
                record_button.disabled = False
                stop_button.disabled = False
                with messages.LoggerStatus.from_bytes(data) as msg:
                    session_id.value = msg.sessionId if msg.recording else "no session"
                    record_status.value = str(msg.recording)
                    record_button.button_style = 'danger' if msg.recording else ''

        if time.time() - last_heard_time > 3:
            session_id.value = "Disconnected"
            record_status.value = "Disconnected"
            record_button.disabled = True
            stop_button.disabled = True

        await asyncio.sleep(0.1)

try:
    ui_update_task.cancel()
except:
    pass
ui_update_task = asyncio.create_task(f())

async def on_record_button_clicked(b):
    await ZMQAMH.pub_socket.send_multipart([b"/loggerd/start/", b""])
    print(f"Record button clicked, b = {b}")
    
async def on_stop_button_clicked(b):
    await ZMQAMH.pub_socket.send_multipart([b"/loggerd/stop/", b""])
    print(f"Stop button clicked, b = {b}")

colortoggle = widgets.ToggleButtons(
    options=[('Opaque', 0), ('Transparent' ,1)],
    description='Patient Material:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)

viewindicator = widgets.ToggleButtons(
    options=[('On',True), ('Off',False)],
    description='View Indicator ON/OFF:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)

selfviewindicator = widgets.ToggleButtons(
    options=[('On',True), ('Off',False)],
    description='View Indicator Self-Select ON/OFF:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)

Flippatient = widgets.ToggleButtons(
    options=[('Original',True), ('Flipped',False)],
    description='Flip Patient:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)

kwireindicator = widgets.ToggleButtons(
    options=[('On',True), ('Off',False)],
    description='Kwire Error Indicator ON/OFF:',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)

kwireerrorselection = widgets.ToggleButtons(
    options=[('On',True), ('Off',False)],
    description='Kwire Error Indicator Self-Selection',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
)


corridorselection= widgets.Select(
    options=[('Left S1', 0), ('Left Ramus', 1), ('L Ramus Short', 2),('L Teardrop', 3),('Right S1', 4), ('Right Ramus', 5), ('Right Ramus Short', 6),('Right Teardrop', 7),('S 2', 8),('S 3', 9),('None',100)],
    value=2,
    description='Corridor:',
)


updatesetting = widgets.Button(
    description='Update Setting',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click me',
    icon='check' # (FontAwesome names without the `fa-` prefix)
)

viewindicatorselect = widgets.SelectMultiple(
    options=['Anteroposterior_View', 'Lateral_View', 'Inlet_View', 'Outlet_View', 'Obturator_Oblique_Left/Iliac_Oblique_Right', 'Obturator_Oblique_Right/Iliac_Oblique_Left', 'Obturator_Oblique_Inlet_Left/Iliac_Oblique_Inlet_Right','Obturator_Oblique_Outlet_Left/Iliac_Oblique_Outlet_Right','Obturator_Oblique_Inlet_Right/Iliac_Oblique_Inlet_Left','Obturator_Oblique_Outlet_Right/Iliac_Oblique_Outlet_Left', 'Teardrop_Left_View','Teardrop_Right_View','None'],
    value=['Lateral_View'],
    #rows=10,
    description='View Indicator Selection:',
    disabled=False
)

left_box = widgets.VBox([colortoggle,Flippatient])
middle_box = widgets.VBox([viewindicator,selfviewindicator,viewindicatorselect])
right_box = widgets.VBox([kwireindicator,kwireerrorselection,corridorselection])
display(widgets.HBox([left_box,middle_box ,right_box,updatesetting]))

async def updatesetting_clicked(b):
    response_topic = "/mp/setting/webgui/SettingManager/"
    msg = messages.SycnedSetting.new_message()
    msg.timestamp = msg.timestamp = float(np.float64(time.time()).item())
    msg.clientId = 'webgui'
    uiControlVar = msg.setting.init("uiControl")
    uiControlVar.patientMaterial = colortoggle.value
    uiControlVar.flippatient = Flippatient.value #need add      
    uiControlVar.carmIndicator = viewindicator.value
    uiControlVar.viewIndicatorselfselect = selfviewindicator.value #need add
    uiControlVar.annotationError = viewindicatorselect.value#for different view selection
    uiControlVar.corridorIndicator =kwireindicator.value
    uiControlVar.webcorridorerrorselect = kwireerrorselection.value
    uiControlVar.webcorridorselection = corridorselection.value
    await ZMQAMH.pub_socket.send_multipart([response_topic.encode(), msg.to_bytes()])
    print(f"Update Setting button clicked, b = {b}")

updatesetting.on_click(lambda b: asyncio.create_task(updatesetting_clicked(b)))
record_button.on_click(lambda b: asyncio.create_task(on_record_button_clicked(b)))
record_button.on_click(lambda b: asyncio.create_task(updatesetting_clicked(b)))
stop_button.on_click(lambda b: asyncio.create_task(on_stop_button_clicked(b)))



display(widgets.HBox([record_button, stop_button, session_id, record_status]))

HBox(children=(VBox(children=(ToggleButtons(description='Patient Material:', options=(('Opaque', 0), ('Transpa…

HBox(children=(Button(description='New Session', disabled=True, icon='circle', style=ButtonStyle()), Button(de…

Update Setting button clicked, b = Button(description='Update Setting', icon='check', style=ButtonStyle(), tooltip='Click me')
Update Setting button clicked, b = Button(description='Update Setting', icon='check', style=ButtonStyle(), tooltip='Click me')
Record button clicked, b = Button(description='New Session', icon='circle', style=ButtonStyle())
Update Setting button clicked, b = Button(description='New Session', icon='circle', style=ButtonStyle())
Stop button clicked, b = Button(description='Stop', icon='square', style=ButtonStyle())
Record button clicked, b = Button(description='New Session', icon='circle', style=ButtonStyle())
Update Setting button clicked, b = Button(description='New Session', icon='circle', style=ButtonStyle())
Stop button clicked, b = Button(description='Stop', icon='square', style=ButtonStyle())
Record button clicked, b = Button(description='New Session', icon='circle', style=ButtonStyle())
Update Setting button clicked, b = Button(description='New Session', 

In [6]:
# class ControlPanel:
#     def __init__(self, zmq_handler):
#         self.zmq_handler = zmq_handler
#         self._create_widgets()
#         self._setup_layout()
#         self._bind_events()

#     def _create_widgets(self):
#         # bot
#         self.record_button = widgets.Button(description='New Session', disabled=True, icon='circle')
#         self.stop_button = widgets.Button(description='Stop', disabled=True, icon='square')
#         self.session_id = widgets.Text(value='None', description='Session ID:', disabled=True)
#         self.record_status = widgets.Text(value='None', description='Recording:', disabled=True)
#         self.
#         # add other widgets...
#         # self.colortoggle, self.viewindicator...etc

#     def _setup_layout(self):
#         # 組織 widgets 的佈局
#         control_box = widgets.HBox([self.record_button, self.stop_button, self.session_id, self.record_status])
#         display(control_box)
#         # 顯示其他 widgets 的佈局

#     def _bind_events(self):
#         self.record_button.on_click(self.on_record_button_clicked)
#         self.stop_button.on_click(self.on_stop_button_clicked)
#         # 綁定其他事件

#     async def ui_update_loop(self):
#         last_heard_time = 0
#         while True:
#             # UI 更新邏輯，例如接收消息並更新 UI
#             await asyncio.sleep(0.1)

#     def on_record_button_clicked(self, b):
#         self.zmq_handler.pub_socket.send_multipart([b"/loggerd/start/", b""])

#     def on_stop_button_clicked(self, b):
#         self.zmq_handler.pub_socket.send_multipart([b"/loggerd/stop/", b""])
#         # 處理其他按鈕點擊事件

  self._get_loop()


In [7]:
# async def main():
#     zmq_handler = ZMQAsyncMsgHandler()  # 初始化 ZMQ handler
#     control_panel = ControlPanel(zmq_handler)  # 創建控制面板
#     await control_panel.ui_update_loop()  # 啟動 UI 更新循環

In [8]:
# if __name__ == '__main__':
#     asyncio.run(main())