In [None]:
# Required imports for the Power BI automation script
import os
import cv2
import base64
import json
import re
import time
import numpy as np
import pyautogui
import matplotlib.pyplot as plt
from PIL import Image
from anthropic import Anthropic
import mss
import easyocr
import warnings
import itertools
import io

# Configuration and setup
warnings.filterwarnings("ignore")
pyautogui.FAILSAFE = False

In [None]:
# Initialize Anthropic client with API key from environment
ANTHROPIC_CLIENT = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
MODEL_NAME = "claude-3-7-sonnet-20250219"

# Constants for icon folder and screen resolution
ICONS_DIRECTORY = "icons"
SCREEN_WIDTH = 1920
SCREEN_HEIGHT = 1080
MATCH_MATCH_THRESHOLD = 0.9

# Initialize Easyperform_ocr reader for English text detection
OCR_READER = easyocr.Reader(['en'], gpu=True)

In [None]:
def find_all_icons(icons_folder="icons", MATCH_THRESHOLD=0.9):
    """
    Match all icons in the icons_folder against a screenshot and return their center positions.
    
    Args:
        icons_folder (str): Path to the folder containing icon images (default: "icons").
        MATCH_THRESHOLD (float): Minimum correlation coefficient for a match (default: 0.8).
    
    Returns:
        list: List of dictionaries in the format {"icon_name": str, "x": int, "y": int}.
    """
    # Step 1: Take screenshot
    with mss.mss() as sct:
        screenshot = sct.grab(sct.monitors[1])
        img = np.array(screenshot)
        main_image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

    if main_image is None:
        raise FileNotFoundError("Could not load screenshot.")

    main_gray = cv2.cvtColor(main_image, cv2.COLOR_BGR2GRAY)
    matched_icons = []

    # Step 3: Iterate through all files in the icons folder
    for icon_file in os.listdir(icons_folder):
        if not icon_file.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
            continue

        icon_name = os.path.splitext(icon_file)[0]
        icon_path = os.path.join(icons_folder, icon_file)

        template = cv2.imread(icon_path)
        if template is None:
            print(f"Warning: Could not load icon: {icon_path}")
            continue

        template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
        t_height, t_width = template_gray.shape[:2]

        res = cv2.matchTemplate(main_gray, template_gray, cv2.TM_CCOEFF_NORMED)
        loc = np.where(res >= MATCH_THRESHOLD)

        for pt in zip(*loc[::-1]):
            center_x = int(pt[0] + t_width // 2)
            center_y = int(pt[1] + t_height // 2)
            matched_icons.append({
                "icon_name": icon_name,
                "x": center_x,
                "y": center_y
            })

    return matched_icons

In [None]:
def perform_ocr():
    """
    Extract text and icons from a screenshot with their positions.
    
    Returns:
        str: Concatenated string of detected text and icons in format {text, x, y} and {icon_name, x, y}.
    """
    detected_items = []

    # Step 1: Take screenshot
    with mss.mss() as sct:
        screenshot = sct.grab(sct.monitors[1])
        img = np.array(screenshot)
        image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

    # Run perform_ocr
    results = OCR_READER.readtext(image, detail=1)

    for bbox, detected_text, _ in results:
        # Calculate center of bounding box
        x_coords = [point[0] for point in bbox]
        y_coords = [point[1] for point in bbox]
        center_x = int(sum(x_coords) / len(x_coords))
        center_y = int(sum(y_coords) / len(y_coords))

        # Format as {text, x, y}
        detected_items.append(f"{{{detected_text}, {center_x}, {center_y}}}")

    icons = find_all_icons(icons_folder="icons", MATCH_THRESHOLD=0.9)
    results = ''.join(detected_items) + json.dumps(icons)

    return results

def find_and_click_item(item_name):
    """
    Locate and click an item by its text using OCR.
    
    Args:
        item_name (str): Text of the item to click.
    
    Returns:
        str: Result message indicating success or failure.
    """
    print(f"Looking for: '{item_name}'")

    # Take screenshot
    with mss.mss() as sct:
        screenshot = sct.grab(sct.monitors[1])
        img = np.array(screenshot)
        image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

    # Run perform_ocr
    results = OCR_READER.readtext(image, detail=1)

    perform_ocr_lines = []
    for bbox, text, conf in results:
        y_center = (bbox[0][1] + bbox[2][1]) / 2
        cleaned_text = text.strip()
        perform_ocr_lines.append({
            'text': cleaned_text,
            'bbox': bbox,
            'y_center': y_center
        })

    item_name_lower = item_name.strip().lower()

    # === Stage 1: Try individual match ===
    for line in perform_ocr_lines:
        if item_name_lower in line['text'].lower():
            x_coords = [pt[0] for pt in line['bbox']]
            y_coords = [pt[1] for pt in line['bbox']]
            center_x = int(sum(x_coords) / len(x_coords))
            center_y = int(sum(y_coords) / len(y_coords))

            print(f"\n Found direct match:\nText: '{line['text']}'\nClicking at (x={center_x}, y={center_y})")
            pyautogui.moveTo(center_x, center_y, duration=2)
            pyautogui.click(center_x, center_y, clicks=2, interval=0.25)
            pyautogui.moveTo(SCREEN_WIDTH/2, SCREEN_HEIGHT/2) 
            return f"Item '{item_name}' clicked at (x={center_x}, y={center_y})"

    # === Stage 2: Try combining nearby lines ===
    for group_size in [2, 3]:
        for combo in itertools.combinations(perform_ocr_lines, group_size):
            y_positions = [line['y_center'] for line in combo]
            if max(y_positions) - min(y_positions) > 40:
                continue  # Skip if lines are too far apart vertically

            combined_text = " ".join(line['text'] for line in combo)
            combined_bbox = []
            for line in combo:
                combined_bbox.extend(line['bbox'])

            if item_name_lower in combined_text.lower():
                x_coords = [pt[0] for pt in combined_bbox]
                y_coords = [pt[1] for pt in combined_bbox]
                center_x = int(sum(x_coords) / len(x_coords))
                center_y = int(sum(y_coords) / len(y_coords))

                print(f"\n Found combined match:\nCombined Text: '{combined_text}'\nClicking at (x={center_x}, y={center_y})")
                pyautogui.moveTo(center_x, center_y, duration=2)
                pyautogui.click(center_x, center_y, clicks=2, interval=0.25)
                pyautogui.moveTo(SCREEN_WIDTH/2, SCREEN_HEIGHT/2) 
                return f"Item '{item_name}' clicked at (x={center_x}, y={center_y})"

    print(f"\n Item '{item_name}' not found")
    return f"Item '{item_name}' not found"

In [None]:
def find_and_click_icon(icon_name, MATCH_THRESHOLD=0.9):
    """
    Locate and click an icon by matching it against a screenshot.
    
    Args:
        icon_name (str): Name of the icon to click.
        match_threshold (float): Minimum correlation coefficient for a match.
    
    Returns:
        str: Result message indicating success or failure.
    """
    # Take screenshot
    with mss.mss() as sct:
        screenshot = sct.grab(sct.monitors[1])
        img = np.array(screenshot)
        main_image = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)

    icon_path = os.path.join(os.getcwd(), ICONS_DIRECTORY, icon_name + ".png")

    # Load template
    template = cv2.imread(icon_path)

    if main_image is None or template is None:
        raise FileNotFoundError("Could not load screenshot or icon image.")

    # Convert to grayscal
    main_gray = cv2.cvtColor(main_image, cv2.COLOR_BGR2GRAY)
    template_gray = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)

    # Template matching
    res = cv2.matchTemplate(main_gray, template_gray, cv2.TM_CCOEFF_NORMED)
    min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)

    if max_val >= MATCH_THRESHOLD:
        # Get dimensions of the template
        t_height, t_width = template_gray.shape[:2]

        # Calculate center coordinates
        center_x = max_loc[0] + t_width // 2
        center_y = max_loc[1] + t_height // 2
        
        print(f"Found icon '{icon_name}' at (x={center_x}, y={center_y})")
        pyautogui.moveTo(center_x, center_y, duration=2)
        pyautogui.click(center_x, center_y)
        pyautogui.moveTo(SCREEN_WIDTH/2, SCREEN_HEIGHT/2) 
        return f"Icon '{icon_name}' c/2licked at (x={center_x}, y={center_y})"
    else:
        print(f"Icon '{icon_name}' not found")
        return f"Icon '{icon_name}' not found"  # No match

