In [1]:
# !pip install pycapnp
# !pip install typer
# !pip install ipywidgets

In [2]:
import asyncio
import zmq.asyncio

import os
import numpy as np
import logging
from pathlib import Path

import capnp
import typer
import zmq.asyncio
import time
from deepdrrzmq.utils.zmq_util import zmq_poll_latest

from deepdrrzmq.utils.typer_util import unwrap_typer_param
from deepdrrzmq.utils.server_util import make_response, DeepDRRServerException, messages
import random
import string

import threading
from IPython.display import display
import ipywidgets as widgets
import time
from threading import Event
import asyncio

In [3]:
rep_port=40100
pub_port=40101
sub_port=40102

In [4]:
context = zmq.asyncio.Context()
# context = zmq.Context()
context.__enter__()

sub_socket = context.socket(zmq.SUB)
sub_socket.hwm = 10000

pub_socket = context.socket(zmq.PUB)
pub_socket.hwm = 10000

# addr = "192.158.1.54"
addr = "localhost"
pub_socket.connect(f"tcp://{addr}:{pub_port}") # make variable
sub_socket.connect(f"tcp://{addr}:{sub_port}")
# pub_socket.connect(f"tcp:10.0.0.7") # make variable
# sub_socket.connect(f"tcp:10.0.0.7")

# sub_socket.subscribe(b"")
sub_socket.subscribe(b"/replayd/")

