# 📘 Word → CSV → YAML → RST 全流程流水线 v6
**针对 `AT_Commands.docx` 的增强：**
- ✅ 纯 `xml.etree.ElementTree` 顺序解析 Word，规避第三方依赖限制。
- ✅ 自动抽取「命令格式 / 参数 / 示例 / 响应」并构建结构化 JSON（参数/示例/响应）。
- ✅ CSV ➜ YAML ➜ RST 全链路保留 `valmap`、响应列表、命令分组，适配复杂嵌套表格。


In [None]:
# Step 0 — 安装依赖（若环境尚未安装）
!pip install -q pandas pyyaml jinja2


## Step 1 — Word → CSV（支持嵌套表格 + 示例/响应抽取）


In [None]:
import json, re
from pathlib import Path
from zipfile import ZipFile
import xml.etree.ElementTree as ET

import pandas as pd

PROJECT_ROOT = Path.cwd()
WORD_CANDIDATES = [
    PROJECT_ROOT / "AT_Commands.docx",
    PROJECT_ROOT / "data" / "AT_Commands.docx",
    PROJECT_ROOT.parent / "before" / "Convertion1" / "AT_Commands.docx",
    PROJECT_ROOT.parent / "before" / "Convertion1 copy" / "AT_Commands.docx",
]


def locate_docx():
    for path in WORD_CANDIDATES:
        if path.exists():
            return path
    raise FileNotFoundError("未找到 AT_Commands.docx，请确认路径。")


IN_WORD = locate_docx()
CSV_DIR = PROJECT_ROOT / "data"
CSV_DIR.mkdir(exist_ok=True)
CSV_OUT = CSV_DIR / "extracted_commands_v6.csv"

print(f"📄 输入 Word: {IN_WORD}")
print(f"💾 CSV 输出: {CSV_OUT}")

NS = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}

CMD_PATTERN = re.compile(r"^(AT[\w\+\-\*#]+)\s*[：:：]\s*(.+)$")
PARAM_LABELS = {"参数", "参数说明", "参数表"}
FORMAT_LABELS = {"命令格式", "命令格式说明", "命令语法"}
EXAMPLE_LABELS = {"示例", "示例命令", "例子", "使用示例"}
HEADER_KEYWORDS = [
    "参数",
    "名称",
    "Name",
    "描述",
    "说明",
    "取值",
    "value",
    "key",
    "含义",
    "类型",
    "命令",
    "响应",
    "示例",
    "例",
]


def iter_blocks(docx_path: Path):
    with ZipFile(docx_path) as zf:
        xml_bytes = zf.read("word/document.xml")
    root = ET.fromstring(xml_bytes)
    body = root.find("w:body", NS)
    for child in body:
        tag = child.tag.split('}')[-1]
        if tag == "p":
            text = "".join(t.text for t in child.findall(".//w:t", NS) if t.text)
            yield ("p", text.strip(), child)
        elif tag == "tbl":
            yield ("tbl", "", child)


def cell_lines(tc):
    lines = []
    for p in tc.findall("w:p", NS):
        text = "".join(t.text for t in p.findall(".//w:t", NS) if t.text)
        text = text.strip()
        if text:
            lines.append(text)
    return lines


def cell_text(tc):
    return "
".join(cell_lines(tc))


def table_rows(tbl):
    rows = []
    for tr in tbl.findall("w:tr", NS):
        cells = tr.findall("w:tc", NS)
        rows.append((cells, [cell_text(tc) for tc in cells]))
    return rows


def looks_like_header(row):
    joined = "".join(row[:3])
    if "<" in joined and ">" in joined:
        return False
    return any(key in joined for key in HEADER_KEYWORDS)


