# OWG Evaluation Pipeline
Author: Vanessa

In [None]:
cd ..

In [None]:
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib.patches import FancyBboxPatch
from pprint import pprint

# helper function
%matplotlib inline
def display_image(path_or_array, size=(10, 10)):
  if isinstance(path_or_array, str):
    image = np.asarray(Image.open(open(image_path, 'rb')).convert("RGB"))
  else:
    image = path_or_array
  
  plt.figure(figsize=size)
  plt.imshow(image)
  plt.axis('off')
  plt.show()

## Setup Pybullet Env

In [None]:
from owg_robot.env import *
from owg_robot.camera import Camera
from owg_robot.objects import YcbObjects

In [None]:
#p.disconnect()

# load camera and env
center_x, center_y, center_z = CAM_X, CAM_Y, CAM_Z
camera = Camera((center_x, center_y, center_z), (center_x, center_y, 0.785), 0.2, 2.0, (448, 448), 40)
env = Environment(camera, vis=True, asset_root='./owg_robot/assets', debug=False, finger_length=0.06)

# load objects
objects = YcbObjects('./owg_robot/assets/ycb_objects',
                    mod_orn=['ChipsCan', 'MustardBottle', 'TomatoSoupCan'],
                    mod_stiffness=['Strawberry'],
                    seed=42
)
objects.shuffle_objects()

n_objects = 12

for obj_name in objects.obj_names[:n_objects]:
    path, mod_orn, mod_stiffness = objects.get_obj_info(obj_name)
    env.load_isolated_obj(path, obj_name, mod_orn, mod_stiffness)
env.dummy_simulation_steps(10)

In [None]:
from third_party.grconvnet import *
from owg.utils.grasp import Grasp2D
import sys
sys.path.append('/home/owner/OWG/third_party/grconvnet')

grasp_generator = load_grasp_generator(camera)

In [None]:
def setup_grasps(env, grasp_generator, visualise_grasps=False):
        rgb, depth, seg = env.camera.get_cam_img()    
        img_size = grasp_generator.IMG_WIDTH
        if  img_size != camera.width: 
            rgb = cv2.resize(rgb, (img_size, img_size))
            depth = cv2.resize(depth, (img_size, img_size))
        for obj_id in env.obj_ids:
            mask = seg == obj_id
            if img_size != camera.width:
                mask = np.array(Image.fromarray(mask).resize((img_size, img_size), Image.LANCZOS))
            grasps, grasp_rects = grasp_generator.predict_grasp_from_mask(rgb,
                                                           depth,
                                                           mask,
                                                           n_grasps=5, 
                                                           show_output=True
            )
            if img_size != camera.width:
                # normalize to original size
                for j, gr in enumerate(grasp_rects):
                    grasp_rects[j][0] = int(gr[0] / img_size * camera.width)
                    grasp_rects[j][1] = int(gr[1] / img_size * camera.width)
                    grasp_rects[j][4] = int(gr[4] / img_size * camera.width)
                    grasp_rects[j][3] = int(gr[3] / img_size * camera.width)
            grasp_rects = [Grasp2D.from_vector(
                x=g[1], y=g[0], w=g[4], h=g[3], theta=g[2], W=camera.width, H=camera.width, normalized=False, line_offset=5,
            ) for g in grasp_rects]
            env.set_obj_grasps(obj_id, grasps, grasp_rects)
        
        if visualise_grasps:
            LID =[]
            for obj_id in env.obj_ids:
                grasps = env.get_obj_grasps(obj_id)
                color = np.random.rand(3).tolist()
                for g in grasps:
                    LID = env.draw_predicted_grasp(g,color=color,lineIDs=LID)
            
            time.sleep(1)
            env.remove_drawing(LID)
            env.dummy_simulation_steps(10)

In [None]:
# run and visualize grasps -- check in your Pybullet client
setup_grasps(env, grasp_generator, visualise_grasps=True)

In [None]:
obs = env.get_obs()

all_grasp_rects = {k: env.get_obj_grasp_rects(k) for k in env.obj_ids }

