# 统一测试


In [9]:
import requests
import base64
import json
from pathlib import Path
import os
import time

class AITester:
    def __init__(self):
        self.base_url = "http://localhost:8000/AI_ALL/"
        self.session = requests.Session()
        
        # 测试资源URL
        self.test_urls = {
            'image': 'http://obs.roseyy.cn/test.jpg',
            'audio': 'https://obs.roseyy.cn/test.wav',
            'video': 'https://obs.roseyy.cn/test.mp4'
        }
        
        # 本地测试文件路径
        self.local_files = {
            'image': r'C:\Users\58300\Desktop\rag\test.jpg',
            'audio': r'C:\Users\58300\Desktop\rag\test.wav',
            'video': r'C:\Users\58300\Desktop\rag\test.mp4'
        }
        
        # 测试结果统计
        self.test_results = {
            'success': [],
            'failed': []
        }
        
        # 创建测试文件
        self.init_test_files()
    
    def init_test_files(self):
        """初始化测试文件"""
        test_dir = Path("test_files")
        test_dir.mkdir(exist_ok=True)
        
        # 创建测试文本文件
        test_txt = test_dir / "test.txt"
        if not test_txt.exists():
            with open(test_txt, 'w', encoding='utf-8') as f:
                f.write("这是一个测试文档。\n用于测试长文本理解功能。\n包含多行文本内容。")
            print(f"创建测试文本: {test_txt}")

    def test_model(self, model_name, test_type, content, file_path=None, url=None):
        """通用模型测试方法"""
        test_id = f"{model_name}-{test_type}"
        data = {
            'model': model_name,
            'text': content
        }
        
        if url:
            data['url'] = url
            
        files = None
        if file_path:
            try:
                files = {'file': open(file_path, 'rb')}
            except Exception as e:
                print(f"打开文件失败: {e}")
                self.test_results['failed'].append((test_id, str(e)))
                return
        
        print(f"\n测试 {model_name} {test_type}模型...")
        try:
            response = self.session.post(self.base_url, data=data, files=files, stream=True)
            self._process_response(response)
            self.test_results['success'].append(test_id)
        except Exception as e:
            print(f"请求错误: {e}")
            self.test_results['failed'].append((test_id, str(e)))
        finally:
            if files:
                files['file'].close()

    def _process_response(self, response):
        """处理流式响应"""
        if response.status_code == 200:
            for line in response.iter_lines():
                if line:
                    content = line.decode('utf-8')
                    type_data = content.split(':', 1)
                    if len(type_data) == 2:
                        content_type, data = type_data
                        print(f"{content_type}: {data}")
        else:
            print(f"错误: {response.status_code}")
            print(response.text)

    def test_cogview(self, prompt, size="1024x1024"):
        """测试文生图功能"""
        test_id = "cogview-4-文生图"
        endpoint = "http://localhost:8000/GLM-Cog/"
        
        payload = {
            "model": "cogview-4",
            "prompt": prompt,
            "size": size
        }
        
        print(f"\n=== 测试 GLM-Cog ===")
        try:
            response = self.session.post(endpoint, json=payload, headers={"Content-Type": "application/json"})
            result = response.json()
            
            if response.status_code == 200:
                print("生成的图片URL:")
                for idx, image in enumerate(result.get('data', [])):
                    print(f"图片 {idx + 1}: {image.get('url', '无URL')}")
                self.test_results['success'].append(test_id)
            else:
                error = result.get('error', '未知错误')
                print(f"错误: {error}")
                self.test_results['failed'].append((test_id, error))
        except Exception as e:
            print(f"请求失败: {e}")
            self.test_results['failed'].append((test_id, str(e)))

    def test_cogvideo(self, prompt, image_path=None):
        """测试视频生成功能"""
        test_id = f"cogvideox-{'图生' if image_path else '文生'}视频"
        endpoint = "http://localhost:8000/GLM-CogVideo/"
        
        payload = {
            "model": "cogvideox",
            "prompt": prompt,
            "quality": "quality",
            "with_audio": True,
            "size": "720x480",
            "fps": 30
        }

        if image_path:
            try:
                with open(image_path, 'rb') as f:
                    image_data = base64.b64encode(f.read()).decode('utf-8')
                    payload["image_url"] = f"data:image/jpeg;base64,{image_data}"
            except Exception as e:
                print(f"处理图片失败: {e}")
                self.test_results['failed'].append((test_id, str(e)))
                return

        print(f"\n=== 测试 {test_id} ===")
        try:
            response = self.session.post(endpoint, json=payload, headers={"Content-Type": "application/json"})
            if response.status_code != 200:
                error = response.json().get('error', '未知错误')
                print(f"生成请求失败: {error}")
                self.test_results['failed'].append((test_id, error))
                return

            task_id = response.json().get('task_id')
            if not task_id:
                print("未获取到任务ID")
                self.test_results['failed'].append((test_id, "未获取到任务ID"))
                return

            print(f"获取到任务ID: {task_id}")
            result = self.check_video_result(task_id, endpoint)
            
            if result:
                print(f"视频URL: {result.get('video_url')}")
                print(f"封面URL: {result.get('cover_url')}")
                self.test_results['success'].append(test_id)
            else:
                self.test_results['failed'].append((test_id, "生成失败"))
                
        except Exception as e:
            print(f"请求失败: {e}")
            self.test_results['failed'].append((test_id, str(e)))

    def check_video_result(self, task_id, endpoint, interval=20, max_retries=120):
        """检查视频生成结果"""
        for _ in range(max_retries):
            try:
                check_payload = {
                    "action": "check_status",
                    "task_id": task_id
                }
                
                response = self.session.post(endpoint, json=check_payload, headers={"Content-Type": "application/json"})
                if response.status_code != 200:
                    print(f"检查任务状态失败: {response.json().get('error', '未知错误')}")
                    continue
                    
                result = response.json()
                status = result.get('task_status')
                print(f"当前状态: {status}")
                
                if status == "SUCCESS":
                    return {
                        "video_url": result.get('video_result', [{}])[0].get('url'),
                        "cover_url": result.get('video_result', [{}])[0].get('cover_image_url')
                    }
                elif status in ["FAIL", "failed", "error"]:
                    return None
                    
                time.sleep(interval)
                
            except Exception as e:
                print(f"检查结果失败: {e}")
                time.sleep(interval)
        
        print("等待超时")
        return None

    def test_long_text(self, file_paths, question):
        """测试长文本理解"""
        test_id = "qwen-long-长文理解"
        print(f"\n=== 测试 {test_id} ===")
        
        try:
            # 读取并合并所有文件内容
            combined_text = ""
            for file_path in file_paths:
                with open(file_path, 'r', encoding='utf-8') as f:
                    combined_text += f.read() + "\n\n"
            
            # 修改消息格式
            messages = [{
                "role": "user",
                "content": question + "\n\n" + combined_text
            }]
            
            data = {
                'model': 'qwen-long',
                'messages': json.dumps(messages)
            }
            
            response = self.session.post(self.base_url, data=data, stream=True)
            if response.status_code == 200:
                print("模型回复：")
                for line in response.iter_lines(decode_unicode=True):
                    if line and line.startswith('text:'):
                        print(line[5:])
                self.test_results['success'].append(test_id)
            else:
                error = response.text
                print(f"错误: {error}")
                self.test_results['failed'].append((test_id, error))
                
        except Exception as e:
            print(f"测试失败: {e}")
            self.test_results['failed'].append((test_id, str(e)))

    def print_test_results(self):
        """打印测试结果统计"""
        print("\n=== 测试结果统计 ===")
        print(f"总计测试: {len(self.test_results['success']) + len(self.test_results['failed'])} 项")
        
        if self.test_results['success']:
            print(f"\n成功: {len(self.test_results['success'])} 项")
            print("成功的测试:")
            for test_id in self.test_results['success']:
                print(f"  - {test_id}")
        
        if self.test_results['failed']:
            print(f"\n失败: {len(self.test_results['failed'])} 项")
            print("失败的测试:")
            for test_id, error in self.test_results['failed']:
                print(f"  - {test_id}: {error}")

