In [None]:
# 检测 chat_rag.py 文件
file_path = "../app_image.py"
!flake8 {file_path} --max-line-length=240
!pylint {file_path}

In [None]:
%%writefile ..\app.py
# pylint: disable=no-member  # Project structure requires dynamic path handling
"""
For more information on `huggingface_hub` Inference API support
please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
import os
import sys
import gradio as gr
import cv2
from pyzbar.pyzbar import decode
import numpy as np
from google import genai
from google.genai import types
from dotenv import load_dotenv

# ===== 2. 初始化配置 =====
# 获取当前文件所在目录的绝对路径
if "__file__" in globals():
    current_dir = os.path.dirname(os.path.abspath(__file__))
    root_dir = os.path.normpath(os.path.join(current_dir, ".."))
else:
    # 在 Jupyter Notebook 环境中
    current_dir = os.getcwd()
    current_dir = os.path.join(current_dir, "..")
    root_dir = os.path.normpath(os.path.join(current_dir))

current_dir = os.path.normpath(current_dir)
sys.path.append(current_dir)

with open(
    os.path.join(current_dir, "system_role_prompt.md"), "r", encoding="utf-8"
) as f:
    system_role = f.read()


BEGIN_PROMPT = """
总结一下最新的内容
"""


CONFIRM_PROMPT = """
对比一下两个版本的差异
"""

load_dotenv(dotenv_path=os.path.join(current_dir, ".env"))  # current_dir + "\.env")
api_key = os.getenv("GEMINI_API_KEY")
gemini_client = None
if api_key:
    gemini_client = genai.Client(api_key=api_key)

MODEL_NAME = "gemini-2.0-flash-exp"


# Common safety settings for all requests
def get_safety_settings():
    return [
        types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"),
        types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"),
        types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"),
        types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"),
        types.SafetySetting(category="HARM_CATEGORY_CIVIC_INTEGRITY", threshold="OFF")
    ]


def format_content(role: str, text: str) -> types.Content:
    """格式化单条消息内容"""
    return types.Content(
        role=role,
        parts=[types.Part(text=text)]
    )


def respond(
    message,
    history: list[tuple[str, str]],
    use_system_message,
):
    # 构建对话历史
    def build_contents(message=None, before_message=None):
        contents = []
        for val in history:
            if val["content"] == "开始":
                context = BEGIN_PROMPT
            elif val["content"] == "确认":
                context = CONFIRM_PROMPT
            else:
                context = val["content"]
            contents.append(format_content(
                val["role"],
                context
            ))

        if before_message:
            contents.append(format_content(
                "assistant",
                before_message
            ))

        if message:
            contents.append(format_content(
                "user",
                message
            ))
        return contents
    if message == "开始" and not history:
        message = BEGIN_PROMPT

    if message == "确认" and len(history) == 2:
        message = CONFIRM_PROMPT

    if message:
        # 处理普通消息
        contents = build_contents(message)
        response = ""
        if use_system_message:
            config = types.GenerateContentConfig(
                system_instruction=system_role,
                safety_settings=get_safety_settings(),
            )
        else:
            config = types.GenerateContentConfig(
                safety_settings=get_safety_settings(),
            )
        for chunk in gemini_client.models.generate_content_stream(
            model=MODEL_NAME,
            contents=contents,
            config=config
        ):
            if chunk.text:  # Check if chunk.text is not None
                response += chunk.text
                yield response


def get_gradio_version():
    return gr.__version__


game_state = {"gr_version": get_gradio_version()}  # 示例 game_state


def process_qr_frame(frame):
    """处理视频帧，检测和解码二维码"""
    if frame is None:
        return frame, game_state

    # 转换图像格式确保兼容性
    if isinstance(frame, np.ndarray):
        img = frame.copy()
    else:
        img = np.array(frame).copy()

    # 检测二维码
    decoded_objects = decode(img)

    if decoded_objects:
        # 获取二维码数据
        qr_data = decoded_objects[0].data.decode('utf-8')

        # 解析二维码数据
        qr_info = {"设定": qr_data}

        game_state.update(qr_info)

        # 在图像上绘制识别框和状态
        points = decoded_objects[0].polygon
        if points:
            # 绘制绿色边框表示成功识别
            pts = np.array(points, np.int32)
            pts = pts.reshape((-1, 1, 2))
            cv2.polylines(img, [pts], True, (0, 255, 0), 2)

            # 添加文本显示已更新
            cv2.putText(img, "QR Code Updated!", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    return img, game_state


def export_chat(history):
    export_string = ""
    for item in history:
        user_msg = item[0]['content']
        bot_msg = item[1]
        export_string += f"User: {user_msg}\nBot: {bot_msg}\n\n"
    return export_string


def get_chat_history(chatbot_component):
    return chatbot_component.load_history()


with gr.Blocks(theme="soft") as demo:
    if gemini_client:
        chatbot = gr.ChatInterface(
            respond,
            title="知识助理",
            type="messages",
            additional_inputs=[
                gr.Checkbox(value=False, label="Use system message"),
            ],

        )
    else:
        gr.Markdown("Gemini API key not found. Please check your .env file.")

    with gr.Accordion("查看状态", open=False):
        game_info_image = gr.Image(label="二维码设定",
                                   webcam_constraints={
                                        "video": {
                                            "facingMode": {"ideal": "environment"}
                                        }
                                    })
        output_img = gr.Image(label="识别结果")
        game_state_output = gr.JSON(value=game_state)  # 初始显示 game_state

        # 设置实时流处理
        game_info_image.upload(
            fn=process_qr_frame,
            inputs=[game_info_image],
            outputs=[output_img, game_state_output],
            show_progress=False,
        )
        # 设置实时流处理
        game_info_image.stream(
            fn=process_qr_frame,
            inputs=[game_info_image],
            outputs=[output_img, game_state_output],
            show_progress=False,
            stream_every=0.5  # 每0.5秒处理一次
        )

if __name__ == "__main__":
    cert_file = os.path.join(current_dir, "localhost+1.pem")
    key_file = os.path.join(current_dir, "localhost+1-key.pem")

    if os.path.exists(cert_file) and os.path.exists(key_file):
        demo.launch(
            server_name="0.0.0.0",
            ssl_certfile=cert_file,
            ssl_keyfile=key_file
        )
    else:
        demo.launch(server_name="0.0.0.0")


In [None]:
%%writefile ..\app_image.py
# pylint: disable=no-member  # Project structure requires dynamic path handling
"""
whisk逆向图片生成
"""
import os
import sys
import hashlib
import json
from datetime import datetime
import gradio as gr

# ===== 2. 初始化配置 =====
# 获取当前文件所在目录的绝对路径
if "__file__" in globals():
    current_dir = os.path.dirname(os.path.abspath(__file__))
    root_dir = os.path.normpath(os.path.join(current_dir, ".."))
else:
    # 在 Jupyter Notebook 环境中
    current_dir = os.getcwd()
    current_dir = os.path.join(current_dir, "..")
    root_dir = os.path.normpath(os.path.join(current_dir))

current_dir = os.path.normpath(current_dir)
sys.path.append(current_dir)

from Module.Common.scripts.llm.utils.google_whisk import (
    generate_image_base64,
    generate_caption,
    generate_image_fx,
    generate_story_board,
    DEFAULT_STYLE_PROMPT_DICT,
)


class AuthConfig:
    """认证配置"""
    def __init__(self):
        self.cookies = config.get("cookies", "")
        self.auth_token = config.get("auth_token", "")


with open(os.path.join(current_dir, "auth_config.json"), 'r', encoding='utf-8') as f:
    config = json.load(f)

auth_config = AuthConfig()

# 缓存文件路径
CACHE_DIR = "cache"
CAPTION_CACHE_FILE = os.path.join(current_dir, CACHE_DIR, "image_caption_cache.json")
STORY_CACHE_FILE = os.path.join(current_dir, CACHE_DIR, "story_prompt_cache.json")

IMAGE_CACHE_DIR = os.path.join(current_dir, CACHE_DIR, "image")
# 创建缓存目录
os.makedirs(os.path.join(current_dir, CACHE_DIR), exist_ok=True)
os.makedirs(IMAGE_CACHE_DIR, exist_ok=True)


# 加载缓存
def load_cache():
    """从文件加载缓存"""
    global image_caption_cache, story_prompt_cache

    if os.path.exists(CAPTION_CACHE_FILE):
        with open(CAPTION_CACHE_FILE, 'r', encoding='utf-8') as cache_file:
            image_caption_cache = json.load(cache_file)
    else:
        image_caption_cache = {}

    if os.path.exists(STORY_CACHE_FILE):
        with open(STORY_CACHE_FILE, 'r', encoding='utf-8') as cache_file:
            story_prompt_cache = json.load(cache_file)
    else:
        story_prompt_cache = {}


# 保存缓存
def save_cache():
    """保存缓存到文件"""
    with open(CAPTION_CACHE_FILE, 'w', encoding='utf-8') as cache_file:
        json.dump(image_caption_cache, cache_file, ensure_ascii=False, indent=2)
    with open(STORY_CACHE_FILE, 'w', encoding='utf-8') as cache_file:
        json.dump(story_prompt_cache, cache_file, ensure_ascii=False, indent=2)


# 初始加载缓存
load_cache()


def get_cached_caption(image_base64: str):
    """获取缓存的图片描述"""
    if not image_base64:
        return None

    # 直接使用base64字符串计算哈希值
    hash_key = hashlib.md5(image_base64.encode()).hexdigest()
    return image_caption_cache.get(hash_key)


def cache_caption(image_base64: str, caption: str):
    """缓存图片描述"""
    if not image_base64 or not caption:
        return

    hash_key = hashlib.md5(image_base64.encode()).hexdigest()
    image_caption_cache[hash_key] = caption
    save_cache()


def get_cached_story_prompt(caption: str, style_key: str, additional_text: str):
    """获取缓存的故事提示词"""
    if not caption:
        return None

    # 使用所有输入参数组合生成缓存键
    cache_key = hashlib.md5(f"{caption}_{style_key}_{additional_text}".encode()).hexdigest()
    return story_prompt_cache.get(cache_key)


def cache_story_prompt(caption: str, style_key: str, additional_text: str, prompt: str):
    """缓存故事提示词"""
    if not caption or not prompt:
        return

    cache_key = hashlib.md5(f"{caption}_{style_key}_{additional_text}".encode()).hexdigest()
    story_prompt_cache[cache_key] = prompt
    save_cache()


# 在 demo 定义之前添加函数
def generate_images(image_input, style_key, additional_text):
    """处理图片生成请求"""
    try:
        # 1. 如果没有上传图片，直接返回
        if image_input is None:
            return None, None

        # 直接使用文件路径生成base64
        image_base64 = generate_image_base64(image_input)

        # 2. 检查缓存中是否有caption
        caption = get_cached_caption(image_base64)
        caption_cache_used = caption is not None
        if caption is None:
            # 如果缓存中没有，则生成新的caption
            caption = generate_caption(image_base64, cookies=auth_config.cookies)
            if caption:
                cache_caption(image_base64, caption)

        # 2. 获取风格提示词
        style_prompt = DEFAULT_STYLE_PROMPT_DICT.get(style_key, "")

        # 3. 检查story prompt缓存
        final_prompt = get_cached_story_prompt(caption, style_key, additional_text)
        story_cache_used = final_prompt is not None
        if final_prompt is None:
            # 如果缓存中没有，则生成新的story prompt
            style_prompt = DEFAULT_STYLE_PROMPT_DICT.get(style_key, "")
            final_prompt = generate_story_board(
                characters=[caption] if caption else [],
                style_prompt=style_prompt,
                additional_input=additional_text,
                cookies=auth_config.cookies
            )
            if final_prompt:
                cache_story_prompt(caption, style_key, additional_text, final_prompt)

        # 打印日志
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{current_time}] 生成图片 | 素材提示词缓存: {'启用' if caption_cache_used else '未启用'} | "
              f"最终提示词缓存: {'启用' if story_cache_used else '未启用'} | "
              f"最终Prompt: \n{final_prompt}")

        # 4. 生成图片
        if final_prompt:
            image_files = generate_image_fx(
                prompt=final_prompt,
                auth_token=auth_config.auth_token,
                output_prefix=(
                    IMAGE_CACHE_DIR +
                    r"\generated_image_" +
                    datetime.now().strftime("%Y%m%d")
                ),
                image_number=2
            )
            # 获取第一张和第二张生成的图片
            first_image = image_files[0] if image_files else None
            second_image = image_files[1] if len(image_files) > 1 else None
            return first_image, second_image
        return None, None

    except Exception as e:
        print(f"Error generating images: {e}")
        return None, None


# 在 demo 定义中添加新的界面
with gr.Blocks(theme="soft") as demo:
    with gr.Row():
        # 左侧输入区域
        with gr.Column(scale=1):
            input_image = gr.Image(
                label="上传图片",
                type="filepath",
                height=300  # 限制图片显示高度
            )
            style_dropdown = gr.Dropdown(
                choices=list(DEFAULT_STYLE_PROMPT_DICT.keys()),
                value=list(DEFAULT_STYLE_PROMPT_DICT.keys())[0],
                label="选择风格"
            )
            additional_text_ui = gr.Textbox(
                label="补充提示词",
                placeholder="请输入额外的提示词...",
                lines=3
            )
            generate_btn = gr.Button("生成图片")

        # 右侧输出区域
        with gr.Column(scale=2):
            output_image1 = gr.Image(label="生成结果 1")
            output_image2 = gr.Image(label="生成结果 2")

    generate_btn.click(
        fn=generate_images,
        inputs=[input_image, style_dropdown, additional_text_ui],
        outputs=[output_image1, output_image2]
    )

if __name__ == "__main__":
    demo.launch(
        server_name="0.0.0.0",
        server_port=8765,
        ssl_verify=False,
        share=True,
        allowed_paths=[IMAGE_CACHE_DIR]
    )