In [None]:
# # Helper function for dev to reload modules
# import  importlib
# import owg.visual_prompt
# importlib.reload(owg.visual_prompt)
# from owg.visual_prompt import VisualPrompterGrounding, VisualPrompterPlanning, VisualPrompterGraspRanking

## Referring Segmentation

In [None]:
from getpass import getpass

# openai_api_key = getpass()
# os.environ['OPENAI_API_KEY'] = openai_api_key
openai_api_key = "test"
os.environ['OPENAI_API_KEY'] = openai_api_key

In [None]:
from owg.visual_prompt import VisualPrompterGrounding, VisualPrompterPlanning, VisualPrompterGraspRanking

In [None]:
config_path = 'config/pyb/OWG_mod.yaml'

In [None]:
grounder = VisualPrompterGrounding(config_path, debug=True)

In [None]:
image, seg = obs['image'], obs['seg']
obj_ids = np.unique(seg)[1:]
all_masks = np.stack([seg == objID for objID in obj_ids])
marker_data = {'masks': all_masks, 'labels': obj_ids}

In [None]:
# show visual prompt
visual_promppt, _ = grounder.prepare_image_prompt(image.copy(), marker_data)
marked_image_grounding = visual_promppt[-1]
display_image(marked_image_grounding, (6,6))

In [None]:
# VLM request
# user_query = "I want to cut some paper"
user_query = "I want to remove some nails"
# user_query = "I want to play tennis"

dets, target_mask, target_ids, metadata_ground = grounder.request(text_query=user_query,image=image.copy(),data=marker_data)

In [None]:
target_id = target_ids[0] # assume single correct object
print(target_id)
display_image(target_mask, (6,6))

## Grasp Planning

In [None]:
planner = VisualPrompterPlanning(config_path, debug=True)

In [None]:
plan, metadata_plan = planner.request(text_query=target_id,
                                    image=image.copy(),
                                    data=marker_data)
# action = plan[0]
action = plan

In [None]:
action

## Grasp Ranking

In [None]:
grasp_ranker = VisualPrompterGraspRanking(config_path, debug=True)

In [None]:
plt.imshow(seg)  # Visualize segmentation
plt.show()
print("Unique segmentation values:", np.unique(seg))


In [None]:
print("Extracted obj_ids from seg:", obj_ids)
print("Expected obj_ids from env:", env.obj_ids)
print("Number of extracted obj_ids:", len(obj_ids))

print("Unique object IDs in segmentation:", np.unique(seg))
print("Expected object IDs from env:", env.obj_ids)

all_masks.shape

### Iterable Ranking for Multiple Actions

In [None]:
from datetime import datetime
from owg_mod.tracker import GraspStatsTracker
# from owg_mod.mlflow_logger import MLflowLogger

tracker = GraspStatsTracker()

# mlogger = MLflowLogger(experiment_name="OWG_LLM_Tracing")
# mlogger.start_run(run_name="grasp_experiment")

In [None]:
def execute_and_track_actions(actions, env, image, all_masks, all_grasp_rects, obj_ids, grasp_ranker, tracker):
    if isinstance(actions, dict):
        actions = [actions]

    for act in actions:
        obj_id = act['input']
        obj_grasps = all_grasp_rects[obj_id]
        obj_mask = all_masks[np.where(obj_ids == obj_id)[0][0]]
        req_data = {'grasps': obj_grasps, 'mask': obj_mask}

        # Optional: show visual prompt
        visual_prompt, _ = grasp_ranker.prepare_image_prompt(image.copy(), req_data)
        marked_image_grasping = visual_prompt[-1]
        display_image(marked_image_grasping, (12, 6))

        # Rank grasps
        sorted_grasps, best_grasp, sorted_grasp_indices, metadata_rank = grasp_ranker.request(image.copy(), req_data)
        act['grasps'] = sorted_grasp_indices

        # Execute grasp
        if act['action'] == 'remove':
            success_grasp, success_target, num_attempts = env.put_obj_in_free_space(obj_id, grasp_indices=act['grasps'])
        elif act['action'] == 'pick':
            success_grasp, success_target, num_attempts = env.put_obj_in_tray(obj_id, grasp_indices=act['grasps'])
        else:
            print(f"Unknown action type: {act['action']}")
            continue

        for _ in range(30):
            env.step_simulation()

        # Log result
        tracker.record_grasp(
            success=success_grasp,
            object_id=obj_id,
            position=env.get_obj_pos(obj_id),
            retries=num_attempts - 1,
            grasp_index=act['grasps'],
            additional_info={"timestamp": datetime.now().isoformat()}
        )
        # mlogger.log_metrics({
        #     "success": float(success_grasp),
        #     "cumulative_success_rate": tracker.get_success_rate(),
        # }, step=tracker.total_grasps)

        tracker.set_metadata(metadata_rank, module_name="ranker")
        # mlogger.log_metrics({
        #     "ranker_entropy": metadata_rank.get("entropy", -1),
        #     "ranker_confidence": metadata_rank.get("confidence", -1),
        # })

