In [None]:
%%capture
!pip install gradio transformers torch
!pip install -U bitsandbytes

In [None]:
import gradio as gr
import torch
import json
import re
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

# Đảm bảo CUDA khả dụng
device = "cuda" if torch.cuda.is_available() else "cpu"


bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.float16,
)

# Khởi tạo model và đẩy lên CUDA
hammer_tokenizer = AutoTokenizer.from_pretrained("beyoru/Calling")
hammer_model = AutoModelForCausalLM.from_pretrained(
    "beyoru/Calling",
    torch_dtype=torch.float16
).to(device)

# Khởi tạo model Qwen và đẩy lên CUDA
qwen_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
qwen_model = AutoModelForCausalLM.from_pretrained(
    "Qwen/Qwen2.5-7B-Instruct",
    # torch_dtype=torch.float16
    quantization_config=bnb_config,

).to(device)

tools = [
    {
        "name": "send_message",
        "description": "Send a message to other user",
        "parameters": {
            "type": "object",
            "properties": {
                "recipient": {"type": "string", "description": "Username of recipient"},
                "message": {"type": "string", "description": "Message content"},
            },
            "required": ["recipient", "message"]
        }
    },
    {
        "name": "update_profile",
        "description": "Update user profile information such as name, username, bio and email",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {"type": "string", "description": "New name of the user"},
                "username": {"type": "string", "description": "New username"},
                "email": {"type": "string", "description": "New email address"},
                "bio": {"type": "string", "description": "New bio text"},
            },
            "required": []
        }
    }
]

def parse_timer_command(message):
    """Phân tích câu lệnh hẹn giờ"""
    if "sau" in message and "phút" in message:
        try:
            parts = message.split("sau")[1].split("phút")[0].strip()
            minutes = int(parts)
            return minutes
        except:
            return 0
    return 0

def extract_json_from_response(response):
    """Cố gắng trích xuất JSON từ response"""
    try:
        # Tìm các khối JSON trong response
        json_str = re.search(r'```(?:json)?\n?(.*?)\n?```', response, re.DOTALL)
        if json_str:
            return json.loads(json_str.group(1).strip())

        # Thử parse trực tiếp nếu không có markdown
        return json.loads(response.strip())
    except:
        return None

SYSTEM_MEM = """\
You are Mem a assistant live in a social media flatform your task is assistant user for their request
"""
def chat_with_qwen(messages):
    """Xử lý hội thoại với Qwen khi Hammer không trả về kết quả phù hợp"""
    # Chuẩn bị messages cho Qwen
    # qwen_messages = []
    # for msg in messages:
    #     if msg["role"] == "user":
    #         qwen_messages.append({"role": "user", "content": msg["content"]})
    #     else:
    #         qwen_messages.append({"role": "assistant", "content": msg["content"]})


    # Generate response
    inputs = qwen_tokenizer.apply_chat_template(
        messages,
        add_generation_prompt=True,
        return_tensors="pt"
    ).to(qwen_model.device)

    outputs = qwen_model.generate(inputs, max_new_tokens=512)
    response = qwen_tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)

    return response


def chat_with_ai(message, history):
    messages = [{"role": "system", "content": SYSTEM_MEM}]
    
    # Thêm lịch sử chat
    for user_msg, ai_msg in history:
        messages.append({"role": "user", "content": user_msg})
        if ai_msg:
            try:
                json.loads(ai_msg)  # Kiểm tra nếu là tool call
                messages.append({"role": "assistant", "content": ai_msg, "tool_calls": json.loads(ai_msg)})
            except:
                messages.append({"role": "assistant", "content": ai_msg})

    # Thêm tin nhắn mới
    messages.append({"role": "user", "content": message})

    # Thử dùng Hammer model
    try:
        hammer_inputs = hammer_tokenizer.apply_chat_template(
            [{"role": "user", "content": message}],
            tools=tools,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt"
        ).to(hammer_model.device)

        hammer_outputs = hammer_model.generate(**hammer_inputs, max_new_tokens=256)
        hammer_response = hammer_tokenizer.decode(hammer_outputs[0][hammer_inputs["input_ids"].shape[1]:], skip_special_tokens=True)
        
        print("Hammer response:", hammer_response)  # Debug

        # Parse tool calls
        tool_calls = extract_json_from_response(hammer_response)
        if tool_calls:
            if isinstance(tool_calls, dict):
                tool_calls = [tool_calls]
            
            # Kiểm tra tool calls hợp lệ
            valid_calls = []
            for call in tool_calls:
                if isinstance(call, dict) and call.get("name") in ["send_message", "respond"]:
                    valid_calls.append(call)
            
            if valid_calls:
                return json.dumps(valid_calls, ensure_ascii=False)
    
    except Exception as e:
        print("Hammer error:", e)  # Debug

    # Nếu Hammer không trả về kết quả hợp lệ, dùng Qwen
    print("Falling back to Qwen")  # Debug
    qwen_response = chat_with_qwen(messages)
    return qwen_response