def parse_nested_valmap(tc):
    valmap = {}
    for tbl in tc.findall(".//w:tbl", NS):
        nested_rows = []
        for tr in tbl.findall("w:tr", NS):
            cells = tr.findall("w:tc", NS)
            row_text = [cell_text(c) for c in cells]
            if not any(x.strip() for x in row_text):
                continue
            nested_rows.append(row_text)
        if not nested_rows:
            continue
        start = 1 if looks_like_header(nested_rows[0]) else 0
        for r in nested_rows[start:]:
            key = (r[0] if len(r) > 0 else "").strip()
            val = " | ".join(c.strip() for c in r[1:] if c.strip())
            if key:
                valmap[key] = val
    return valmap


def parse_enum_map_fuzzy(text):
    if not text:
        return {}
    items = re.split(r"[
,，,;；]+", text)
    mapping = {}
    for item in items:
        item = item.strip()
        if not item:
            continue
        if "：" in item or ":" in item:
            k, v = re.split(r"[:：]", item, 1)
            mapping[k.strip()] = v.strip()
        else:
            m = re.match(r"^(\S+)\s*(?:->|→|=>|=|-|—|~|到|至)\s*(.+)$", item)
            if m:
                mapping[m.group(1).strip()] = m.group(2).strip()
    return mapping


def parse_format_table(tbl):
    types, formats, responses = [], [], []
    for cells, texts in table_rows(tbl):
        if not any(t.strip() for t in texts):
            continue
        if looks_like_header(texts):
            continue
        typ = texts[0].strip() if len(texts) > 0 else ""
        cmd = texts[1].strip() if len(texts) > 1 else ""
        resp = texts[2].strip() if len(texts) > 2 else ""
        if typ:
            types.append(typ)
        if cmd:
            formats.append(cmd.replace("<CR>", "").replace("。", "").strip())
        if resp:
            responses.append(resp)
    return types, formats, responses


def parse_param_table(tbl):
    params = []
    for cells, texts in table_rows(tbl):
        if not any(t.strip() for t in texts):
            continue
        if looks_like_header(texts):
            continue
        name = texts[0].strip() if len(texts) > 0 else ""
        desc = texts[1].strip() if len(texts) > 1 else ""
        valmap = {}
        if len(cells) > 2:
            valmap = parse_nested_valmap(cells[2])
            if not valmap:
                valmap = parse_enum_map_fuzzy(texts[2] if len(texts) > 2 else "")
        if not valmap:
            valmap = parse_nested_valmap(cells[1]) or parse_enum_map_fuzzy(desc)
        params.append({"name": name, "desc": desc, "valmap": valmap})
    return params


def parse_example_table(tbl):
    rows = [(cells, texts) for cells, texts in table_rows(tbl) if any(t.strip() for t in texts)]
    if not rows:
        return []
    header_text = "".join(rows[0][1])
    start = 1 if looks_like_header(rows[0][1]) or any(key in header_text for key in ["命令", "示例", "响应"]) else 0
    examples = []
    for cells, texts in rows[start:]:
        cmd_lines = cell_lines(cells[0]) if len(cells) > 0 else []
        resp_lines = cell_lines(cells[1]) if len(cells) > 1 else []
        cmd = cmd_lines[0] if cmd_lines else (texts[0].strip() if len(texts) > 0 else "")
        extra_cmd_resp = cmd_lines[1:]
        resp_parts = resp_lines or extra_cmd_resp
        resp = " | ".join(resp_parts)
        if not resp:
            resp = texts[1].strip() if len(texts) > 1 else ""
        more = [part.strip() for part in texts[2:] if part.strip()]
        if more:
            resp = " | ".join([p for p in ([resp] if resp else []) + more])
        examples.append({"cmd": cmd.replace("<CR>", "").strip(), "resp": resp})
    return examples


def unique(seq):
    seen = []
    for item in seq:
        if item and item not in seen:
            seen.append(item)
    return seen


def extract_word_to_csv(docx_path: Path, csv_out: Path):
    commands = []
    current = None
    current_group = ""
    expect = None

    for kind, text, elem in iter_blocks(docx_path):
        if kind == "p":
            txt = text.strip()
            if not txt:
                continue
            m = CMD_PATTERN.match(txt)
            if m:
                if current:
                    current["description"] = "