def main():
    tester = AITester()
    
    try:
        # 测试GLM-4
        print("\n=== 测试 GLM-4 ===")
        tester.test_model('glm-4', '对话',
                         "你好，请介绍一下你自己")

        # 测试GLM-4V
        print("\n=== 测试 GLM-4V ===")
        tester.test_model('glm-4v-plus', '图像URL',
                         "这张图片是什么内容？",
                         url=tester.test_urls['image'])
        tester.test_model('glm-4v-plus', '图像文件',
                         "描述这张图片",
                         file_path=tester.local_files['image'])

        # 测试文生图
        tester.test_cogview("一只可爱的小猫咪")
        
        # 测试视频生成
        tester.test_cogvideo("一朵盛开的红玫瑰")  # 文生视频
        tester.test_cogvideo(
            "让这张图片动起来",
            r"C:\Users\58300\Desktop\rag\test.jpg"  # 图生视频
        )
        
        # 测试GLM-4-Voice
        print("\n=== 测试 GLM-4-Voice ===")
        tester.test_model('glm-4-voice', '语音',
                         "你好，请把这段话转成语音")
        tester.test_model('glm-4-voice', '语音识别',
                         "请识别这段音频",
                         file_path=tester.local_files['audio'])

        # 测试QwenChat
        print("\n=== 测试 QwenChat ===")
        tester.test_model('qwen2.5-1.5b-instruct', '对话',
                         "你好，请介绍一下你自己")

        # 测试QwenOCR
        print("\n=== 测试 QwenOCR ===")
        tester.test_model('qwen-vl-ocr', 'OCR',
                         "识别图片中的文字",
                         file_path=tester.local_files['image'])

        # 测试Qwenomni
        print("\n=== 测试 Qwenomni ===")
        tester.test_model('qwen-omni-turbo', '多模态',
                         "这张图片是什么内容？",
                         file_path=tester.local_files['image'])

        # 测试QwenAudio
        print("\n=== 测试 QwenAudio ===")
        tester.test_model('qwen-audio-turbo-latest', '音频理解',
                         "这段音频说了什么？",
                         file_path=tester.local_files['audio'])
        
        # 测试长文本理解
        md_files = [
            r"C:\Users\58300\Desktop\rag\1.md",
            r"C:\Users\58300\Desktop\rag\2.md",
            r"C:\Users\58300\Desktop\rag\3.md"
        ]
        tester.test_long_text(md_files, "1.md的标题是，2.md的标题是，3.md的表态是")
        
    finally:
        tester.print_test_results()

if __name__ == "__main__":
    main()


=== 测试 GLM-4 ===

