In [1]:
!pip install Requests==2.32.3 transformers==4.51.3 qwen-vl-utils==0.0.11 bitsandbytes==0.45.5 accelerate==1.6.0 python-dotenv gradio sentencepiece Pillow requests einops #torch

Collecting transformers==4.51.3
  Downloading transformers-4.51.3-py3-none-any.whl.metadata (38 kB)
Collecting qwen-vl-utils==0.0.11
  Downloading qwen_vl_utils-0.0.11-py3-none-any.whl.metadata (6.3 kB)
Collecting bitsandbytes==0.45.5
  Downloading bitsandbytes-0.45.5-py3-none-manylinux_2_24_x86_64.whl.metadata (5.0 kB)
Collecting accelerate==1.6.0
  Downloading accelerate-1.6.0-py3-none-any.whl.metadata (19 kB)
Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Collecting av (from qwen-vl-utils==0.0.11)
  Downloading av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch<3,>=2.0->bitsandbytes==0.45.5)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch<3,>=2.0->bitsandbytes==0.45.5)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x8

In [None]:
import gradio as gr
import logging
import re
from PIL import Image
import requests
from io import BytesIO
import base64
import os
import uuid
import json
import time
from threading import Thread, Lock
import csv
import datetime
import torch
from transformers import AutoProcessor, TextIteratorStreamer, Qwen2_5_VLForConditionalGeneration
import numpy # For qwen_model_handler

# --- Global Configuration ---
LOGGING_LEVEL = os.getenv("LOGGING_LEVEL", "INFO").upper()
FLASK_PORT = int(os.getenv("FLASK_PORT", 7860))
FLASK_DEBUG = os.getenv("FLASK_DEBUG", "True").lower() == 'true'
PRELOAD_DEFAULT_MODEL = os.getenv("PRELOAD_DEFAULT_MODEL", "false").lower() == "true"

# --- Master User Credentials (Hardcoded for this request) ---
# IMPORTANT: For production, use environment variables for these!
MASTER_USERNAME = "sakib@xyz.com"
MASTER_PASSWORD = "master"


# --- Model Configurations ---
MODEL_CONFIGS = [
    {
        "id": "unsloth/Qwen2.5-VL-3B-Instruct-unsloth-bnb-4bit",
        "display_name": "Qwen2.5-VL 3B (Instruct, 4-bit Unsloth)",
    },
    {
        "id": "unsloth/Qwen2.5-VL-7B-Instruct-unsloth-bnb-4bit",
        "display_name": "Qwen2.5-VL 7B (Instruct, 4-bit Unsloth)",
    },
    {
        "id": "unsloth/Qwen2.5-VL-32B-Instruct-unsloth-bnb-4bit",
        "display_name": "Qwen2.5-VL 32B (Instruct, 4-bit Unsloth)",
    }
]

DEFAULT_MODEL_CONFIG = MODEL_CONFIGS[0]
DEFAULT_MODEL_ID = DEFAULT_MODEL_CONFIG["id"]
DEFAULT_MODEL_DISPLAY_NAME = DEFAULT_MODEL_CONFIG["display_name"]

AVAILABLE_MODEL_IDS = [config["id"] for config in MODEL_CONFIGS]
AVAILABLE_MODEL_DISPLAY_NAMES = [config["display_name"] for config in MODEL_CONFIGS]
MODEL_DISPLAY_NAME_TO_ID_MAP = {config["display_name"]: config["id"] for config in MODEL_CONFIGS}

MAX_HISTORY_LENGTH = int(os.getenv("MAX_HISTORY_LENGTH", 10))
DEFAULT_TEMPERATURE = float(os.getenv("DEFAULT_TEMPERATURE", 0.1))

DEFAULT_LOAD_PARAMS = {
    "load_in_4bit": True,
    "bnb_4bit_quant_type": os.getenv("DEFAULT_BNB_4BIT_QUANT_TYPE", "nf4"),
    "bnb_4bit_use_double_quant": os.getenv("DEFAULT_BNB_4BIT_USE_DOUBLE_QUANT", "true").lower() == 'true',
    "bnb_4bit_compute_dtype_str": os.getenv("DEFAULT_BNB_4BIT_COMPUTE_DTYPE", "auto"),
    "llm_int8_enable_fp32_cpu_offload": os.getenv("DEFAULT_LLM_INT8_ENABLE_FP32_CPU_OFFLOAD", "true").lower() == 'true',
}