In [None]:
def open_pbi():
    """
    Open Power BI Desktop application.
    
    Returns:
        str: Confirmation message.
    """
    pyautogui.hotkey("win", "s") # Open Windows search
    time.sleep(1)
    pyautogui.typewrite("Power BI Desktop")
    time.sleep(1)
    pyautogui.press("enter")
    time.sleep(5) # Wait for Power BI to open
    return "PowerBI is now open"

In [None]:
tools = [
    {
        "name": "open_pbi",
        "description": "This tool is exclusively for opening the Power BI app and must not be used to open any other application.",
        "input_schema": {
            "type": "object",
            "properties": {},
            "required": []
         }
    },
    {
        "name": "find_and_click_item",
        "description": "This tool is exclusively for clicking or double-clicking any item, including button, menu, and files, and must not be used for any other tasks.",
        "input_schema": {
            "type": "object",
            "properties": {
                 "item_name": {
                    "type": "string",
                    "description": "The text of an item, including button, menu, and files, which needs to be clicked or double-clicked"
                }
            },
            "required": ["item_name"]
         }
    },
    {
        "name": "find_and_click_icon",
        "description": "This tool is exclusively for clicking or double-clicking any icon, including visualisations icons, chevron right, right arrow, rectangle, and checkmark icons, and must not be used for any other tasks.",
        "input_schema": {
            "type": "object",
            "properties": {
                 "icon_name": {
                    "type": "string",
                    "description": "The name of an icon, including visualisations icons, which needs to be clicked or double-clicked. The icon names can be 'bar chart' , 'column chart' , 'chevron right' , 'right arrow', 'rectangle', or 'checkmark'",
                }
            },
            "required": ["icon_name"]
         }
    },
    {
        "name": "perform_ocr",
        "description": "This tool is exclusively for extracting and returning a string of all text and icons detected in the screenshot and their position and must not be used for any other tasks. The format of each extracted text and its position is {text, x, y}. The format of each detected icon and its position is {icon_name, x, y}",
        "input_schema": {
            "type": "object",
            "properties": {},
            "required": []
         }
    }
]

