<a href="https://colab.research.google.com/github/FlyAIBox/LLM-101/blob/main/chapter03-llm-deploy/vllm/deepseek_r1_distill_qwen_fast_api.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 基于vLLM部署DeepSeek-R1-Distill-Qwen-1.5B
## 📖 关于 DeepSeek R1 蒸馏版 Qwen 1.5B 模型

### 🧠 模型特点
- **模型名称**: `deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B`
- **参数规模**: 15亿参数，适合在显存有限的GPU（如Nvidia T4/4090） 上运行
- **推理能力**: 继承了 DeepSeek R1 的强大推理能力
- **蒸馏技术**: 通过知识蒸馏获得更小但高效的模型

### 🔍 模型优势
1. **轻量化**: 15亿参数，内存占用小
2. **高效推理**: 优化的推理速度
3. **强大能力**: 保持了大模型的推理能力
4. **免费部署**: 适合在 Colab T4 GPU 上运行

## 🎯 实验目标

本实验旨在帮助大模型技术初学者：

### 📚 学习内容
1. **环境准备**: 了解如何检查和配置 Colab 环境
2. **依赖安装**: 学习安装 VLLM、FastAPI 等关键库
3. **模型部署**: 掌握使用 VLLM 部署大语言模型的方法
4. **API 开发**: 创建 RESTful API 接口服务
5. **实时交互**: 实现流式输出和实时对话功能

### 💰 成本优势
- **完全免费**: 使用 Google Colab 免费 T4 GPU (15GB 显存)
- **零配置**: 无需本地环境配置，浏览器即可运行
- **即开即用**: 一键启动，快速体验大模型部署

### 🚀 期望收获
通过本实验，您将掌握：
- 大语言模型的基本部署流程
- VLLM 推理引擎的使用方法
- FastAPI Web 服务的开发技巧
- 模型 API 的设计和实现

## 🔧 第一步：环境信息检查

在开始部署模型之前，我们需要了解当前的运行环境。这个步骤非常重要，因为：

### 🎯 检查目的
1. **硬件确认**: 确保有足够的 GPU 显存运行模型
2. **系统兼容**: 验证操作系统和 Python 版本
3. **资源评估**: 了解可用的 CPU、内存和存储空间
4. **环境配置**: 检查 CUDA 版本和相关依赖

### 📊 检查内容
- **操作系统**: Linux 发行版和版本
- **CPU 信息**: 处理器型号和核心数
- **内存状态**: 总内存和可用内存
- **GPU 配置**: 显卡型号和显存大小
- **CUDA 版本**: 深度学习框架支持
- **Python 环境**: 解释器版本
- **磁盘空间**: 可用存储空间

In [3]:
# 🔍 环境信息检查脚本
#
# 本脚本的作用：
# 1. 安装 pandas 库用于数据表格展示
# 2. 检查系统的各项配置信息
# 3. 生成详细的环境报告表格
#
# 对于初学者来说，这个步骤帮助您：
# - 了解当前运行环境的硬件配置
# - 确认是否满足模型运行的最低要求
# - 学习如何通过代码获取系统信息

# 安装 pandas 库 - 用于创建和展示数据表格
# pandas 是 Python 中最流行的数据处理和分析库
!pip install pandas==2.2.2

import platform # 导入 platform 模块以获取系统信息
import os # 导入 os 模块以与操作系统交互
import subprocess # 导入 subprocess 模块以运行外部命令
import pandas as pd # 导入 pandas 模块，通常用于数据处理，这里用于创建表格
import shutil # 导入 shutil 模块以获取磁盘空间信息

