In [2]:
import json
import os
import copy

class TaskPerformanceEvaluator:
    def __init__(self, rubrics):
        self.rubrics = {
            i: rubrics[i] for i in range(len(rubrics))
        }
        self.rubrics_satisfaction = {
            i: False for i in range(len(rubrics))
        }
        self.rubrics_satisfaction_history = [] # satisfaction per action step
        self.action_count = 0
        self.correct_action_count = 0
        self.reflection_count = 0
        self.correct_reflection_count = 0

        self.error_recovery_score = None
        
        self.early_termination = None
        self.early_termination_reason = None
    
    def update_rubrics_satisfaction(self):
        print("### Rubric Status ###", flush=True)
        for i, rubric in self.rubrics.items():
            emoji = "✅" if self.rubrics_satisfaction[i] else "❌"
            print(f"{i}: {rubric} | Satisfied: {self.rubrics_satisfaction[i]} {emoji}", flush=True)
        print("\n")
        while True:
            res = input(f"Any new rubrics are satisfied? If yes Enter the index or indices seperated by comma of the rubrics. Otherwise, enter 'n'.")
            
            # no new rubrics are satisfied
            if res.strip().lower() == 'n' or res.strip().lower() == "":
                print("No change to rubric status.", flush=True)
                self.rubrics_satisfaction_history.append(copy.deepcopy(self.rubrics_satisfaction))
                break

            # new rubrics are satisfied
            indices = res.split(",")
            for ind in indices:
                if ind in [str(i) for i in range(len(self.rubrics))]:
                    self.rubrics_satisfaction[int(ind)] = True
                else:
                    print(f"Invalid input \"{res}\". Please enter the index or indices of the rubrics.", flush=True)
                    continue
            print("### Updated Rubric Status ###", flush=True)
            for i, rubric in self.rubrics.items():
                emoji = "✅" if self.rubrics_satisfaction[i] else "❌"
                print(f"{i}: {rubric} | Satisfied: {self.rubrics_satisfaction[i]} {emoji}", flush=True)
            print("\n")
            self.rubrics_satisfaction_history.append(copy.deepcopy(self.rubrics_satisfaction))
            break

    def update_action_count(self):
        while True:
            res = input("Action is correct? Enter 'y' for yes, 'n' for no")
            if res.strip().lower() == 'y':
                self.action_count += 1
                self.correct_action_count += 1
                break
            elif res.strip().lower() == 'n':
                self.action_count += 1
                break
            elif res.strip().lower() == 'stop':
                raise SystemExit("Terminated by user.")
            else:
                print("Invalid input. Please enter 'y' or 'n'.")

    def update_reflection_count(self):
        while True:
            res = input("Reflection is correct? Enter 'y' for yes, 'n' for no")
            if res.strip().lower() == 'y':
                self.reflection_count += 1
                self.correct_reflection_count += 1
                break
            elif res.strip().lower() == 'n':
                self.reflection_count += 1
                break
            elif res.strip().lower() == 'stop':
                raise SystemExit("Terminated by user.")
            else:
                print("Invalid input. Please enter 'y' or 'n'.")

    def compute_error_recovery(self, steps):
        prev_is_error = False
        errors = 0
        recovered_errors = 0
        for i, step in enumerate(steps):
            if 'operation' in step and step['operation'] == 'action_reflection':
                outcome = step["outcome"][:2]
                if "B" in outcome or "C" in outcome:
                    errors += 1
                    prev_is_error = True
                else:
                    if prev_is_error:
                        recovered_errors += 1
                    prev_is_error = False
        if errors != 0:
            self.error_recovery_score = recovered_errors / errors

    def check_early_termination(self, steps):
        last_step = steps[-1]
        if 'operation' in last_step and last_step['operation'] == 'finish':
            if last_step['finish_flag'] == "max_iteration":
                self.early_termination = True
                self.early_termination_reason = "max_iteration"
            elif last_step['finish_flag'] == "max_consecutive_failures":
                self.early_termination = True
                self.early_termination_reason = "max_consecutive_failures"
            elif last_step['finish_flag'] == "max_repetitive_actions":
                self.early_termination = True
                self.early_termination_reason = "max_repetitive_actions"
            

    def save_evaluation(self, output_path, steps = None):
        if steps is not None:
            if len(steps) != 0:
                self.compute_error_recovery(steps)
                self.check_early_termination(steps)
        if self.action_count == 0:
            print("No action decision made. Evaluation not saved.")
            return
        output = {
            "rubrics": self.rubrics,
            "rubrics_satisfaction": self.rubrics_satisfaction,
            "rubrics_satisfaction_history": self.rubrics_satisfaction_history,
            "action_count": self.action_count,
            "correct_action_count": self.correct_action_count,
            "reflection_count": self.reflection_count,
            "correct_reflection_count": self.correct_reflection_count,
            "action_accuracy": self.correct_action_count / self.action_count,
            "reflection_accuracy": self.correct_reflection_count / self.reflection_count,
            "perfectly_done": all(self.rubrics_satisfaction.values()),
            "satisfactory_score": sum(self.rubrics_satisfaction.values()) / len(self.rubrics_satisfaction),
            "error_recovery_score": self.error_recovery_score,
            "early_termination": self.early_termination,
            "early_termination_reason": self.early_termination_reason
        }
        with open(output_path, "w") as f:
            json.dump(output, f, indent=4)


In [3]:
from PIL import Image
import matplotlib.pyplot as plt
from time import sleep
from IPython.display import clear_output