logging.basicConfig(level=getattr(logging, LOGGING_LEVEL, logging.INFO),
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("gradio_qwen_vl_app")

LOGS_DIRECTORY = os.path.join(os.getcwd(), 'logs')
if not os.path.exists(LOGS_DIRECTORY):
    try: os.makedirs(LOGS_DIRECTORY); logger.info(f"Created logs directory: {LOGS_DIRECTORY}")
    except OSError as e: logger.error(f"Failed to create logs directory {LOGS_DIRECTORY}: {e}", exc_info=True)

USERS_FILE = os.path.join(os.getcwd(), 'users.csv')
USAGE_LOG_FILE = os.path.join(LOGS_DIRECTORY, 'usage_log_gradio.csv')
USAGE_LOG_HEADERS = [
    "timestamp", "username", "request_uuid", "model_id",
    "temperature", "system_prompt_key_used", "prompt_text", "image_provided",
    "image_details", "ai_response_length", "ai_response_preview", "ttft_ms",
    "generation_time_ms", "total_stream_handler_time_ms", "error_message",
    "load_params_used"
]
usage_log_lock = Lock()
loaded_users = {}

DEFAULT_USERS_CSV_CONTENT = """id,name,email,password,role
1,Admin User,admin,adminpass,admin
2,Test User,user,userpass,user
"""

def load_users_csv():
    global loaded_users
    if not os.path.exists(USERS_FILE):
        logger.info(f"{USERS_FILE} not found. Creating with default users.")
        try:
            with open(USERS_FILE, 'w', newline='', encoding='utf-8') as f:
                f.write(DEFAULT_USERS_CSV_CONTENT)
            logger.info(f"Created default {USERS_FILE}")
        except Exception as e:
            logger.error(f"Failed to create default {USERS_FILE}: {e}")
            return # Return if file creation fails

    # Reset loaded_users before attempting to load, in case this is a reload
    current_loaded_users = {}
    logger.info(f"Attempting to load users from {USERS_FILE}...")
    try:
        with open(USERS_FILE, mode='r', newline='', encoding='utf-8-sig') as f:
            reader = csv.DictReader(f)
            for row in reader:
                username = row.get('email', '').strip()
                password = row.get('password', '').strip()
                if username and password:
                    current_loaded_users[username.lower()] = {"password": password, "name": row.get('name', username)}
                else:
                    logger.warning(f"  Skipped user row due to missing username/password: {row}")
        loaded_users = current_loaded_users # Assign only after successful load
        logger.info(f"Loaded {len(loaded_users)} users from CSV.")
    except Exception as e:
        logger.error(f"Error loading users from {USERS_FILE}: {e}", exc_info=True)
        # Do not clear loaded_users here, might still have previously loaded ones if this is a failed reload


def gradio_auth_fn(username, password):
    # 1. Check for Master User
    if username.lower() == MASTER_USERNAME.lower() and password == MASTER_PASSWORD:
        logger.info(f"Master user '{username}' authenticated successfully.")
        return True

    # 2. If not master user, proceed with CSV-based authentication
    # Ensure CSV users are loaded if not already.
    # Note: `loaded_users` might be empty if CSV loading failed or CSV is empty.
    if not loaded_users and os.path.exists(USERS_FILE): # Attempt to load if empty and file exists
        load_users_csv()

    user_info = loaded_users.get(username.lower())
    if user_info and user_info["password"] == password:
        logger.info(f"User '{username}' from CSV authenticated successfully.")
        return True

    logger.warning(f"Authentication failed for user '{username}'. User info found in CSV: {bool(user_info)}")
    return False


def init_usage_log_csv():
    with usage_log_lock:
        if not os.path.exists(USAGE_LOG_FILE):
            try:
                with open(USAGE_LOG_FILE, mode='w', newline='', encoding='utf-8') as f:
                    csv.writer(f).writerow(USAGE_LOG_HEADERS)
            except Exception as e: logger.error(f"Failed to create usage log file {USAGE_LOG_FILE}: {e}", exc_info=True)

def log_usage_data_gradio(**kwargs):
    with usage_log_lock:
        row_to_write = [kwargs.get(header, "") for header in USAGE_LOG_HEADERS]
        try:
            with open(USAGE_LOG_FILE, mode='a', newline='', encoding='utf-8') as f:
                csv.writer(f).writerow(row_to_write)
        except Exception as e: logger.error(f"Failed to write to usage log {USAGE_LOG_FILE}: {e}", exc_info=True)

logger_qwen_vl_utils = logging.getLogger(__name__ + ".qwen_vl_utils")

def _load_image_from_url(url):
    try:
        if url.startswith('data:image'):
            header, encoded = url.split(',', 1)
            image_data = base64.b64decode(encoded)
            img = Image.open(BytesIO(image_data))
        elif url.startswith('http://') or url.startswith('https://'):
            response = requests.get(url, stream=True, timeout=15)
            response.raise_for_status()
            img = Image.open(BytesIO(response.content))
        else: # Assume local file path
            if os.path.exists(url):
                img = Image.open(url)
            else:
                logger_qwen_vl_utils.warning(f"Local file not found or unsupported scheme: {url[:60]}...")
                return None
        if img.mode != 'RGB': img = img.convert('RGB')
        return img
    except Exception as e:
        logger_qwen_vl_utils.error(f"Error loading image {url[:60]}...: {e}")
        return None

def extract_pil_images_from_messages(messages):
    pil_images = []
    if not messages: return pil_images
    for msg in messages:
        if msg.get("role") == "user" and isinstance(msg.get("content"), list):
            for item in msg["content"]:
                if item.get("type") == "image_url":
                    url_data = item.get("image_url")
                    url = url_data.get("url") if isinstance(url_data, dict) else url_data
                    if url:
                        img = _load_image_from_url(url)
                        if img: pil_images.append(img)
    return pil_images

DEFAULT_SYSTEM_PROMPTS = {
    "ocr_digit_only": "You are an expert Optical Character Recognition (OCR) assistant. Your sole task is to meticulously extract ONLY THE DIGITS (0-9) visible in the provided image. Present these digits clearly, for example, separated by spaces or newlines. If no digits are found, explicitly state 'No digits found'. Do not provide any other text, explanation, or commentary. If no image is provided, state 'Please provide an image for digit extraction.'",
    "ocr_general": "You are an expert Optical Character Recognition (OCR) assistant. Your primary task is to meticulously extract ALL text, numbers, and symbols visible in any provided image or described scene. Transcribe the text exactly as it appears. Only output the extracted text. If no image is clearly referenced or uploaded, state that you need an image or image URL to perform OCR.",
    "ocr_receipt": "You are an expert OCR assistant specializing in receipts. Extract all items, quantities, and prices. Also identify the store name, date, and total amount. Present the information in a structured format if possible.",
    "chat_general_helper": "You are a helpful AI assistant. Analyze the provided image and respond to the user's query."
}
PRIMARY_DEFAULT_SYSTEM_PROMPT_KEY = "ocr_digit_only"
PRIMARY_DEFAULT_SYSTEM_PROMPT = DEFAULT_SYSTEM_PROMPTS[PRIMARY_DEFAULT_SYSTEM_PROMPT_KEY]

logger_qwen_model_handler = logging.getLogger(__name__ + ".qwen_model_handler")
loaded_models_cache = {}

def get_dtype_from_string(dtype_str: str):
    if dtype_str == "bfloat16": return torch.bfloat16
    if dtype_str == "float16": return torch.float16
    if dtype_str == "float32": return torch.float32
    return "auto"

def get_model_and_processor(model_id: str, load_in_4bit: bool, bnb_4bit_quant_type: str, bnb_4bit_use_double_quant: bool, bnb_4bit_compute_dtype_str: str, llm_int8_enable_fp32_cpu_offload: bool):
    param_tuple = (model_id, f"4bit-{load_in_4bit}", f"quant-{bnb_4bit_quant_type}", f"doubleq-{bnb_4bit_use_double_quant}", f"compute-{bnb_4bit_compute_dtype_str}", f"offload-{llm_int8_enable_fp32_cpu_offload}")
    cache_key = "_".join(param_tuple)
    if cache_key in loaded_models_cache: return loaded_models_cache[cache_key]
    logger_qwen_model_handler.info(f"Initiating load for model '{model_id}'. Cache key: {cache_key}")
    model_kwargs = {"trust_remote_code": True}
    if torch.cuda.is_available():
        model_kwargs["device_map"] = {"": torch.cuda.current_device()}
        actual_compute_dtype = get_dtype_from_string(bnb_4bit_compute_dtype_str)
        if actual_compute_dtype == "auto": actual_compute_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
        model_kwargs.update({"load_in_4bit": True, "bnb_4bit_quant_type": bnb_4bit_quant_type, "bnb_4bit_use_double_quant": bnb_4bit_use_double_quant, "bnb_4bit_compute_dtype": actual_compute_dtype})
        if "llm_int8_enable_fp32_cpu_offload" in DEFAULT_LOAD_PARAMS: model_kwargs["llm_int8_enable_fp32_cpu_offload"] = llm_int8_enable_fp32_cpu_offload
    else:
        logger_qwen_model_handler.warning(f"CUDA NOT available. Model '{model_id}' will be loaded on CPU in float32.")
        model_kwargs.update({"device_map": "cpu", "torch_dtype": torch.float32})
    try:
        model = Qwen2_5_VLForConditionalGeneration.from_pretrained(model_id, **model_kwargs)
        processor = AutoProcessor.from_pretrained(model_id, trust_remote_code=True)
        loaded_models_cache[cache_key] = (model, processor)
        logger_qwen_model_handler.info(f"Successfully loaded: {model_id}")
        return model, processor
    except Exception as e:
        logger_qwen_model_handler.error(f"Error loading model '{model_id}': {e}", exc_info=True)
        if cache_key in loaded_models_cache: del loaded_models_cache[cache_key]
        raise e

def generate_chat_response_stream(model_id_param: str, messages_for_model, temperature, load_in_4bit: bool, bnb_4bit_quant_type: str, bnb_4bit_use_double_quant: bool, bnb_4bit_compute_dtype_str: str, llm_int8_enable_fp32_cpu_offload: bool, max_new_tokens=2048):
    model, processor = get_model_and_processor(model_id_param, load_in_4bit, bnb_4bit_quant_type, bnb_4bit_use_double_quant, bnb_4bit_compute_dtype_str, llm_int8_enable_fp32_cpu_offload)
    pil_images = extract_pil_images_from_messages(messages_for_model)
    try:
        text_prompt = processor.apply_chat_template(messages_for_model, tokenize=False, add_generation_prompt=True)
    except Exception as e: raise ValueError(f"Error preparing prompt: {e}.")
    try:
        inputs = processor(text=[text_prompt], images=pil_images or None, return_tensors="pt", padding=True).to(model.device)
    except Exception as e: raise ValueError(f"Error in processor call: {e}.")
    streamer = TextIteratorStreamer(processor.tokenizer, skip_prompt=True, skip_special_tokens=True, clean_up_tokenization_spaces=True)
    pad_token_id = processor.tokenizer.pad_token_id or processor.tokenizer.eos_token_id
    if pad_token_id is None:
        im_end_id = processor.tokenizer.convert_tokens_to_ids("<|im_end|>")
        if im_end_id != processor.tokenizer.unk_token_id: pad_token_id = im_end_id
        else: raise ValueError(f"Tokenizer for {model_id_param} missing critical token IDs.")
    eos_token_id_gen = processor.tokenizer.eos_token_id
    if isinstance(eos_token_id_gen, list):
        im_end_id = processor.tokenizer.convert_tokens_to_ids("<|im_end|>")
        eos_token_id_gen = im_end_id if im_end_id != processor.tokenizer.unk_token_id and im_end_id in eos_token_id_gen else eos_token_id_gen[0]
    if eos_token_id_gen is None: eos_token_id_gen = pad_token_id
    generation_kwargs = dict(**inputs, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=temperature > 0.01, temperature=max(temperature, 0.01), top_p=0.8 if temperature > 0.01 else None, pad_token_id=pad_token_id, eos_token_id=eos_token_id_gen)
    Thread(target=model.generate, kwargs=generation_kwargs).start()
    buffer = ""
    for chunk in streamer:
        if chunk: buffer += chunk
        if ' ' in buffer or '\n' in buffer or len(buffer) > 5: yield buffer; buffer = ""
    if buffer: yield buffer

def truncate_qwen_history(history_list: list, max_pairs: int):
    system_prompt_message = history_list[0] if history_list and history_list[0].get("role") == "system" else None
    conv_history = [m for m in history_list if m.get("role") != "system"]
    num_messages_to_keep = max_pairs * 2
    if len(conv_history) > num_messages_to_keep:
        conv_history = conv_history[-num_messages_to_keep:]
    return ([system_prompt_message] + conv_history) if system_prompt_message else conv_history

# --- Helper to prepare Qwen messages from Gradio input ---
def prepare_qwen_messages_for_model(
    gradio_message_dict: dict,
    gradio_chat_history: list,
    system_prompt_text: str,
    request_uuid: str,
    forget_history: bool,
    system_prompt_key: str
) -> tuple[list, str | None]:

    qwen_messages = [{"role": "system", "content": system_prompt_text}]
    logged_image_path_current_for_logging = None

    if not forget_history:
        for turn in gradio_chat_history:
            user_turn_dict, assistant_turn_str = turn
            user_content_parts = []
            history_user_text = user_turn_dict.get("text", "")
            history_user_files = user_turn_dict.get("files", [])

            for file_path in history_user_files:
                user_content_parts.append({"type": "image_url", "image_url": {"url": file_path}})
            if history_user_text:
                user_content_parts.append({"type": "text", "text": history_user_text})

            if user_content_parts:
                qwen_messages.append({"role": "user", "content": user_content_parts})
            if assistant_turn_str:
                 qwen_messages.append({"role": "assistant", "content": [{"type": "text", "text": assistant_turn_str}]})

    current_user_prompt_text_from_ui = gradio_message_dict.get("text", "") if gradio_message_dict else ""
    current_user_files_gr_paths = gradio_message_dict.get("files", []) if gradio_message_dict else []

    current_qwen_user_content_parts = []
    image_part_added_for_current_turn = False

    if current_user_files_gr_paths:
        gradio_temp_image_path = current_user_files_gr_paths[0]
        try:
            current_qwen_user_content_parts.append({"type": "image_url", "image_url": {"url": gradio_temp_image_path}})
            image_part_added_for_current_turn = True
            pil_image_to_log = _load_image_from_url(gradio_temp_image_path)
            if pil_image_to_log:
                temp_img_filename = f"log_img_{request_uuid}.png"
                persistent_log_path = os.path.join(LOGS_DIRECTORY, temp_img_filename)
                pil_image_to_log.save(persistent_log_path)
                logged_image_path_current_for_logging = persistent_log_path
            else:
                logger.warning(f"Could not load PIL from Gradio temp path {gradio_temp_image_path} for logging.")
        except Exception as e:
            logger.error(f"Error processing current image {gradio_temp_image_path}: {e}")
            current_qwen_user_content_parts.append({"type": "text", "text": f"[Error processing uploaded image: {e}]"})

    if current_user_prompt_text_from_ui.strip():
        current_qwen_user_content_parts.append({"type": "text", "text": current_user_prompt_text_from_ui.strip()})
    elif image_part_added_for_current_turn:
        if system_prompt_key == "ocr_digit_only":
            current_qwen_user_content_parts.append({"type": "text", "text": "ocr digit"})
            logger.info(f"User Request {request_uuid}: ocr_digit_only active with image and no UI text. Using 'ocr digit' as effective prompt.")
        else:
            current_qwen_user_content_parts.append({"type": "text", "text": "Describe the image."})
            logger.info(f"User Request {request_uuid}: Image present with no UI text. Using 'Describe the image.' as effective prompt.")

    if any(p.get("type") == "image_url" or (p.get("type") == "text" and p.get("text", "").strip()) for p in current_qwen_user_content_parts):
        qwen_messages.append({"role": "user", "content": current_qwen_user_content_parts})

    return qwen_messages, logged_image_path_current_for_logging

# --- Gradio Chat Callback Function (adapted for gr.ChatInterface) ---
def handle_chat_submit(
    gradio_message_dict: dict,
    gradio_chat_history: list,
    request: gr.Request,
    model_display_name_dd: str,
    system_prompt_key_dd: str,
    temperature_slider: float,
    forget_history_flag: bool,
    load_in_4bit_cb: bool,
    bnb_4bit_quant_type_dd: str,
    bnb_4bit_use_double_quant_cb: bool,
    bnb_4bit_compute_dtype_str_dd: str
):
    request_uuid = str(uuid.uuid4())[:8]
    username = request.username if request and request.username else "anonymous_gradio_user"
    actual_model_id = MODEL_DISPLAY_NAME_TO_ID_MAP.get(model_display_name_dd, DEFAULT_MODEL_ID)
    system_prompt_text = DEFAULT_SYSTEM_PROMPTS.get(system_prompt_key_dd, PRIMARY_DEFAULT_SYSTEM_PROMPT)

    current_text = gradio_message_dict.get("text", "") if gradio_message_dict else ""
    current_files = gradio_message_dict.get("files", []) if gradio_message_dict else []
    if not current_text.strip() and not current_files:
        logger.info(f"Request {request_uuid}: No input text or files provided.")
        yield "Please provide some input or an image."
        return

    messages_for_model, logged_image_path = prepare_qwen_messages_for_model(
        gradio_message_dict,
        gradio_chat_history,
        system_prompt_text,
        request_uuid,
        forget_history_flag,
        system_prompt_key_dd
    )

    if len(messages_for_model) < 2 or messages_for_model[-1].get("role") != "user":
        logger.warning(f"Request {request_uuid}: No valid user message constructed to send to model. messages_for_model: {messages_for_model}")
        yield "Could not form a valid message to send. Please try again with text or an image."
        return

    messages_for_model = truncate_qwen_history(messages_for_model, MAX_HISTORY_LENGTH)

    user_prompt_text_for_log = ""
    if messages_for_model and messages_for_model[-1]["role"] == "user":
        content_list = messages_for_model[-1]["content"]
        for item in content_list:
            if item["type"] == "text":
                user_prompt_text_for_log = item["text"]
                break

    current_load_params = {
        "load_in_4bit": load_in_4bit_cb, "bnb_4bit_quant_type": bnb_4bit_quant_type_dd,
        "bnb_4bit_use_double_quant": bnb_4bit_use_double_quant_cb,
        "bnb_4bit_compute_dtype_str": bnb_4bit_compute_dtype_str_dd,
        "llm_int8_enable_fp32_cpu_offload": DEFAULT_LOAD_PARAMS["llm_int8_enable_fp32_cpu_offload"]
    }
    log_payload = {
        "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "username": username,
        "request_uuid": request_uuid, "model_id": actual_model_id, "temperature": temperature_slider,
        "system_prompt_key_used": system_prompt_key_dd, "prompt_text": user_prompt_text_for_log,
        "image_provided": bool(logged_image_path), "image_details": logged_image_path or "none",
        "load_params_used": json.dumps(current_load_params)
    }

    full_ai_response = ""; stream_start_time = time.monotonic(); ttft_ms = -1; gen_time_ms = -1; first_chunk_time = None
    try:
        stream_gen = generate_chat_response_stream(
            model_id_param=actual_model_id, messages_for_model=messages_for_model, temperature=temperature_slider,
            **current_load_params
        )
        for chunk in stream_gen:
            if first_chunk_time is None: first_chunk_time = time.monotonic(); ttft_ms = round((first_chunk_time - stream_start_time) * 1000)
            if chunk:
                full_ai_response += chunk
                yield full_ai_response

        if first_chunk_time: gen_time_ms = round((time.monotonic() - first_chunk_time) * 1000)
        else:
            ttft_ms = round((time.monotonic() - stream_start_time) * 1000); gen_time_ms = 0

        if not full_ai_response:
             logger.info(f"Model returned an empty response for request {request_uuid}. This might be intended.")

        log_payload.update({
            "ai_response_length": len(full_ai_response), "ai_response_preview": full_ai_response[:200],
            "ttft_ms": ttft_ms, "generation_time_ms": gen_time_ms,
            "total_stream_handler_time_ms": round((time.monotonic() - stream_start_time) * 1000), "error_message": ""
        })
    except Exception as e:
        logger.error(f"Error during model generation for {request_uuid}: {e}", exc_info=True)
        error_msg = f"ERROR: {str(e)[:200]}"
        yield error_msg
        log_payload.update({ "error_message": str(e)[:200],
                             "total_stream_handler_time_ms": round((time.monotonic() - stream_start_time) * 1000),
                             "ttft_ms": ttft_ms if ttft_ms != -1 else log_payload["total_stream_handler_time_ms"],
                             "generation_time_ms": gen_time_ms if gen_time_ms != -1 else 0 })
    finally:
        log_usage_data_gradio(**log_payload)

# --- Gradio UI Definition ---
with gr.Blocks(theme=gr.themes.Soft(), title="Qwen-VL Chat Studio") as demo:
    gr.Markdown("# Qwen-VL Unsloth Demo Studio")
    gr.Markdown("### Instructions\nType your message and/or drop an image into the chat input. Use Shift+Enter for a new line in the input box.")

    with gr.Row():
        with gr.Column(scale=1):
            gr.Markdown("## Chat Configuration")
            model_id_dropdown = gr.Dropdown(
                AVAILABLE_MODEL_DISPLAY_NAMES, value=DEFAULT_MODEL_DISPLAY_NAME, label="Select Model"
            )
            system_prompt_dropdown = gr.Dropdown(
                list(DEFAULT_SYSTEM_PROMPTS.keys()), value=PRIMARY_DEFAULT_SYSTEM_PROMPT_KEY, label="System Prompt"
            )
            temperature_slider = gr.Slider(0.0, 2.0, value=DEFAULT_TEMPERATURE, step=0.1, label="Temperature")

            forget_history_checkbox = gr.Checkbox(
                label="Forget Previous Chat History (Context)",
                value=True,
                info="If checked, the model will not use past conversation turns for context."
            )

            with gr.Accordion("Advanced Model Loading", open=False):
                load_in_4bit_checkbox = gr.Checkbox(value=DEFAULT_LOAD_PARAMS["load_in_4bit"], label="Load in 4-bit", interactive=False)
                bnb_4bit_quant_type_dropdown = gr.Dropdown(["nf4", "fp4"], value=DEFAULT_LOAD_PARAMS["bnb_4bit_quant_type"], label="4-bit Quant Type")
                bnb_4bit_use_double_quant_checkbox = gr.Checkbox(value=DEFAULT_LOAD_PARAMS["bnb_4bit_use_double_quant"], label="Use Double Quantization")
                bnb_4bit_compute_dtype_str_dropdown = gr.Dropdown(["auto", "bfloat16", "float16", "float32"], value=DEFAULT_LOAD_PARAMS["bnb_4bit_compute_dtype_str"], label="4-bit Compute DType")

        with gr.Column(scale=1):
            chat_interface = gr.ChatInterface(
                fn=handle_chat_submit,
                additional_inputs=[
                    model_id_dropdown, system_prompt_dropdown, temperature_slider,
                    forget_history_checkbox,
                    load_in_4bit_checkbox, bnb_4bit_quant_type_dropdown,
                    bnb_4bit_use_double_quant_checkbox, bnb_4bit_compute_dtype_str_dropdown
                ],
                title=None,
                chatbot=gr.Chatbot(
                    label="Chat Window",
                    height=650,
                    show_copy_button=True,
                    avatar_images=(os.path.join(os.getcwd(),"user_avatar.png"),
                                   os.path.join(os.getcwd(),"bot_avatar.png"))
                ),
                textbox=gr.MultimodalTextbox(
                    file_types=["image"],
                    placeholder="Type your message or drop an image here... (Shift+Enter for new line)",
                    label=None
                )
            )

def preload_models_on_startup():
    if PRELOAD_DEFAULT_MODEL:
        logger.info(f"Attempting to preload default model: {DEFAULT_MODEL_ID}...")
        try:
            load_params_for_preload = { k: DEFAULT_LOAD_PARAMS[k] for k in [
                "load_in_4bit", "bnb_4bit_quant_type", "bnb_4bit_use_double_quant",
                "bnb_4bit_compute_dtype_str", "llm_int8_enable_fp32_cpu_offload"
            ]}
            get_model_and_processor(DEFAULT_MODEL_ID, **load_params_for_preload)
            logger.info(f"Default model {DEFAULT_MODEL_ID} preloading initiated/completed.")
        except Exception as e:
            logger.error(f"Failed to preload default model {DEFAULT_MODEL_ID}: {e}", exc_info=True)
    else:
        logger.info(f"PRELOAD_DEFAULT_MODEL is '{PRELOAD_DEFAULT_MODEL}'. Skipping model preloading.")

# Initialize CSVs and attempt to load users early
# This ensures `loaded_users` is populated before `gradio_auth_fn` might be called
# by Gradio framework if auth is enabled.
if not os.path.exists(LOGS_DIRECTORY):
    try: os.makedirs(LOGS_DIRECTORY); logger.info(f"Created logs directory: {LOGS_DIRECTORY}")
    except OSError as e: logger.error(f"Failed to create logs directory {LOGS_DIRECTORY}: {e}")
init_usage_log_csv() # Initialize usage log first
load_users_csv()     # Then load users (which might create users.csv)


if __name__ == '__main__':
    user_avatar_path = os.path.join(os.getcwd(), "user_avatar.png")
    bot_avatar_path = os.path.join(os.getcwd(), "bot_avatar.png")
    if not os.path.exists(user_avatar_path):
        try: Image.new('RGB', (80, 80), color = (70, 130, 180)).save(user_avatar_path)
        except Exception as e: logger.error(f"Failed to create dummy user_avatar: {e}")
    if not os.path.exists(bot_avatar_path):
        try: Image.new('RGB', (80, 80), color = (34, 139, 34)).save(bot_avatar_path)
        except Exception as e: logger.error(f"Failed to create dummy bot_avatar: {e}")

    preload_models_on_startup()
    logger.info("Starting Gradio Qwen-VL Chat App with Master User and Side-by-Side Layout...")

    # Auth configuration: Enable auth if master user is set OR if CSV users are loaded
    auth_enabled = bool(MASTER_USERNAME and MASTER_PASSWORD) or bool(loaded_users)
    auth_config_fn = gradio_auth_fn if auth_enabled else None

    if not auth_enabled:
        logger.warning("Authentication is disabled. Neither master user credentials are set nor were any users loaded from CSV.")
    else:
        logger.info(f"Authentication enabled. Master user configured: {bool(MASTER_USERNAME and MASTER_PASSWORD)}. CSV users loaded: {len(loaded_users)}")


    demo.queue().launch(
        share=True, debug=FLASK_DEBUG, auth=auth_config_fn,
        server_port=FLASK_PORT, server_name="0.0.0.0"
    )

  chatbot=gr.Chatbot(


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://6e30051a203037e385.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/4.20k [00:00<?, ?B/s]

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


model.safetensors:   0%|          | 0.00/3.79G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/238 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/575 [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


tokenizer_config.json:   0%|          | 0.00/5.80k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/11.4M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/605 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/614 [00:00<?, ?B/s]

chat_template.jinja:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

chat_template.json:   0%|          | 0.00/1.05k [00:00<?, ?B/s]