In [1]:
#!pip install "transformers==4.53.0"

In [2]:
import re, os, json
import base64
import torch
from typing import List, Dict, Any

from pathlib import Path
from PIL import Image
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor
import gc

2025-08-20 00:19:56.799374: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755649196.810653    7055 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755649196.814066    7055 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-08-20 00:19:56.826982: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
MODEL_DIR = "xlangai/OpenCUA-7B"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

IMAGE_DIR = Path("./images/1921c65e-38c4-4aec-a298-f7d8a8635197")

FOLDER_NAME = IMAGE_DIR.name
RESULT_DIR = Path("results") / FOLDER_NAME
RESULT_DIR.mkdir(parents=True, exist_ok=True)

CAPS_JSON = RESULT_DIR / f"{FOLDER_NAME}_captions.json"
ACTS_JSON = RESULT_DIR / f"{FOLDER_NAME}_actions.json"

DO_ACTIONS = True

In [4]:
def encode_image(path) -> str:
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode()

def free_memory():
    try:
        torch.cuda.empty_cache()
    except Exception:
        pass
    gc.collect()

_num_re = re.compile(r'(\d+)')
def natural_key(path: Path):
    return [int(t) if t.isdigit() else t.lower() for t in _num_re.split(path.name)]

def collect_images(root: Path) -> List[Path]:
    exts = {".png", ".jpg", ".jpeg"}
    return sorted([p for p in root.glob("*") if p.suffix.lower() in exts], key=natural_key)

URL_RE = re.compile(r'\bhttps?://[^\s]+|\bwww\.[^\s]+\b', re.IGNORECASE)
def extract_urls(text: str) -> List[str]:
    return URL_RE.findall(text or "")

def clamp01(x: float) -> float:
    try:
        v = float(x)
    except Exception:
        return 0.0
    return 0.0 if v < 0 else 1.0 if v > 1 else v

def normalize_xy(xy, W, H):
    """
    [x,y]가 픽셀이면 0..1로 변환, 이미 비율이면 클램프만
    """
    if isinstance(xy, (list, tuple)) and len(xy) == 2:
        x, y = xy
        if isinstance(x, (int, float)) and isinstance(y, (int, float)):
            if x > 1 or y > 1:  # 픽셀
                return [round(clamp01(x / max(W, 1)), 4), round(clamp01(y / max(H, 1)), 4)]
            return [round(clamp01(x), 4), round(clamp01(y), 4)]
    return None

def parse_actions_json(text: str):
    try:
        obj = json.loads(text)
    except json.JSONDecodeError:
        return None
    # [[{...}]] 형태 평탄화
    if isinstance(obj, list) and len(obj) == 1 and isinstance(obj[0], list):
        obj = obj[0]
    return obj if isinstance(obj, list) else None

def history_to_text(history: List[Dict[str, Any]], max_steps: int = 5) -> str:
    """
    최근 max_steps 단계만 간단 요약 텍스트로.
    history 항목: {"caption": str, "actions": list[dict]}
    """
    lines = []
    take = history[-max_steps:]
    for i, h in enumerate(take, 1):
        caps = (h.get("caption") or "")[:120].replace("\n", " ")
        acts = h.get("actions") or []
        # 간단 요약
        act_summ = []
        for a in acts:
            if not isinstance(a, dict): continue
            t = a.get("type")
            if t in ("click", "move", "drag"):
                act_summ.append(f'{t}@{a.get("xy", "")}')
            elif t == "type":
                txt = str(a.get("text",""))[:40]
                act_summ.append(f'type("{txt}")')
            elif t == "key":
                act_summ.append(f'key{a.get("keys", [])}')
            else:
                act_summ.append(t or "?")
        lines.append(f"- Step{-len(take)+i+len(history)}: caption={caps} | actions=[{', '.join(act_summ)}]")
    return "\n".join(lines) if lines else "(no history)"

def history_has_url_enter(history: List[Dict[str, Any]], url: str) -> bool:
    """
    과거 스텝에 url을 입력(type)하고 enter까지 친 기록이 있는가?
    """
    typed = False
    for h in history:
        acts = h.get("actions") or []
        for a in acts:
            if not isinstance(a, dict): continue
            if a.get("type") == "type" and isinstance(a.get("text"), str) and url in a["text"]:
                typed = True
            if typed and a.get("type") == "key" and "enter" in (a.get("keys") or []):
                return True
    return False