# 获取 CPU 信息的函数，包括核心数量
def get_cpu_info():
    cpu_info = "" # 初始化 CPU 信息字符串
    physical_cores = "N/A"
    logical_cores = "N/A"

    if platform.system() == "Windows": # 如果是 Windows 系统
        cpu_info = platform.processor() # 使用 platform.processor() 获取 CPU 信息
        try:
            # 获取 Windows 上的核心数量 (需要 WMI)
            import wmi
            c = wmi.WMI()
            for proc in c.Win32_Processor():
                physical_cores = proc.NumberOfCores
                logical_cores = proc.NumberOfLogicalProcessors
        except:
            pass # 如果 WMI 不可用，忽略错误

    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取 CPU 信息和核心数量
        os.environ['PATH'] = os.environ['PATH'] + os.pathsep + '/usr/sbin' # 更新 PATH 环境变量
        try:
            process_brand = subprocess.Popen(['sysctl', "machdep.cpu.brand_string"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_brand, stderr_brand = process_brand.communicate()
            cpu_info = stdout_brand.decode().split(': ')[1].strip() if stdout_brand else "Could not retrieve CPU info"

            process_physical = subprocess.Popen(['sysctl', "hw.physicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_physical, stderr_physical = process_physical.communicate()
            physical_cores = stdout_physical.decode().split(': ')[1].strip() if stdout_physical else "N/A"

            process_logical = subprocess.Popen(['sysctl', "hw.logicalcpu"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            stdout_logical, stderr_logical = process_logical.communicate()
            logical_cores = stdout_logical.decode().split(': ')[1].strip() if stdout_logical else "N/A"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/cpuinfo 文件获取 CPU 信息和核心数量
            with open('/proc/cpuinfo') as f:
                physical_cores_count = 0
                logical_cores_count = 0
                cpu_info_lines = []
                for line in f:
                    if line.startswith('model name'): # 查找以 'model name'开头的行
                        if not cpu_info: # 只获取第一个 model name
                            cpu_info = line.split(': ')[1].strip()
                    elif line.startswith('cpu cores'): # 查找以 'cpu cores' 开头的行
                        physical_cores_count = int(line.split(': ')[1].strip())
                    elif line.startswith('processor'): # 查找以 'processor' 开头的行
                        logical_cores_count += 1
                physical_cores = str(physical_cores_count) if physical_cores_count > 0 else "N/A"
                logical_cores = str(logical_cores_count) if logical_cores_count > 0 else "N/A"
                if not cpu_info:
                     cpu_info = "Could not retrieve CPU info"

        except:
            cpu_info = "Could not retrieve CPU info"
            physical_cores = "N/A"
            logical_cores = "N/A"

    return f"{cpu_info} ({physical_cores} physical cores, {logical_cores} logical cores)" # 返回 CPU 信息和核心数量


# 获取内存信息的函数
def get_memory_info():
    mem_info = "" # 初始化内存信息字符串
    if platform.system() == "Windows":
        # 在 Windows 上不容易通过标准库获取，需要外部库或 PowerShell
        mem_info = "Requires external tools on Windows" # 设置提示信息
    elif platform.system() == "Darwin": # 如果是 macOS 系统
        # 在 macOS 上使用 sysctl 命令获取内存大小
        process = subprocess.Popen(['sysctl', "hw.memsize"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 运行 sysctl 命令
        stdout, stderr = process.communicate() # 获取标准输出和标准错误
        mem_bytes = int(stdout.decode().split(': ')[1].strip()) # 解析输出，获取内存大小（字节）
        mem_gb = mem_bytes / (1024**3) # 转换为 GB
        mem_info = f"{mem_gb:.2f} GB" # 格式化输出
    else:  # Linux 系统
        try:
            # 在 Linux 上读取 /proc/meminfo 文件获取内存信息
            with open('/proc/meminfo') as f:
                total_mem_kb = 0
                available_mem_kb = 0
                for line in f:
                    if line.startswith('MemTotal'): # 查找以 'MemTotal' 开头的行
                        total_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取总内存（KB）
                    elif line.startswith('MemAvailable'): # 查找以 'MemAvailable' 开头的行
                         available_mem_kb = int(line.split(':')[1].strip().split()[0]) # 解析行，获取可用内存（KB）

                if total_mem_kb > 0:
                    total_mem_gb = total_mem_kb / (1024**2) # 转换为 GB
                    mem_info = f"{total_mem_gb:.2f} GB" # 格式化输出总内存
                    if available_mem_kb > 0:
                        available_mem_gb = available_mem_kb / (1024**2)
                        mem_info += f" (Available: {available_mem_gb:.2f} GB)" # 添加可用内存信息
                else:
                     mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息

        except:
            mem_info = "Could not retrieve memory info" # 如果读取文件出错，设置错误信息
    return mem_info # 返回内存信息

# 获取 GPU 信息的函数，包括显存
def get_gpu_info():
    try:
        # 尝试使用 nvidia-smi 获取 NVIDIA GPU 信息和显存
        result = subprocess.run(['nvidia-smi', '--query-gpu=name,memory.total', '--format=csv,noheader'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            gpu_lines = result.stdout.strip().split('\n') # 解析输出，获取 GPU 名称和显存
            gpu_info_list = []
            for line in gpu_lines:
                name, memory = line.split(', ')
                gpu_info_list.append(f"{name} ({memory})") # 格式化 GPU 信息
            return ", ".join(gpu_info_list) if gpu_info_list else "NVIDIA GPU found, but info not listed" # 返回 GPU 信息或提示信息
        else:
             # 尝试使用 lshw 获取其他 GPU 信息 (需要安装 lshw)
            try:
                result_lshw = subprocess.run(['lshw', '-C', 'display'], capture_output=True, text=True)
                if result_lshw.returncode == 0: # 如果命令成功执行
                     # 简单解析输出中的 product 名称和显存
                    gpu_info_lines = []
                    current_gpu = {}
                    for line in result_lshw.stdout.splitlines():
                        if 'product:' in line:
                             if current_gpu:
                                 gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")
                             current_gpu = {'product': line.split('product:')[1].strip()}
                        elif 'size:' in line and 'memory' in line:
                             current_gpu['memory'] = line.split('size:')[1].strip()

                    if current_gpu: # 添加最后一个 GPU 的信息
                        gpu_info_lines.append(f"{current_gpu.get('product', 'GPU')} ({current_gpu.get('memory', 'N/A')})")

                    return ", ".join(gpu_info_lines) if gpu_info_lines else "GPU found (via lshw), but info not parsed" # 如果找到 GPU 但信息无法解析，设置提示信息
                else:
                    return "No GPU found (checked nvidia-smi and lshw)" # 如果两个命令都找不到 GPU，设置提示信息
            except FileNotFoundError:
                 return "No GPU found (checked nvidia-smi, lshw not found)" # 如果找不到 lshw 命令，设置提示信息
    except FileNotFoundError:
        return "No GPU found (nvidia-smi not found)" # 如果找不到 nvidia-smi 命令，设置提示信息


# 获取 CUDA 版本的函数
def get_cuda_version():
    try:
        # 尝试使用 nvcc --version 获取 CUDA 版本
        result = subprocess.run(['nvcc', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            for line in result.stdout.splitlines():
                if 'release' in line: # 查找包含 'release' 的行
                    return line.split('release ')[1].split(',')[0] # 解析行，提取版本号
        return "CUDA not found or version not parsed" # 如果找不到 CUDA 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "CUDA not found" # 如果找不到 nvcc 命令，设置提示信息

# 获取 Python 版本的函数
def get_python_version():
    return platform.python_version() # 获取 Python 版本

# 获取 Conda 版本的函数
def get_conda_version():
    try:
        # 尝试使用 conda --version 获取 Conda 版本
        result = subprocess.run(['conda', '--version'], capture_output=True, text=True)
        if result.returncode == 0: # 如果命令成功执行
            return result.stdout.strip() # 返回 Conda 版本
        return "Conda not found or version not parsed" # 如果找不到 Conda 或版本无法解析，设置提示信息
    except FileNotFoundError:
        return "Conda not found" # 如果找不到 conda 命令，设置提示信息

# 获取物理磁盘空间信息的函数
def get_disk_space():
    try:
        total, used, free = shutil.disk_usage("/") # 获取根目录的磁盘使用情况
        total_gb = total / (1024**3) # 转换为 GB
        used_gb = used / (1024**3) # 转换为 GB
        free_gb = free / (1024**3) # 转换为 GB
        return f"Total: {total_gb:.2f} GB, Used: {used_gb:.2f} GB, Free: {free_gb:.2f} GB" # 格式化输出
    except Exception as e:
        return f"Could not retrieve disk info: {e}" # 如果获取信息出错，设置错误信息

# 获取环境信息
os_name = platform.system() # 获取操作系统名称
os_version = platform.release() # 获取操作系统版本
if os_name == "Linux":
    try:
        # 在 Linux 上尝试获取发行版和版本
        lsb_info = subprocess.run(['lsb_release', '-a'], capture_output=True, text=True)
        if lsb_info.returncode == 0: # 如果命令成功执行
            for line in lsb_info.stdout.splitlines():
                if 'Description:' in line: # 查找包含 'Description:' 的行
                    os_version = line.split('Description:')[1].strip() # 提取描述信息作为版本
                    break # 找到后退出循环
                elif 'Release:' in line: # 查找包含 'Release:' 的行
                     os_version = line.split('Release:')[1].strip() # 提取版本号
                     # 尝试获取 codename
                     try:
                         codename_info = subprocess.run(['lsb_release', '-c'], capture_output=True, text=True)
                         if codename_info.returncode == 0:
                             os_version += f" ({codename_info.stdout.split(':')[1].strip()})" # 将 codename 添加到版本信息中
                     except:
                         pass # 如果获取 codename 失败则忽略

    except FileNotFoundError:
        pass # lsb_release 可能未安装，忽略错误

full_os_info = f"{os_name} {os_version}" # 组合完整的操作系统信息
cpu_info = get_cpu_info() # 调用函数获取 CPU 信息和核心数量
memory_info = get_memory_info() # 调用函数获取内存信息
gpu_info = get_gpu_info() # 调用函数获取 GPU 信息和显存
cuda_version = get_cuda_version() # 调用函数获取 CUDA 版本
python_version = get_python_version() # 调用函数获取 Python 版本
conda_version = get_conda_version() # 调用函数获取 Conda 版本
disk_info = get_disk_space() # 调用函数获取物理磁盘空间信息


# 创建用于存储数据的字典
env_data = {
    "项目": [ # 项目名称列表
        "操作系统",
        "CPU 信息",
        "内存信息",
        "GPU 信息",
        "CUDA 信息",
        "Python 版本",
        "Conda 版本",
        "物理磁盘空间" # 添加物理磁盘空间
    ],
    "信息": [ # 对应的信息列表
        full_os_info,
        cpu_info,
        memory_info,
        gpu_info,
        cuda_version,
        python_version,
        conda_version,
        disk_info # 添加物理磁盘空间信息
    ]
}

# 创建一个 pandas DataFrame
df = pd.DataFrame(env_data)

# 打印表格
print("### 环境信息") # 打印标题
print(df.to_markdown(index=False)) # 将 DataFrame 转换为 Markdown 格式并打印，不包含索引

### 环境信息
| 项目         | 信息                                                               |
|:-------------|:-------------------------------------------------------------------|
| 操作系统     | Linux Ubuntu 22.04.4 LTS                                           |
| CPU 信息     | Intel(R) Xeon(R) CPU @ 2.00GHz (1 physical cores, 2 logical cores) |
| 内存信息     | 12.67 GB (Available: 11.29 GB)                                     |
| GPU 信息     | Tesla T4 (15360 MiB)                                               |
| CUDA 信息    | 12.5                                                               |
| Python 版本  | 3.11.13                                                            |
| Conda 版本   | Conda not found                                                    |
| 物理磁盘空间 | Total: 112.64 GB, Used: 47.93 GB, Free: 64.69 GB                   |


## 📦 第二步：安装依赖包

现在我们需要安装运行模型所需的关键 Python 包：

### 🔧 核心依赖说明

#### 1. **FastAPI (0.116.0)**
- **作用**: 现代化的 Python Web 框架
- **用途**: 创建 RESTful API 接口服务
- **特点**: 自动生成 API 文档，支持异步处理

#### 2. **nest-asyncio (1.6.0)**
- **作用**: 允许在已有事件循环中运行异步代码
- **用途**: 解决 Jupyter 环境中的异步兼容问题
- **重要性**: 确保 FastAPI 在 Colab 中正常运行

#### 3. **pyngrok (7.2.12)**
- **作用**: Python 版本的 ngrok 客户端
- **用途**: 创建公网隧道，让外部访问本地服务
- **场景**: 将 Colab 中的 API 服务暴露给外部

#### 4. **uvicorn (0.35.0)**
- **作用**: 高性能的 ASGI 服务器
- **用途**: 运行 FastAPI 应用程序
- **特点**: 支持异步处理，性能优异

#### 5. **vllm (0.9.2)**
- **作用**: 高性能大语言模型推理引擎
- **用途**: 加载和运行 DeepSeek 模型
- **优势**: 内存高效，推理速度快

### ⚡ 安装过程
下面的命令会安装所有必需的依赖包，请耐心等待安装完成。

In [4]:
# 📦 批量安装依赖包
#
# 这里使用 pip install 命令一次性安装所有必需的包
# 使用 \ 符号可以将长命令分成多行，提高可读性
#
# 安装过程可能需要几分钟时间，请耐心等待
# 如果出现版本冲突，系统会自动处理依赖关系

!pip install \
    fastapi==0.116.0 \
    nest-asyncio==1.6.0 \
    pyngrok==7.2.12 \
    uvicorn==0.35.0 \
    vllm==0.9.2



## 🚀 第三步：启动 VLLM 模型服务

现在我们将使用 VLLM 在后台启动 DeepSeek R1 蒸馏版模型服务。

### 🎯 VLLM 服务启动说明

#### 🔍 模型选择
- **模型来源**: [Hugging Face DeepSeek AI](https://huggingface.co/deepseek-ai/DeepSeek-R1#3-model-downloads)
- **当前模型**: `deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B`
- **参数规模**: 15亿参数，适合 T4 GPU 运行
- **替换选项**: 可以替换为其他 DeepSeek R1 系列模型

#### ⚙️ VLLM 参数解释
- `serve`: VLLM 的服务模式命令
- `--trust-remote-code`: 允许执行远程代码（模型配置）
- `--dtype half`: 使用半精度浮点数，节省显存
- `--max-model-len 16384`: 最大序列长度为 16K tokens
- `--tensor-parallel-size 1`: 使用单卡推理

#### 🔄 后台运行
模型将在后台启动，不会阻塞当前进程，这样我们可以继续执行其他代码。


In [5]:
# 🚀 启动 VLLM 模型服务
#
# 这个单元格的作用：
# 1. 导入必要的 Python 模块
# 2. 配置模型参数
# 3. 使用 subprocess 在后台启动 VLLM 服务

import os
import subprocess

# 📝 可选：配置 Hugging Face 镜像源
# 如果在中国大陆访问 Hugging Face 较慢，可以启用下面这行
# os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"

# 🎯 模型配置
# 指定要使用的模型名称
# 这里使用的是 DeepSeek R1 的蒸馏版本，参数量为 15亿
model = 'deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B'

# 🔧 启动 VLLM 服务器
# 使用 subprocess.Popen 在后台启动服务，这样不会阻塞当前进程
print(f"🚀 正在启动 VLLM 服务，模型: {model}")
print("⏳ 首次运行需要下载模型，请耐心等待...")

vllm_process = subprocess.Popen([
    'vllm',                      # VLLM 命令
    'serve',                     # 服务模式
    model,                       # 模型名称
    '--trust-remote-code',       # 信任远程代码
    '--dtype', 'half',           # 使用半精度浮点数
    '--max-model-len', '16384',  # 最大序列长度
    '--tensor-parallel-size', '1' # 单卡推理
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True)

print("✅ VLLM 服务启动命令已执行，正在后台加载模型...")
print("📡 服务将在 http://localhost:8000 上运行")

🚀 正在启动 VLLM 服务，模型: deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B
⏳ 首次运行需要下载模型，请耐心等待...
✅ VLLM 服务启动命令已执行，正在后台加载模型...
📡 服务将在 http://localhost:8000 上运行


## 🔍 第四步：监控 VLLM 服务状态

由于 VLLM 在后台运行，我们需要监控其启动状态。

### 🎯 监控的重要性

#### 🔄 为什么需要监控？
- **异步启动**: VLLM 在后台启动，需要时间加载模型
- **状态确认**: 确保服务正常运行后再进行后续操作
- **错误诊断**: 及时发现和处理启动过程中的问题
- **资源管理**: 监控进程状态，避免资源泄漏

#### ⏱️ 启动时间说明
- **首次运行**: 需要下载模型文件，可能需要 5-10 分钟
- **后续运行**: 模型已缓存，启动时间约 1-2 分钟
- **检查频率**: 每 5 秒检查一次服务状态

#### 🚦 状态检查机制
- **健康检查**: 通过 HTTP 请求检查服务是否可用
- **进程监控**: 监控 VLLM 进程的运行状态
- **日志输出**: 显示启动过程中的关键信息

In [6]:
# 🔍 VLLM 服务监控函数
#
# 这个单元格定义了两个重要的监控函数：
# 1. check_vllm_status: 检查 VLLM 服务是否可用
# 2. monitor_vllm_process: 持续监控 VLLM 进程状态

import requests
import time
from typing import Tuple
import sys

def check_vllm_status(url: str = "http://localhost:8000/health") -> bool:
    """
    🏥 检查 VLLM 服务器健康状态

    参数:
        url: 健康检查的 URL 地址

    返回:
        bool: True 表示服务正常，False 表示服务不可用

    工作原理:
        向 VLLM 的健康检查端点发送 GET 请求
        如果返回 200 状态码，说明服务正常运行
    """
    try:
        response = requests.get(url, timeout=5)
        return response.status_code == 200
    except requests.exceptions.ConnectionError:
        return False
    except requests.exceptions.Timeout:
        return False
    except Exception:
        return False

def monitor_vllm_process(vllm_process: subprocess.Popen, check_interval: int = 5) -> Tuple[bool, str, str]:
    """
    📊 监控 VLLM 进程的启动状态

    参数:
        vllm_process: VLLM 进程对象
        check_interval: 检查间隔时间（秒）

    返回:
        Tuple[bool, str, str]: (是否成功, 标准输出, 标准错误)

    工作流程:
        1. 循环检查进程是否还在运行
        2. 定期检查服务健康状态
        3. 输出进程的日志信息
        4. 返回最终状态
    """
    print("🔍 开始 VLLM 服务器监控...")
    print("⏳ 正在等待服务启动，请耐心等待...")

    while vllm_process.poll() is None:  # 当进程仍在运行时
        # 检查服务是否已经可用
        if check_vllm_status():
            print("✅ VLLM 服务器已启动并运行！")
            print("🎉 服务地址: http://localhost:8000")
            return True, "", ""

        print("⏳ 等待 VLLM 服务器启动...")
        time.sleep(check_interval)

        # 检查并输出进程日志
        if vllm_process.stdout and vllm_process.stdout.readable():
            try:
                stdout = vllm_process.stdout.read1(1024).decode('utf-8')
                if stdout.strip():
                    print("📝 标准输出:", stdout.strip())
            except Exception:
                pass

        if vllm_process.stderr and vllm_process.stderr.readable():
            try:
                stderr = vllm_process.stderr.read1(1024).decode('utf-8')
                if stderr.strip():
                    print("⚠️ 标准错误:", stderr.strip())
            except Exception:
                pass

    # 如果到达这里，进程已结束（可能是错误）
    print("❌ VLLM 进程已结束")
    try:
        stdout, stderr = vllm_process.communicate(timeout=5)
        return False, stdout.decode('utf-8'), stderr.decode('utf-8')
    except Exception:
        return False, "", "进程通信超时"

In [7]:
# 🚀 执行 VLLM 服务监控
#
# 这个单元格的作用：
# 1. 调用监控函数，等待 VLLM 服务启动
# 2. 处理启动成功和失败的情况
# 3. 支持用户中断操作

print("🎯 开始监控 VLLM 服务启动状态...")
print("💡 提示：首次运行可能需要 5-10 分钟下载模型")
print("⌨️  按 Ctrl+C 可以中断监控（但不会停止 VLLM 服务）")

try:
    # 调用监控函数，等待服务启动
    success, stdout, stderr = monitor_vllm_process(vllm_process)

    if not success:
        print("\n❌ VLLM 服务器启动失败！")
        print("\n📋 完整标准输出:")
        print(stdout)
        print("\n🚨 完整标准错误:")
        print(stderr)
        print("\n🔧 可能的解决方案:")
        print("1. 检查 GPU 内存是否足够")
        print("2. 确认模型名称是否正确")
        print("3. 重新运行安装依赖包的单元格")
        sys.exit(1)
    else:
        print("\n🎉 VLLM 服务启动成功！")
        print("📡 API 服务地址: http://localhost:8000")
        print("📚 API 文档地址: http://localhost:8000/docs")
        print("✅ 现在可以继续运行后续单元格")

except KeyboardInterrupt:
    print("\n⚠️ 用户中断监控")
    print("💡 注意：VLLM 服务仍在后台运行")
    print("🔄 如果需要停止 VLLM 服务，请重启 Colab 运行时")

    # 可选：强制停止 VLLM 进程
    # 取消下面的注释可以在中断时停止服务
    # print("🛑 正在停止 VLLM 服务...")
    # vllm_process.terminate()
    # try:
    #     vllm_process.wait(timeout=5)
    #     print("✅ VLLM 服务已停止")
    # except subprocess.TimeoutExpired:
    #     vllm_process.kill()
    #     print("⚡ 强制终止 VLLM 服务")

    # 输出最终日志信息
    try:
        stdout, stderr = vllm_process.communicate(timeout=2)
        if stdout:
            print("\n📝 最终标准输出:")
            print(stdout.decode('utf-8'))
        if stderr:
            print("\n⚠️ 最终标准错误:")
            print(stderr.decode('utf-8'))
    except:
        print("📝 无法获取最终日志")

    sys.exit(0)

🎯 开始监控 VLLM 服务启动状态...
💡 提示：首次运行可能需要 5-10 分钟下载模型
⌨️  按 Ctrl+C 可以中断监控（但不会停止 VLLM 服务）
🔍 开始 VLLM 服务器监控...
⏳ 正在等待服务启动，请耐心等待...
⏳ 等待 VLLM 服务器启动...
📝 标准输出: INFO 07-13 05:43:42 [__init__.py:244] Automatically detected platform cuda.
⚠️ 标准错误: 2025-07-13 05:43:32.340136: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1752385412.599609    2600 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1752385412.667315    2600 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-07-13 05:43:33.198960: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the fo

In [10]:
# 🧪 模型推理测试函数
#
# 这个单元格定义了两个核心函数：
# 1. ask_model: 发送问题并获取完整回答
# 2. stream_llm_response: 实现流式响应功能

import requests
import json
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from fastapi.responses import StreamingResponse

# 📝 定义请求数据模型
class QuestionRequest(BaseModel):
    """
    API 请求的数据模型

    属性:
        question (str): 用户提出的问题

    说明:
        使用 Pydantic 模型确保数据类型安全
        后续 FastAPI 会自动验证请求数据
    """
    question: str

def ask_model(question: str):
    """
    🤖 向 VLLM 模型发送问题并获取完整回答

    参数:
        question (str): 用户提出的问题

    返回:
        dict: 包含模型回答的 JSON 响应

    工作流程:
        1. 构造符合 OpenAI API 格式的请求
        2. 发送 POST 请求到 VLLM 服务
        3. 处理响应并返回结果
    """
    # VLLM 的 OpenAI 兼容 API 端点
    url = "http://localhost:8000/v1/chat/completions"

    # 设置请求头
    headers = {"Content-Type": "application/json"}

    # 构造请求数据（OpenAI 格式）
    data = {
        "model": model,  # 使用全局模型变量
        "messages": [
            {
                "role": "user",
                "content": question
            }
        ],
        "max_tokens": 2048,  # 最大生成长度
        "temperature": 0.7,  # 生成的随机性
        "top_p": 0.9         # 核采样参数
    }

    try:
        # 发送请求
        response = requests.post(url, headers=headers, json=data, timeout=60)
        response.raise_for_status()  # 检查 HTTP 错误
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"❌ 请求失败: {e}")
        return None

def stream_llm_response(question: str):
    """
    🌊 流式响应生成器 - 实时获取模型输出

    参数:
        question (str): 用户提出的问题

    生成:
        str: 逐行返回模型的生成内容

    特点:
        - 实时显示生成过程
        - 降低等待时间
        - 提供更好的用户体验
    """
    url = "http://localhost:8000/v1/chat/completions"
    headers = {"Content-Type": "application/json"}

    # 启用流式传输
    data = {
        "model": model,
        "messages": [{"role": "user", "content": question}],
        "stream": True,      # 🔥 关键：启用流式传输
        "max_tokens": 2048,
        "temperature": 0.7
    }

    try:
        with requests.post(url, headers=headers, json=data, stream=True, timeout=60) as response:
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    # OpenAI 风格的流式响应以 "data: " 为前缀
                    decoded_line = line.decode("utf-8")
                    if decoded_line.startswith("data: "):
                        decoded_line = decoded_line[6:]  # 移除 "data: " 前缀
                    yield decoded_line + "\n"
    except requests.exceptions.RequestException as e:
        yield f"❌ 流式请求失败: {e}\n"

# 🧪 测试基础推理功能
print("🧪 测试模型推理功能...")
print("📝 发送测试问题: 法国的首都是什么？")

try:
    result = ask_model("法国的首都是什么？")
    if result:
        print("\n✅ 模型推理成功！")
        print("📋 完整响应:")
        print(json.dumps(result, indent=2, ensure_ascii=False))

        # 提取并显示模型回答
        if "choices" in result and len(result["choices"]) > 0:
            answer = result["choices"][0]["message"]["content"]
            print(f"\n🤖 模型回答: {answer}")
    else:
        print("❌ 模型推理失败")
except Exception as e:
    print(f"❌ 测试过程中出现错误: {e}")

print("\n" + "="*50)
print("✅ 推理函数定义完成，可以继续下一步！")


🧪 测试模型推理功能...
📝 发送测试问题: 法国的首都是什么？

✅ 模型推理成功！
📋 完整响应:
{
  "id": "chatcmpl-784243ed466f4e79b8ee2467f1e8823a",
  "object": "chat.completion",
  "created": 1752386516,
  "model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "reasoning_content": null,
        "content": "嗯，用户问的是法国的首都是什么。首先，我得确认一下，法国的首都确实是巴黎。对吧？我记得法国的首都叫做巴黎，是位于南洋岛上的大城市。那巴黎的地理位置呢，是不是东边靠近巴黎海，西边靠近巴黎市区？还有，巴黎是不是最大的城市，人口多少呢？我记得好像是超过三千万人，挺大的。\n\n然后，用户可能是在问历史上的首都，或者现代的首都。如果是历史上的，可能需要提到法国的辉煌历史，比如巴黎圣母院、巴黎圣母院等等。而现代的，可能更多是集中在中心地带，比如中心区、CBD，或者更远的地方，比如巴黎郊外的国家公园。\n\n我还得想想，用户为什么会问这个问题。可能是在学习历史，或者在旅行，或者只是好奇。不管怎样，回答的时候要简洁明了，涵盖基本信息和一些历史背景，这样用户就能全面了解了。\n\n另外，用户可能还想知道巴黎有什么特色，比如美食、文化活动，或者在不同场合下的首都位置。所以，在回答中，可以提到这些方面，让用户有更多的了解。\n\n总结一下，我需要先确认巴黎是法国的首都，然后提供一些基本信息，比如地理位置、人口，以及现代和历史上的首都情况。这样用户就能得到全面的解答了。\n</think>\n\n是的，法国的首都（capital）是巴黎。巴黎是法国的首都，也是世界上人口最多的城市之一，人口超过三千万人。它位于南洋岛的西岸，东临巴黎海，西靠巴黎市区，地理位置优越。巴黎以其深厚的历史和文化价值而闻名，是法国的首都，也是全球重要的商业和旅游中心之一。",
        "tool_

## 🌐 第六步：创建 FastAPI Web 服务

现在我们将创建一个 FastAPI Web 服务，将 VLLM 模型封装成易于使用的 REST API。

### 🎯 FastAPI 服务说明

#### 🔧 核心功能
1. **RESTful API**: 提供标准的 HTTP 接口
2. **自动文档**: 自动生成 Swagger UI 文档
3. **数据验证**: 使用 Pydantic 进行请求验证
4. **异步支持**: 支持高并发请求处理

#### 📡 API 端点设计
- **根路径** (`/`): 健康检查端点
- **生成回答** (`/api/v1/generate-response`): 获取完整回答
- **流式回答** (`/api/v1/generate-response-stream`): 实时流式输出

#### 🔒 CORS 配置
- 允许跨域访问，支持前端调用
- 支持所有 HTTP 方法和头部
- 便于与不同前端框架集成

### 🚀 服务特点
- **高性能**: 基于 ASGI 的异步框架
- **易用性**: 简洁的 API 设计
- **可扩展**: 支持添加更多功能
- **标准化**: 遵循 REST API 设计规范

In [11]:
# 🌐 创建 FastAPI Web 服务
#
# 这个单元格的作用：
# 1. 初始化 FastAPI 应用
# 2. 配置 CORS 跨域支持
# 3. 定义 API 端点和路由

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import nest_asyncio
from pyngrok import ngrok
import uvicorn

# 🚀 创建 FastAPI 应用实例
app = FastAPI(
    title="DeepSeek R1 API 服务",
    description="基于 VLLM 的 DeepSeek R1 蒸馏版模型 API 服务",
    version="1.0.0",
    docs_url="/docs",  # Swagger UI 文档地址
    redoc_url="/redoc"  # ReDoc 文档地址
)

# 🔒 配置 CORS 跨域中间件
# 允许前端应用从不同域名访问 API
app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],        # 允许所有域名（生产环境应限制）
    allow_credentials=True,     # 允许携带凭据
    allow_methods=['*'],        # 允许所有 HTTP 方法
    allow_headers=['*'],        # 允许所有请求头
)

# 🏠 根路径 - 健康检查端点
@app.get('/')
async def root():
    """
    🏥 健康检查端点

    返回:
        dict: 包含服务状态信息
    """
    return {
        'status': 'healthy',
        'message': 'DeepSeek R1 API 服务正在运行',
        'model': model,
        'version': '1.0.0'
    }

# 🤖 生成完整回答的 API 端点
@app.post("/api/v1/generate-response")
def generate_response(request: QuestionRequest):
    """
    📝 生成完整回答的 API 端点

    参数:
        request (QuestionRequest): 包含用户问题的请求对象

    返回:
        dict: 包含模型回答的响应

    异常:
        HTTPException: 当模型推理失败时抛出 500 错误
    """
    try:
        print(f"📝 收到问题: {request.question}")

        # 调用模型推理函数
        response = ask_model(request.question)

        if response is None:
            raise HTTPException(status_code=500, detail="模型推理失败")

        print("✅ 推理完成")
        return {"response": response}

    except Exception as e:
        print(f"❌ 推理过程中出现错误: {str(e)}")
        raise HTTPException(status_code=500, detail=f"推理失败: {str(e)}")

# 🌊 流式响应的 API 端点
@app.post("/api/v1/generate-response-stream")
def stream_response(request: QuestionRequest):
    """
    🌊 流式响应 API 端点

    参数:
        request (QuestionRequest): 包含用户问题的请求对象

    返回:
        StreamingResponse: 实时流式响应

    特点:
        - 实时返回生成内容
        - 降低用户等待时间
        - 提供更好的交互体验
    """
    try:
        print(f"🌊 收到流式请求: {request.question}")

        # 调用流式响应生成器
        response_generator = stream_llm_response(request.question)

        return StreamingResponse(
            response_generator,
            media_type="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "Access-Control-Allow-Origin": "*"
            }
        )

    except Exception as e:
        print(f"❌ 流式响应过程中出现错误: {str(e)}")
        raise HTTPException(status_code=500, detail=f"流式响应失败: {str(e)}")

print("✅ FastAPI 应用创建完成！")
print("📚 API 文档将在启动后访问: http://localhost:8081/docs")
print("🔄 准备启动 Web 服务...")

✅ FastAPI 应用创建完成！
📚 API 文档将在启动后访问: http://localhost:8081/docs
🔄 准备启动 Web 服务...


## 🌐 第六步：Ngrok 注册与配置

### 🎯 什么是 Ngrok？

Ngrok 是一个强大的内网穿透工具，可以将本地运行的服务暴露到公网上，让外部用户可以访问。在我们的场景中，它可以让其他人通过公网 URL 访问您在 Colab 中部署的模型 API。

### 🔧 Ngrok 的作用

#### 📡 核心功能
1. **内网穿透**: 将本地服务映射到公网域名
2. **HTTPS 支持**: 自动提供 HTTPS 加密连接
3. **域名分配**: 分配一个临时的公网域名
4. **流量监控**: 提供请求日志和监控功能

#### 🎯 使用场景
- **API 分享**: 与团队成员分享 API 接口
- **远程测试**: 在不同设备上测试服务
- **演示展示**: 向客户展示项目效果
- **Webhook 接收**: 接收第三方服务的回调

### 📝 Ngrok 注册流程

#### 步骤 1：访问官网注册
1. 打开 Ngrok 官网：[https://ngrok.com/](https://ngrok.com/)
2. 点击右上角的 **"Sign up"** 按钮
3. 选择注册方式：
   - **GitHub 账号**: 推荐，一键登录
   - **Google 账号**: 方便快捷
   - **邮箱注册**: 传统方式

#### 步骤 2：验证邮箱（如果使用邮箱注册）
1. 填写邮箱地址和密码
2. 查收验证邮件
3. 点击邮件中的验证链接

#### 步骤 3：完成账号设置
1. 填写基本信息（可选）
2. 选择使用目的（个人/商业）
3. 完成注册流程

### 🔑 获取 Authtoken

#### 方法 1：Dashboard 获取
1. 登录后进入 [Ngrok Dashboard](https://dashboard.ngrok.com/)
2. 在左侧导航栏找到 **"Your Authtoken"** 或 **"Getting Started"**
3. 复制显示的 authtoken（格式类似：`1ABC2def3GHI4jkl5MNO6pqr7STU8vwx9YZ`）

#### 方法 2：直接访问链接
访问：[https://dashboard.ngrok.com/get-started/your-authtoken](https://dashboard.ngrok.com/get-started/your-authtoken)

### ⚠️ Authtoken 安全提示

#### 🔒 安全注意事项
1. **保密性**: Token 相当于您的账号密码，不要公开分享
2. **定期更换**: 建议定期重置 token 以确保安全
3. **权限控制**: 免费账号有使用限制，付费账号功能更多
4. **监控使用**: 定期检查 Dashboard 中的使用情况

#### 📊 免费账号限制
- **并发隧道**: 1个
- **连接数**: 40个/分钟
- **域名**: 随机分配
- **会话时长**: 8小时

### 🛠️ Token 配置方法

下面的单元格将演示如何配置您的 authtoken：

In [12]:
# 🔑 配置 Ngrok Authtoken
#
# ⚠️ 重要提示：请将下面的 YOUR_AUTHTOKEN_HERE 替换为您从 Ngrok Dashboard 获取的真实 token
#
# 🔗 获取 token 的步骤：
# 1. 访问：https://dashboard.ngrok.com/get-started/your-authtoken
# 2. 登录您的 Ngrok 账号
# 3. 复制显示的 authtoken
# 4. 替换下面代码中的 YOUR_AUTHTOKEN_HERE

# 📝 示例 token 格式（请替换为您的真实 token）：
# YOUR_AUTHTOKEN = "1ABC2def3GHI4jkl5MNO6pqr7STU8vwx9YZ"

# 🚨 请在下面填入您的真实 authtoken
YOUR_AUTHTOKEN = "2zo0ukGSh7rLCwIJLcyEEqQEcyC_5DPqPXokBtt3MbofiUTJ5"  # 👈 请替换这里

# 验证 token 是否已设置
if YOUR_AUTHTOKEN == "YOUR_AUTHTOKEN_HERE":
    print("❌ 请先设置您的 Ngrok authtoken！")
    print("📝 步骤：")
    print("1. 访问 https://dashboard.ngrok.com/get-started/your-authtoken")
    print("2. 登录并复制您的 authtoken")
    print("3. 将上面的 YOUR_AUTHTOKEN_HERE 替换为您的真实 token")
    print("4. 重新运行此单元格")
else:
    # 配置 authtoken
    import subprocess
    result = subprocess.run(['ngrok', 'config', 'add-authtoken', YOUR_AUTHTOKEN],
                          capture_output=True, text=True)
    if result.returncode == 0:
        print("✅ Ngrok authtoken 配置成功！")
        print("🎉 现在可以创建公网隧道了")
    else:
        print(f"❌ 配置失败: {result.stderr}")
        print("💡 请检查 token 是否正确")

✅ Ngrok authtoken 配置成功！
🎉 现在可以创建公网隧道了


### 🌐 创建 Ngrok 隧道和启动服务

现在我们将同时：
1. 创建 Ngrok 隧道，将本地服务暴露到公网
2. 启动 FastAPI 服务，提供 API 接口

#### 🔄 执行顺序
- 先创建 Ngrok 隧道（获取公网 URL）
- 然后启动 FastAPI 服务（在指定端口运行）
- 外部用户可以通过公网 URL 访问 API

#### 💡 使用提示
- 隧道创建成功后会显示公网 URL
- 请保存这个 URL，用于外部访问
- 服务启动后会阻塞当前单元格，这是正常现象


In [13]:
# 🌐 创建 Ngrok 隧道
#
# 这个单元格的作用：
# 1. 设置 FastAPI 服务的端口
# 2. 创建 Ngrok 隧道连接到该端口
# 3. 获取公网访问 URL

import time
from pyngrok import ngrok

# 📡 设置服务端口
port = 8081
print(f"🚀 准备在端口 {port} 上启动服务...")

try:
    # 🌐 创建 Ngrok 隧道
    print("🔗 正在创建 Ngrok 隧道...")

    # 创建 HTTP 隧道
    public_url = ngrok.connect(port).public_url

    print("✅ Ngrok 隧道创建成功！")
    print(f"🌍 公网访问地址: {public_url}")
    print(f"🔗 本地地址: http://127.0.0.1:{port}")
    print()
    print("📚 API 文档地址:")
    print(f"   • Swagger UI: {public_url}/docs")
    print(f"   • ReDoc: {public_url}/redoc")
    print()
    print("🧪 API 端点:")
    print(f"   • 健康检查: {public_url}/")
    print(f"   • 生成回答: {public_url}/api/v1/generate-response")
    print(f"   • 流式回答: {public_url}/api/v1/generate-response-stream")
    print()
    print("💡 提示：请保存上面的公网地址，用于外部访问")

    # 🔍 显示 Ngrok 监控信息
    tunnels = ngrok.get_tunnels()
    if tunnels:
        print("\n📊 当前活跃隧道:")
        for tunnel in tunnels:
            print(f"   • {tunnel.name}: {tunnel.public_url} -> {tunnel.config['addr']}")

except Exception as e:
    print(f"❌ 创建 Ngrok 隧道失败: {e}")
    print("🔧 可能的解决方案:")
    print("1. 检查 authtoken 是否正确配置")
    print("2. 确认网络连接正常")
    print("3. 检查是否超出免费版限制")
    print("4. 重新运行 authtoken 配置单元格")

    # 显示当前配置的 authtoken（部分遮蔽）
    try:
        import subprocess
        result = subprocess.run(['ngrok', 'config', 'check'], capture_output=True, text=True)
        if result.returncode == 0:
            print("\n📋 当前 Ngrok 配置状态: 正常")
        else:
            print(f"\n❌ Ngrok 配置检查失败: {result.stderr}")
    except:
        print("\n⚠️ 无法检查 Ngrok 配置状态")

🚀 准备在端口 8081 上启动服务...
🔗 正在创建 Ngrok 隧道...
✅ Ngrok 隧道创建成功！
🌍 公网访问地址: https://66bd8a6a43be.ngrok-free.app
🔗 本地地址: http://127.0.0.1:8081

📚 API 文档地址:
   • Swagger UI: https://66bd8a6a43be.ngrok-free.app/docs
   • ReDoc: https://66bd8a6a43be.ngrok-free.app/redoc

🧪 API 端点:
   • 健康检查: https://66bd8a6a43be.ngrok-free.app/
   • 生成回答: https://66bd8a6a43be.ngrok-free.app/api/v1/generate-response
   • 流式回答: https://66bd8a6a43be.ngrok-free.app/api/v1/generate-response-stream

💡 提示：请保存上面的公网地址，用于外部访问

📊 当前活跃隧道:
   • http-8081-b5d7580a-0cc9-4dec-bf0d-236665d58ac5: https://66bd8a6a43be.ngrok-free.app -> http://localhost:8081


In [None]:
# 🚀 启动 FastAPI Web 服务
#
# 这个单元格的作用：
# 1. 应用 nest_asyncio 解决 Jupyter 环境的异步问题
# 2. 使用 uvicorn 启动 FastAPI 应用
# 3. 在指定端口上运行 Web 服务，通过 Ngrok 隧道对外提供服务

print("🚀 启动 FastAPI Web 服务...")
print(f"📡 本地服务端口: {port}")
if 'public_url' in globals():
    print(f"🌍 公网访问地址: {public_url}")
    print(f"📚 API 文档: {public_url}/docs")
else:
    print("⚠️ 未检测到 Ngrok 隧道，请先运行上一个单元格")

print()
print("🎯 服务功能:")
print("   • 健康检查 API")
print("   • 模型问答 API（同步）")
print("   • 模型问答 API（流式）")
print("   • 自动生成的 API 文档")
print()
print("💡 提示：")
print("   • 服务启动后会阻塞当前单元格（这是正常现象）")
print("   • 可以在新标签页中访问 API 文档进行测试")
print("   • 按 Ctrl+C 或中断内核可以停止服务")
print("   • 停止服务后 Ngrok 隧道也会关闭")

# 应用 nest_asyncio 以在 Jupyter 环境中运行异步代码
nest_asyncio.apply()

try:
    # 启动 FastAPI 应用
    # host="0.0.0.0" 允许外部访问（通过 Ngrok 隧道）
    # port=port 使用之前定义的端口
    print(f"\n🔄 正在启动服务...")
    uvicorn.run(app, host="0.0.0.0", port=port)
except KeyboardInterrupt:
    print("\n🛑 服务已停止")
    print("💡 如需重新启动，请重新运行此单元格")
except Exception as e:
    print(f"\n❌ 服务启动失败: {e}")
    print("🔧 可能的解决方案:")
    print("1. 检查端口是否被占用")
    print("2. 确认 VLLM 服务正在运行")
    print("3. 重新运行依赖安装单元格")
finally:
    # 清理 Ngrok 隧道
    try:
        ngrok.kill()
        print("🧹 Ngrok 隧道已清理")
    except:
        pass

INFO:     Started server process [706]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)


🚀 启动 FastAPI Web 服务...
📡 本地服务端口: 8081
🌍 公网访问地址: https://66bd8a6a43be.ngrok-free.app
📚 API 文档: https://66bd8a6a43be.ngrok-free.app/docs

🎯 服务功能:
   • 健康检查 API
   • 模型问答 API（同步）
   • 模型问答 API（流式）
   • 自动生成的 API 文档

💡 提示：
   • 服务启动后会阻塞当前单元格（这是正常现象）
   • 可以在新标签页中访问 API 文档进行测试
   • 按 Ctrl+C 或中断内核可以停止服务
   • 停止服务后 Ngrok 隧道也会关闭

🔄 正在启动服务...
INFO:     23.142.200.79:0 - "GET /docs HTTP/1.1" 200 OK
INFO:     23.142.200.79:0 - "GET /openapi.json HTTP/1.1" 200 OK
📝 收到问题: 呦呦鹿鸣
✅ 推理完成
INFO:     23.142.200.79:0 - "POST /api/v1/generate-response HTTP/1.1" 200 OK
🌊 收到流式请求: 大模型学习，第一步做什么
INFO:     23.142.200.79:0 - "POST /api/v1/generate-response-stream HTTP/1.1" 200 OK


## 🧪 第八步：API 使用示例

服务启动成功后，您可以通过多种方式调用 API。

### 📡 API 调用方式

#### 1. 🌐 浏览器访问
- **本地访问**:
  - API 文档: `http://localhost:8081/docs` (Swagger UI)
  - 健康检查: `http://localhost:8081/`
- **公网访问**（通过 Ngrok）:
  - API 文档: `https://您的ngrok地址/docs`
  - 健康检查: `https://您的ngrok地址/`

#### 2. 📱 命令行调用 (cURL)
使用 cURL 命令行工具测试 API 接口

#### 3. 🐍 Python 调用
使用 requests 库或其他 HTTP 客户端

#### 4. 🌍 移动端/远程访问
通过 Ngrok 提供的公网 URL，可以在任何设备上访问

### 🔧 请求格式说明
- **Content-Type**: `application/json`
- **请求体**: JSON 格式，包含 `question` 字段
- **响应**: JSON 格式，包含模型回答

### 🌐 Ngrok 公网访问优势

#### ✨ 主要优势
1. **跨设备访问**: 手机、平板、其他电脑都可以访问
2. **团队协作**: 团队成员可以直接测试您的 API
3. **真实环境**: 模拟真实的网络环境和延迟
4. **HTTPS 支持**: 自动提供 SSL 加密，安全可靠

#### 📱 使用场景
- **移动端测试**: 在手机上直接测试 API
- **远程演示**: 向客户或同事展示项目
- **前端集成**: 前端开发者可以直接调用 API
- **第三方集成**: 支持 Webhook 等第三方服务

### 💡 使用提示
- 优先使用 Ngrok 提供的 HTTPS 地址
- 本地测试可以使用 localhost 地址
- 注意请求和响应的 JSON 格式
- 可以通过 Swagger UI 进行交互式测试
- Ngrok 地址每次重启都会变化，注意更新

### 📱 cURL 命令示例

#### 🌐 本地访问 - 生成完整回答
```bash
curl --location 'http://localhost:8081/api/v1/generate-response' \
--header 'Content-Type: application/json' \
--data '{
    "question": "巴黎在哪里？"
}'
```

#### 🌍 公网访问示例（需要替换为实际的 ngrok 地址）
```bash
curl --location 'https://你的ngrok地址/api/v1/generate-response' \
--header 'Content-Type: application/json' \
--data '{
    "question": "巴黎在哪里？"
}'
```

#### 🌊 流式响应示例
```bash
curl --location 'http://localhost:8081/api/v1/generate-response-stream' \
--header 'Content-Type: application/json' \
--data '{
    "question": "请详细介绍一下人工智能的发展历史"
}'
```

#### 🔧 参数说明
- `--location`: 跟随 HTTP 重定向
- `--header`: 设置请求头，指定内容类型
- `--data`: 发送 JSON 格式的请求体

## 📋 API 响应示例

### 🤖 完整响应格式

当您调用 `/api/v1/generate-response` 端点时，会收到如下格式的 JSON 响应：

#### 📊 响应结构说明
- **id**: 请求的唯一标识符
- **object**: 响应对象类型
- **created**: 响应创建时间戳
- **model**: 使用的模型名称
- **choices**: 模型生成的选择列表
  - **index**: 选择的索引
  - **message**: 消息内容
    - **role**: 角色（assistant）
    - **content**: 模型生成的回答
  - **finish_reason**: 完成原因（stop 表示正常结束）
- **usage**: 令牌使用统计
  - **prompt_tokens**: 输入令牌数
  - **completion_tokens**: 生成令牌数
  - **total_tokens**: 总令牌数

In [None]:
{
  "response": {
    "id": "chatcmpl-13e29c35212b486ead18d91aa0668886",
    "object": "chat.completion",
    "created": 1752386782,
    "model": "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
    "choices": [
      {
        "index": 0,
        "message": {
          "role": "assistant",
          "reasoning_content": null,
          "content": "好，用户发来“呦呦鹿鸣”这个词，看起来像是在玩手机的语音合成功能。我应该回复用户一个友好又有趣的回应，比如“嗯，看到你这么说，我好像也听到了，鹿鸣的声音很温柔啊！”这样既回应了他们的提问，又让语气更生动。另外，可以用一些轻松的语气，让用户觉得有趣又不觉得压力。可能用户想了解这句话的含义，或者只是想在聊天。所以，我需要保持自然，不显得太生硬。另外，可能用户是想测试一下语音合成的功能，或者是想了解一些有趣的话题。不管怎样，回应要友好，同时带点趣味，让用户感到愉快。\n</think>\n\n嗯，看到你这么说，我好像也听到了，鹿鸣的声音很温柔啊！",
          "tool_calls": []
        },
        "logprobs": null,
        "finish_reason": "stop",
        "stop_reason": null
      }
    ],
    "usage": {
      "prompt_tokens": 9,
      "total_tokens": 176,
      "completion_tokens": 167,
      "prompt_tokens_details": null
    },
    "prompt_logprobs": null,
    "kv_transfer_params": null
  }
}

### 🌊 流式响应格式

当您调用 `/api/v1/generate-response-stream` 端点时，会收到一系列 JSON 对象，每个对象代表生成过程中的一个步骤：

#### 📡 流式响应特点
- **实时性**: 逐步返回生成内容，无需等待完整回答
- **低延迟**: 用户可以立即看到模型开始生成
- **更好体验**: 适合长文本生成和实时对话

#### 🔄 流式数据格式
每行数据都是一个独立的 JSON 对象，包含：
- **id**: 请求标识符（整个流中保持一致）
- **object**: "chat.completion.chunk"
- **created**: 时间戳
- **model**: 模型名称
- **choices**: 当前生成的内容块
  - **index**: 选择索引
  - **delta**: 增量内容
    - **content**: 新生成的文本片段
  - **finish_reason**: 结束原因（null 表示继续，"stop" 表示结束）

#### 💡 使用建议
- 适合需要实时反馈的应用场景
- 可以实现打字机效果的用户界面
- 对于长文本生成特别有用

In [None]:
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"嗯"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"，"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"我现在"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"在"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"学习"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"大"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"模型"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"，"},"logprobs":null,"finish_reason":null}]}
{"id":"chatcmpl-72594106be2541269cc68e8b37123051","object":"chat.completion.chunk","created":1752386825,"model":"deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B","choices":[{"index":0,"delta":{"content":"比如"},"logprobs":null,"finish_reason":null}]}
...
[DONE]


### Kill the VLLM

In [None]:
vllm_process.terminate()
vllm_process.wait()  # Wait for process to terminate