测试 glm-4 对话模型...
text: 你好
text: ，
text: 我是
text: 智
text: 谱
text: 清
text: 言
text: ，
text: 是
text: 清华大学
text:  K
text: EG
text:  实
text: 验
text: 室
text: 和
text: 智
text: 谱
text:  AI
text:  公司
text: 于
text:  
text: 202
text: 3
text:  年
text: 共同
text: 训练
text: 的语言
text: 模型
text: 。
text: 我的
text: 目标
text: 是通过
text: 回答
text: 用户
text: 提出
text: 的问题
text: 来
text: 帮助他们
text: 解决问题
text: 。
text: 由于
text: 我是一个
text: 计算机
text: 程序
text: ，
text: 所以
text: 我没有
text: 自我
text: 意识
text: ，
text: 也不能
text: 像
text: 人类
text: 一样
text: 感知
text: 世界
text: 。
text: 我只能
text: 通过
text: 分析
text: 我所
text: 学
text: 到的
text: 信息
text: 来
text: 回答
text: 问题
text: 。
text: 

=== 测试 GLM-4V ===

测试 glm-4v-plus 图像URL模型...

=== 测试结果统计 ===
总计测试: 1 项

成功: 1 项
成功的测试:
  - glm-4-对话


KeyboardInterrupt: 

# glm接口方法


In [7]:
from zhipuai import ZhipuAI
import os
client = ZhipuAI(api_key="98fd7efde5b04a9ab8ded261b0cd54e5.iSTkJXlCzxR1rMC9") 
response_text = ""  # 用于存储合并后的结果

response = client.chat.completions.create(
    model="glm-4-flash",
    messages=[
        {"role": "user", "content": "你是谁？"},
    ],
)

# 打印合并后的完整文本
print(response)

ModuleNotFoundError: No module named 'zhipuai'

# glm 使用url方式


In [8]:
import requests
import json

# 配置 API 密钥和 URL
api_key = "98fd7efde5b04a9ab8ded261b0cd54e5.iSTkJXlCzxR1rMC9"
url = "https://open.bigmodel.cn/api/paas/v4/chat/completions"

# 设置请求头和请求体
response = client.chat.completions.create(
    model="glm-4-flash",
    messages=[
        {"role": "user", "content": "我得了抑郁症，我该怎么办？"},
    ],
    tools=[
        {
            "type": "retrieval",
            "retrieval": {
                "knowledge_id": "1880239333915635712",
                "prompt_template": "从文档\n\"\"\"\n{{knowledge}}\n\"\"\"\n中找问题\n\"\"\"\n{{question}}\n\"\"\"\n的答案，找到答案就仅使用文档语句回答问题，找不到答案就用自身知识回答并且告诉用户该信息不是来自文档。\n不要复述问题，直接开始回答。"
            }
        }
    ],
    stream=True,
)

# 遍历响应中的每个 chunk，累加 content 到 response_text
for chunk in response:
    response_text += chunk.choices[0].delta.content

# 打印合并后的完整文本
print(response_text)

NameError: name 'client' is not defined

# 测试GLM-4

In [1]:
import requests
 
API_URL = "http://localhost:8000/GLM-4/"
HEADERS = {"Content-Type": "application/json"}
 
def simple_test(question, model="glm-4-flash"):
    payload = {
        "question": question,
        "model": model 
    }
    try:
        response = requests.post(API_URL,  json=payload, headers=HEADERS)
        return response.json()   # 直接返回原始 JSON 数据
    except Exception as e:
        return {"error": f"请求失败: {str(e)}"}  # 返回错误信息的 JSON 格式
 
if __name__ == "__main__":
    # 简单测试
    result = simple_test("你是谁")
    print(result)  # 打印返回的 JSON 数据
    
    # 测试其他模型（如需）
    # result = simple_test("描述这张图片内容", model="glm-4v")
    # print(result)

{'choices': [{'finish_reason': 'stop', 'index': 0, 'message': {'content': '我是一个人工智能助手，名为 ChatGLM，是基于清华大学 KEG 实验室和智谱 AI 公司于 2024 年共同训练的语言模型开发的。我的任务是针对用户的问题和要求提供适当的答复和支持。', 'role': 'assistant'}}], 'created': 1739764414, 'id': '20250217115332f69ffded3f984280', 'model': 'glm-4-flash', 'request_id': '20250217115332f69ffded3f984280', 'usage': {'completion_tokens': 46, 'prompt_tokens': 7, 'total_tokens': 53}}


# 测试GLM-4V


In [None]:
import requests
import base64
import os

API_URL = "http://localhost:8000/GLM-4V/"
HEADERS = {"Content-Type": "application/json"}

