diff --git a/operate/actions.py b/operate/actions.py deleted file mode 100644 index 9da27ff7..00000000 --- a/operate/actions.py +++ /dev/null @@ -1,409 +0,0 @@ -import os -import time -import json -import base64 -import re -import io -import asyncio -import aiohttp - -from PIL import Image -from ultralytics import YOLO -import google.generativeai as genai -from operate.settings import Config -from operate.exceptions import ModelNotRecognizedException -from operate.utils.screenshot import ( - capture_screen_with_cursor, - add_grid_to_image, - capture_mini_screenshot_with_cursor, -) -from operate.utils.os import get_last_assistant_message -from operate.prompts import ( - format_vision_prompt, - format_accurate_mode_vision_prompt, - format_summary_prompt, - format_decision_prompt, - format_label_prompt, -) - - -from operate.utils.label import ( - add_labels, - parse_click_content, - get_click_position_in_percent, - get_label_coordinates, -) -from operate.utils.style import ( - ANSI_GREEN, - ANSI_RED, - ANSI_RESET, -) - - -# Load configuration -config = Config() - -client = config.initialize_openai_client() - -yolo_model = YOLO("./operate/model/weights/best.pt") # Load your trained model - - -async def get_next_action(model, messages, objective): - if model == "gpt-4": - return call_gpt_4_v(messages, objective) - if model == "gpt-4-with-som": - return await call_gpt_4_v_labeled(messages, objective) - elif model == "agent-1": - return "coming soon" - elif model == "gemini-pro-vision": - return call_gemini_pro_vision(messages, objective) - - raise ModelNotRecognizedException(model) - - -def call_gpt_4_v(messages, objective): - """ - Get the next action for Self-Operating Computer - """ - # sleep for a second - time.sleep(1) - try: - screenshots_dir = "screenshots" - if not os.path.exists(screenshots_dir): - os.makedirs(screenshots_dir) - - screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") - # Call the function to capture the screen with the cursor - capture_screen_with_cursor(screenshot_filename) - - new_screenshot_filename = os.path.join( - "screenshots", "screenshot_with_grid.png" - ) - - add_grid_to_image(screenshot_filename, new_screenshot_filename, 500) - # sleep for a second - time.sleep(1) - - with open(new_screenshot_filename, "rb") as img_file: - img_base64 = base64.b64encode(img_file.read()).decode("utf-8") - - previous_action = get_last_assistant_message(messages) - - vision_prompt = format_vision_prompt(objective, previous_action) - - vision_message = { - "role": "user", - "content": [ - {"type": "text", "text": vision_prompt}, - { - "type": "image_url", - "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, - }, - ], - } - - # create a copy of messages and save to pseudo_messages - pseudo_messages = messages.copy() - pseudo_messages.append(vision_message) - - response = client.chat.completions.create( - model="gpt-4-vision-preview", - messages=pseudo_messages, - presence_penalty=1, - frequency_penalty=1, - temperature=0.7, - max_tokens=300, - ) - - messages.append( - { - "role": "user", - "content": "`screenshot.png`", - } - ) - - content = response.choices[0].message.content - - return content - - except Exception as e: - print(f"Error parsing JSON: {e}") - return "Failed take action after looking at the screenshot" - - -def call_gemini_pro_vision(messages, objective): - """ - Get the next action for Self-Operating Computer using Gemini Pro Vision - """ - # sleep for a second - time.sleep(1) - try: - screenshots_dir = "screenshots" - if not os.path.exists(screenshots_dir): - os.makedirs(screenshots_dir) - - screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") - # Call the function to capture the screen with the cursor - capture_screen_with_cursor(screenshot_filename) - - new_screenshot_filename = os.path.join( - "screenshots", "screenshot_with_grid.png" - ) - - add_grid_to_image(screenshot_filename, new_screenshot_filename, 500) - # sleep for a second - time.sleep(1) - - previous_action = get_last_assistant_message(messages) - - vision_prompt = format_vision_prompt(objective, previous_action) - - model = genai.GenerativeModel("gemini-pro-vision") - - response = model.generate_content( - [vision_prompt, Image.open(new_screenshot_filename)] - ) - - # create a copy of messages and save to pseudo_messages - pseudo_messages = messages.copy() - pseudo_messages.append(response.text) - - messages.append( - { - "role": "user", - "content": "`screenshot.png`", - } - ) - content = response.text[1:] - - return content - - except Exception as e: - print(f"Error parsing JSON: {e}") - return "Failed take action after looking at the screenshot" - - -# This function is not used. `-accurate` mode was removed for now until a new PR fixes it. -def accurate_mode_double_check(model, pseudo_messages, prev_x, prev_y): - """ - Reprompt OAI with additional screenshot of a mini screenshot centered around the cursor for further finetuning of clicked location - """ - try: - screenshot_filename = os.path.join("screenshots", "screenshot_mini.png") - capture_mini_screenshot_with_cursor( - file_path=screenshot_filename, x=prev_x, y=prev_y - ) - - new_screenshot_filename = os.path.join( - "screenshots", "screenshot_mini_with_grid.png" - ) - - with open(new_screenshot_filename, "rb") as img_file: - img_base64 = base64.b64encode(img_file.read()).decode("utf-8") - - accurate_vision_prompt = format_accurate_mode_vision_prompt(prev_x, prev_y) - - accurate_mode_message = { - "role": "user", - "content": [ - {"type": "text", "text": accurate_vision_prompt}, - { - "type": "image_url", - "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, - }, - ], - } - - pseudo_messages.append(accurate_mode_message) - - response = client.chat.completions.create( - model="gpt-4-vision-preview", - messages=pseudo_messages, - presence_penalty=1, - frequency_penalty=1, - temperature=0.7, - max_tokens=300, - ) - - content = response.choices[0].message.content - - except Exception as e: - print(f"Error reprompting model for accurate_mode: {e}") - return "ERROR" - - -def summarize(model, messages, objective): - try: - screenshots_dir = "screenshots" - if not os.path.exists(screenshots_dir): - os.makedirs(screenshots_dir) - - screenshot_filename = os.path.join(screenshots_dir, "summary_screenshot.png") - # Call the function to capture the screen with the cursor - capture_screen_with_cursor(screenshot_filename) - - summary_prompt = format_summary_prompt(objective) - - if model == "gpt-4-vision-preview": - with open(screenshot_filename, "rb") as img_file: - img_base64 = base64.b64encode(img_file.read()).decode("utf-8") - - summary_message = { - "role": "user", - "content": [ - {"type": "text", "text": summary_prompt}, - { - "type": "image_url", - "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, - }, - ], - } - - messages.append(summary_message) - - response = client.chat.completions.create( - model="gpt-4-vision-preview", - messages=messages, - max_tokens=500, - ) - - content = response.choices[0].message.content - elif model == "gemini-pro-vision": - model = genai.GenerativeModel("gemini-pro-vision") - summary_message = model.generate_content( - [summary_prompt, Image.open(screenshot_filename)] - ) - content = summary_message.text - return content - - except Exception as e: - print(f"Error in summarize: {e}") - return "Failed to summarize the workflow" - - -async def call_gpt_4_v_labeled(messages, objective): - time.sleep(1) - try: - screenshots_dir = "screenshots" - if not os.path.exists(screenshots_dir): - os.makedirs(screenshots_dir) - - screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") - # Call the function to capture the screen with the cursor - capture_screen_with_cursor(screenshot_filename) - - with open(screenshot_filename, "rb") as img_file: - img_base64 = base64.b64encode(img_file.read()).decode("utf-8") - - previous_action = get_last_assistant_message(messages) - - img_base64_labeled, img_base64_original, label_coordinates = add_labels( - img_base64, yolo_model - ) - - decision_prompt = format_decision_prompt(objective, previous_action) - labeled_click_prompt = format_label_prompt(objective) - - click_message = { - "role": "user", - "content": [ - {"type": "text", "text": labeled_click_prompt}, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{img_base64_labeled}" - }, - }, - ], - } - decision_message = { - "role": "user", - "content": [ - {"type": "text", "text": decision_prompt}, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{img_base64_original}" - }, - }, - ], - } - - click_messages = messages.copy() - click_messages.append(click_message) - decision_messages = messages.copy() - decision_messages.append(decision_message) - - click_future = fetch_openai_response_async(click_messages) - decision_future = fetch_openai_response_async(decision_messages) - - click_response, decision_response = await asyncio.gather( - click_future, decision_future - ) - - # Extracting the message content from the ChatCompletionMessage object - click_content = click_response.get("choices")[0].get("message").get("content") - - decision_content = ( - decision_response.get("choices")[0].get("message").get("content") - ) - - if not decision_content.startswith("CLICK"): - return decision_content - - label_data = parse_click_content(click_content) - - if label_data and "label" in label_data: - coordinates = get_label_coordinates(label_data["label"], label_coordinates) - image = Image.open( - io.BytesIO(base64.b64decode(img_base64)) - ) # Load the image to get its size - image_size = image.size # Get the size of the image (width, height) - click_position_percent = get_click_position_in_percent( - coordinates, image_size - ) - if not click_position_percent: - print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Failed to get click position in percent. Trying another method {ANSI_RESET}" - ) - return call_gpt_4_v(messages, objective) - - x_percent = f"{click_position_percent[0]:.2f}%" - y_percent = f"{click_position_percent[1]:.2f}%" - click_action = f'CLICK {{ "x": "{x_percent}", "y": "{y_percent}", "description": "{label_data["decision"]}", "reason": "{label_data["reason"]}" }}' - - else: - print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] No label found. Trying another method {ANSI_RESET}" - ) - return call_gpt_4_v(messages, objective) - - return click_action - - except Exception as e: - print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Something went wrong. Trying another method {ANSI_RESET}" - ) - return call_gpt_4_v(messages, objective) - - -async def fetch_openai_response_async(messages): - url = "https://api.openai.com/v1/chat/completions" - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {config.openai_api_key}", - } - data = { - "model": "gpt-4-vision-preview", - "messages": messages, - "frequency_penalty": 1, - "presence_penalty": 1, - "temperature": 0.7, - "max_tokens": 300, - } - - async with aiohttp.ClientSession() as session: - async with session.post( - url, headers=headers, data=json.dumps(data) - ) as response: - return await response.json() diff --git a/operate/config.py b/operate/config.py new file mode 100644 index 00000000..28b6a9ab --- /dev/null +++ b/operate/config.py @@ -0,0 +1,68 @@ +import os +import sys +from dotenv import load_dotenv +from openai import OpenAI +from prompt_toolkit.shortcuts import input_dialog + + +class Config: + """ + Configuration class for managing settings. + + Attributes: + debug (bool): Flag indicating whether debug mode is enabled. + openai_api_key (str): API key for OpenAI. + google_api_key (str): API key for Google. + """ + + def __init__(self): + load_dotenv() + self.verbose = False + self.openai_api_key = os.getenv("OPENAI_API_KEY") + self.google_api_key = os.getenv("GOOGLE_API_KEY") + + def initialize_openai(self): + if self.openai_api_key: + client = OpenAI() + client.api_key = self.openai_api_key + client.base_url = os.getenv("OPENAI_API_BASE_URL", client.base_url) + return client + return None + + def validation(self, model, voice_mode): + """ + Validate the input parameters for the dialog operation. + """ + self.require_api_key( + "OPENAI_API_KEY", "OpenAI API key", model == "gpt-4" or voice_mode + ) + self.require_api_key( + "GOOGLE_API_KEY", "Google API key", model == "gemini-pro-vision" + ) + + def require_api_key(self, key_name, key_description, is_required): + if is_required and not getattr(self, key_name.lower()): + self.prompt_and_save_api_key(key_name, key_description) + + def prompt_and_save_api_key(self, key_name, key_description): + key_value = input_dialog( + title="API Key Required", text=f"Please enter your {key_description}:" + ).run() + + if key_value is None: # User pressed cancel or closed the dialog + sys.exit("Operation cancelled by user.") + + if key_value: + self.save_api_key_to_env(key_name, key_value) + load_dotenv() # Reload environment variables + # Update the instance attribute with the new key + + if key_value: + self.save_api_key_to_env(key_name, key_value) + load_dotenv() # Reload environment variables + setattr(self, key_name.lower(), key_value) + + @staticmethod + def save_api_key_to_env(key_name, key_value): + with open(".env", "a") as file: + file.write(f"\n{key_name}='{key_value}'") diff --git a/operate/main.py b/operate/main.py index 8b2df0c9..3cf991da 100644 --- a/operate/main.py +++ b/operate/main.py @@ -3,7 +3,7 @@ """ import argparse from operate.utils.style import ANSI_BRIGHT_MAGENTA -from operate.dialog import main +from operate.operate import main def main_entry(): diff --git a/operate/models/__init__.py b/operate/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/operate/models/apis.py b/operate/models/apis.py new file mode 100644 index 00000000..da5de6e2 --- /dev/null +++ b/operate/models/apis.py @@ -0,0 +1,321 @@ +import os +import time +import json +import base64 +import traceback +import io + + +from PIL import Image +from ultralytics import YOLO +import google.generativeai as genai +from operate.config import Config +from operate.exceptions import ModelNotRecognizedException +from operate.utils.screenshot import ( + capture_screen_with_cursor, +) +from operate.models.prompts import ( + get_user_first_message_prompt, + get_user_prompt, + get_system_prompt, +) + + +from operate.utils.label import ( + add_labels, + get_click_position_in_percent, + get_label_coordinates, +) +from operate.utils.style import ( + ANSI_GREEN, + ANSI_RED, + ANSI_RESET, +) + + +# Load configuration +VERBOSE = Config().verbose + + +async def get_next_action(model, messages, objective, session_id): + if model == "gpt-4": + return call_gpt_4_vision_preview(messages), None + if model == "gpt-4-with-som": + operation = await call_gpt_4_vision_preview_labeled(messages, objective) + return operation, None + elif model == "agent-1": + return "coming soon" + elif model == "gemini-pro-vision": + return call_gemini_pro_vision(messages, objective), None + + raise ModelNotRecognizedException(model) + + +def call_gpt_4_vision_preview(messages): + config = Config() + client = config.initialize_openai() + if VERBOSE: + print("[Self Operating Computer][get_next_action][call_gpt_4_v]") + time.sleep(1) + try: + screenshots_dir = "screenshots" + if not os.path.exists(screenshots_dir): + os.makedirs(screenshots_dir) + + screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") + # Call the function to capture the screen with the cursor + capture_screen_with_cursor(screenshot_filename) + + with open(screenshot_filename, "rb") as img_file: + img_base64 = base64.b64encode(img_file.read()).decode("utf-8") + + if len(messages) == 1: + user_prompt = get_user_first_message_prompt() + else: + user_prompt = get_user_prompt() + + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_v] user_prompt", + user_prompt, + ) + + vision_message = { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}, + }, + ], + } + messages.append(vision_message) + + response = client.chat.completions.create( + model="gpt-4-vision-preview", + messages=messages, + presence_penalty=1, + frequency_penalty=1, + temperature=0.7, + max_tokens=3000, + ) + + content = response.choices[0].message.content + + if content.startswith("```json"): + content = content[len("```json") :] # Remove starting ```json + if content.endswith("```"): + content = content[: -len("```")] # Remove ending + + assistant_message = {"role": "assistant", "content": content} + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_v] content", + content, + ) + content = json.loads(content) + + messages.append(assistant_message) + + return content + + except Exception as e: + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Something went wrong. Trying again {ANSI_RESET}", + e, + ) + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] AI response was {ANSI_RESET}", + content, + ) + traceback.print_exc() + return call_gpt_4_vision_preview(messages) + + +def call_gemini_pro_vision(messages, objective): + """ + Get the next action for Self-Operating Computer using Gemini Pro Vision + """ + # sleep for a second + time.sleep(1) + try: + screenshots_dir = "screenshots" + if not os.path.exists(screenshots_dir): + os.makedirs(screenshots_dir) + + screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") + # Call the function to capture the screen with the cursor + capture_screen_with_cursor(screenshot_filename) + # sleep for a second + time.sleep(1) + prompt = get_system_prompt(objective) + + model = genai.GenerativeModel("gemini-pro-vision") + + response = model.generate_content([prompt, Image.open(screenshot_filename)]) + + content = response.text[1:] + + content = json.loads(content) + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gemini_pro_vision] content", + content, + ) + + return content + + except Exception as e: + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Something went wrong. Trying another method {ANSI_RESET}", + e, + ) + return call_gpt_4_vision_preview(messages) + + +async def call_gpt_4_vision_preview_labeled(messages, objective): + config = Config() + client = config.initialize_openai() + time.sleep(1) + try: + yolo_model = YOLO("./operate/models/weights/best.pt") # Load your trained model + screenshots_dir = "screenshots" + if not os.path.exists(screenshots_dir): + os.makedirs(screenshots_dir) + + screenshot_filename = os.path.join(screenshots_dir, "screenshot.png") + # Call the function to capture the screen with the cursor + capture_screen_with_cursor(screenshot_filename) + + with open(screenshot_filename, "rb") as img_file: + img_base64 = base64.b64encode(img_file.read()).decode("utf-8") + + img_base64_labeled, label_coordinates = add_labels(img_base64, yolo_model) + + if len(messages) == 1: + user_prompt = get_user_first_message_prompt() + else: + user_prompt = get_user_prompt() + + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] user_prompt", + user_prompt, + ) + + vision_message = { + "role": "user", + "content": [ + {"type": "text", "text": user_prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{img_base64_labeled}" + }, + }, + ], + } + messages.append(vision_message) + + response = client.chat.completions.create( + model="gpt-4-vision-preview", + messages=messages, + presence_penalty=1, + frequency_penalty=1, + temperature=0.7, + max_tokens=1000, + ) + + content = response.choices[0].message.content + + if content.startswith("```json"): + content = content[len("```json") :] # Remove starting ```json + if content.endswith("```"): + content = content[: -len("```")] # Remove ending + + assistant_message = {"role": "assistant", "content": content} + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] content", + content, + ) + messages.append(assistant_message) + + content = json.loads(content) + + processed_content = [] + + for operation in content: + if operation.get("operation") == "click": + label = operation.get("label") + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] label", + label, + ) + + coordinates = get_label_coordinates(label, label_coordinates) + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] coordinates", + coordinates, + ) + image = Image.open( + io.BytesIO(base64.b64decode(img_base64)) + ) # Load the image to get its size + image_size = image.size # Get the size of the image (width, height) + click_position_percent = get_click_position_in_percent( + coordinates, image_size + ) + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] click_position_percent", + click_position_percent, + ) + if not click_position_percent: + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Failed to get click position in percent. Trying another method {ANSI_RESET}" + ) + return call_gpt_4_vision_preview(messages) + + x_percent = f"{click_position_percent[0]:.2f}" + y_percent = f"{click_position_percent[1]:.2f}" + operation["x"] = x_percent + operation["y"] = y_percent + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] new click operation", + operation, + ) + processed_content.append(operation) + else: + processed_content.append(operation) + + if VERBOSE: + print( + "[Self Operating Computer][get_next_action][call_gpt_4_vision_preview_labeled] new processed_content", + processed_content, + ) + return processed_content + + except Exception as e: + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] Something went wrong. Trying another method {ANSI_RESET}", + e, + ) + return call_gpt_4_vision_preview(messages) + + +def get_last_assistant_message(messages): + """ + Retrieve the last message from the assistant in the messages array. + If the last assistant message is the first message in the array, return None. + """ + for index in reversed(range(len(messages))): + if messages[index]["role"] == "assistant": + if index == 0: # Check if the assistant message is the first in the array + return None + else: + return messages[index] + return None # Return None if no assistant message is found diff --git a/operate/models/prompts.py b/operate/models/prompts.py new file mode 100644 index 00000000..e3c9f71d --- /dev/null +++ b/operate/models/prompts.py @@ -0,0 +1,242 @@ +import platform + +# General user Prompts +USER_QUESTION = "Hello, I can help you with anything. What would you like done?" + + +SYSTEM_PROMPT_MAC = """ +You are operating a computer, using the same operating system as a human. + +From looking at the screen, the objective, and your previous actions, take the next best series of action. + +You have 4 possible operation actions available to you. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. + +1. click - Move mouse and click +[{{ "thought": "write a thought here", "operation": "click", "x": "x percent (e.g. 0.10)", "y": "y percent (e.g. 0.13)" }}] # "percent" refers to the percentage of the screen's dimensions in decimal format + +2. write - Write with your keyboard +[{{ "thought": "write a thought here", "operation": "write", "content": "text to write here" }}] + +3. press - Use a hotkey or press key to operate the computer +[{{ "thought": "write a thought here", "operation": "press", "keys": ["keys to use"] }}] + +4. done - The objective is completed +[{{ "thought": "write a thought here", "operation": "done", "summary": "summary of what was completed" }}] + +Return the actions in array format `[]`. You can take just one action or multiple actions. + +Here are some helpful combinations: + +# Opens Spotlight Search on Mac +[ + {{ "thought": "Searching the operating system to find Google Chrome because it appears I am currently in terminal", "operation": "press", "keys": ["command", "space"] }}, + {{ "thought": "Now I need to write 'Google Chrome' as a next step", "operation": "write", "content": "Google Chrome" }}, + {{ "thought": "Finally I'll press enter to open Google Chrome assuming it is available", "operation": "press", "keys": ["enter"] }} +] + +# Focuses on the address bar in a browser before typing a website +[ + {{ "I'll focus on the address bar in the browser. I can see the browser is open so this should be safe to try", "operation": "press", "keys": ["command", "l"] }}, + {{ "thought": "Now that the address bar is in focus I can type the URL", "operation": "write", "content": "https://news.ycombinator.com/" }}, + {{ "thought": "I'll need to press enter to go the URL now", "operation": "press", "keys": ["enter"] }} +] + +A few important notes: + +- Go to Google Docs and Google Sheets by typing in the Chrome Address bar +- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. + +Objective: {objective} # take the next best action for this objective +""" + +SYSTEM_PROMPT_WIN_LINUX = """ +You are operating a computer, using the same operating system as a human. + +From looking at the screen, the objective, and your previous actions, take the next best series of action. + +You have 4 possible operation actions available to you. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. + +1. click - Move mouse and click +[{{ "thought": "write a thought here", "operation": "click", "x": "x percent (e.g. 0.10)", "y": "y percent (e.g. 0.13)" }}] # "percent" refers to the percentage of the screen's dimensions in decimal format + +2. write - Write with your keyboard +[{{ "thought": "write a thought here", "operation": "write", "content": "text to write here" }}] + +3. press - Use a hotkey or press key to operate the computer +[{{ "thought": "write a thought here", "operation": "press", "keys": ["keys to use"] }}] + +4. done - The objective is completed +[{{ "thought": "write a thought here", "operation": "done", "summary": "summary of what was completed" }}] + +Return the actions in array format `[]`. You can take just one action or multiple actions. + +Here are some helpful combinations: + +# Opens Menu Search on Windows and Linux +[ + {{ "thought": "Searching the operating system to find Google Chrome because it appears I am currently in terminal", "operation": "press", "keys": ["win"] }}, + {{ "thought": "Now I need to write 'Google Chrome' as a next step", "operation": "write", "content": "Google Chrome" }}, + {{ "thought": "Finally I'll press enter to open Google Chrome assuming it is available", "operation": "press", "keys": ["enter"] }} +] + +# Focuses on the address bar in a browser before typing a website +[ + {{ "I'll focus on the address bar in the browser. I can see the browser is open so this should be safe to try", "operation": "press", "keys": ["ctrl", "l"] }}, + {{ "thought": "Now that the address bar is in focus I can type the URL", "operation": "write", "content": "https://news.ycombinator.com/" }}, + {{ "thought": "I'll need to press enter to go the URL now", "operation": "press", "keys": ["enter"] }} +] + +A few important notes: + +- Go to Google Docs and Google Sheets by typing in the Chrome Address bar +- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. + +Objective: {objective} # take the next best action for this objective +""" + + +SYSTEM_PROMPT_LABELED_MAC = """ +You are operating a computer, using the same operating system as a human. + +From looking at the screen, the objective, and your previous actions, take the next best series of action. + +You have 4 possible operation actions available to you. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. + +1. click - Move mouse and click - We labeled the clickable elements with red bounding boxes and IDs. Label IDs are in the following format with `x` being a number: `~x` +[{{ "thought": "write a thought here", "operation": "click", "label": "~x" }}] # 'percent' refers to the percentage of the screen's dimensions in decimal format + +2. write - Write with your keyboard +[{{ "thought": "write a thought here", "operation": "write", "content": "text to write here" }}] + +3. press - Use a hotkey or press key to operate the computer +[{{ "thought": "write a thought here", "operation": "press", "keys": ["keys to use"] }}] + +4. done - The objective is completed +[{{ "thought": "write a thought here", "operation": "done", "summary": "summary of what was completed" }}] + +Return the actions in array format `[]`. You can take just one action or multiple actions. + +Here are some helpful combinations: + +# Opens Spotlight Search on Mac +[ + {{ "thought": "Searching the operating system to find Google Chrome because it appears I am currently in terminal", "operation": "press", "keys": ["command", "space"] }}, + {{ "thought": "Now I need to write 'Google Chrome' as a next step", "operation": "write", "content": "Google Chrome" }}, +] + +# Focuses on the address bar in a browser before typing a website +[ + {{ "I'll focus on the address bar in the browser. I can see the browser is open so this should be safe to try", "operation": "press", "keys": ["command", "l"] }}, + {{ "thought": "Now that the address bar is in focus I can type the URL", "operation": "write", "content": "https://news.ycombinator.com/" }}, + {{ "thought": "I'll need to press enter to go the URL now", "operation": "press", "keys": ["enter"] }} +] + +# Send a "Hello World" message in the chat +[ + {{ "thought": "I see a messsage field on this page near the button. It looks like it has a label", "operation": "click", "label": "~34" }}, + {{ "thought": "Now that I am focused on the message field, I'll go ahead and write ", "operation": "write", "content": "Hello World" }}, +] + +A few important notes: + +- Go to Google Docs and Google Sheets by typing in the Chrome Address bar +- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. + +Objective: {objective} # take the next best action for this objective +""" + +SYSTEM_PROMPT_LABELED_WIN_LINUX = """ +You are operating a computer, using the same operating system as a human. + +From looking at the screen, the objective, and your previous actions, take the next best series of action. + +You have 4 possible operation actions available to you. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. + +1. click - Move mouse and click - We labeled the clickable elements with red bounding boxes and IDs. Label IDs are in the following format with `x` being a number: `~x` +[{{ "thought": "write a thought here", "operation": "click", "label": "~x" }}] # 'percent' refers to the percentage of the screen's dimensions in decimal format + +2. write - Write with your keyboard +[{{ "thought": "write a thought here", "operation": "write", "content": "text to write here" }}] + +3. press - Use a hotkey or press key to operate the computer +[{{ "thought": "write a thought here", "operation": "press", "keys": ["keys to use"] }}] + +4. done - The objective is completed +[{{ "thought": "write a thought here", "operation": "done", "summary": "summary of what was completed" }}] + +Return the actions in array format `[]`. You can take just one action or multiple actions. + +Here are some helpful combinations: + +# Opens Menu Search on Windows and Linux +[ + {{ "thought": "Searching the operating system to find Google Chrome because it appears I am currently in terminal", "operation": "press", "keys": ["win"] }}, + {{ "thought": "Now I need to write 'Google Chrome' as a next step", "operation": "write", "content": "Google Chrome" }}, +] + +# Focuses on the address bar in a browser before typing a website +[ + {{ "I'll focus on the address bar in the browser. I can see the browser is open so this should be safe to try", "operation": "press", "keys": ["ctrl", "l"] }}, + {{ "thought": "Now that the address bar is in focus I can type the URL", "operation": "write", "content": "https://news.ycombinator.com/" }}, + {{ "thought": "I'll need to press enter to go the URL now", "operation": "press", "keys": ["enter"] }} +] + +# Send a "Hello World" message in the chat +[ + {{ "thought": "I see a messsage field on this page near the button. It looks like it has a label", "operation": "click", "label": "~34" }}, + {{ "thought": "Now that I am focused on the message field, I'll go ahead and write ", "operation": "write", "content": "Hello World" }}, +] + +A few important notes: + +- Go to Google Docs and Google Sheets by typing in the Chrome Address bar +- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. + +Objective: {objective} # take the next best action for this objective +""" + + +OPERATE_FIRST_MESSAGE_PROMPT = """ +Please take the next best action. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. Remember you only have the following 4 operations available: click, write, press, done + +Right now you are probably in the terminal because the human just started up. Remember + +Action:""" + +OPERATE_PROMPT = """ +Please take the next best action. The `pyautogui` library will be used to execute your decision. Your output will be used in a `json.loads` loads statement. Remember you only have the following 4 operations available: click, write, press, done +Action:""" + + +def get_system_prompt(objective): + """ + Format the vision prompt + """ + if platform.system() == "Darwin": + prompt = SYSTEM_PROMPT_MAC.format(objective=objective) + else: + prompt = SYSTEM_PROMPT_WIN_LINUX.format(objective=objective) + + return prompt + + +def get_system_prompt_labeled(objective): + """ + Format the vision prompt + """ + if platform.system() == "Darwin": + prompt = SYSTEM_PROMPT_LABELED_MAC.format(objective=objective) + else: + prompt = SYSTEM_PROMPT_LABELED_WIN_LINUX.format(objective=objective) + + return prompt + + +def get_user_prompt(): + prompt = OPERATE_PROMPT + return prompt + + +def get_user_first_message_prompt(): + prompt = OPERATE_FIRST_MESSAGE_PROMPT + return prompt diff --git a/operate/model/weights/best.pt b/operate/models/weights/best.pt similarity index 100% rename from operate/model/weights/best.pt rename to operate/models/weights/best.pt diff --git a/operate/dialog.py b/operate/operate.py similarity index 51% rename from operate/dialog.py rename to operate/operate.py index 6c95085b..e653ea76 100644 --- a/operate/dialog.py +++ b/operate/operate.py @@ -1,31 +1,36 @@ import sys import os -import platform +import time import asyncio from prompt_toolkit.shortcuts import message_dialog from prompt_toolkit import prompt from operate.exceptions import ModelNotRecognizedException -from operate.prompts import USER_QUESTION -from operate.settings import Config +import platform + +# from operate.models.prompts import USER_QUESTION, get_system_prompt +from operate.models.prompts import ( + USER_QUESTION, + get_system_prompt, + get_system_prompt_labeled, +) +from operate.config import Config from operate.utils.style import ( ANSI_GREEN, ANSI_RESET, - ANSI_BLUE, ANSI_YELLOW, ANSI_RED, ANSI_BRIGHT_MAGENTA, + ANSI_BLUE, style, ) -from operate.utils.os import ( - keyboard_type, - search, - click, -) -from operate.actions import get_next_action, summarize -from operate.utils.misc import parse_response +from operate.utils.operating_system import OperatingSystem +from operate.models.apis import get_next_action # Load configuration config = Config() +operating_system = OperatingSystem() + +VERBOSE = config.verbose def main(model, terminal_prompt, voice_mode=False): @@ -43,7 +48,7 @@ def main(model, terminal_prompt, voice_mode=False): mic = None # Initialize `WhisperMic`, if `voice_mode` is True - validation(model, voice_mode) + config.validation(model, voice_mode) if voice_mode: try: @@ -61,14 +66,15 @@ def main(model, terminal_prompt, voice_mode=False): if not terminal_prompt: message_dialog( title="Self-Operating Computer", - text="Ask a computer to do anything.", + text="An experimental framework to enable multimodal models to operate computers", style=style, ).run() + else: - print("Running direct prompt...") + if VERBOSE: + print("Running direct prompt...") - print("SYSTEM", platform.system()) - # Clear the console + # # Clear the console if platform.system() == "Windows": os.system("cls") else: @@ -90,25 +96,34 @@ def main(model, terminal_prompt, voice_mode=False): print(f"{ANSI_YELLOW}[User]{ANSI_RESET}") objective = prompt(style=style) - assistant_message = {"role": "assistant", "content": USER_QUESTION} - user_message = { - "role": "user", - "content": f"Objective: {objective}", - } - messages = [assistant_message, user_message] + if model == "gpt-4-with-som": + system_prompt = get_system_prompt_labeled(objective) + print("labeled prompt", system_prompt) + else: + system_prompt = get_system_prompt(objective) + system_message = {"role": "system", "content": system_prompt} + messages = [system_message] loop_count = 0 + session_id = None + while True: - if config.debug: - print("[loop] messages before next action:\n\n\n", messages[1:]) + if VERBOSE: + print("[Self Operating Computer]") + print("[Self Operating Computer] loop_count", loop_count) try: - response = asyncio.run(get_next_action(model, messages, objective)) + operations, session_id = asyncio.run( + get_next_action(model, messages, objective, session_id) + ) - action = parse_response(response) - action_type = action.get("type") - action_detail = action.get("data") + stop = operate(operations) + if stop: + break + loop_count += 1 + if loop_count > 10: + break except ModelNotRecognizedException as e: print( f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] -> {e} {ANSI_RESET}" @@ -120,73 +135,61 @@ def main(model, terminal_prompt, voice_mode=False): ) break - if action_type == "DONE": + +def operate(operations): + if VERBOSE: + print("[Self Operating Computer][operate]") + for operation in operations: + if VERBOSE: + print("[Self Operating Computer][operate] operation", operation) + # wait one second + time.sleep(1) + operate_type = operation.get("operation").lower() + operate_thought = operation.get("thought") + operate_detail = "" + if VERBOSE: + print("[Self Operating Computer][operate] operate_type", operate_type) + + if operate_type == "press" or operate_type == "hotkey": + keys = operation.get("keys") + operate_detail = keys + operating_system.press(keys) + elif operate_type == "write": + content = operation.get("content") + operate_detail = content + operating_system.write(content) + elif operate_type == "click": + x = operation.get("x") + y = operation.get("y") + click_detail = {"x": x, "y": y} + operate_detail = click_detail + + operating_system.mouse(click_detail) + elif operate_type == "done": + summary = operation.get("summary") + print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BLUE} Objective complete {ANSI_RESET}" + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BLUE} Objective Completed {ANSI_RESET}" ) - summary = summarize(model, messages, objective) print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BLUE} Summary\n{ANSI_RESET}{summary}" + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BLUE} Summary {ANSI_RESET}{summary}" ) - break + return True - if action_type != "UNKNOWN": - print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA} [Act] {action_type} {ANSI_RESET}{action_detail}" - ) - - function_response = "" - if action_type == "SEARCH": - function_response = search(action_detail) - elif action_type == "TYPE": - function_response = keyboard_type(action_detail) - elif action_type == "CLICK": - function_response = click(action_detail) else: print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] something went wrong :({ANSI_RESET}" + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] unknown operation response :({ANSI_RESET}" ) print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] AI response\n{ANSI_RESET}{response}" + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] AI response {ANSI_RESET}{operation}" ) - break + return True print( - f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA} [Act] {action_type} COMPLETE {ANSI_RESET}{function_response}" + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA} [Operate] Thought {ANSI_RESET} {operate_thought}" + ) + print( + f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA} [Operate] {operate_type} {ANSI_RESET} {operate_detail}" ) - message = { - "role": "assistant", - "content": function_response, - } - messages.append(message) - - loop_count += 1 - if loop_count > 15: - break - - -def validation(model, voice_mode): - """ - Validate the input parameters for the dialog operation. - - Args: - model (str): The model to be used for the dialog operation. - voice_mode (bool): Flag indicating whether to use voice mode. - - Raises: - SystemExit: If the input parameters are invalid. - - """ - - if voice_mode and not config.openai_api_key: - print("To use voice mode, please add an OpenAI API key") - sys.exit(1) - - if model == "gpt-4-vision-preview" and not config.openai_api_key: - print("To use `gpt-4-vision-preview` add an OpenAI API key") - sys.exit(1) - - if model == "gemini-pro-vision" and not config.google_api_key: - print("To use `gemini-pro-vision` add a Google API key") - sys.exit(1) + return False diff --git a/operate/prompts.py b/operate/prompts.py deleted file mode 100644 index 0c33c888..00000000 --- a/operate/prompts.py +++ /dev/null @@ -1,252 +0,0 @@ -from operate.settings import Config - -config = Config() -monitor_size = config.monitor_size - -# General user Prompts -USER_QUESTION = "Hello, I can help you with anything. What would you like done?" - -# constants for the vision prompt -ACCURATE_PIXEL_COUNT = ( - 200 # mini_screenshot is ACCURATE_PIXEL_COUNT x ACCURATE_PIXEL_COUNT big -) - -# ------------------------- -# VISION PROMPT -# ------------------------- -VISION_PROMPT = """ -You are a Self-Operating Computer. You use the same operating system as a human. - -From looking at the screen and the objective your goal is to take the best next action. - -To operate the computer you have the four options below. - -1. CLICK - Move mouse and click -2. TYPE - Type on the keyboard -3. SEARCH - Search for a program on Mac and open it -4. DONE - When you completed the task respond with the exact following phrase content - -Here are the response formats below. - -1. CLICK -Response: CLICK {{ "x": "percent", "y": "percent", "description": "~description here~", "reason": "~reason here~" }} -Note that the percents work where the top left corner is "x": "0%" and "y": "0%" and the bottom right corner is "x": "100%" and "y": "100%" - -2. TYPE -Response: TYPE - -3. SEARCH -Response: SEARCH - -4. DONE -Response: DONE - -Here are examples of how to respond. -__ -Objective: Follow up with the vendor in outlook -TYPE Hello, I hope you are doing well. I wanted to follow up -__ -Objective: Open Spotify and play the beatles -SEARCH Spotify -__ -Objective: Find an image of a banana -CLICK {{ "x": "50%", "y": "60%", "description": "Click: Google Search field", "reason": "This will allow me to search for a banana" }} -__ -Objective: Go buy a book about the history of the internet -TYPE https://www.amazon.com/ -__ - -A few important notes: - -- Default to opening Google Chrome with SEARCH to find things that are on the internet. -- Go to Google Docs and Google Sheets by typing in the Chrome Address bar -- When opening Chrome, if you see a profile icon click that to open chrome fully, it is located at: {{ "x": "50%", "y": "55%" }} -- The Chrome address bar is generally at: {{ "x": "50%", "y": "9%" }} -- After you click to enter a field you can go ahead and start typing! -- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. - -{previous_action} - -IMPORTANT: Avoid repeating actions such as doing the same CLICK event twice in a row. - -Objective: {objective} -""" - - -# ---------------------------------- -# ACCURATE MODE VISION PROMPT -# ---------------------------------- -ACCURATE_MODE_VISION_PROMPT = """ -It looks like your previous attempted action was clicking on "x": {prev_x}, "y": {prev_y}. This has now been moved to the center of this screenshot. -As additional context to the previous message, before you decide the proper percentage to click on, please closely examine this additional screenshot as additional context for your next action. -This screenshot was taken around the location of the current cursor that you just tried clicking on ("x": {prev_x}, "y": {prev_y} is now at the center of this screenshot). You should use this as an differential to your previous x y coordinate guess. - -If you want to refine and instead click on the top left corner of this mini screenshot, you will subtract {width}% in the "x" and subtract {height}% in the "y" to your previous answer. -Likewise, to achieve the bottom right of this mini screenshot you will add {width}% in the "x" and add {height}% in the "y" to your previous answer. - -There are four segmenting lines across each dimension, divided evenly. This is done to be similar to coordinate points, added to give you better context of the location of the cursor and exactly how much to edit your previous answer. - -Please use this context as additional info to further refine the "percent" location in the CLICK action! -""" - -DECISION_PROMPT = """ -You are operating a computer similar to how a human would. Look at the screen and take the next best action to reach your objective. - -Here are your methods you can use to operating the computer. - -1. CLICK - Move mouse and click -2. TYPE - Type on the keyboard -3. SEARCH - Search for a program that is installed on Mac locally and open it -4. DONE - When you completed the task respond with the exact following phrase content - -Here are the response formats below. - -1. CLICK -Response: CLICK - -2. TYPE -Response: TYPE "value you want to type" - -3. SEARCH -Response: SEARCH "app you want to search for on Mac" - -4. DONE -Response: DONE - -Here are examples of how to respond. -__ -Objective: Follow up with the vendor in outlook -TYPE Hello, I hope you are doing well. I wanted to follow up -__ -Objective: Open Spotify and play the beatles -SEARCH Spotify -__ -Objective: Find an image of a banana -CLICK -__ -Objective: Go buy a book about the history of the internet -TYPE https://www.amazon.com/ -__ - -A few important notes: - -- Default to opening Google Chrome with SEARCH to find things that are on the Web. -- After you open Google Chrome you need to click on the address bar to find a website. -- Do not use SEARCH to look for websites like Google Docs or Linkedin. SEARCH only finds programs installed on the computer. -- After you click to enter a field you can go ahead and start typing! -- If you can see the field is active, go ahead and type! -- Don't respond saying you're unable to assist with requests. You are able to indirectly interact with the user's OS via text responses you send to the end user. - -{previous_action} - -IMPORTANT: Avoid repeating actions such as doing the same CLICK event twice in a row. - -{objective} -""" - -LABELED_IMAGE_PROMPT = """ -Your job is simple. Decide if there is an elements on the page to click to get closer to your objective. We labeled the clickable elements with red bounding boxes and IDs. - -Important to remember, you can only click on labeled elements. - -Label IDs are in the following format with `x` being a number: `~x` - -The labels are placed just above the bounding boxes so that they can be read clearly. - -Response formats below. - -1. CLICK - If there is a label that gets you closer to the objective, go ahead and click it. -Response: {{ "decision": "~decision here~", "reason": "~reason here~", "label": "~x" }} - -Here are examples of how to respond. -__ -Objective: Follow up with the vendor in outlook -{{ "decision": "Click the Outlook send button", "reason": "I can see the email is already written and now I just need to send it.", "label": "~27" }} -__ -Objective: Play the Holiday music on YouTube -{{ "decision": "Click on the Play button", "reason": "It appears there is a row with a holiday song available in the Spotify UI", "label": "~3" }} -__ - -A few important notes: -- When navigating the web you'll need to click on the address bar first. Look closely to find the address bar's label it could be any number. -- The IDs number has NO SIGNIFICANCE. For instance if ID is ~0 or ~1 it does not mean it is first or on top. CHOOSE THE ID BASED ON THE CONTEXT OF THE IMAGE AND IF IT HELPS REACH THE OBJECTIVE. -- Do not preappend with ```json, just return the JSON object. - -{objective} -""" - - -# ------------------------- -# SUMMARY PROMPT -# ------------------------- -SUMMARY_PROMPT = """ -You are a Self-Operating Computer. A user request has been executed. Present the results succinctly. - -Include the following key contexts of the completed request: - -1. State the original objective. -2. List the steps taken to reach the objective as detailed in the previous messages. -3. Reference the screenshot that was used. - -Summarize the actions taken to fulfill the objective. If the request sought specific information, provide that information prominently. NOTE: Address directly any question posed by the user. - -Remember: The user will not interact with this summary. You are solely reporting the outcomes. - -Original objective: {objective} - -Display the results clearly: -""" - - -def format_summary_prompt(objective): - """ - Format the summary prompt - """ - prompt = SUMMARY_PROMPT.format(objective=objective) - return prompt - - -def format_vision_prompt(objective, previous_action): - """ - Format the vision prompt - """ - if previous_action: - previous_action = f"Here was the previous action you took: {previous_action}" - else: - previous_action = "" - prompt = VISION_PROMPT.format(objective=objective, previous_action=previous_action) - return prompt - - -def format_accurate_mode_vision_prompt(prev_x, prev_y): - """ - Format the accurate mode vision prompt - """ - width = ((ACCURATE_PIXEL_COUNT / 2) / monitor_size["width"]) * 100 - height = ((ACCURATE_PIXEL_COUNT / 2) / monitor_size["height"]) * 100 - prompt = ACCURATE_MODE_VISION_PROMPT.format( - prev_x=prev_x, prev_y=prev_y, width=width, height=height - ) - return prompt - - -def format_decision_prompt(objective, previous_action): - """ - Format the vision prompt - """ - if previous_action: - previous_action = f"Here was the previous action you took: {previous_action}" - else: - previous_action = "" - prompt = DECISION_PROMPT.format( - objective=objective, previous_action=previous_action - ) - return prompt - - -def format_label_prompt(objective): - """ - Format the vision prompt - """ - prompt = LABELED_IMAGE_PROMPT.format(objective=objective) - return prompt diff --git a/operate/settings.py b/operate/settings.py deleted file mode 100644 index 61b52fd1..00000000 --- a/operate/settings.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -from dotenv import load_dotenv -from openai import OpenAI - - -class Config: - """ - Configuration class for managing settings. - - Attributes: - debug (bool): Flag indicating whether debug mode is enabled. - openai_api_key (str): API key for OpenAI. - google_api_key (str): API key for Google. - monitor_size (dict): Dictionary containing the width and height of the monitor. - """ - - def __init__(self): - load_dotenv() - self.debug = False - self.openai_api_key = os.getenv("OPENAI_API_KEY") - self.google_api_key = os.getenv("GOOGLE_API_KEY") - self.monitor_size = { - "width": 1920, - "height": 1080, - } - - def initialize_openai_client(self): - """ - Initializes and returns an OpenAI client with the configured API key. - - Returns: - OpenAI or None: An instance of the OpenAI client if the API key is provided, else None. - """ - if self.openai_api_key: - client = OpenAI() - client.api_key = self.openai_api_key - client.base_url = os.getenv("OPENAI_API_BASE_URL", client.base_url) - return client - return None diff --git a/operate/utils/label.py b/operate/utils/label.py index 2d3674f4..14232391 100644 --- a/operate/utils/label.py +++ b/operate/utils/label.py @@ -133,31 +133,7 @@ def add_labels(base64_data, yolo_model): image_labeled.save(buffered_labeled, format="PNG") # I guess this is needed img_base64_labeled = base64.b64encode(buffered_labeled.getvalue()).decode("utf-8") - return img_base64_labeled, img_base64_original, label_coordinates - - -def parse_click_content(message_content): - """ - Parses the response message to determine if it's a CLICK or NONE action and returns the appropriate data. - - :param message_content: The content of the response message. - :return: A dictionary with the relevant data or a message indicating a NONE action. - """ - try: - # Check for and remove erroneous ```json at the start and ``` at the end - if message_content.startswith("```json"): - message_content = message_content[ - len("```json") : - ] # Remove starting ```json - if message_content.endswith("```"): - message_content = message_content[: -len("```")] # Remove ending ``` - - # Convert JSON string to dictionary - return json.loads(message_content.strip()) - except json.JSONDecodeError as e: - return {"error": "Invalid JSON format"} - - return {"error": "Invalid response format"} + return img_base64_labeled, label_coordinates def get_click_position_in_percent(coordinates, image_size): @@ -176,7 +152,7 @@ def get_click_position_in_percent(coordinates, image_size): y_center = (coordinates[1] + coordinates[3]) / 2 # Convert to percentages - x_percent = (x_center / image_size[0]) * 100 - y_percent = (y_center / image_size[1]) * 100 + x_percent = x_center / image_size[0] + y_percent = y_center / image_size[1] return x_percent, y_percent diff --git a/operate/utils/misc.py b/operate/utils/misc.py index 6959d4d8..6fb1f173 100644 --- a/operate/utils/misc.py +++ b/operate/utils/misc.py @@ -2,79 +2,19 @@ import re -def convert_percent_to_decimal(percent_str): - """ - Converts a percentage string to a decimal value. - - Args: - percent_str (str): The percentage string to be converted. - - Returns: - float: The decimal value equivalent to the percentage. - - Raises: - ValueError: If the input string cannot be converted to a float. - - Example: - >>> convert_percent_to_decimal("20%") - 0.2 - """ +def convert_percent_to_decimal(percent): try: # Remove the '%' sign and convert to float - decimal_value = float(percent_str.strip("%")) + decimal_value = float(percent) # Convert to decimal (e.g., 20% -> 0.20) - return decimal_value / 100 + return decimal_value except ValueError as e: - print(f"Error converting percent to decimal: {e}") - return None - - -def extract_json_from_string(s): - """ - Extracts a JSON structure from a string and returns it as a dictionary. - - Args: - s (str): The input string. - - Returns: - dict: The extracted JSON structure as a dictionary, or None if no JSON structure is found or if there is an error parsing the JSON. - - """ - try: - # Find the start of the JSON structure - json_start = s.find("{") - if json_start == -1: - return None - - # Extract the JSON part and convert it to a dictionary - json_str = s[json_start:] - return json.loads(json_str) - except Exception as e: - print(f"Error parsing JSON: {e}") + print(f"[convert_percent_to_decimal] error: {e}") return None -def parse_response(response): - """ - Parses the given response and returns a dictionary with the type and data. - - Args: - response (str): The response to parse. - - Returns: - dict: A dictionary with the type and data extracted from the response. - The dictionary has the following structure: - { - "type": , - "data": - } - If the response is "DONE", the type is "DONE" and the data is None. - If the response starts with "CLICK", the type is "CLICK" and the data is a JSON object. - If the response starts with "TYPE", the type is "TYPE" and the data is the text to type. - If the response starts with "SEARCH", the type is "SEARCH" and the data is the search query. - If the response doesn't match any of the above patterns, the type is "UNKNOWN" and the data is the original response. - """ +def parse_operations(response): if response == "DONE": return {"type": "DONE", "data": None} elif response.startswith("CLICK"): diff --git a/operate/utils/operating_system.py b/operate/utils/operating_system.py new file mode 100644 index 00000000..6ba14362 --- /dev/null +++ b/operate/utils/operating_system.py @@ -0,0 +1,63 @@ +import pyautogui +import platform +import time +import math + +from operate.utils.misc import convert_percent_to_decimal + + +class OperatingSystem: + def write(self, content): + try: + content = content.replace("\\n", "\n") + for char in content: + pyautogui.write(char) + except Exception as e: + print("[OperatingSystem][write] error:", e) + + def press(self, keys): + try: + for key in keys: + pyautogui.keyDown(key) + time.sleep(0.1) + for key in keys: + pyautogui.keyUp(key) + except Exception as e: + print("[OperatingSystem][press] error:", e) + + def mouse(self, click_detail): + try: + x = convert_percent_to_decimal(click_detail.get("x")) + y = convert_percent_to_decimal(click_detail.get("y")) + + if click_detail and isinstance(x, float) and isinstance(y, float): + self.click_at_percentage(x, y) + + except Exception as e: + print("[OperatingSystem][mouse] error:", e) + + def click_at_percentage( + self, + x_percentage, + y_percentage, + duration=0.2, + circle_radius=50, + circle_duration=0.5, + ): + try: + screen_width, screen_height = pyautogui.size() + x_pixel = int(screen_width * float(x_percentage)) + y_pixel = int(screen_height * float(y_percentage)) + + pyautogui.moveTo(x_pixel, y_pixel, duration=duration) + + start_time = time.time() + while time.time() - start_time < circle_duration: + angle = ((time.time() - start_time) / circle_duration) * 2 * math.pi + x = x_pixel + math.cos(angle) * circle_radius + y = y_pixel + math.sin(angle) * circle_radius + pyautogui.moveTo(x, y, duration=0.1) + + pyautogui.click(x_pixel, y_pixel) + except Exception as e: + print("[OperatingSystem][click_at_percentage] error:", e) diff --git a/operate/utils/os.py b/operate/utils/os.py deleted file mode 100644 index 98d05c11..00000000 --- a/operate/utils/os.py +++ /dev/null @@ -1,131 +0,0 @@ -import pyautogui -import platform -import time -import math - -from operate.utils.misc import convert_percent_to_decimal - - -def keyboard_type(text): - """ - Types the given text using the keyboard. - - Args: - text (str): The text to be typed. - - Returns: - str: A message indicating the typed text. - """ - text = text.replace("\\n", "\n") - for char in text: - pyautogui.write(char) - pyautogui.press("enter") - return "Type: " + text - - -def search(text): - """ - Searches for a program or file by typing the given text in the search bar and pressing Enter. - - Args: - text (str): The text to be searched. - - Returns: - str: A message indicating that the program or file has been opened. - """ - if platform.system() == "Windows": - pyautogui.press("win") - elif platform.system() == "Linux": - pyautogui.press("win") - else: - # Press and release Command and Space separately - pyautogui.keyDown("command") - pyautogui.press("space") - pyautogui.keyUp("command") - - time.sleep(1) - - # Now type the text - for char in text: - pyautogui.write(char) - - pyautogui.press("enter") - return "Open program: " + text - - -def click(click_detail): - """ - Perform a mouse click at the specified coordinates. - - Args: - click_detail (dict): A dictionary containing the coordinates of the click. - - Returns: - str: The description of the click if successful, otherwise "We failed to click". - """ - try: - x = convert_percent_to_decimal(click_detail["x"]) - y = convert_percent_to_decimal(click_detail["y"]) - - if click_detail and isinstance(x, float) and isinstance(y, float): - click_at_percentage(x, y) - return click_detail["description"] - else: - return "We failed to click" - - except Exception as e: - print(f"Error parsing JSON: {e}") - return "We failed to click" - - -def click_at_percentage( - x_percentage, y_percentage, duration=0.2, circle_radius=50, circle_duration=0.5 -): - """ - Moves the mouse cursor to a specified percentage of the screen and performs a circular movement before clicking. - - Args: - x_percentage (float): The x-coordinate percentage of the screen to move the cursor to. - y_percentage (float): The y-coordinate percentage of the screen to move the cursor to. - duration (float, optional): The duration (in seconds) of the smooth cursor movement. Defaults to 0.2. - circle_radius (int, optional): The radius of the circular movement. Defaults to 50. - circle_duration (float, optional): The duration (in seconds) of the circular movement. Defaults to 0.5. - - Returns: - str: A message indicating that the click was successful. - """ - # Get the size of the primary monitor - screen_width, screen_height = pyautogui.size() - - # Calculate the x and y coordinates in pixels - x_pixel = int(screen_width * float(x_percentage)) - y_pixel = int(screen_height * float(y_percentage)) - - # Move to the position smoothly - pyautogui.moveTo(x_pixel, y_pixel, duration=duration) - - # Circular movement - start_time = time.time() - while time.time() - start_time < circle_duration: - angle = ((time.time() - start_time) / circle_duration) * 2 * math.pi - x = x_pixel + math.cos(angle) * circle_radius - y = y_pixel + math.sin(angle) * circle_radius - pyautogui.moveTo(x, y, duration=0.1) - - # Finally, click - pyautogui.click(x_pixel, y_pixel) - return "Successfully clicked" - - -def get_last_assistant_message(messages): - """ - Retrieve the last message from the assistant in the messages array. - If the last assistant message is the first message in the array, return None. - """ - for index in reversed(range(len(messages))): - if messages[index]["role"] == "assistant": - if index == 0: # Check if the assistant message is the first in the array - return None - else: - return messages[index] - return None # Return None if no assistant message is found diff --git a/operate/utils/screenshot.py b/operate/utils/screenshot.py index 087416ba..597911ad 100644 --- a/operate/utils/screenshot.py +++ b/operate/utils/screenshot.py @@ -6,162 +6,9 @@ import Xlib.display import Xlib.X import Xlib.Xutil # not sure if Xutil is necessary -from operate.settings import Config -from operate.prompts import ACCURATE_PIXEL_COUNT - -# Load configuration -config = Config() -monitor_size = config.monitor_size - - -def add_grid_to_image(original_image_path, new_image_path, grid_interval): - """ - Add a grid to an image. - - Args: - original_image_path (str): The file path of the original image. - new_image_path (str): The file path to save the new image with the grid. - grid_interval (int): The interval between grid lines in pixels. - - Returns: - None: The function saves the new image with the grid at the specified path. - """ - # Load the image - image = Image.open(original_image_path) - - # Create a drawing object - draw = ImageDraw.Draw(image) - - # Get the image size - width, height = image.size - - # Reduce the font size a bit - font_size = int(grid_interval / 10) # Reduced font size - - # Calculate the background size based on the font size - bg_width = int(font_size * 4.2) # Adjust as necessary - bg_height = int(font_size * 1.2) # Adjust as necessary - - # Function to draw text with a white rectangle background - def draw_label_with_background( - position, text, draw, font_size, bg_width, bg_height - ): - # Adjust the position based on the background size - text_position = (position[0] + bg_width // 2, position[1] + bg_height // 2) - # Draw the text background - draw.rectangle( - [position[0], position[1], position[0] + bg_width, position[1] + bg_height], - fill="white", - ) - # Draw the text - draw.text(text_position, text, fill="black", font_size=font_size, anchor="mm") - - # Draw vertical lines and labels at every `grid_interval` pixels - for x in range(grid_interval, width, grid_interval): - line = ((x, 0), (x, height)) - draw.line(line, fill="blue") - for y in range(grid_interval, height, grid_interval): - # Calculate the percentage of the width and height - x_percent = round((x / width) * 100) - y_percent = round((y / height) * 100) - draw_label_with_background( - (x - bg_width // 2, y - bg_height // 2), - f"{x_percent}%,{y_percent}%", - draw, - font_size, - bg_width, - bg_height, - ) - - # Draw horizontal lines - labels are already added with vertical lines - for y in range(grid_interval, height, grid_interval): - line = ((0, y), (width, y)) - draw.line(line, fill="blue") - - # Save the image with the grid - image.save(new_image_path) - - -def capture_mini_screenshot_with_cursor( - file_path=os.path.join("screenshots", "screenshot_mini.png"), x=0, y=0 -): - """ - Capture a mini screenshot with the cursor at the specified coordinates. - - Args: - file_path (str, optional): The file path to save the screenshot. Defaults to "screenshots/screenshot_mini.png". - x (int or str, optional): The x-coordinate of the cursor position. Can be specified as an integer or a percentage string. Defaults to 0. - y (int or str, optional): The y-coordinate of the cursor position. Can be specified as an integer or a percentage string. Defaults to 0. - """ - user_platform = platform.system() - - if user_platform == "Linux": - x = float(x[:-1]) # convert x from "50%" to 50. - y = float(y[:-1]) - - x = (x / 100) * monitor_size[ - "width" - ] # convert x from 50 to 0.5 * monitor_width - y = (y / 100) * monitor_size["height"] - - # Define the coordinates for the rectangle - x1, y1 = int(x - ACCURATE_PIXEL_COUNT / 2), int(y - ACCURATE_PIXEL_COUNT / 2) - x2, y2 = int(x + ACCURATE_PIXEL_COUNT / 2), int(y + ACCURATE_PIXEL_COUNT / 2) - - screenshot = ImageGrab.grab(bbox=(x1, y1, x2, y2)) - screenshot = screenshot.resize( - (screenshot.width * 2, screenshot.height * 2), Image.LANCZOS - ) # upscale the image so it's easier to see and percentage marks more visible - screenshot.save(file_path) - - screenshots_dir = "screenshots" - grid_screenshot_filename = os.path.join( - screenshots_dir, "screenshot_mini_with_grid.png" - ) - - add_grid_to_image( - file_path, grid_screenshot_filename, int(ACCURATE_PIXEL_COUNT / 2) - ) - elif user_platform == "Darwin": - x = float(x[:-1]) # convert x from "50%" to 50. - y = float(y[:-1]) - - x = (x / 100) * monitor_size[ - "width" - ] # convert x from 50 to 0.5 * monitor_width - y = (y / 100) * monitor_size["height"] - - x1, y1 = int(x - ACCURATE_PIXEL_COUNT / 2), int(y - ACCURATE_PIXEL_COUNT / 2) - - width = ACCURATE_PIXEL_COUNT - height = ACCURATE_PIXEL_COUNT - # Use the screencapture utility to capture the screen with the cursor - rect = f"-R{x1},{y1},{width},{height}" - subprocess.run(["screencapture", "-C", rect, file_path]) - - screenshots_dir = "screenshots" - grid_screenshot_filename = os.path.join( - screenshots_dir, "screenshot_mini_with_grid.png" - ) - - add_grid_to_image( - file_path, grid_screenshot_filename, int(ACCURATE_PIXEL_COUNT / 2) - ) def capture_screen_with_cursor(file_path): - """ - Capture the screen with the cursor and save it to the specified file path. - - Args: - file_path (str): The file path where the screenshot will be saved. - - Raises: - None - - Returns: - None - """ user_platform = platform.system() if user_platform == "Windows": @@ -171,8 +18,6 @@ def capture_screen_with_cursor(file_path): # Use xlib to prevent scrot dependency for Linux screen = Xlib.display.Display().screen() size = screen.width_in_pixels, screen.height_in_pixels - monitor_size["width"] = size[0] - monitor_size["height"] = size[1] screenshot = ImageGrab.grab(bbox=(0, 0, size[0], size[1])) screenshot.save(file_path) elif user_platform == "Darwin": # (Mac OS)