".join(current.get("description_lines", [])).strip()
                    commands.append(current)
                current = {
                    "name": m.group(1).strip(),
                    "title": m.group(2).strip(),
                    "description_lines": [],
                    "types": [],
                    "formats": [],
                    "responses": [],
                    "parameters": [],
                    "examples": [],
                    "group": current_group,
                }
                expect = None
                continue
            if txt in FORMAT_LABELS:
                expect = "format"
                continue
            if txt in PARAM_LABELS:
                expect = "param"
                continue
            if txt in EXAMPLE_LABELS:
                expect = "example"
                continue
            if current is None:
                current_group = txt
            else:
                current["description_lines"].append(txt)
        else:
            if current is None:
                expect = None
                continue
            if expect == "format":
                t, f, r = parse_format_table(elem)
                current["types"].extend(t)
                current["formats"].extend(f)
                current["responses"].extend(r)
                expect = None
            elif expect == "param":
                params = parse_param_table(elem)
                if params:
                    current["parameters"] = params
                expect = None
            elif expect == "example":
                ex = parse_example_table(elem)
                if ex:
                    current["examples"].extend(ex)
                expect = None

    if current:
        current["description"] = "
".join(current.get("description_lines", [])).strip()
        commands.append(current)

    rows = []
    for cmd in commands:
        types = unique(cmd["types"])
        formats = unique(cmd["formats"]) or [cmd["name"]]
        examples = cmd["examples"]
        responses = unique(cmd["responses"])
        rows.append({
            "命令": cmd["name"],
            "命令标题": cmd["title"],
            "命令类型": ";".join(types),
            "命令格式": " | ".join(formats),
            "示例命令": " | ".join(e["cmd"] for e in examples if e["cmd"]),
            "示例响应": " | ".join(e["resp"] for e in examples if e["resp"]),
            "示例JSON": json.dumps(examples, ensure_ascii=False),
            "功能描述": cmd.get("description", ""),
            "命令分组": cmd.get("group", ""),
            "响应JSON": json.dumps(responses, ensure_ascii=False),
            "备注": "",
            "参数JSON": json.dumps(cmd.get("parameters", []), ensure_ascii=False),
        })

    df = pd.DataFrame(rows)
    df.to_csv(csv_out, index=False, encoding="utf-8-sig")
    print(f"✅ 已解析 {len(df)} 条命令 → {csv_out}")
    return df


df_csv = extract_word_to_csv(IN_WORD, CSV_OUT)
df_csv.head()


## Step 2 — CSV → YAML（保持结构化参数/示例/响应）


In [None]:
import json
import pandas as pd
import yaml

YAML_OUT = CSV_DIR / "all_commands_v6.yaml"


def csv_to_yaml(csv_path: Path, yaml_path: Path):
    df = pd.read_csv(csv_path, dtype=str).fillna("")
    cmds = []
    for _, row in df.iterrows():
        examples = json.loads(row.get("示例JSON", "[]") or "[]")
        params = json.loads(row.get("参数JSON", "[]") or "[]")
        responses = json.loads(row.get("响应JSON", "[]") or "[]")
        for p in params:
            if not isinstance(p.get("valmap"), dict):
                p["valmap"] = {}
        entry = {
            "command": row.get("命令", ""),
            "title": row.get("命令标题", ""),
            "group": row.get("命令分组", ""),
            "type": [t.strip() for t in row.get("命令类型", "").split(";") if t.strip()],
            "formats": [f.strip() for f in row.get("命令格式", "").split("|") if f.strip()],
            "responses": responses,
            "parameters": params,
            "examples": [{"cmd": ex.get("cmd", ""), "resp": ex.get("resp", "")} for ex in examples],
            "description": row.get("功能描述", ""),
            "notes": row.get("备注", ""),
        }
        cmds.append(entry)
    with open(yaml_path, "w", encoding="utf-8") as f:
        yaml.safe_dump({"commands": cmds}, f, allow_unicode=True, sort_keys=False)
    print(f"✅ 已生成 YAML → {yaml_path}")


