In [14]:
import cv2
from pathlib import Path
# If you want to read a video file set the path under path, 
# If you want to use a specific image set use_video to False and give the image path under path
#TODO: Set the path to the video file or image file
use_video = True
path = "/home/ademi/hermes/data/open_oven_20241022_preprocessed/demonstration_0/cam_3_rgb_video.mp4"

#TODO: If you have an open teach pickle you can use that instead of the video/image, set use_pickle to True and set the path/pixel_key
use_pickle = False
pickle_path = "path/to/pickle"
pixel_key = "pixels"

#TODO: Set the task name here -- this will be used to save the output
task_name = "open_oven_1022"

if use_pickle:
    import pickle
    with open(pickle_path, 'rb') as f:
        data = pickle.load(f)
    img = data['observations'][0][pixel_key][0]
    use_video = False
else:
    img = None
    if use_video:
        cap = cv2.VideoCapture(path)
        while(cap.isOpened()):
            ret, frame = cap.read()
            img = frame.copy()
            break
    else:
        img = cv2.imread(path)

    # We flip here because CV2 reads in as BGR
    img = img[:,:,::-1]

#TODO: If its hard to see the image, you can increase the size_multiplier, this won't affect the selected coordinates
size_multiplier = 1

coordinates_path = f"../../coordinates"

In [15]:
%gui asyncio

import os
from PIL import Image

import numpy as np
from IPython.display import display, Javascript
import ipywidgets as widgets
from ipycanvas import Canvas, hold_canvas
import pickle

import io
import asyncio
import logging

# Define an async function to wait for button click
async def wait_for_click(button):
    # Create a future object
    future = asyncio.Future()
    # Define the click event handler
    def on_button_clicked(b):
        future.set_result(None)
    # Attach the event handler to the button
    button.on_click(on_button_clicked)
    # Wait until the future is set
    await future

class Points():
    def __init__(self, env_name, img, coordinates_path, size_multiplier=1):
        logging.getLogger().setLevel(logging.DEBUG)
        logging.info("Starting the Points class")
        self.img = img
        self.size_multiplier = size_multiplier
        self.coordinates_path = coordinates_path
        self.env_name = env_name

        # Save the image to a bytes buffer
        image = Image.fromarray(self.img)
        size = img.shape
        image = image.resize((size[1] * self.size_multiplier, size[0] * self.size_multiplier))
        buffer = io.BytesIO()
        image.save(buffer, format='PNG')
        buffer.seek(0)

        # Create an IPyWidgets Image widget
        self.canvas = Canvas(width=size[1] * self.size_multiplier, height=size[0] * self.size_multiplier)
        # Define the size of each cell

        self.canvas.put_image_data(np.array(image), 0, 0)

        # Display coordinates
        coords_label = widgets.Label(value="Click on the image to select the coordinates")

        # Define the click event handler
        self.coords = []
        def on_click(x, y):
            coords_label.value = f"Coordinates: ({x}, {y})"
            self.coords.append((0, x, y))

            with hold_canvas(self.canvas):
                self.canvas.put_image_data(np.array(image), 0, 0)  # Redraw the original image

                self.canvas.fill_style = 'red'
                for coord in self.coords:
                    x, y = coord[1] // self.size_multiplier, coord[2] // self.size_multiplier
                    self.canvas.fill_circle(x, y, 2)

        # Connect the click event to the handler
        self.canvas.on_mouse_down(on_click)

        self.button = widgets.Button(description="Save Points")

        # Display the widgets
        self.vbox = widgets.VBox([self.canvas, coords_label, self.button])

        # # Display the widget
        display(self.vbox)

    def on_done(self):
        logging.info("saving")
        Path(self.coordinates_path + "/coords/").mkdir(parents=True, exist_ok=True)
        with open(self.coordinates_path + "/coords/" + self.env_name + ".pkl", 'wb') as f:
            try:
                pickle.dump(self.coords, f)
            except Exception as e:
                logging.info(e)
        Path(self.coordinates_path + "/images/").mkdir(parents=True, exist_ok=True)
        with open(self.coordinates_path + "/images/" + self.env_name + ".png", 'wb') as f:
            try:
                image = Image.fromarray(self.img)
                image.save(f)
            except Exception as e:
                logging.info(e)
        logging.info("saved")

In [16]:
async def f():
    point = Points(task_name, img, coordinates_path, size_multiplier)
    x = await wait_for_click(point.button)
    point.vbox.close()
    point.canvas.close()
    point.on_done()
asyncio.ensure_future(f())

<Task pending name='Task-9' coro=<f() running at /tmp/ipykernel_1077460/3187731821.py:1>>

INFO:root:Starting the Points class


VBox(children=(Canvas(height=720, width=1280), Label(value='Click on the image to select the coordinates'), Bu…

DEBUG:Comm:handle_msg[6508f4a7f223420baf43c98c0453b0a3]({'header': {'date': datetime.datetime(2024, 10, 23, 0, 4, 26, 110000, tzinfo=tzutc()), 'msg_id': '4f1c01cd-0c7f-4953-8d04-6e9028557500', 'msg_type': 'comm_msg', 'session': 'c1aae978-562e-43d5-a099-32f4f321e0f2', 'username': '803ece22-63dd-4462-8db3-eca9f0747131', 'version': '5.2'}, 'msg_id': '4f1c01cd-0c7f-4953-8d04-6e9028557500', 'msg_type': 'comm_msg', 'parent_header': {}, 'metadata': {}, 'content': {'comm_id': '6508f4a7f223420baf43c98c0453b0a3', 'data': {'method': 'custom', 'content': {'event': 'client_ready'}}}, 'buffers': []})
DEBUG:Comm:handle_msg[6508f4a7f223420baf43c98c0453b0a3]({'header': {'date': datetime.datetime(2024, 10, 23, 0, 4, 26, 520000, tzinfo=tzutc()), 'msg_id': '7a5d2f22-4fab-47c6-86b8-df8bda3a45b9', 'msg_type': 'comm_msg', 'session': 'c1aae978-562e-43d5-a099-32f4f321e0f2', 'username': '803ece22-63dd-4462-8db3-eca9f0747131', 'version': '5.2'}, 'msg_id': '7a5d2f22-4fab-47c6-86b8-df8bda3a45b9', 'msg_type': 'comm