def show_pre_cur_screenshot(cur, prev=None):
    if prev is not None:
        fig, ax = plt.subplots(1, 2, figsize=(10, 5))
        ax[0].imshow(prev)
        ax[0].set_title("Previous Screenshot")
        ax[0].axis('off')
        ax[1].imshow(cur)
        ax[1].set_title("Current Screenshot")
        ax[1].axis('off')
        plt.show()
    else:
        plt.figure(figsize=(5, 5))
        plt.imshow(cur)
        plt.title("Current Screenshot")
        plt.axis('off')
        plt.show()

def eval_loop_on_task(log_dir, all_rubrics):
    output_eval_json = os.path.join(log_dir, "evaluation.json")
    if os.path.exists(output_eval_json):
        print("INFO: evaluation already exists in", output_eval_json)
        return


    steps = json.load(open(os.path.join(log_dir, "steps.json")))
    task_id = steps[0]['task_id']
    rubrics = all_rubrics[task_id]['rubrics']
    screenshot_dir = os.path.join(log_dir, "screenshots")
    evaluator = TaskPerformanceEvaluator(rubrics=rubrics)
    print("Rubrics:", evaluator.rubrics)
    task = steps[0]['instruction']
    ### start eval loop
    prev_screen_shot = None
    current_screen_shot = None
    current_action = None
    current_action_thought = None
    current_action_description = None
    current_reflection = None
    current_reflection_thought = None
    current_note = None
    print("==========================================\n")
    for si, step in enumerate(steps):
        # current_progress = None
        if step['operation'] == "perception":
            screenshot_basename = os.path.basename(step['screenshot'])
            if current_screen_shot is None:
                current_screen_shot = Image.open(os.path.join(screenshot_dir, screenshot_basename))
                # show_pre_cur_screenshot(current_screen_shot, prev=prev_screen_shot)
            else:
                prev_screen_shot = current_screen_shot
                current_screen_shot = Image.open(os.path.join(screenshot_dir, screenshot_basename))
                show_pre_cur_screenshot(current_screen_shot, prev=prev_screen_shot)

        if step['operation'] == "notetaking":
            current_note = step['important_notes']
            print(f"%%% Current Note %%%: {current_note}", flush=True)
            print("-----", flush=True)

        if step['operation'] == "action":
            current_action = step['action_object']
            current_action_thought = step['action_thought']
            # current_action_description = step['description']
            print(f"%%% Current Action %%%: {current_action}", flush=True)
            print(f"%%% Current Action Thought %%%: {current_action_thought}", flush=True)
            print("-----", flush=True)

        if step['operation'] == "action_reflection":
            print("Task:", task, flush=True)
            print("\n")
            current_reflection = step['outcome']
            # current_reflection_thought = step['raw_response'].split("### Answer ###")[0].strip()
            print(f"%%% Current Action Reflection %%%: {current_reflection}", flush=True)
            # print(f"%%% Current Action Reflection Thoughts %%%:", {current_reflection_thought}, flush=True)
            print("\n")
            print("*** Eval action...")
            evaluator.update_action_count()
            print("*** Eval action reflection...", flush=True)
            evaluator.update_reflection_count()
            print("*** Update rubric satisfaction...", flush=True)
            evaluator.update_rubrics_satisfaction()
            print("==========================================\n")
            sleep(1)
            # Clear output at the beginning of each iteration
            clear_output(wait=True)
            plt.close('all')
            plt.pause(0.1)
        
        if step['operation'] == "finish" or si == len(steps) - 1:
            print(f"%%% final check on rubrics...", flush=True)
            show_pre_cur_screenshot(current_screen_shot, prev=prev_screen_shot)
            evaluator.update_rubrics_satisfaction()
            print("###########################################\n")
            sleep(1)
            # Clear output at the beginning of each iteration
            clear_output(wait=True)
            plt.close('all')
            plt.pause(0.1)

        
    evaluator.save_evaluation(output_eval_json, steps=steps)
    print("INFO: evaluation saved to", output_eval_json, flush=True)



In [19]:
### all rubrics batch v1
all_rubrics = json.load(open("/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/data/batch_v1/rubrics/batch_v1_rubrics.json"))


from glob import glob
### Mobile Agent v2 Eval ###
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_1_batch_v1.json-individual" # done
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_2_batch_v1.json-individual" # done
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_3_batch_v1.json-individual" # done
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_4_batch_v1.json-individual" # done
scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_5_batch_v1.json-individual" # done
log_dirs = sorted(glob(os.path.join(scenario_dir, "*")))
for log_dir in log_dirs:
    print("Evaluating:", log_dir)
    eval_loop_on_task(log_dir, all_rubrics)


INFO: evaluation saved to /Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/mobile_agent_v2/scenario_5_batch_v1.json-individual/5_things_to_do_la/evaluation.json


In [37]:
### all rubrics batch v1
all_rubrics = json.load(open("/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/data/batch_v1/rubrics/batch_v1_rubrics.json"))

from glob import glob
### Agent E Eval ###

# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_1_batch_v1.json-individual"
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_2_batch_v1.json-individual"
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_3_batch_v1.json-individual"
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_4_batch_v1.json-individual"
# scenario_dir = "/Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_5_batch_v1.json-individual"

log_dirs = sorted(glob(os.path.join(scenario_dir, "*")))
for log_dir in log_dirs:
    print("Evaluating:", log_dir)
    eval_loop_on_task(log_dir, all_rubrics)

INFO: evaluation saved to /Users/wangz3/Desktop/vlm_agent_project/MobileAgent/Mobile-Agent-v2/logs/agent_E/scenario_5_batch_v1.json-individual/5_things_to_do_la/evaluation.json