csv_to_yaml(CSV_OUT, YAML_OUT)


## Step 3 — YAML → RST（命令详情文档渲染）


In [None]:
from types import SimpleNamespace

from jinja2 import Template
import yaml

RST_DIR = PROJECT_ROOT / "data" / "rst_output_v6"
RST_DIR.mkdir(parents=True, exist_ok=True)

TEMPLATE_STR = """
{{ cmd.command }}
{{ "=" * cmd.command|length }}

**Title**: {{ cmd.title }}
{% if cmd.group %}**Group**: {{ cmd.group }}{% endif %}
**Types**: {{ cmd.type|join(", ") if cmd.type else "\u2014" }}

Formats::
{% if cmd.formats %}
{% for fmt in cmd.formats %}
   {{ fmt }}
{% endfor %}
{% else %}
   \u2014
{% endif %}
{% if cmd.responses %}
Responses::
{% for resp in cmd.responses %}
   {{ resp }}
{% endfor %}

{% endif %}
Parameters
----------
.. list-table::
   :header-rows: 1
   :widths: 18 34 48

   * - Name
     - Description
     - Values
{% for p in cmd.parameters %}
   * - {{ p.name }}
     - {{ p.desc or "\u2014" }}
     - {% if p.valmap %}
       .. list-table::
          :header-rows: 1
          :widths: 20 40

          * - Key
            - Value
{% for k, v in p.valmap.items() %}
          * - {{ k }}
            - {{ v }}
{% endfor %}
       {% else %}N/A{% endif %}
{% endfor %}

Examples
--------
{% if cmd.examples %}
{% for ex in cmd.examples %}
* ``{{ ex.cmd }}`` → {{ ex.resp or "\u2014" }}
{% endfor %}
{% else %}
* 无示例
{% endif %}

**Description**: {{ cmd.description or "\u2014" }}
{% if cmd.notes %}**Notes**: {{ cmd.notes }}{% endif %}
""".lstrip()

RST_TMPL = Template(TEMPLATE_STR)


def yaml_to_rst(yaml_path: Path, rst_dir: Path):
    with open(yaml_path, "r", encoding="utf-8") as f:
        data = yaml.safe_load(f) or {}
    cmds = data.get("commands", [])
    generated = []
    for cmd in cmds:
        parameters = cmd.get("parameters", []) or []
        examples = cmd.get("examples", []) or []
        responses = cmd.get("responses", []) or []
        cmd_obj = SimpleNamespace(
            command=cmd.get("command", ""),
            title=cmd.get("title", ""),
            group=cmd.get("group", ""),
            type=cmd.get("type", []) or [],
            formats=cmd.get("formats", []) or [],
            responses=responses,
            parameters=[
                SimpleNamespace(
                    name=p.get("name", ""),
                    desc=p.get("desc", ""),
                    valmap=p.get("valmap", {}) if isinstance(p.get("valmap", {}), dict) else {}
                )
                for p in parameters
            ],
            examples=[
                SimpleNamespace(cmd=ex.get("cmd", ""), resp=ex.get("resp", ""))
                for ex in examples
            ],
            description=cmd.get("description", ""),
            notes=cmd.get("notes", ""),
        )
        rst_text = RST_TMPL.render(cmd=cmd_obj)
        out_path = rst_dir / f"{cmd_obj.command}.rst"
        with open(out_path, "w", encoding="utf-8") as f:
            f.write(rst_text)
        generated.append(out_path.name)
    index_lines = [
        "AT Commands Manual",
        "==================",
        "",
        ".. toctree::",
        "   :maxdepth: 1",
        "",
    ]
    for name in generated:
        index_lines.append(f"   {Path(name).stem}")
    with open(rst_dir / "index.rst", "w", encoding="utf-8") as f:
        f.write("\n".join(index_lines))
    print(f"✅ 共生成 {len(generated)} 个 RST 文件 → {rst_dir}")
    return generated


sample_files = yaml_to_rst(YAML_OUT, RST_DIR)
print("📄 示例 RST：", sample_files[:5])