In [5]:
class ReplayControllerInterface:
    def __init__(self):
        self.enable_button = widgets.Button(disabled=False, button_style='', icon='check')
        self.disable_button = widgets.Button(disabled=False, button_style='', icon='close')
        self.record_button = widgets.Button(description='New Session', disabled=True, button_style='', icon='circle')
        self.play_button = widgets.Button(layout=widgets.Layout(width='40px'), disabled=False, button_style='', icon='play')
        self.pause_button = widgets.Button(layout=widgets.Layout(width='40px'), disabled=False, button_style='', icon='pause')
        self.session_id = widgets.Text(value='None', placeholder='', description='Selected:', disabled=True, layout=widgets.Layout(width='400px'))
        self.duration_display = widgets.Text(value='None', placeholder='', description='Duration:', disabled=True)
        self.playback_status = widgets.Text(value='None', placeholder='', description='Status:', disabled=True)
        self.log_dropdown = widgets.Dropdown(options=['not loaded'], description='Log file:', disabled=False)
        self.change_log_button = widgets.Button(description='Load Log', disabled=False, button_style='', icon='cloud-upload')
        self.loop_checkbox = widgets.Checkbox(value=False, description='Loop', disabled=False, indent=False)
        self.scrub_slider = widgets.FloatSlider(
            value=7.5,
            min=0,
            max=10.0,
            step=0,
            # description='Test:',
            disabled=False,
            continuous_update=False,
            orientation='horizontal',
            readout=True,
            readout_format='.1f',
            layout=widgets.Layout(width='500px')
        )
        self.log_dropdown_init = False
        self.last_heard_time = 0
        self.last_code_scrub_slider_value = None
        self.replayd_enabled = False
        self.no_log_loaded = True
        self.start_time = 0

    async def start(self):
        await asyncio.gather(
            self.publish(),
            self.listen(),
            self.ui_update(),
        )

    async def publish(self):
        while True:
            # if not self.log_dropdown_init:
            #     pub_socket.send_multipart([b"/replayd/in/listrequest/", b""])
            
            self.disconnected = time.time() - self.last_heard_time > 5

            if self.disconnected:
                self.session_id.value = "Disconnected"
                self.playback_status.value = "Disconnected"
                self.duration_display.value = "Disconnected"
                self.play_button.button_style =  ''
                self.pause_button.button_style = ''

            self.play_button.disabled = self.disconnected or not self.replayd_enabled or self.no_log_loaded
            self.pause_button.disabled = self.disconnected or not self.replayd_enabled or self.no_log_loaded
            self.scrub_slider.disabled = self.disconnected or not self.replayd_enabled or self.no_log_loaded
            self.log_dropdown.disabled = self.disconnected or not self.replayd_enabled
            self.loop_checkbox.disabled = self.disconnected or not self.replayd_enabled

            # print(f"{self.disconnected=} {self.replayd_enabled=} {self.no_log_loaded=}")
            
            await asyncio.sleep(.1)

    async def listen(self):

        while True:
            latest_msgs = await zmq_poll_latest(sub_socket)

            for topic, data in latest_msgs.items():
                if topic == b"/replayd/status/":
                    self.last_heard_time = time.time()
                    
                    with messages.ReplayerStatus.from_bytes(data) as msg:
                        self.no_log_loaded = msg.logId == ""

                        new_slider_val = msg.time - msg.startTime
                        self.replayd_enabled = msg.enabled
                        self.last_code_scrub_slider_value = new_slider_val
                        self.scrub_slider.value = new_slider_val
                        self.start_time = msg.startTime
                        self.scrub_slider.max = msg.endTime - msg.startTime
                        self.scrub_slider.min = 0
                        self.session_id.value = msg.logId if not self.no_log_loaded else "Not Loaded"
                        
                        self.playback_status.value = "Playing" if msg.playing else "Paused"

                        self.enable_button.button_style = 'success' if self.replayd_enabled else ''
                        self.disable_button.button_style = 'danger' if not self.replayd_enabled else ''

                        
                        self.play_button.button_style = 'success' if msg.playing and not self.no_log_loaded else ''
                        self.pause_button.button_style = 'danger' if not msg.playing and not self.no_log_loaded else ''

                        self.loop_checkbox.value = msg.loop



                        self.duration_display.value = f"{msg.endTime - msg.startTime:.2f} s" if not self.no_log_loaded else "Not Loaded"

                elif topic == b"/replayd/list/":
                    with messages.LogList.from_bytes(data) as msg:
                        log_list = [x.id for x in msg.logs]
                        self.log_dropdown.options = reversed(log_list)
                        self.log_dropdown_init = True 
                        # print(f"got list {log_list}")


            await asyncio.sleep(0)

    def on_play_button_clicked(self, b):
        pub_socket.send_multipart([b"/replayd/in/start/", b""])
        print("play button clicked")

    def on_pause_button_clicked(self, b):
        pub_socket.send_multipart([b"/replayd/in/stop/", b""])
        print("pause button clicked")

    def on_selected_log_changed(self, b):
        print("selected log changed")
        msg = messages.LoadLogRequest.new_message()
        msg.logId = self.log_dropdown.value
        msg.autoplay = True
        msg.loop = True
        pub_socket.send_multipart([b"/replayd/in/load/", msg.to_bytes()])

    def on_loop_checkbox_changed(self, b):
        pub_socket.send_multipart([b"/replayd/in/listrequest/", b""])

        print(f"loop checkbox changed to {self.loop_checkbox.value}")
        msg = messages.BoolValue.new_message()
        msg.value = self.loop_checkbox.value
        pub_socket.send_multipart([b"/replayd/in/loop/", msg.to_bytes()])

    def on_scrub_slider_changed(self, b):
        if abs(self.scrub_slider.value - self.last_code_scrub_slider_value) > 0.01:
            # print(b)
            # print(f"{self.scrub_slider.value=} {self.last_code_scrub_slider_value=} {self.scrub_slider.value - self.last_code_scrub_slider_value=}")
            msg = messages.Float64Value.new_message()
            # msg.value = self.start_time
            msg.value = self.scrub_slider.value + self.start_time
            pub_socket.send_multipart([b"/replayd/in/scrub/", msg.to_bytes()])
            print(f"scrub slider changed to {self.scrub_slider.value}")

    def on_enable_click(self, b):
        pub_socket.send_multipart([b"/replayd/in/enable/", b""])

    def on_disable_click(self, b):
        pub_socket.send_multipart([b"/replayd/in/disable/", b""])

    async def ui_update(self):
        self.enable_button.on_click(self.on_enable_click)
        self.disable_button.on_click(self.on_disable_click)
        self.play_button.on_click(self.on_play_button_clicked)
        self.pause_button.on_click(self.on_pause_button_clicked)
        self.change_log_button.on_click(self.on_selected_log_changed)
        self.loop_checkbox.observe(self.on_loop_checkbox_changed, names='value')
        self.scrub_slider.observe(self.on_scrub_slider_changed, names='value')
        display(widgets.VBox([
            widgets.HBox([
                self.enable_button,
                self.disable_button,
            ]),
            widgets.HBox([
                self.log_dropdown,
                self.change_log_button,
                self.session_id,    
                self.playback_status,

            ]),
            widgets.HBox([
                self.play_button,
                self.pause_button,
                self.scrub_slider,
                self.loop_checkbox,
                self.duration_display,
            ])
        ]))



try:
    main_future.cancel()
    del main_future
    del interface

    print("Please restart the kernel to run this cell again, interface may be buggy after this")
except:
    pass

interface = ReplayControllerInterface()
main_future = asyncio.ensure_future(interface.start())
#await main_future


VBox(children=(HBox(children=(Button(icon='check', style=ButtonStyle()), Button(icon='close', style=ButtonStyl…

play button clicked
pause button clicked
scrub slider changed to 0.0
play button clicked
selected log changed
play button clicked
pause button clicked
scrub slider changed to 369.9445813390815
play button clicked