def test_glm4v():
    # 固定测试内容
    test_data = {
        "text": "这是一段测试文本",
        "image_url": "http://obs.roseyy.cn/test.jpg",
        "video_url": "https://obs.roseyy.cn/test.mp4",
        "image_base64_path": "C:/Users/58300/Desktop/rag/test.jpg",
        "video_base64_path": "C:/Users/58300/Desktop/rag/test.mp4"
    }

    # 构建消息内容
    content = []
    
    # 场景1: 测试图片URL-支持glm-4v-plus-0111、glm-4v-plus 、glm-4v、glm-4v-flash
    content.append({
        "type": "image_url",
        "image_url": {
            "url": test_data["image_url"]
        }
    })
    content.append({
        "type": "text",
        "text": "请描述这张图片"
    })

    # 或者场景2: 测试视频URL-支持glm-4v
    # content.append({
    #     "type": "video_url",
    #     "video_url": {
    #         "url": test_data["video_url"]
    #     }
    # })
    # content.append({
    #     "type": "text",
    #     "text": "请描述这个视频的内容"
    # })

    # 或者场景3: 测试Base64图片-支持glm-4v-plus-0111、glm-4v-plus 、glm-4v、glm-4v-flash
    # with open(test_data["image_base64_path"], "rb") as f:
    #     img_base64 = base64.b64encode(f.read()).decode('utf-8')
    # content.append({
    #     "type": "image_url",
    #     "image_url": {
    #         "url": f"data:image/jpeg;base64,{img_base64}"
    #     }
    # })
    # content.append({
    #     "type": "text",
    #     "text": "分析这张图片"
    # })


    # # 或者场景4: 测试Base64视频-支持glm-4v-plus-0111、glm-4v-plus 、glm-4v、glm-4v-flash
    # with open(test_data["video_base64_path"], "rb") as f:
    #     video_base64 = base64.b64encode(f.read()).decode('utf-8')
    # content.append({
    #     "type": "video_url",
    #     "video_url": {
    #         "url": f"data:video/mp4;base64,{video_base64}"
    #     }
    # })
    # content.append({
    #     "type": "text",
    #     "text": "请描述这个视频"
    # })

    payload = {
        "model": "glm-4v-flash",  # 随时替换模型
        "messages": [{
            "role": "user",
            "content": content
        }]
    }

    try:
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        result = response.json()
        
        print("\n=== 测试结果 ===")
        print(f"状态码: {response.status_code}")
        if response.status_code != 200:
            print(f"错误信息: {result.get('error', '未知错误')}")
        else:
            print("响应内容:")
            print(result.get('choices', [{}])[0].get('message', {}).get('content', '无内容'))
        
    except Exception as e:
        print(f"请求失败: {str(e)}")

if __name__ == "__main__":
    test_glm4v()

# 测试文生图glm-cogview

In [None]:
import requests

API_URL = "http://localhost:8000/GLM-Cog/"
HEADERS = {"Content-Type": "application/json"}

def test_cogview():
    # 测试数据
    test_cases = [
        {
            "model": "cogview-3-flash",#可用模型cogview-4；cogview-3-flash
            "prompt": "一只可爱的小猫咪",
            "size": "1024x1024"
        },
        # 可以添加更多测试用例
    ]
    
    for case in test_cases:
        print(f"\n测试用例: {case['prompt']}")
        
        payload = {
            "model": case["model"],
            "prompt": case["prompt"],
            "size": case["size"]
        }
        
        try:
            response = requests.post(API_URL, json=payload, headers=HEADERS)
            result = response.json()
            
            print("\n=== 测试结果 ===")
            print(f"状态码: {response.status_code}")
            if response.status_code != 200:
                print(f"错误信息: {result.get('error', '未知错误')}")
            else:
                print("生成的图片URL:")
                # 根据实际返回格式调整获取图片URL的方式
                images = result.get('data', [])
                for idx, image in enumerate(images):
                    print(f"图片 {idx + 1}: {image.get('url', '无URL')}")
            
        except Exception as e:
            print(f"请求失败: {str(e)}")

if __name__ == "__main__":
    test_cogview() 

# 文生视频GLM-cogvideox 测试

In [5]:
import base64
import time
import requests

API_URL = "http://localhost:8000/GLM-CogVideo/"
HEADERS = {"Content-Type": "application/json"}

def test_generate_video(prompt, image_input=None, model="cogvideox-flash"):
    """
    测试视频生成
    :param prompt: 文本描述
    :param image_input: 图片输入（可选，支持URL或本地路径）
    :param model: 模型名称
    :return: 生成的视频URL
    """
    # 处理图片输入
    image_url = None
    if image_input:
        if image_input.startswith(('http://', 'https://')):
            image_url = image_input
        else:
            # 处理本地文件
            try:
                with open(image_input, 'rb') as f:
                    image_url = f"data:image/jpeg;base64,{base64.b64encode(f.read()).decode()}"
            except Exception as e:
                print(f"处理图片失败: {e}")
                return None

    # 构建请求数据
    payload = {
        "model": model,
        "prompt": prompt,
        "image_url": image_url,
        "quality": "quality",
        "with_audio": True,
        "size": "720x480",
        "fps": 30
    }

    try:
        # 发起生成请求
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        if response.status_code != 200:
            print(f"生成请求失败: {response.json().get('error', '未知错误')}")
            return None
            
        task_id = response.json().get('task_id')
        if not task_id:
            print("未获取到任务ID")
            return None
            
        print(f"获取到任务ID: {task_id}")
        
        # 轮询检查结果
        return check_video_result(task_id)
        
    except Exception as e:
        print(f"请求失败: {e}")
        return None

def check_video_result(task_id, interval=20, max_retries=120):
    """检查视频生成结果"""
    for _ in range(max_retries):
        try:
            # 构建状态查询请求
            check_payload = {
                "action": "check_status",
                "task_id": task_id
            }
            
            response = requests.post(API_URL, json=check_payload, headers=HEADERS)
            if response.status_code != 200:
                print(f"检查任务状态失败: {response.json().get('error', '未知错误')}")
                continue
                
            result = response.json()
            status = result.get('task_status')
            print(f"当前状态: {status}")
            
            if status == "SUCCESS":
                video_result = result.get('video_result', [])
                if not video_result:
                    print("无视频结果")
                    return None
                    
                first_video = video_result[0]
                return {
                    "video_url": first_video.get('url'),
                    "cover_url": first_video.get('cover_image_url')
                }
            elif status in ["FAIL", "failed", "error"]:
                print("视频生成失败")
                return None
                
            time.sleep(interval)
            
        except Exception as e:
            print(f"检查结果失败: {e}")
            time.sleep(interval)
    
    print("等待超时")
    return None

