In [3]:
pip install scikit-image


Collecting scikit-image
  Downloading scikit_image-0.25.2-cp311-cp311-macosx_12_0_arm64.whl.metadata (14 kB)
Collecting networkx>=3.0 (from scikit-image)
  Downloading networkx-3.5-py3-none-any.whl.metadata (6.3 kB)
Collecting imageio!=2.35.0,>=2.33 (from scikit-image)
  Downloading imageio-2.37.0-py3-none-any.whl.metadata (5.2 kB)
Collecting tifffile>=2022.8.12 (from scikit-image)
  Downloading tifffile-2025.5.26-py3-none-any.whl.metadata (32 kB)
Collecting lazy-loader>=0.4 (from scikit-image)
  Downloading lazy_loader-0.4-py3-none-any.whl.metadata (7.6 kB)
Downloading scikit_image-0.25.2-cp311-cp311-macosx_12_0_arm64.whl (13.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.2/13.2 MB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hDownloading imageio-2.37.0-py3-none-any.whl (315 kB)
Downloading lazy_loader-0.4-py3-none-any.whl (12 kB)
Downloading networkx-3.5-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [3

In [12]:
"""
gui.py

Gradio GUI definition -- provides a user interface with insight into the current ASR results, current language plan
being executed, visual state (from camera), and more.
"""

import queue
import time
from typing import List, Tuple
from skimage import data


import gradio as gr
import numpy as np

# from vsandbox.util.advance import trigger

def trigger():
    print("⚠️ Dummy trigger called. Replace with actual implementation.")

class DummyGlobalState:
    def __init__(self):
        self.state = {
            "teach_mode": False,
            "teaching_stack": [],
            "save_teach_episode": False,
            "teach_prims": [],
            "display_teach_prims": [],
            "state_machine": gui.state_machine
        }

    def lock_get(self, key):
        return self.state.get(key, None)

    def lock_set(self, key, value):
        self.state[key] = value

class GUI:
    def __init__(
        self,
        click_queue: queue.Queue,
        prim_functions: List = None,
        teach_lang=True,
        smooth=False,
        point_only=False,
    ) -> None:
        """Initializes a GUI with the given sandbox specification, and thread-safe queue(s) to write/read events."""
        self.click_queue, self.global_state = click_queue, None

        # Pointer to State Machine (thread-safe)
        self.state_machine, self.teach_lang, self.smooth, self.point_only = None, teach_lang, smooth, point_only
        self.state_machine = DummyStateMachine("TEACH_PERCEPTION_MODULE")
        if prim_functions is not None:
            self.prim_functions = [a["function"]["name"] for a in prim_functions]

        # Initialize Instance Variables
        self.command, self.high_plan, self.low_plan = "", [" "], [[" ", 0]]

        # # testing
        # self.low_plan = [['pickup ribbon roll', 0], ['pickup2 ribbon roll', 1], ['pickup2 ribbon roll', 1]]
        # self.high_plan = ['pickup ribbon roll', 'pickup4 ribbon roll']

        self.display_step, self.primitive_step, self.teaching_stack, self.display_teach_prims = 1, 1, [], []

        # # testing
        # self.teaching_stack, self.display_teach_prims = ["smell"], ["smell ribbon", "move"]

        self.image, self.command_history = np.zeros((256, 256, 3), dtype=np.uint8), []
        self.image = data.astronaut()
        self.update_freq = 0.1
        self.output_message = ""
        # self.output_counter = 0
        self.teach_on = False
        self.teach_counter = None
        self.known_primitives = []

        # Tracker for User Clicks
        self.clicked_point = None

        # Gradio GUI Attributes
        self.custom_text_lg = None
        self.row_css = """
            .row_container {
                height: 16vh !important;
            }
            gradio-app > .gradio-container {
                max-width: 100% !important;
            }
            """
        # self.initialize()

    def clear_clicks(self):
        with self.click_queue.mutex:
            self.click_queue.queue.clear()
        self.clicked_point = None
        self.obj_click = None

    def initialize(self):
        from gradio.themes.utils.sizes import Size

        self.custom_text_lg = Size(
            name="text_lg",
            xxs="14px",
            xs="16px",
            sm="18px",
            md="24px",
            lg="26px",
            xl="28px",
            xxl="32px",
        )
        self.build_gui()

    def add_global_state(self, global_state) -> None:
        self.global_state = global_state

    def get_command(self) -> str:
        return self.command

    def get_output(self) -> str:
        if self.state_machine is None:
            self.output_message = "### Setting up Robot"
        else:
            if self.global_state.lock_get("teach_mode") and not self.teach_on:
                if self.teach_counter is None:
                    self.teach_counter = time.time()
                if time.time() - self.teach_counter > 4:
                    self.teach_on = True

                self.output_message = "# Beginning Teaching"
            elif self.state_machine.current_state.value == "GET_USER_INPUT":
                self.output_message = "### Waiting for Command"
            elif self.state_machine.current_state.value == "TEACH_PERCEPTION_MODULE" and self.click_queue.empty():
                self.output_message = "# Click on Image to Teach Object"
            elif self.command != "":
                self.output_message = "### Executing"

            if not self.global_state.lock_get("teach_mode"):
                self.teach_on = False
                self.teach_counter = None

        return self.output_message

    @staticmethod
    def parse_highlighted_plan(
        plan: List[str],
        highlight_step: int = 0,
        label_name: str = "",
        prim_plan: List[str] = None,
        prim_step: int = None,
        teach: bool = False,
    ) -> List[Tuple[str, str]]:
        """
        Given a plan as a list of primitives to execute, returns a List[Tuple[str, str]] following the format expected
        by gr.HighlightedText (https://www.gradio.app/docs/highlightedtext).

        Specifically, each Tuple consists of (character: str, label: str) where `label` is element of "color_map" passed
        to the gr.HighlightedText element; in our case, the only label is the empty string ("" --> `green`).
        """
        if prim_plan is not None:
            prim_dict = {}
            for prim_num, prim in enumerate(prim_plan):
                prim_key = str(prim[1])
                if prim_key not in prim_dict.keys():
                    prim_dict[prim_key] = []
                prim_dict[prim_key].append([prim[0], prim_num])

        highlighted_plan = []
        for step, instruction in enumerate(plan):
            foldout_prims = []
            should_highlight = step == highlight_step
            if prim_plan is not None:
                if str(step) in prim_dict.keys():
                    foldout_prims = prim_dict[str(step)]

            show_foldout_prims = len(foldout_prims) > 1 or teach or (len(foldout_prims) == 1 and not foldout_prims[0][0] == instruction)
            # if len(foldout_prims) > 1 and not teach:
            if show_foldout_prims and not teach:
                should_highlight = False  # highlight the low level step that we are on instead

            for character in instruction:
                highlighted_plan.append((character, label_name if should_highlight else None))

            # Newline Separate each Instruction
            if len(plan) > 1 or show_foldout_prims:
                highlighted_plan.append(("\n", None))

            if prim_plan is not None:
                if show_foldout_prims:
                    for prim_instruction in foldout_prims:
                        if len(prim_instruction[0].replace(" ", "")) > 0:
                            highlight_prim = prim_step == prim_instruction[1]
                            highlighted_plan.append((" - ", label_name if highlight_prim else None))
                            for prim_character in prim_instruction[0]:
                                highlighted_plan.append((prim_character, label_name if highlight_prim else None))

                            highlighted_plan.append(("\n", None))

        return highlighted_plan

    def get_highlighted_plan(self) -> List[Tuple[str, str]]:
        return self.parse_highlighted_plan(
            self.high_plan, self.display_step, prim_plan=self.low_plan, prim_step=self.primitive_step
        )

    def get_teaching_stack(self) -> List[Tuple[str, str]]:
        if len(self.teaching_stack) == 0:
            return self.parse_highlighted_plan([""], -1, label_name="")

        # Otherwise --> indicate `TEACHING`
        if len(self.display_teach_prims) > 0:
            split_teach_prims = [[a, 0] for a in self.display_teach_prims]
        else:
            split_teach_prims = []

        teaching_highlighted = self.parse_highlighted_plan(
            self.teaching_stack, 0, prim_plan=split_teach_prims, label_name="TEACH", teach=True
        )
        return teaching_highlighted

    def get_img(self) -> np.ndarray:
        # import cv2
        # self.image = cv2.imread("/Users/jennifergrannen/Documents/Stanford/iliad/vocal_sand/keypoints/data/4_26_pics_full/00000.jpg")
        return self.image

    def get_command_history(self) -> str:
        if len(self.command_history) == 1 and self.command_history[0][0] != "-":
            self.command_history[0] = "- " + self.command_history[0]
        return "\n\n- ".join(self.command_history)

    def get_known_prims(self) -> str:
        highlighted_actions = []
        for action in self.known_primitives:
            if self.prim_functions is not None:
                if action not in self.prim_functions:
                    labelled = True
                else:
                    labelled = False
            else:
                labelled = False
            for charac in action:
                highlighted_actions.append((charac, "" if labelled else None))
            highlighted_actions.append((",", None))
            highlighted_actions.append((" ", None))
        return highlighted_actions[:-2]

    def dummy_clicked(self, b) -> None:

        state = self.global_state.lock_get("state_machine")
        print("CALLING HERE", state.current_state.id)

        if state.current_state.id == "RUN_PERCEPTION_MODULE" or state.current_state.id == "TEACH_PERCEPTION_MODULE":
            new_cancel_vision = gr.update(interactive=True)
            new_start_teach = gr.update(interactive=False)
            return new_start_teach, new_cancel_vision

            # new_cancel_vision = gr.Button("Re-Teach Object?", interactive=True)
            # new_cancel_vision.click(fn=self.reteach_vision)
            # new_start_teach = gr.Button("Teach New Plan?", interactive=False)
            # new_start_teach.click(fn=self.start_teaching)

        elif state.current_state.id == "IMPLICIT_TEACH_LANGUAGE_MODULE":
            print("IN HERE?")
            new_start_teach = gr.update(interactive=True)
            new_cancel_vision = gr.update(interactive=False)

            new_cancel_vision = gr.Button("Re-Teach Object?", interactive=False)
            # new_cancel_vision.click(fn=self.reteach_vision)
            new_start_teach = gr.Button("Teach New Plan?", interactive=True)
            # new_start_teach.click(fn=self.start_teaching)
            return new_start_teach, new_cancel_vision

        else:
            new_start_teach = gr.update(interactive=False)
            new_cancel_vision = gr.update(interactive=False)
            return new_start_teach, new_cancel_vision


            # new_cancel_vision = gr.Button("Re-Teach Object?", interactive=False)
            # # new_cancel_vision.click(fn=self.reteach_vision)
            # new_start_teach = gr.Button("Teach New Plan?", interactive=False)
            # # new_start_teach.click(fn=self.start_teaching)

        # return new_start_teach, new_cancel_vision


    def cancel_teaching(self) -> None:
        if self.click_queue.empty():
            self.click_queue.put_nowait(("click", None))

        self.global_state.lock_set("teach_mode", False)
        self.global_state.lock_set("teaching_stack", self.teaching_stack[1:])
        self.global_state.lock_set("save_teach_episode", False)
        self.global_state.lock_set("teach_prims", [])
        self.global_state.lock_set("display_teach_prims", [])

    def reteach_vision(self) -> None:
        self.global_state.lock_set("reteach_object", True)
        state = self.global_state.lock_get("state_machine")
        if state.current_state.id == "RUN_PERCEPTION_MODULE":
            state.teach()
            trigger()

        elif state.current_state.id == "TEACH_PERCEPTION_MODULE":
            state.retry()
            trigger()

    def start_teaching(self) -> None:
        state = self.global_state.lock_get("state_machine")
        if state.current_state.id == "IMPLICIT_TEACH_LANGUAGE_MODULE":
            state.fail()
            trigger()

    def finish_teaching(self) -> None:
        state = self.global_state.lock_get("state_machine")
        if state.current_state.id == "GET_USER_INPUT":
            self.global_state.lock_set("teach_mode", False)
            self.global_state.lock_set("save_teach_episode", True)
            state.teach()
            trigger()

    def build_gui(self) -> None:
        def click_handler(image: gr.Image, event: gr.SelectData) -> np.ndarray:
            """Parse (x, y) coordinate from `click` --> put  in `self.click_queue` (for Perception Teaching)."""
            if event is None:
                self.output_message = "Click event error"
                return self.image

            # Otherwise, parse `event` and invoke `put`
            x, y = event.index[0], event.index[1]
            if x is not None and y is not None:
                self.clicked_point = (x, y)
                if not self.click_queue.empty():
                    self.click_queue.get()  # only use last click
                self.click_queue.put_nowait(("click", self.clicked_point))

                self.output_message = f"Clicked at {self.clicked_point}"
                return self.image

            self.output_message = "Out of Bounds"
            return self.image

        with gr.Blocks(theme=gr.themes.Soft(text_size=self.custom_text_lg), css=self.row_css) as self.demo:
            with gr.Row():
                with gr.Column(scale=5):

                    with gr.Row():
                        with gr.Column(scale=2):
                            # with gr.Group(elem_classes=["row_container"]):
                            with gr.Group():
                                gr.Markdown('<p style="font-size: 40px; font-weight: bold; margin: 0px;"> &nbsp; I heard:</p>')
                                text = gr.Textbox(
                                    self.get_command,
                                    label="",
                                    show_label=False,
                                    container=False,
                                    lines=2,
                                    scale=2,
                                    every=self.update_freq,
                                    min_width=500,
                                )
                        with gr.Column(scale=1):
                            # with gr.Group(elem_classes=["row_container"]):
                            gr.Markdown('<p style="font-size: 30px; font-weight: bold; margin: 0px;"> &nbsp;Output:</p>')
                            gr.Markdown(self.get_output, every=self.update_freq)

                    with gr.Group():
                        with gr.Row():
                            with gr.Column(scale=1):
                                gr.Markdown('<p style="font-size: 30px; font-weight: bold; margin: 0px;"> &nbsp;Known Actions:</p>')
                            with gr.Column(scale=3):
                                # gr.Textbox(self.get_known_prims, show_label=False, container=False, every=self.update_freq)
                                gr.HighlightedText(
                                    self.get_known_prims,
                                    label = "",
                                    show_label=False,
                                    color_map={"": "green"},
                                    combine_adjacent=True,
                                    show_legend=False,
                                    every=self.update_freq,
                                )

                    with gr.Row():
                        with gr.Column(elem_classes=["col_lang"], scale=1):
                            # gr.Markdown('<p style="text-align: center; font-size: 30px; margin: 2px auto;">Language</p>')
                            with gr.Group():
                                gr.Markdown("### &nbsp; This is my plan:")
                                gr.HighlightedText(
                                    self.get_highlighted_plan,
                                    label="",
                                    show_label=False,
                                    color_map={"": "green"},
                                    combine_adjacent=True,
                                    show_legend=False,
                                    every=self.update_freq,
                                )
                                if self.smooth:
                                    start_teach_button = gr.Button("Teach New Plan?")
                                    start_teach_button.click(fn=self.start_teaching)

                            if self.teach_lang:
                                with gr.Group(visible=True) as self.teaching_stack_group:
                                    gr.Markdown("### &nbsp; Currently Teaching")
                                    gr.HighlightedText(
                                        self.get_teaching_stack,
                                        label="",
                                        show_label=False,
                                        color_map={"Teaching": "green"},
                                        combine_adjacent=True,
                                        show_legend=False,
                                        every=self.update_freq,
                                    )
                                    self.finish_teach_button = gr.Button("Finish and Save")
                                    self.finish_teach_button.click(fn=self.finish_teaching)

                                    self.cancel_teach_button = gr.Button("Cancel Teaching")
                                    self.cancel_teach_button.click(fn=self.cancel_teaching)

                        with gr.Column(scale=3):
                            # gr.Markdown('<p style="text-align: center; font-size: 30px; margin: 2px auto;">Vision</p>')

                            # with gr.Group():
                            vision_img = gr.Image(
                                self.get_img,
                                type="numpy",
                                show_label=False,
                                show_download_button=False,
                                interactive=False,
                                min_width=200,
                                every=self.update_freq,
                            )

                            if self.smooth:
                                cancel_vision_button = gr.Button("Re-Teach Object?")
                                cancel_vision_button.click(fn=self.reteach_vision)

                with gr.Column(scale=1):
                    with gr.Group():
                        gr.Markdown("### &nbsp; Command History")
                        # gr.Textbox(text, label="", show_label=False, autoscroll=True, lines=20)
                        gr.Textbox(self.get_command_history, label="", show_label=False, autoscroll=True, lines=20, every=self.update_freq)


            # Click Callback for "Perception" Teaching (logs x/y coordinate on image)
            if not self.point_only:
                vision_img.select(click_handler, inputs=[vision_img], outputs=[vision_img])

            # self.dummy_button = gr.Button("Dummy", interactive = True, visible=True)
            # self.dummy_button.click(fn = self.dummy_clicked, inputs = self.dummy_button, outputs = [start_teach_button,cancel_vision_button], every=self.update_freq)




    def gui(self):
        print("Starting Gradio UI")
        self.demo.queue().launch()


if __name__ == "__main__":
    import queue

    gui_click_queue = queue.Queue()
    gui = GUI(click_queue=gui_click_queue)
    # Assign it before .initialize()
    gui.global_state = DummyGlobalState()
    gui.initialize()
    gui.gui()
    # gui.demo.launch()

Starting Gradio UI
* Running on local URL:  http://127.0.0.1:7865
* To create a public link, set `share=True` in `launch()`.