In [None]:
execute_and_track_actions(
    actions=action,  # single dict or list of dicts
    env=env,
    image=image,
    all_masks=all_masks,
    all_grasp_rects=all_grasp_rects,
    obj_ids=obj_ids,
    grasp_ranker=grasp_ranker,
    tracker=tracker
)

## Experiment Tracking

In [None]:
# Helper function for dev to reload modules
# import importlib
# import owg_mod.tracker
# importlib.reload(owg_mod.tracker)

# from owg_mod.tracker import GraspStatsTracker

In [None]:
# Set metadata (if your class expects a single dict, combine them first)
# tracker.set_metadata({
#     "grounding": metadata_ground,
#     "planning": metadata_plan,
# })

tracker.set_metadata(metadata_ground, module_name="grounder")
# Log LLM uncertainty to MLflow
# mlogger.log_metrics({
#     "grounder_entropy": metadata_ground.get("entropy", -1),
#     "grounder_confidence": metadata_ground.get("confidence", -1),
# })

tracker.set_metadata(metadata_plan, module_name="planner")
# mlogger.log_metrics({
#     "planner_entropy": metadata_plan.get("entropy", -1),
#     "planner_confidence": metadata_plan.get("confidence", -1),
# })

# Set model settings and variants
tracker.set_model_settings({
    "grounder": grounder.get_model_params(),
    "ranker": grasp_ranker.get_model_params(),
    "planner": planner.get_model_params()
})

tracker.set_prompt_variants({
    "grounder": grounder.get_variants(),
    "ranker": grasp_ranker.get_variants(),
    "planner": planner.get_variants()
})

# (Optional) Tag with date, model type, or prompt variant
# mlogger.log_params({
#     "grounder_model": grounder.model_name,
#     "ranker_model": grasp_ranker.model_name,
#     "planner_model": planner.model_name,
#     "prompt_variants": tracker.get_prompt_variants()
# })

# Print report
print("Overall success rate:", tracker.get_success_rate())
print("Grasp log:", tracker.get_log())
summary = tracker.get_summary()
print("Experiment summary:", summary)
tracker.save_uncertainty_log(summary)
print("âœ… Uncertainty log saved to logs/uncertainty_logs.jsonl")

In [None]:
tracker.get_success_rate_per_object()

In [None]:
# p.disconnect()

## Visualization

In [None]:
# from owg_mod.experiment_viz import ExperimentVisualizer

# visualizer = ExperimentVisualizer(tracker)
# visualizer.generate_full_report(output_dir="experiment_results")

In [None]:
# # # Helper function for dev to reload modules
# import importlib
# import owg_mod.experiment_visualizer
# importlib.reload(owg_mod.experiment_visualizer)
# from owg_mod.experiment_visualizer import ExperimentVisualizer

In [None]:
# # Save metadata + logs to MLflow
# summary_dict = tracker.get_summary()
# mlogger.log_dict(summary_dict, artifact_file="experiment_summary.json")

# # Save plots as artifacts
# import glob
# for fig_path in glob.glob("experiment_results/*.png"):
#     mlogger.log_artifact(fig_path, artifact_path="plots")

# # Close MLflow run
# mlogger.end_run()