## 1.获取各个图片块base64

https://codebeautify.org/base64-to-image-converter

In [42]:
import os
import json
import base64
import requests
from typing import Dict, Any, List, Tuple

BASE_URL = "http://192.168.18.124:8080"  # 后端地址
API_KEY = os.getenv("RAGFLOW_API_KEY", "ragflow-ViYWE4MzM4OGEwNDExZjA5N2UyMjJiY2")
TARGET_DATASET_NAME = "test"

IMAGE_URL_TEMPLATE = BASE_URL.rstrip("/") + "/v1/document/image/{image_id}"

HEADERS_JSON = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}
HEADERS_BIN = {
    "Authorization": f"Bearer {API_KEY}"
}

OUTPUT_DIR = "image_output"
MAX_BASE64_LEN = 800_000   # 调整阈值：Base64 字符长度 < 200000 才保存

EXT_MAP = {
    "image/jpeg": "jpg",
    "image/jpg": "jpg",
    "image/png": "png",
    "image/gif": "gif",
    "image/webp": "webp",
    "image/bmp": "bmp",
    "image/tiff": "tiff",
}

def get_dataset_id_by_name(name: str) -> str:
    r = requests.get(
        f"{BASE_URL}/api/v1/datasets",
        headers=HEADERS_JSON,
        params={"name": name, "page": 1, "page_size": 50},
        timeout=15
    )
    r.raise_for_status()
    resp = r.json()
    if resp.get("code") != 0:
        raise RuntimeError(f"List datasets error: {resp}")
    for ds in resp.get("data", []):
        if isinstance(ds, dict) and ds.get("name") == name:
            return ds["id"]
    raise ValueError(f"Dataset '{name}' 不存在")

def list_documents(dataset_id: str) -> List[Dict[str, Any]]:
    url = f"{BASE_URL}/api/v1/datasets/{dataset_id}/documents"
    page, size = 1, 50
    docs: List[Dict[str, Any]] = []
    while True:
        r = requests.get(url, headers=HEADERS_JSON, params={"page": page, "page_size": size}, timeout=30)
        r.raise_for_status()
        resp = r.json()
        if resp.get("code") != 0:
            raise RuntimeError(f"List documents error: {resp}")
        data = resp.get("data", {}) or {}
        batch = (data.get("docs") or data.get("documents") or [])
        batch = [d for d in batch if isinstance(d, dict)]
        if not batch:
            break
        docs.extend(batch)
        if len(batch) < size:
            break
        page += 1
    return docs

def list_chunks(dataset_id: str, document_id: str, page_size: int = 500) -> List[Dict[str, Any]]:
    url = f"{BASE_URL}/api/v1/datasets/{dataset_id}/documents/{document_id}/chunks"
    page = 1
    out: List[Dict[str, Any]] = []
    while True:
        r = requests.get(url, headers=HEADERS_JSON, params={"page": page, "page_size": page_size}, timeout=60)
        r.raise_for_status()
        resp = r.json()
        if resp.get("code") != 0:
            raise RuntimeError(f"List chunks error: {resp}")
        data = resp.get("data", {}) or {}
        chunks = data.get("chunks") or []
        if not isinstance(chunks, list):
            raise RuntimeError(f"Unexpected chunks type: {type(chunks)}")
        chunks = [c for c in chunks if isinstance(c, dict)]
        out.extend(chunks)
        if len(chunks) < page_size:
            break
        page += 1
    return out

def download_image(image_id: str, session: requests.Session, retry: int = 2) -> Tuple[bytes, str]:
    url = IMAGE_URL_TEMPLATE.format(image_id=image_id)
    last_err = None
    for _ in range(retry + 1):
        try:
            resp = session.get(url, headers=HEADERS_BIN, timeout=30)
            if resp.status_code == 404:
                raise FileNotFoundError(f"image 404 {image_id}")
            resp.raise_for_status()
            ct = resp.headers.get("Content-Type", "").lower()
            if not ct.startswith("image/"):
                raise RuntimeError(f"非 image/ 响应 (Content-Type={ct})")
            return resp.content, ct
        except Exception as e:
            last_err = e
    raise RuntimeError(f"下载图片失败 image_id={image_id} err={last_err}")

def ensure_dir(path: str):
    if not os.path.isdir(path):
        os.makedirs(path, exist_ok=True)