if __name__ == "__main__":
    # # 测试用例1：仅文本生成
    # print("\n=== 测试用例1：仅文本生成 ===")
    # result1 = test_generate_video(
    #     prompt="红色玫瑰",
    #     model="cogvideox-flash"
    # )
    # print("生成结果:", result1)
    
    # # 测试用例2：图片URL生成
    # print("\n=== 测试用例2：图片URL生成 ===")
    # result2 = test_generate_video(
    #     prompt="让这朵花绽放",
    #     image_input="http://obs.roseyy.cn/test.jpg",
    #     model="cogvideox-flash"
    # )
    # print("生成结果:", result2)
    
    # 测试用例3：本地图片生成
    print("\n=== 测试用例3：本地图片生成 ===")
    result3 = test_generate_video(
        prompt="让图片动起来",
        image_input="C:/Users/58300/Desktop/rag/test.jpg",
        model="cogvideox-flash"
    )
    print("生成结果:", result3) 


=== 测试用例3：本地图片生成 ===
获取到任务ID: 36781736753330461-8960016563948002395
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable
检查任务状态失败: 'NoneType' object is not iterable


KeyboardInterrupt: 

# GLM-4-VOICE智能对话测试

In [None]:
import base64
import wave
import requests
import time
from zhipuai import ZhipuAI

API_URL = "http://localhost:8000/GLM-4-Voice/"
HEADERS = {"Content-Type": "application/json"}

def check_server():
    """检查服务器是否运行"""
    try:
        # 改用我们已有的API端点来检查
        response = requests.get("http://localhost:8000/GLM-4-Voice/")
        # 即使返回405（因为只支持POST），也说明服务器在运行
        return response.status_code in [200, 404, 405]
    except requests.exceptions.ConnectionError:
        return False

def wait_for_server(timeout=30):
    """等待服务器启动"""
    start_time = time.time()
    while time.time() - start_time < timeout:
        if check_server():
            print("服务器已就绪")
            return True
        print("等待服务器启动...")
        time.sleep(2)
    return False

def save_audio_as_wav(audio_data, filepath):
    """
    将音频数据保存为WAV文件
    :param audio_data: 音频数据（字节流）
    :param filepath: 保存路径
    """
    with wave.open(filepath, 'wb') as wav_file:
        wav_file.setnchannels(1)   # 单声道
        wav_file.setsampwidth(2)   # 每个样本2字节
        wav_file.setframerate(44100)   # 采样率44100 Hz
        wav_file.writeframes(audio_data)
    print(f"音频已保存到 {filepath}")

def test_voice_generation(prompt=None, audio_path=None, model="glm-4-voice"):
    """
    测试语音生成
    :param prompt: 文本提示
    :param audio_path: 音频文件路径（可选）
    :param model: 模型名称
    :return: 生成的文本和音频
    """
    # 检查服务器状态
    if not wait_for_server():
        print("无法连接到服务器，请确保Django服务已启动")
        return None

    # 构建消息内容
    content = []
    
    # 添加文本内容
    if prompt:
        content.append({
            "type": "text",
            "text": prompt
        })
    
    # 处理音频输入
    if audio_path:
        try:
            # 读取音频文件并转换为Base64编码
            with open(audio_path, "rb") as audio_file:
                audio_data = audio_file.read()
                audio_base64 = base64.b64encode(audio_data).decode("utf-8")
            
            # 获取音频格式
            audio_format = "wav" if audio_path.lower().endswith(".wav") else "mp3"
            
            content.append({
                "type": "input_audio",
                "input_audio": {
                    "data": audio_base64,
                    "format": audio_format
                }
            })
            
        except Exception as e:
            print(f"处理音频文件失败: {e}")
            return None
    
    # 构建请求数据
    payload = {
        "model": model,
        "messages": [{
            "role": "user",
            "content": content
        }],
        "temperature": 0.8,
        "top_p": 0.6,
        "max_tokens": 1024,
        "do_sample": True,
        "stream": False
    }
    
    try:
        # 发起请求
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        if response.status_code != 200:
            print(f"请求失败: {response.json().get('error', '未知错误')}")
            return None
        
        result = response.json()
        
        # 如果有音频数据，保存为文件
        if "choices" in result and result["choices"]:
            choice = result["choices"][0]
            if "message" in choice and "audio" in choice["message"]:
                try:
                    audio_data = base64.b64decode(choice["message"]["audio"]["data"])
                    save_audio_as_wav(audio_data, "output.wav")
                except Exception as e:
                    print(f"保存音频失败: {e}")
        
        return result
        
    except Exception as e:
        print(f"请求失败: {e}")
        return None

if __name__ == "__main__":
    # # 测试用例1：仅文本生成
    # print("\n=== 测试用例1：仅文本生成 ===")
    # result1 = test_voice_generation(
    #     prompt="给我讲个笑话",
    #     model="glm-4-voice"
    # )
    # print("生成结果:", result1)
    
    # 测试用例2：文本和音频生成
    print("\n=== 测试用例2：文本和音频生成 ===")
    result2 = test_voice_generation(
        prompt="这段话说得怎么样？",
        audio_path="D:/python代码/rvc/sound/limei/output2/limei.wav_0000000000_0000206080.wav",
        model="glm-4-voice"
    )
    print("生成结果:", result2) 