In [5]:
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR, trust_remote_code=True)
model = AutoModel.from_pretrained(
    MODEL_DIR,
    torch_dtype=torch.bfloat16,
    device_map="auto",
    trust_remote_code=True
)
image_processor = AutoImageProcessor.from_pretrained(MODEL_DIR, trust_remote_code=True)
print("Model loaded on device:", device)

Loading model...


Loading checkpoint shards:   0%|          | 0/28 [00:00<?, ?it/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


Model loaded on device: cuda


In [6]:
def generate_image_caption(img_path: str) -> str:
    image = Image.open(img_path).convert('RGB')
    processed = image_processor(image, return_tensors="pt")
    pixel_values = processed['pixel_values'].to(device=device, dtype=torch.bfloat16)
    grid_thws = processed.get('image_grid_thw')
    if grid_thws is not None:
        grid_thws = grid_thws.to(device=device)

    messages = [
        {"role": "system", "content": "You are a helpful assistant that describes GUI screenshots."},
        {"role": "user", "content": [
            {"type": "image", "image": f"data:image/png;base64,{encode_image(img_path)}"},
            {"type": "text",  "text": "Describe the content of this screenshot briefly."},
        ]},
    ]
    input_ids = tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True)
    input_ids = torch.tensor([input_ids], device=device)

    gen_kwargs = dict(
        input_ids=input_ids,
        pixel_values=pixel_values,
        max_new_tokens=128,
        temperature=0.7,
    )
    if grid_thws is not None:
        gen_kwargs["grid_thws"] = grid_thws

    out_ids = model.generate(**gen_kwargs)
    out_ids = out_ids[:, input_ids.shape[1]:]
    text = tokenizer.batch_decode(out_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0].strip()
    free_memory()
    return text

In [7]:
def generate_actions(img_path: str, instruction: str, caption: str, history: List[Dict[str, Any]]) -> str:
    """
    히스토리(이전 캡션/액션 요약)를 주고 '다음 단계'만 JSON으로 생성.
    - 좌표는 반드시 0..1 (소수점 4자리)
    - instruction 내 URL은 '한 번만' type -> enter (히스토리에 있으면 반복 금지)
    """
    with Image.open(img_path) as im:
        W, H = im.size

    urls = extract_urls(instruction)
    url_hint = f"EXTRACTED_URLS: {urls}" if urls else "EXTRACTED_URLS: []"
    already_navigated = False
    if urls:
        already_navigated = history_has_url_enter(history, urls[0])

    hist_text = history_to_text(history, max_steps=6)

    rules = f"""
You MUST output ONLY a JSON array of the NEXT steps (not the whole task).
Use these fields:
- "type": one of ["move","click","double_click","right_click","drag","scroll","type","key","wait"]
- "xy": [x_rel,y_rel] in [0,1], 4 decimals (pointer actions only)
- "delta": for scroll, e.g., {{"dx":0,"dy":-0.2}}
- "text": for type
- "keys": for key (e.g., ["enter"])

Rules:
- Use ONLY relative coordinates (0..1). NEVER absolute pixels.
- Do NOT repeat past actions unless strictly needed.
- { "If instruction includes a URL, and it has NOT been typed+entered before, include a 'type' with the exact URL then a 'key' with ['enter']." if not already_navigated else "URL typing+enter already done earlier. Do NOT type the URL again."}
- Return ONLY JSON. No explanations.
- Prefer concise steps (<= 3) for the next move.

{url_hint}
Image size hint: width={W}, height={H}.

History (most recent last):
{hist_text}

Current screen description:
{caption}

Instruction (global goal):
{instruction}
""".strip()

    messages = [
        {"role": "system", "content": "You are a reliable GUI agent. Output ONLY JSON of the NEXT steps using relative coordinates."},
        {"role": "user", "content": [
            {"type": "image", "image": f"data:image/png;base64,{encode_image(img_path)}"},
            {"type": "text",  "text": rules}
        ]},
    ]

    input_ids = tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True)
    input_ids = torch.tensor([input_ids], device=device)

    processed = image_processor(Image.open(img_path).convert('RGB'), return_tensors="pt")
    pixel_values = processed['pixel_values'].to(device=device, dtype=torch.bfloat16)
    grid_thws = processed.get('image_grid_thw')
    if grid_thws is not None:
        grid_thws = grid_thws.to(device=device)

    gen_kwargs = dict(
        input_ids=input_ids,
        pixel_values=pixel_values,
        max_new_tokens=384,
        temperature=0.0,
    )
    if grid_thws is not None:
        gen_kwargs["grid_thws"] = grid_thws

    out_ids = model.generate(**gen_kwargs)
    out_ids = out_ids[:, input_ids.shape[1]:]
    text = tokenizer.batch_decode(out_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)[0].strip()

    # ---- 후처리: JSON 파싱 & 좌표 정규화 ----
    steps = parse_actions_json(text) or []
    norm_steps = []
    for s in steps:
        if not isinstance(s, dict): continue
        rec = {"type": s.get("type","")}
        if "xy" in s:
            xy = normalize_xy(s["xy"], W, H)
            if xy: rec["xy"] = xy
        if "delta" in s: rec["delta"] = s["delta"]
        if "text"  in s: rec["text"]  = s["text"]
        if "keys"  in s: rec["keys"]  = s["keys"]
        if rec["type"]: norm_steps.append(rec)

    # URL 중복 방지(히스토리에 이미 url+enter가 있으면 제거)
    if urls and already_navigated:
        url0 = urls[0]
        filtered = []
        for a in norm_steps:
            if a.get("type") == "type" and isinstance(a.get("text"), str) and url0 in a["text"]:
                continue
            if a.get("type") == "key" and "enter" in (a.get("keys") or []):
                continue
            filtered.append(a)
        norm_steps = filtered

    free_memory()
    return json.dumps(norm_steps, ensure_ascii=False)