def save_image(chunk_id: str, content: bytes, content_type: str) -> str:
    ext = EXT_MAP.get(content_type, "bin")
    filename = f"{chunk_id}.{ext}"
    filepath = os.path.join(OUTPUT_DIR, filename)
    with open(filepath, "wb") as f:
        f.write(content)
    return filepath

def export_all_image_chunks(dataset_name: str):
    ensure_dir(OUTPUT_DIR)

    dataset_id = get_dataset_id_by_name(dataset_name)
    docs = list_documents(dataset_id)
    if not docs:
        print("无文档，结束。")
        return

    session = requests.Session()
    image_cache: Dict[str, Tuple[bytes, str]] = {}

    total_img_chunk = 0
    downloaded_image_ids = 0
    saved_count = 0
    skipped_big = 0
    failed = 0
    summary = []

    # 可选：预计算允许的最大字节数（向下取整）
    max_bytes_allowed = (MAX_BASE64_LEN // 4) * 3

    for doc in docs:
        doc_id = doc.get("id")
        chunks = list_chunks(dataset_id, doc_id)
        img_chunks = [c for c in chunks if c.get("doc_type_kwd") == "image"]
        if not img_chunks:
            continue
        print(f"文档 {doc_id} 图片块数: {len(img_chunks)}")

        for c in img_chunks:
            chunk_id = c.get("id")
            image_id = c.get("image_id")
            if not chunk_id or not image_id:
                continue

            total_img_chunk += 1

            if image_id not in image_cache:
                try:
                    img_bytes, ct = download_image(image_id, session)
                    image_cache[image_id] = (img_bytes, ct)
                    downloaded_image_ids += 1
                except Exception as e:
                    failed += 1
                    print(f"[失败] chunk_id={chunk_id} image_id={image_id} err={e}")
                    continue

            img_bytes, ct = image_cache[image_id]
            b64_len = ((len(img_bytes) + 2) // 3) * 4

            if b64_len >= MAX_BASE64_LEN:
                skipped_big += 1
                summary.append({
                    "chunk_id": chunk_id,
                    "image_id": image_id,
                    "status": "skipped_big",
                    "base64_len": b64_len,
                    "bytes_len": len(img_bytes)
                })
                continue

            path = save_image(chunk_id, img_bytes, ct)
            saved_count += 1
            summary.append({
                "chunk_id": chunk_id,
                "image_id": image_id,
                "status": "saved",
                "file": path,
                "base64_len": b64_len,
                "bytes_len": len(img_bytes)
            })

    report_path = os.path.join(OUTPUT_DIR, "summary.json")
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump({
            "dataset": dataset_name,
            "total_image_chunks": total_img_chunk,
            "distinct_image_ids_downloaded": downloaded_image_ids,
            "saved": saved_count,
            "skipped_big": skipped_big,
            "failed": failed,
            "max_base64_len_threshold": MAX_BASE64_LEN,
            "approx_max_bytes_allowed": max_bytes_allowed,
            "details": summary
        }, f, ensure_ascii=False, indent=2)

    print("\n==== 完成 ====")
    print(f"图片类型 chunk 总数: {total_img_chunk}")
    print(f"下载唯一 image_id 数: {downloaded_image_ids}")
    print(f"成功保存: {saved_count}")
    print(f"因 Base64 长度 >= {MAX_BASE64_LEN} 跳过: {skipped_big}")
    print(f"失败: {failed}")
    print(f"详情见: {report_path}")

if __name__ == "__main__":
    export_all_image_chunks(TARGET_DATASET_NAME)


文档 84777bde8a0a11f085c722bcecfb00ba 图片块数: 32
文档 846b70968a0a11f085c722bcecfb00ba 图片块数: 48

==== 完成 ====
图片类型 chunk 总数: 80
下载唯一 image_id 数: 80
成功保存: 80
因 Base64 长度 >= 800000 跳过: 0
失败: 0
详情见: image_output/summary.json


In [40]:
import os
import json
import requests
from typing import Dict, Optional, Generator

BASE_URL = "http://192.168.18.124:8080"
AGENT_ID = "12a59aca8a0c11f0b18822bcecfb00ba"
API_KEY = os.getenv("RAGFLOW_API_KEY", "ragflow-ViYWE4MzM4OGEwNDExZjA5N2UyMjJiY2")

HEADERS_JSON = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

def create_session(agent_id: str, begin_vars: Optional[Dict[str, str]] = None, user_id: Optional[str] = None) -> str:
    """
    显式创建会话（deprecated 接口，若需要填 Begin 组件必填变量可用）
    begin_vars: 例如 {"var1":"你好", "var2":"世界"}
    """
    params = {}
    if user_id:
        params["user_id"] = user_id
    url = f"{BASE_URL}/api/v1/agents/{agent_id}/sessions"
    body = begin_vars or {}
    resp = requests.post(url, headers=HEADERS_JSON, params=params, json=body, timeout=30)
    data = resp.json()
    if data.get("code") != 0:
        raise RuntimeError(f"Create session failed: {data}")
    session_id = data["data"]["id"]
    return session_id

def converse(
    agent_id: str,
    question: str,
    session_id: Optional[str] = None,
    inputs: Optional[Dict[str, str]] = None,
    debug: bool = False
) -> Dict:
    """
    非流式对话。
    inputs: 若第一轮需携带 Begin 变量，可放入。示例 {"var1":"你好","var2":"世界"}
    """
    if not question:
        raise ValueError("question 不能为空")
    url = f"{BASE_URL}/api/v1/agents/{agent_id}/converse"
    payload = {
        "question": question
    }
    if session_id:
        payload["session_id"] = session_id
    if inputs:
        payload.update(inputs)  # 直接合并，符合 Begin 要求（若接口需要）
    if debug:
        print("REQUEST:", json.dumps(payload, ensure_ascii=False))
    resp = requests.post(url, headers=HEADERS_JSON, json=payload, timeout=120)
    try:
        data = resp.json()
    except Exception:
        raise RuntimeError(f"Non-JSON response: {resp.text}")
    if data.get("code") != 0:
        raise RuntimeError(f"Converse failed: {data}")
    return data["data"]

def converse_stream(
    agent_id: str,
    question: str,
    session_id: Optional[str] = None,
    inputs: Optional[Dict[str, str]] = None
) -> Generator[Dict, None, None]:
    """
    流式对话（Server-Sent Events 风格）。
    逐块 yield 解析出的 data 字典。
    """
    if not question:
        raise ValueError("question 不能为空")
    url = f"{BASE_URL}/api/v1/agents/{agent_id}/converse"
    payload = {
        "question": question,
        "stream": True  # 假设需显式声明流式；若后端自动识别可去掉
    }
    if session_id:
        payload["session_id"] = session_id
    if inputs:
        payload.update(inputs)

    with requests.post(url, headers=HEADERS_JSON, json=payload, stream=True, timeout=300) as r:
        if r.status_code != 200:
            raise RuntimeError(f"HTTP {r.status_code}: {r.text}")
        for line in r.iter_lines(decode_unicode=True):
            if not line:
                continue
            if line.startswith("data:"):
                raw = line[5:].strip()
                if raw == "[DONE]" or raw == "DONE":
                    break
                try:
                    obj = json.loads(raw)
                except json.JSONDecodeError:
                    # 后端如果多次发送同一结构可忽略解析失败行
                    continue
                # 结构通常: {"code":0,"data":{...}}
                if obj.get("code") != 0:
                    raise RuntimeError(f"stream error: {obj}")
                yield obj["data"]

def simple_test():
    """
    展示完整调试流程：
    1. （可选）创建 session（若 Begin 有必填变量）
    2. 发起一次非流式问答
    3. 发起一次流式问答
    """
    # 若 Begin 有必填字段，先创建；假设 var1/var2（根据你的实际 DSL 修改）
    need_begin_vars = False  # 如果确认需要，改成 True 并填写
    begin_vars = {"var1": "你好", "var2": "世界"} if need_begin_vars else None

    session_id = None
    if need_begin_vars:
        session_id = create_session(AGENT_ID, begin_vars=begin_vars)
        print("创建会话 session_id =", session_id)

    print("\n===== 流式 =====")
    for chunk in converse_stream(
        AGENT_ID,
        question="回答：你能做什么？",
        session_id=session_id
    ):
        # 逐块输出
        piece = chunk.get("answer")
        if piece:
            print(piece, end="", flush=True)
    print("\n(流式结束)")

if __name__ == "__main__":
    # 简单运行
    simple_test()



===== 流式 =====

(流式结束)