# coze_chat模型体测试

In [1]:
import requests

def test_coze_chat():
    """测试COZE对话"""
    API_URL = "http://localhost:8000/CozeChat/"
    HEADERS = {"Content-Type": "application/json"}
    
    # 测试数据 - 需要提供question和user_id
    payload = {
        "question": "你是谁？",
        "user_id": "123"  # 用户标识必须提供
        # api_token和bot_id有默认值，可以省略
    }
    
    try:
        # 发起请求
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        if response.status_code != 200:
            print(f"请求失败: {response.json().get('error', '未知错误')}")
            return None
            
        # 处理响应
        result = response.json()
        print("对话内容:", result.get("content"))
        print("Token用量:", result.get("token_count"))
        return result
        
    except Exception as e:
        print(f"请求失败: {e}")
        return None

if __name__ == "__main__":
    test_coze_chat() 

对话内容: 我是豆包呀，能陪你解答各种问题和提供各种信息呢~ 有什么事都可以问我哦。
Token用量: 1266


# Qwen大语言模型测试

In [1]:
import requests
import json
from pathlib import Path

# 测试基础配置
BASE_URL = "http://localhost:8000"
HEADERS = {"Content-Type": "application/json"}

def test_qwen_chat():
    """测试通义千问单轮对话"""
    url = f"{BASE_URL}/QwenChat/"
    
    # 获取用户输入
    content = input("请输入您的问题: ")
    system_role = input("请输入角色设定(直接回车使用默认'你是一名专业的心理医生'): ") or "你是一名专业的心理医生"
    model = input("请输入模型名称(直接回车使用默认'deepseek-r1-distill-qwen-1.5b'): ") or "deepseek-r1-distill-qwen-1.5b"
    
    # 构建请求数据
    payload = {
        "content": content,
        "system_role": system_role,
        "model": model
    }
    
    try:
        print("\n助手: ", end='', flush=True)
        response = requests.post(url, data=payload)
        for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
            if chunk:
                print(chunk, end='', flush=True)
        print("\n")
        return True
    except Exception as e:
        print(f"请求失败: {e}")
        return False

def test_qwen_chat_file():
    """测试通义千问长文本对话"""
    url = f"{BASE_URL}/QwenChatFile/"
    
    # 获取用户输入
    file_path = input("请输入文件路径: ")
    question = input("请输入您的问题: ")
    # 长文本对话固定使用qwen-long模型
    
    try:
        with open(file_path, 'rb') as f:
            files = {'file': (Path(file_path).name, f)}
            data = {'question': question}
            
            response = requests.post(url, files=files, data=data)
            
            if response.status_code == 200:
                result = response.json()
                print("\n助手:", result.get('response'))
                return result
            else:
                print(f"请求失败: {response.status_code}")
                print(f"错误信息: {response.text}")
                return None
    except Exception as e:
        print(f"请求失败: {e}")
        return None

def test_qwen_chat_toke():
    """测试通义千问多轮对话"""
    url = f"{BASE_URL}/QwenChatToke/"
    session = requests.Session()
    
    # 获取模型名称
    model = input("请输入模型名称(直接回车使用默认'qwen2.5-1.5b-instruct'): ") or "qwen2.5-1.5b-instruct"
    print("开始多轮对话 (输入'退出'结束对话)")
    
    while True:
        user_input = input("\n用户: ")
        if user_input.lower() in ['退出', 'quit', 'exit', '0']:
            break
            
        try:
            response = session.post(
                url, 
                data={
                    "content": user_input,
                    "model": model
                }, 
                stream=True
            )
            
            print("助手: ", end='', flush=True)
            for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
                if chunk:
                    print(chunk, end='', flush=True)
            print()
            
        except Exception as e:
            print(f"请求失败: {e}")
            break

def main():
    while True:
        print("\n=== 通义千问测试程序 ===")
        print("1. 单轮对话测试")
        print("2. 长文本对话测试")
        print("3. 多轮对话测试")
        print("0. 退出程序")
        
        choice = input("\n请选择测试类型(0-3): ")
        
        if choice == '1':
            test_qwen_chat()
        elif choice == '2':
            test_qwen_chat_file()
        elif choice == '3':
            test_qwen_chat_toke()
        elif choice == '0':
            break
        else:
            print("无效的选择，请重试")

if __name__ == "__main__":
    main()


=== 通义千问测试程序 ===
1. 单轮对话测试
2. 长文本对话测试
3. 多轮对话测试
0. 退出程序

助手: 你好！虽然我是你好！虽然我是由深度求索你好！虽然我是由深度求索 dzxl20你好！虽然我是由深度求索 dzxl2023 人工智能你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意为你解答！无论是你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意为你解答！无论是关于心理评估、你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意为你解答！无论是关于心理评估、人格测试、心理你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意为你解答！无论是关于心理评估、人格测试、心理再生研究，还是你好！虽然我是由深度求索 dzxl2023 人工智能助手 内容安全 设置起来的，但我完全愿意为你解决任何相关问题。如果你有任何心理或心理学方面的疑问，我很乐意为你解答！无论是关于心理评估、人格测试、心

# Qwen 全模态理解Omni+OCR提取文字VL

In [1]:
import requests
import json
from pathlib import Path
import re
import base64
import numpy as np
import soundfile as sf
import pyaudio
import time

# 测试基础配置
BASE_URL = "http://localhost:8000"
HEADERS = {"Content-Type": "application/json"}