In [8]:
def process_all_images(image_root: Path, instruction: str):
    images = collect_images(image_root)
    print(f"Found {len(images)} image(s) under {image_root.resolve()}")

    captions_acc: List[Dict[str, Any]] = []
    actions_acc:  List[Dict[str, Any]] = []
    history:      List[Dict[str, Any]] = []  # [{caption, actions(list[dict])}, ...]

    for img_path in images:
        rel = img_path.relative_to(image_root)
        print(f"\n[Caption] {rel}")
        caption = generate_image_caption(str(img_path))
        captions_acc.append({"image": str(img_path), "caption": caption})
        CAPS_JSON.write_text(json.dumps(captions_acc, indent=2, ensure_ascii=False), encoding="utf-8")

        if DO_ACTIONS:
            print(f"[Actions] {rel}")
            actions_json_str = generate_actions(str(img_path), instruction, caption, history)

            # 저장용/히스토리용 파싱 (파싱 실패 시 빈 리스트)
            try:
                parsed_actions = json.loads(actions_json_str)
                if not isinstance(parsed_actions, list):
                    parsed_actions = []
            except json.JSONDecodeError:
                parsed_actions = []

            actions_acc.append({
                "image": str(img_path),
                "instruction": instruction,
                "caption": caption,
                "actions": parsed_actions
            })
            ACTS_JSON.write_text(json.dumps(actions_acc, indent=2, ensure_ascii=False), encoding="utf-8")

            # 히스토리 업데이트
            history.append({
                "caption": caption,
                "actions": parsed_actions
            })

In [9]:
%%time
instruction = "www.xxx.com 웹 사이트에 접속해줘"
process_all_images(IMAGE_DIR, instruction)

The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


Found 5 image(s) under /home/jovyan/images/1921c65e-38c4-4aec-a298-f7d8a8635197

[Caption] 1921c65e-38c4-4aec-a298-f7d8a8635197_1.jpg


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
The `seen_tokens` attribute is deprecated and will be removed in v4.41. Use the `cache_position` model input instead.
The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


[Actions] 1921c65e-38c4-4aec-a298-f7d8a8635197_1.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.



[Caption] 1921c65e-38c4-4aec-a298-f7d8a8635197_2.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


[Actions] 1921c65e-38c4-4aec-a298-f7d8a8635197_2.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.



[Caption] 1921c65e-38c4-4aec-a298-f7d8a8635197_3.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


[Actions] 1921c65e-38c4-4aec-a298-f7d8a8635197_3.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.



[Caption] 1921c65e-38c4-4aec-a298-f7d8a8635197_4.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


[Actions] 1921c65e-38c4-4aec-a298-f7d8a8635197_4.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.



[Caption] 1921c65e-38c4-4aec-a298-f7d8a8635197_5.jpg


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.
The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:151644 for open-end generation.


[Actions] 1921c65e-38c4-4aec-a298-f7d8a8635197_5.jpg
CPU times: user 24.8 s, sys: 1.36 s, total: 26.1 s
Wall time: 24.7 s