In [None]:
def process_tool_call(tool_name, tool_input):
    """
    Process a tool call from Claude.
    
    Args:
        tool_name (str): Name of the tool to execute.
        tool_input (dict): Input parameters for the tool.
    
    Returns:
        str: Result of the tool execution.
    """
    if tool_name == "open_pbi":
        return open_pbi()
    elif tool_name == "find_and_click_item":
        return find_and_click_item(tool_input["item_name"])
    elif tool_name == "find_and_click_icon":
        return find_and_click_icon(tool_input["icon_name"])
    elif tool_name == "perform_ocr":
        return perform_ocr()

In [None]:
def extract_reply(text):
    """
    Extract reply content from Claude's response.
    
    Args:
        text (str): Response text containing reply tags.
    
    Returns:
        str: Extracted reply content or None if not found.
    """
    pattern = r'<reply>(.*?)</reply>'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1)
    else:
        return None    
    
def chat():
    """
    Main chat function to interact with the user and process Power BI automation tasks.
    """
    system_prompt = """
    You are an agent designed to help users create Power BI dashboards. Be helpful and concise in your responses.
    You have access to tools, but only use them when necessary. Only interact with Power BI — do not open or interact with any other apps, even if requested.
    You can select, click, or double-click items (e.g., buttons, menus, files) using the find_and_click_item tool, but before doing so, first check if the item exists in the extracted text from the screenshot using the perform_ocr tool.
    To click or double-click icons, you can use a dedicated tool by passing the icon name. Supported icon names include "bar chart",  "column chart", "right arrow", "rectangle", and "checkmark".
    To choose a column from imported data, click the 'rectangle' icon next to the column name. Confirm the selection by verifying that the rectangle turns into a 'checkmark' icon.
    Consider all the columns from imported data to generate any chart. Ensure that you find a 'checkmark' in front of each column.
    Do not ask the user for permission to click or double-click items or icons.
    Each time you respond:
    First, think through your response.
    Then, write the user-facing reply enclosed in <reply></reply> tags for easy parsing.
    """
    user_message = input("\nUser: ")

    messages = [
        {
            "role": "system",
            "content": system_prompt,
            "cache_control": {"type": "default"}  # This caches only the system prompt
        },
        {
            "role": "user",
            "content": user_message
        }
    ]
    
    messages = [{"role": "user", "content": user_message}]
    while True:
        if user_message == "quit":
            break
        #If the last message is from the assistant, 
        # get another input from the user
        if messages[-1].get("role") == "assistant":
            user_message = input("\nUser: ")
            messages.append({"role": "user", "content": user_message})

        #Send a request to Claude
        response = ANTHROPIC_CLIENT.messages.create(
            model=MODEL_NAME,
            system=system_prompt,
            max_tokens=4096,
            temperature = 0.0,
            tools=tools,
            messages=messages
        )
        # Update messages to include Claude's response
        messages.append(
            {"role": "assistant", "content": response.content}
        )

        
        # Manage OCR tool calls to remove the old OCR content for token efficiency 
        perform_ocr_candidates = {}  # id -> block
        perform_ocr_positions = []   # list of (index in messages, block)
        
        for i, msg in enumerate(messages):
            if msg['role'] == 'assistant' and isinstance(msg['content'], list):
                for block in msg['content']:
                    if getattr(block, 'type', None) == 'tool_use' and getattr(block, 'name', None) == 'perform_ocr':
                        perform_ocr_id = getattr(block, 'id', None)
                        if perform_ocr_id:
                            perform_ocr_candidates[perform_ocr_id] = block
                            perform_ocr_positions.append((i, perform_ocr_id))
        
        # Determine which perform_ocr ID is the last one
        last_perform_ocr_id = perform_ocr_positions[-1][1] if perform_ocr_positions else None
        
        # Identify perform_ocr tool_use IDs that actually received tool_result responses
        used_perform_ocr_ids = set()
        for msg in messages:
            content = msg.get('content')
            if isinstance(content, list):
                for item in content:
                    if isinstance(item, dict) and item.get('type') == 'tool_result':
                        tool_use_id = item.get('tool_use_id')
                        if tool_use_id in perform_ocr_candidates:
                            used_perform_ocr_ids.add(tool_use_id)
        
        #  Remove perform_ocr tool_use blocks, except the last one (for token efficiency)
        for msg in messages:
            if msg['role'] == 'assistant' and isinstance(msg['content'], list):
                msg['content'] = [
                    block for block in msg['content']
                    if not (
                        getattr(block, 'type', None) == 'tool_use' and
                        getattr(block, 'name', None) == 'perform_ocr' and
                        getattr(block, 'id', None) in used_perform_ocr_ids and
                        getattr(block, 'id', None) != last_perform_ocr_id
                    )
                ]
        
        # Remove user tool_result messages for all perform_ocrs *except* the last one (for token efficiency)
        filtered_messages = []
        for msg in messages:
            content = msg.get('content')
            if isinstance(content, list):
                is_older_perform_ocr_tool_result = any(
                    isinstance(item, dict) and
                    item.get('type') == 'tool_result' and
                    item.get('tool_use_id') in used_perform_ocr_ids and
                    item.get('tool_use_id') != last_perform_ocr_id
                    for item in content
                )
                if is_older_perform_ocr_tool_result:
                    continue  # skip this message
            filtered_messages.append(msg)
        
        # Final update
        messages = filtered_messages

        #If Claude stops because it wants to use a tool:
        if response.stop_reason == "tool_use":
            #Naive approach assumes only 1 tool is called at a time
            tool_use = response.content[-1] 
            tool_name = tool_use.name
            tool_input = tool_use.input
            print(f"=====Claude wants to use the {tool_name} tool=====")


            #Actually run the underlying tool functionality on our db
            tool_result = process_tool_call(tool_name, tool_input)

            #Add our tool_result message:
            messages.append(
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use.id,
                            "content": str(tool_result) }
                    ],
                },
            )
        else: 
            #If Claude does NOT want to use a tool, 
            #just print out the text reponse
            model_reply = extract_reply(response.content[0].text)
            print("\nAcme Co Support: " + f"{model_reply}" )

        #print(messages)

In [None]:
# start chatting with the agent
chat()