def play_audio(audio_string):
    """播放音频响应"""
    if audio_string:
        try:
            wav_bytes = base64.b64decode(audio_string)
            audio_np = np.frombuffer(wav_bytes, dtype=np.int16)
            sf.write('audio_response.wav', audio_np, samplerate=24000)
            
            # 播放音频
            p = pyaudio.PyAudio()
            stream = p.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
            stream.write(audio_np.tobytes())
            time.sleep(0.8)
            stream.stop_stream()
            stream.close()
            p.terminate()
        except Exception as e:
            print(f"音频播放失败: {e}")

def test_qwen_ocr():
    """测试通义千问OCR功能"""
    url = f"{BASE_URL}/QwenOCR/"
    
    # 获取用户输入
    file_path = input("请输入图片路径: ")
    question = input("请输入问题（直接回车默认提取所有文字）: ") or "提取所有图中文字"
    
    try:
        with open(file_path, 'rb') as f:
            files = {'file': (Path(file_path).name, f)}
            data = {'question': question}
            
            response = requests.post(url, files=files, data=data)
            
            if response.status_code == 200:
                result = response.json()
                print("\n识别结果:", result.get('response'))
                return result
            else:
                print(f"请求失败: {response.status_code}")
                print(f"错误信息: {response.text}")
                return None
    except Exception as e:
        print(f"请求失败: {e}")
        return None

def test_qwen_omni():
    """测试通义千问多模态对话"""
    url = f"{BASE_URL}/Qwenomni/"
    session = requests.Session()  # 创建会话以保持对话历史
    
    # 获取语音选项
    print("\n=== 选择AI助手音色 ===")
    print("1. 邻家女孩 (Cherry)")
    print("2. 职场小美女 (Serena)")
    print("3. 邻家大哥哥 (Ethan)")
    print("4. 御姐 (Chelsie)")
    voice_choice = input("请选择 (1-4): ")
    
    voices = ["Cherry", "Serena", "Ethan", "Chelsie"]
    if voice_choice not in ['1', '2', '3', '4']:
        print("无效的选择")
        return None
    
    voice = voices[int(voice_choice) - 1]
    
    while True:  # 多轮对话循环
        # 获取输入类型
        print("\n=== 选择输入类型 ===")
        print("1. 纯文本对话")
        print("2. 多媒体分析 (URL或本地文件)")
        choice = input("请选择功能 (1-2): ")
        
        try:
            response = None
            
            if choice == '1':
                text = input("请输入文本：")
                data = {
                    'type': 'text',
                    'text': text,
                    'voice': voice
                }
                response = session.post(url, data=data, stream=True)
                
            elif choice == '2':
                path = input("请输入URL或本地文件路径：")
                text = input("请输入相关问题（直接回车使用默认问题）：") or "请描述内容"
                
                if is_url(path):
                    # URL方式处理
                    file_type = get_file_type(path)
                    if not file_type:
                        print("不支持的文件类型")
                        continue
                        
                    data = {
                        'type': file_type,
                        'text': text,
                        'voice': voice,
                        'url': path
                    }
                    response = session.post(url, data=data, stream=True)
                else:
                    # 本地文件处理
                    file_type = get_file_type(path)
                    if not file_type:
                        print("不支持的文件类型")
                        continue
                    
                    try:
                        with open(path, 'rb') as f:
                            files = {'file': (Path(path).name, f.read())}
                            data = {
                                'type': file_type,
                                'text': text,
                                'voice': voice
                            }
                            response = session.post(url, data=data, files=files, stream=True)
                    except IOError as e:
                        print(f"文件读取错误: {e}")
                        continue
            else:
                print("无效的选择")
                continue
                
            if not response:
                print("请求未初始化")
                continue
                
            print("\nAI助手: ", end='', flush=True)
            
            # 处理响应
            if response.status_code == 200:
                audio_string = ""
                
                for line in response.iter_lines(decode_unicode=True):
                    if line:
                        if line.startswith('audio:'):
                            audio_string += line[6:]
                        elif line.startswith('text:'):
                            text_content = line[5:]
                            print(text_content, end='', flush=True)
                
                print("\n")
                # 播放音频响应
                if audio_string:
                    play_audio(audio_string)
                
                # 继续对话
                while True:
                    text = input("\n请输入问题（或输入'q'退出）: ")
                    if text.lower() == 'q':
                        return
                    
                    # 发送纯文本对话请求
                    data = {
                        'type': 'text',
                        'text': text,
                        'voice': voice
                    }
                    response = session.post(url, data=data, stream=True)
                    
                    print("\nAI助手: ", end='', flush=True)
                    audio_string = ""
                    
                    for line in response.iter_lines(decode_unicode=True):
                        if line:
                            if line.startswith('audio:'):
                                audio_string += line[6:]
                            elif line.startswith('text:'):
                                text_content = line[5:]
                                print(text_content, end='', flush=True)
                    
                    print("\n")
                    if audio_string:
                        play_audio(audio_string)
            else:
                print(f"请求失败: {response.status_code}")
                print(f"错误信息: {response.text}")
                break
                
        except Exception as e:
            print(f"请求失败: {e}")
            break

def is_url(string):
    """检查是否是URL"""
    url_pattern = re.compile(
        r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
    )
    return bool(url_pattern.match(string))

def get_file_type(path):
    """根据文件扩展名判断文件类型"""
    ext = path.lower().split('.')[-1]
    if ext in ['jpg', 'jpeg', 'png', 'gif']:
        return 'image'
    elif ext in ['mp3', 'wav', 'ogg']:
        return 'audio'
    elif ext in ['mp4', 'avi', 'mov']:
        return 'video'
    return None

