## Windows omni_env

## 00 Install & Import Libraries

In [None]:
import torch
import json
import random
import io
import ast
import xml.etree.ElementTree as ET
import os.path as osp
import math
import numpy as np
import base64

from icecream import ic
from transformers import (
    Qwen2_5_VLForConditionalGeneration,
    AutoTokenizer,
    AutoProcessor,
    BitsAndBytesConfig,
    TextStreamer
)
from qwen_vl_utils import process_vision_info, smart_resize
from PIL import Image, ImageDraw, ImageFont, ImageColor
from IPython.display import display, Markdown
from matplotlib import pyplot as plt

from qwen_agent.llm.fncall_prompts.nous_fncall_prompt import (
    NousFnCallPrompt,
    Message,
    ContentItem,
)
from utils.agent_function_call import MobileUse

## 01 Import Model

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

bnb_config = BitsAndBytesConfig(
    load_in_4bit = True,
    bnb_4bit_quant_type = 'nf4',
    bnb_4bit_compute_dtype = torch.float16,
    bnb_4bit_use_double_quant = True,
)

In [None]:
model_path = './00_Model/Qwen2.5-VL-3B-Instruct'

model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    model_path,
    quantization_config = bnb_config,
    device_map = 'auto',
).to(device) #''
processor = AutoProcessor.from_pretrained(model_path)

## 02 Define Inference Function

In [None]:
def draw_point(image: Image.Image, point: list, color = None):
    from copy import deepcopy
    if isinstance(color, str):
        try:
            color = ImageColor.getrgb(color)
            color = color + (128,)  
        except ValueError:
            color = (255, 0, 0, 128)  
    else:
        color = (255, 0, 0, 128)  
 
    overlay = Image.new('RGBA', image.size, (255, 255, 255, 0))
    overlay_draw = ImageDraw.Draw(overlay)
    radius = min(image.size) * 0.05
    x, y = point

    overlay_draw.ellipse(
        [(x - radius, y - radius), (x + radius, y + radius)],
        fill=color  # Red with 50% opacity
    )

    image = image.convert('RGBA')
    combined = Image.alpha_composite(image, overlay)

    return combined.convert('RGB')

In [None]:
def mobile_agent(user_query, screenshot):
    # The resolution of the device will be written into the system prompt. 
    dummy_image = Image.open(screenshot)
    resized_height, resized_width  = smart_resize(dummy_image.height,
        dummy_image.width,
        factor = processor.image_processor.patch_size * processor.image_processor.merge_size,
        min_pixels = processor.image_processor.min_pixels,
        max_pixels = processor.image_processor.max_pixels,)

    mobile_use = MobileUse(
        cfg = {'display_width_px' : resized_width, 'display_height_px' : resized_height}
    )

    # Build messages
    prompt_builder = NousFnCallPrompt()
    message = prompt_builder.preprocess_fncall_messages(
        messages = [
            Message(role = 'system', content = [ContentItem(text = 'You are a helpful assistant.')]),
            Message(role = 'user', content = [
                ContentItem(text = user_query),
                ContentItem(image = f"file://{screenshot}")
            ]),
        ],
        functions = [mobile_use.function],
        lang = None,
    )
    message = [msg.model_dump() for msg in message]

    text = processor.apply_chat_template(message, tokenize = False, add_generation_prompt = True)
    print('text', text)
    inputs = processor(text = [text], images = [dummy_image], padding = True, return_tensors = 'pt').to('cuda')

    streamer = TextStreamer(processor.tokenizer, skip_special_tokens = True, skip_prompt = True)
    
    output_ids = model.generate(**inputs, max_new_tokens = 2048, streamer = streamer)
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(inputs.input_ids, output_ids)]
    output_text = processor.batch_decode(generated_ids, skip_special_tokens = True, clean_up_tokenization_spaces = True)[0]
    print(output_text)

    # Qwen will perform action thought function call
    action = json.loads(output_text.split('<tool_call>\n')[1].split('\n</tool_call>')[0])

    # As an example, we visualize the "click" action by draw a green circle onto the image.
    display_image = dummy_image.resize((resized_width, resized_height))
    if action['arguments']['action'] == 'click':
        display_image = draw_point(dummy_image, action['arguments']['coordinate'], color = 'green')
        display(display_image)
    else:
        display(display_image)

## 03 Run Inference

### 03.01 Mobile Use

In [None]:
%%time

screenshot = './00_Dataset/mobile_en_example.png'

# The operation history can be orgnized by Step x: [action]; Step x+1: [action]...
user_query = 'The user query:  Open the file manager app and view the au_uu_SzH3yR2.mp3 file in MUSIC Folder\nTask progress (You have done the following operation on the current device): Step 1: {"name": "mobile_use", "arguments": {"action": "open", "text": "File Manager"}}; '

mobile_agent(user_query, screenshot)

### 03.02 Chinese App & Query

In [None]:
%%time

screenshot = './00_Dataset/mobile_zh_example.jpg'

# The operation history can be orgnized by Step x: [action]; Step x+1: [action]...
user_query = 'The user query:在盒马中,打开购物车，结算（到付款页面即可） (You have done the following operation on the current device):'

mobile_agent(user_query, screenshot)