with gr.Blocks() as demo:
    gr.Markdown("# 🤖 AI Assistant với Hammer và Qwen")
    chatbot = gr.Chatbot(height=500)
    msg = gr.Textbox(label="Nhập tin nhắn")
    clear = gr.Button("Xóa lịch sử")

    def respond(message, chat_history):
        bot_message = chat_with_ai(message, chat_history)
        chat_history.append((message, bot_message))  # Lưu lịch sử trò chuyện
        return "", chat_history

    msg.submit(respond, [msg, chatbot], [msg, chatbot])
    clear.click(lambda: None, None, chatbot, queue=False)

demo.launch(share=True, debug=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/7.16k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/605 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/613 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/912 [00:00<?, ?B/s]

Unrecognized keys in `rope_scaling` for 'rope_type'='yarn': {'original_max_position_embeddings'}


model.safetensors.index.json:   0%|          | 0.00/35.6k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.96G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/1.21G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/7.30k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/661 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/35.6k [00:00<?, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/3.97G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/242 [00:00<?, ?B/s]

  chatbot = gr.Chatbot(height=500)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://a45ffc66e649db6c60.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.


In [None]:
import gradio as gr
import torch
import json
import re
import ast
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

# Device configuration
device = "cuda" if torch.cuda.is_available() else "cpu"

# Quantization configuration
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.float16,
)

# # Initialize models
# hammer_tokenizer = AutoTokenizer.from_pretrained("beyoru/Calling")
# hammer_model = AutoModelForCausalLM.from_pretrained(
#     "beyoru/Calling",
#     torch_dtype=torch.float16,
#     device_map="auto"
# )

# qwen_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
# qwen_model = AutoModelForCausalLM.from_pretrained(
#     "Qwen/Qwen2.5-7B-Instruct",
#     quantization_config=bnb_config,
#     device_map="auto"
# )

# Tool definitions
tools = [
    {
        "name": "send_message",
        "description": "Send a message to other user",
        "parameters": {
            "type": "object",
            "properties": {
                "recipient": {"type": "string", "description": "Username of recipient"},
                "message": {"type": "string", "description": "Message content"},
            },
            "required": ["recipient", "message"]
        }
    },
    {
        "name": "update_profile",
        "description": "Update user profile information such as name, username, bio and email",
        "parameters": {
            "type": "object",
            "properties": {
                "name": {"type": "string", "description": "New name of the user"},
                "username": {"type": "string", "description": "New username"},
                "email": {"type": "string", "description": "New email address"},
                "bio": {"type": "string", "description": "New bio text"},
            },
            "required": []
        }
    }
]

SYSTEM_MEM = """\
Bạn là Mem một trợ lý ảo trên một nền tảng mạng xã hội. Nhiệm vụ của bạn là giao tiếp với người dùng và giải đáp các thắc mắc phổ biến.

Bạn vui tính hoạt bát và luôn vui vẻ, hòa đồng sử dụng từ ngữ vui vẻ
"""

def extract_json_from_response(response):
    """Robust JSON extraction that handles various response formats"""
    try:
        # First try to extract from code blocks
        json_match = re.search(r'```(?:json)?\n?(.*?)\n?```', response, re.DOTALL)
        if json_match:
            json_str = json_match.group(1).strip()
            # Handle both proper JSON and Python-style dictionaries
            try:
                return json.loads(json_str)
            except json.JSONDecodeError:
                try:
                    return ast.literal_eval(json_str)
                except:
                    return None
        
        # Try direct JSON parsing
        try:
            return json.loads(response.strip())
        except json.JSONDecodeError:
            # Try Python literal eval for dictionary-like strings
            try:
                return ast.literal_eval(response.strip())
            except:
                return None
    except Exception as e:
        print(f"JSON extraction error: {e}")
        return None

def chat_with_qwen(messages):
    """Chat with Qwen as fallback"""
    try:
        inputs = qwen_tokenizer.apply_chat_template(
            messages,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(qwen_model.device)

        outputs = qwen_model.generate(
            inputs,
            max_new_tokens=512,
            do_sample=True,
            temperature=0.7,
            top_p=0.9
        )
        response = qwen_tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
        return response.split("<|im_end|>")[0].strip()
    except Exception as e:
        print(f"Qwen error: {e}")
        return "Xin lỗi, tôi gặp sự cố khi xử lý yêu cầu của bạn."

def is_valid_tool_call(tool_call, tools):
    """Validate a tool call against the tools definition"""
    if not isinstance(tool_call, dict):
        return False
    
    tool_name = tool_call.get("name")
    if tool_name not in [t["name"] for t in tools]:
        return False
    
    # Find the tool definition
    tool_def = next((t for t in tools if t["name"] == tool_name), None)
    if not tool_def:
        return False
    
    # Check required parameters
    required_params = tool_def["parameters"].get("required", [])
    arguments = tool_call.get("arguments", {})
    
    if isinstance(arguments, str):
        try:
            arguments = json.loads(arguments)
        except:
            try:
                arguments = ast.literal_eval(arguments)
            except:
                return False
    
    return all(param in arguments for param in required_params)

def chat_with_ai(message, history):
    messages = [{"role": "system", "content": SYSTEM_MEM}]

    # Add chat history
    for user_msg, ai_msg in history:
        messages.append({"role": "user", "content": user_msg})
        if ai_msg:
            try:
                parsed = json.loads(ai_msg)
                if isinstance(parsed, (dict, list)) and any(is_valid_tool_call(call, tools) for call in ([parsed] if isinstance(parsed, dict) else parsed)):
                    messages.append({
                        "role": "assistant",
                        "content": ai_msg,
                        "tool_calls": parsed if isinstance(parsed, list) else [parsed]
                    })
                else:
                    messages.append({"role": "assistant", "content": ai_msg})
            except:
                messages.append({"role": "assistant", "content": ai_msg})

    messages.append({"role": "user", "content": message})

    try:
        # Try Hammer first
        hammer_inputs = hammer_tokenizer.apply_chat_template(
            messages,
            tools=tools,
            add_generation_prompt=True,
            return_dict=True,
            return_tensors="pt"
        ).to(hammer_model.device)

        hammer_outputs = hammer_model.generate(
            **hammer_inputs,
            max_new_tokens=512,
            do_sample=True,
            temperature=0.7
        )
        hammer_response = hammer_tokenizer.decode(
            hammer_outputs[0][hammer_inputs["input_ids"].shape[1]:],
            skip_special_tokens=True
        )

        print("Hammer raw response:", repr(hammer_response))  # Debug raw response

        # Parse tool calls
        tool_calls = extract_json_from_response(hammer_response)
        print("Parsed tool calls:", tool_calls)  # Debug parsed tool calls

        if tool_calls:
            # Normalize to list
            if isinstance(tool_calls, dict):
                tool_calls = [tool_calls]
            elif not isinstance(tool_calls, list):
                tool_calls = []

            # Validate tool calls
            valid_calls = [call for call in tool_calls if is_valid_tool_call(call, tools)]
            print("Valid tool calls:", valid_calls)  # Debug valid calls

            if valid_calls:
                return json.dumps(valid_calls, ensure_ascii=False)

    except Exception as e:
        print(f"Hammer error: {e}")

    # Fall back to Qwen if Hammer fails or no valid tool calls
    print("Falling back to Qwen")
    return chat_with_qwen(messages)

# Gradio interface
with gr.Blocks() as demo:
    gr.Markdown("# 🤖 AI Assistant với Hammer và Qwen")
    chatbot = gr.Chatbot(height=500)
    msg = gr.Textbox(label="Nhập tin nhắn", placeholder="Nhập tin nhắn của bạn ở đây...")
    clear = gr.Button("Xóa lịch sử")

    def respond(message, chat_history):
        bot_message = chat_with_ai(message, chat_history)
        chat_history.append((message, bot_message))
        return "", chat_history

    msg.submit(respond, [msg, chatbot], [msg, chatbot])
    clear.click(lambda: None, None, chatbot, queue=False)

demo.launch(share=True, debug=True)