def main():
    while True:
        print("\n=== 通义千问高级功能测试 ===")
        print("1. OCR文字识别测试")
        print("2. 多模态对话测试")
        print("0. 退出程序")
        
        choice = input("\n请选择测试类型(0-2): ")
        
        if choice == '1':
            test_qwen_ocr()
        elif choice == '2':
            test_qwen_omni()
        elif choice == '0':
            break
        else:
            print("无效的选择，请重试")

if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'soundfile'

# Qwen 音频理解

In [2]:
import requests
import json
from pathlib import Path
import base64
import re

# 测试基础配置
BASE_URL = "http://localhost:8000"
HEADERS = {"Content-Type": "application/json"}

def test_audio():
    """测试通义千问音频理解"""
    url = f"{BASE_URL}/QwenAudio/"
    session = requests.Session()  # 创建会话以保持对话历史
    
    print("欢迎使用音频理解系统！")
    print("您可以上传音频文件（URL或本地路径），并提出相关问题。")
    
    while True:
        # 1. 用户上传音频文件
        audio_source = input("\n请输入音频文件的URL或本地路径: ").strip()
        if not audio_source:
            print("音频文件不能为空，请重新输入！")
            continue
        
        # 2. 用户输入问题
        question = input("请输入您想问的问题（如果为空，则默认总结音频内容）: ").strip()
        
        try:
            if audio_source.startswith(('http://', 'https://')):
                # URL方式
                data = {
                    'audio_source': audio_source,
                    'question': question
                }
                response = session.post(url, data=data, stream=True)
            else:
                # 本地文件方式
                try:
                    with open(audio_source, 'rb') as f:
                        files = {'file': (Path(audio_source).name, f)}
                        data = {'question': question}
                        response = session.post(url, data=data, files=files, stream=True)
                except IOError as e:
                    print(f"文件读取错误: {e}")
                    continue
            
            # 处理响应
            if response.status_code == 200:
                print("\n模型回复：", end='', flush=True)
                for line in response.iter_lines(decode_unicode=True):
                    if line:
                        if line.startswith('text:'):
                            print(line[5:], end='', flush=True)
                print("\n")
                
                # 多轮对话
                while True:
                    follow_up = input("\n是否继续提问？（输入问题继续，输入'exit'结束对话）: ").strip()
                    if follow_up.lower() == 'exit':
                        return
                    
                    if follow_up:
                        data = {
                            'question': follow_up
                        }
                        response = session.post(url, data=data, stream=True)
                        
                        if response.status_code == 200:
                            print("\n模型回复：", end='', flush=True)
                            for line in response.iter_lines(decode_unicode=True):
                                if line and line.startswith('text:'):
                                    print(line[5:], end='', flush=True)
                            print("\n")
                        else:
                            print(f"请求失败: {response.status_code}")
                            print(f"错误信息: {response.text}")
                            break
                    else:
                        print("问题不能为空，请重新输入！")
            else:
                print(f"请求失败: {response.status_code}")
                print(f"错误信息: {response.text}")
                
        except Exception as e:
            print(f"请求失败: {e}")
            continue

def main():
    while True:
        print("\n=== 通义千问功能测试 ===")
        print("1. 音频理解测试")
        print("0. 退出程序")
        
        choice = input("\n请选择测试类型(0-1): ")
        
        if choice == '1':
            test_audio()
        elif choice == '0':
            break
        else:
            print("无效的选择，请重试")

if __name__ == "__main__":
    main()


=== 通义千问功能测试 ===
1. 音频理解测试
0. 退出程序
欢迎使用音频理解系统！
您可以上传音频文件（URL或本地路径），并提出相关问题。

模型回复：[{'text': '当然'}][{'text': '，'}][{'text': '生意'}][{'text': '好'}][{'text': '听'}][{'text': '。'}][]

问题不能为空，请重新输入！
问题不能为空，请重新输入！
问题不能为空，请重新输入！
问题不能为空，请重新输入！
问题不能为空，请重新输入！
请求失败: 400
错误信息: {"error": "\u672a\u63d0\u4f9b\u97f3\u9891\u6570\u636e"}
音频文件不能为空，请重新输入！


# 测试服务器地址

In [None]:
import mysql.connector 
from mysql.connector  import Error
 
def connect_to_database():
    try:
        # 配置数据库连接信息 
        connection = mysql.connector.connect( 
            host="123.249.67.69",       # 数据库服务器地址
            port=3306,                 # 数据库端口（默认为 3306）
            user="dailyfresh",           # 数据库用户名
            password="Yd011987..",       # 数据库密码
            database="dailyfresh"    # 要连接的数据库名称 
        )
 
        if connection.is_connected(): 
            print("成功连接到数据库！")
 
            # 获取数据库版本信息 
            db_info = connection.get_server_info() 
            print(f"数据库版本: {db_info}")
 
            # 创建游标对象
            cursor = connection.cursor() 
 
            # 执行 SQL 查询 
            cursor.execute("SELECT  DATABASE();")
            record = cursor.fetchone() 
            print(f"当前使用的数据库: {record[0]}")
 
    except Error as e:
        print(f"连接数据库时出错: {e}")
 
    finally:
        # 关闭数据库连接
        if connection.is_connected(): 
            cursor.close() 
            connection.close() 
            print("数据库连接已关闭。")
 
# 调用函数
if __name__ == "__main__":
    connect_to_database()