diff --git a/pipeline-1008/word_full_pipeline_v6.ipynb b/pipeline-1008/word_full_pipeline_v6.ipynb new file mode 100644 index 0000000..326b98b --- /dev/null +++ b/pipeline-1008/word_full_pipeline_v6.ipynb @@ -0,0 +1,555 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 📘 Word → CSV → YAML → RST 全流程流水线 v6\n", + "**针对 `AT_Commands.docx` 的增强:**\n", + "- ✅ 纯 `xml.etree.ElementTree` 顺序解析 Word,规避第三方依赖限制。\n", + "- ✅ 自动抽取「命令格式 / 参数 / 示例 / 响应」并构建结构化 JSON(参数/示例/响应)。\n", + "- ✅ CSV ➜ YAML ➜ RST 全链路保留 `valmap`、响应列表、命令分组,适配复杂嵌套表格。\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "# Step 0 — 安装依赖(若环境尚未安装)\n", + "!pip install -q pandas pyyaml jinja2\n" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1 — Word → CSV(支持嵌套表格 + 示例/响应抽取)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "import json, re\n", + "from pathlib import Path\n", + "from zipfile import ZipFile\n", + "import xml.etree.ElementTree as ET\n", + "\n", + "import pandas as pd\n", + "\n", + "PROJECT_ROOT = Path.cwd()\n", + "WORD_CANDIDATES = [\n", + " PROJECT_ROOT / \"AT_Commands.docx\",\n", + " PROJECT_ROOT / \"data\" / \"AT_Commands.docx\",\n", + " PROJECT_ROOT.parent / \"before\" / \"Convertion1\" / \"AT_Commands.docx\",\n", + " PROJECT_ROOT.parent / \"before\" / \"Convertion1 copy\" / \"AT_Commands.docx\",\n", + "]\n", + "\n", + "\n", + "def locate_docx():\n", + " for path in WORD_CANDIDATES:\n", + " if path.exists():\n", + " return path\n", + " raise FileNotFoundError(\"未找到 AT_Commands.docx,请确认路径。\")\n", + "\n", + "\n", + "IN_WORD = locate_docx()\n", + "CSV_DIR = PROJECT_ROOT / \"data\"\n", + "CSV_DIR.mkdir(exist_ok=True)\n", + "CSV_OUT = CSV_DIR / \"extracted_commands_v6.csv\"\n", + "\n", + "print(f\"📄 输入 Word: {IN_WORD}\")\n", + "print(f\"💾 CSV 输出: {CSV_OUT}\")\n", + "\n", + "NS = {\"w\": \"http://schemas.openxmlformats.org/wordprocessingml/2006/main\"}\n", + "\n", + "CMD_PATTERN = re.compile(r\"^(AT[\\w\\+\\-\\*#]+)\\s*[:::]\\s*(.+)$\")\n", + "PARAM_LABELS = {\"参数\", \"参数说明\", \"参数表\"}\n", + "FORMAT_LABELS = {\"命令格式\", \"命令格式说明\", \"命令语法\"}\n", + "EXAMPLE_LABELS = {\"示例\", \"示例命令\", \"例子\", \"使用示例\"}\n", + "HEADER_KEYWORDS = [\n", + " \"参数\",\n", + " \"名称\",\n", + " \"Name\",\n", + " \"描述\",\n", + " \"说明\",\n", + " \"取值\",\n", + " \"value\",\n", + " \"key\",\n", + " \"含义\",\n", + " \"类型\",\n", + " \"命令\",\n", + " \"响应\",\n", + " \"示例\",\n", + " \"例\",\n", + "]\n", + "\n", + "\n", + "def iter_blocks(docx_path: Path):\n", + " with ZipFile(docx_path) as zf:\n", + " xml_bytes = zf.read(\"word/document.xml\")\n", + " root = ET.fromstring(xml_bytes)\n", + " body = root.find(\"w:body\", NS)\n", + " for child in body:\n", + " tag = child.tag.split('}')[-1]\n", + " if tag == \"p\":\n", + " text = \"\".join(t.text for t in child.findall(\".//w:t\", NS) if t.text)\n", + " yield (\"p\", text.strip(), child)\n", + " elif tag == \"tbl\":\n", + " yield (\"tbl\", \"\", child)\n", + "\n", + "\n", + "def cell_lines(tc):\n", + " lines = []\n", + " for p in tc.findall(\"w:p\", NS):\n", + " text = \"\".join(t.text for t in p.findall(\".//w:t\", NS) if t.text)\n", + " text = text.strip()\n", + " if text:\n", + " lines.append(text)\n", + " return lines\n", + "\n", + "\n", + "def cell_text(tc):\n", + " return \"\n", + "\".join(cell_lines(tc))\n", + "\n", + "\n", + "def table_rows(tbl):\n", + " rows = []\n", + " for tr in tbl.findall(\"w:tr\", NS):\n", + " cells = tr.findall(\"w:tc\", NS)\n", + " rows.append((cells, [cell_text(tc) for tc in cells]))\n", + " return rows\n", + "\n", + "\n", + "def looks_like_header(row):\n", + " joined = \"\".join(row[:3])\n", + " if \"<\" in joined and \">\" in joined:\n", + " return False\n", + " return any(key in joined for key in HEADER_KEYWORDS)\n", + "\n", + "\n", + "def parse_nested_valmap(tc):\n", + " valmap = {}\n", + " for tbl in tc.findall(\".//w:tbl\", NS):\n", + " nested_rows = []\n", + " for tr in tbl.findall(\"w:tr\", NS):\n", + " cells = tr.findall(\"w:tc\", NS)\n", + " row_text = [cell_text(c) for c in cells]\n", + " if not any(x.strip() for x in row_text):\n", + " continue\n", + " nested_rows.append(row_text)\n", + " if not nested_rows:\n", + " continue\n", + " start = 1 if looks_like_header(nested_rows[0]) else 0\n", + " for r in nested_rows[start:]:\n", + " key = (r[0] if len(r) > 0 else \"\").strip()\n", + " val = \" | \".join(c.strip() for c in r[1:] if c.strip())\n", + " if key:\n", + " valmap[key] = val\n", + " return valmap\n", + "\n", + "\n", + "def parse_enum_map_fuzzy(text):\n", + " if not text:\n", + " return {}\n", + " items = re.split(r\"[\n", + ",,,;;]+\", text)\n", + " mapping = {}\n", + " for item in items:\n", + " item = item.strip()\n", + " if not item:\n", + " continue\n", + " if \":\" in item or \":\" in item:\n", + " k, v = re.split(r\"[::]\", item, 1)\n", + " mapping[k.strip()] = v.strip()\n", + " else:\n", + " m = re.match(r\"^(\\S+)\\s*(?:->|→|=>|=|-|—|~|到|至)\\s*(.+)$\", item)\n", + " if m:\n", + " mapping[m.group(1).strip()] = m.group(2).strip()\n", + " return mapping\n", + "\n", + "\n", + "def parse_format_table(tbl):\n", + " types, formats, responses = [], [], []\n", + " for cells, texts in table_rows(tbl):\n", + " if not any(t.strip() for t in texts):\n", + " continue\n", + " if looks_like_header(texts):\n", + " continue\n", + " typ = texts[0].strip() if len(texts) > 0 else \"\"\n", + " cmd = texts[1].strip() if len(texts) > 1 else \"\"\n", + " resp = texts[2].strip() if len(texts) > 2 else \"\"\n", + " if typ:\n", + " types.append(typ)\n", + " if cmd:\n", + " formats.append(cmd.replace(\"\", \"\").replace(\"。\", \"\").strip())\n", + " if resp:\n", + " responses.append(resp)\n", + " return types, formats, responses\n", + "\n", + "\n", + "def parse_param_table(tbl):\n", + " params = []\n", + " for cells, texts in table_rows(tbl):\n", + " if not any(t.strip() for t in texts):\n", + " continue\n", + " if looks_like_header(texts):\n", + " continue\n", + " name = texts[0].strip() if len(texts) > 0 else \"\"\n", + " desc = texts[1].strip() if len(texts) > 1 else \"\"\n", + " valmap = {}\n", + " if len(cells) > 2:\n", + " valmap = parse_nested_valmap(cells[2])\n", + " if not valmap:\n", + " valmap = parse_enum_map_fuzzy(texts[2] if len(texts) > 2 else \"\")\n", + " if not valmap:\n", + " valmap = parse_nested_valmap(cells[1]) or parse_enum_map_fuzzy(desc)\n", + " params.append({\"name\": name, \"desc\": desc, \"valmap\": valmap})\n", + " return params\n", + "\n", + "\n", + "def parse_example_table(tbl):\n", + " rows = [(cells, texts) for cells, texts in table_rows(tbl) if any(t.strip() for t in texts)]\n", + " if not rows:\n", + " return []\n", + " header_text = \"\".join(rows[0][1])\n", + " start = 1 if looks_like_header(rows[0][1]) or any(key in header_text for key in [\"命令\", \"示例\", \"响应\"]) else 0\n", + " examples = []\n", + " for cells, texts in rows[start:]:\n", + " cmd_lines = cell_lines(cells[0]) if len(cells) > 0 else []\n", + " resp_lines = cell_lines(cells[1]) if len(cells) > 1 else []\n", + " cmd = cmd_lines[0] if cmd_lines else (texts[0].strip() if len(texts) > 0 else \"\")\n", + " extra_cmd_resp = cmd_lines[1:]\n", + " resp_parts = resp_lines or extra_cmd_resp\n", + " resp = \" | \".join(resp_parts)\n", + " if not resp:\n", + " resp = texts[1].strip() if len(texts) > 1 else \"\"\n", + " more = [part.strip() for part in texts[2:] if part.strip()]\n", + " if more:\n", + " resp = \" | \".join([p for p in ([resp] if resp else []) + more])\n", + " examples.append({\"cmd\": cmd.replace(\"\", \"\").strip(), \"resp\": resp})\n", + " return examples\n", + "\n", + "\n", + "def unique(seq):\n", + " seen = []\n", + " for item in seq:\n", + " if item and item not in seen:\n", + " seen.append(item)\n", + " return seen\n", + "\n", + "\n", + "def extract_word_to_csv(docx_path: Path, csv_out: Path):\n", + " commands = []\n", + " current = None\n", + " current_group = \"\"\n", + " expect = None\n", + "\n", + " for kind, text, elem in iter_blocks(docx_path):\n", + " if kind == \"p\":\n", + " txt = text.strip()\n", + " if not txt:\n", + " continue\n", + " m = CMD_PATTERN.match(txt)\n", + " if m:\n", + " if current:\n", + " current[\"description\"] = \"\n", + "\".join(current.get(\"description_lines\", [])).strip()\n", + " commands.append(current)\n", + " current = {\n", + " \"name\": m.group(1).strip(),\n", + " \"title\": m.group(2).strip(),\n", + " \"description_lines\": [],\n", + " \"types\": [],\n", + " \"formats\": [],\n", + " \"responses\": [],\n", + " \"parameters\": [],\n", + " \"examples\": [],\n", + " \"group\": current_group,\n", + " }\n", + " expect = None\n", + " continue\n", + " if txt in FORMAT_LABELS:\n", + " expect = \"format\"\n", + " continue\n", + " if txt in PARAM_LABELS:\n", + " expect = \"param\"\n", + " continue\n", + " if txt in EXAMPLE_LABELS:\n", + " expect = \"example\"\n", + " continue\n", + " if current is None:\n", + " current_group = txt\n", + " else:\n", + " current[\"description_lines\"].append(txt)\n", + " else:\n", + " if current is None:\n", + " expect = None\n", + " continue\n", + " if expect == \"format\":\n", + " t, f, r = parse_format_table(elem)\n", + " current[\"types\"].extend(t)\n", + " current[\"formats\"].extend(f)\n", + " current[\"responses\"].extend(r)\n", + " expect = None\n", + " elif expect == \"param\":\n", + " params = parse_param_table(elem)\n", + " if params:\n", + " current[\"parameters\"] = params\n", + " expect = None\n", + " elif expect == \"example\":\n", + " ex = parse_example_table(elem)\n", + " if ex:\n", + " current[\"examples\"].extend(ex)\n", + " expect = None\n", + "\n", + " if current:\n", + " current[\"description\"] = \"\n", + "\".join(current.get(\"description_lines\", [])).strip()\n", + " commands.append(current)\n", + "\n", + " rows = []\n", + " for cmd in commands:\n", + " types = unique(cmd[\"types\"])\n", + " formats = unique(cmd[\"formats\"]) or [cmd[\"name\"]]\n", + " examples = cmd[\"examples\"]\n", + " responses = unique(cmd[\"responses\"])\n", + " rows.append({\n", + " \"命令\": cmd[\"name\"],\n", + " \"命令标题\": cmd[\"title\"],\n", + " \"命令类型\": \";\".join(types),\n", + " \"命令格式\": \" | \".join(formats),\n", + " \"示例命令\": \" | \".join(e[\"cmd\"] for e in examples if e[\"cmd\"]),\n", + " \"示例响应\": \" | \".join(e[\"resp\"] for e in examples if e[\"resp\"]),\n", + " \"示例JSON\": json.dumps(examples, ensure_ascii=False),\n", + " \"功能描述\": cmd.get(\"description\", \"\"),\n", + " \"命令分组\": cmd.get(\"group\", \"\"),\n", + " \"响应JSON\": json.dumps(responses, ensure_ascii=False),\n", + " \"备注\": \"\",\n", + " \"参数JSON\": json.dumps(cmd.get(\"parameters\", []), ensure_ascii=False),\n", + " })\n", + "\n", + " df = pd.DataFrame(rows)\n", + " df.to_csv(csv_out, index=False, encoding=\"utf-8-sig\")\n", + " print(f\"✅ 已解析 {len(df)} 条命令 → {csv_out}\")\n", + " return df\n", + "\n", + "\n", + "df_csv = extract_word_to_csv(IN_WORD, CSV_OUT)\n", + "df_csv.head()\n" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2 — CSV → YAML(保持结构化参数/示例/响应)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "import json\n", + "import pandas as pd\n", + "import yaml\n", + "\n", + "YAML_OUT = CSV_DIR / \"all_commands_v6.yaml\"\n", + "\n", + "\n", + "def csv_to_yaml(csv_path: Path, yaml_path: Path):\n", + " df = pd.read_csv(csv_path, dtype=str).fillna(\"\")\n", + " cmds = []\n", + " for _, row in df.iterrows():\n", + " examples = json.loads(row.get(\"示例JSON\", \"[]\") or \"[]\")\n", + " params = json.loads(row.get(\"参数JSON\", \"[]\") or \"[]\")\n", + " responses = json.loads(row.get(\"响应JSON\", \"[]\") or \"[]\")\n", + " for p in params:\n", + " if not isinstance(p.get(\"valmap\"), dict):\n", + " p[\"valmap\"] = {}\n", + " entry = {\n", + " \"command\": row.get(\"命令\", \"\"),\n", + " \"title\": row.get(\"命令标题\", \"\"),\n", + " \"group\": row.get(\"命令分组\", \"\"),\n", + " \"type\": [t.strip() for t in row.get(\"命令类型\", \"\").split(\";\") if t.strip()],\n", + " \"formats\": [f.strip() for f in row.get(\"命令格式\", \"\").split(\"|\") if f.strip()],\n", + " \"responses\": responses,\n", + " \"parameters\": params,\n", + " \"examples\": [{\"cmd\": ex.get(\"cmd\", \"\"), \"resp\": ex.get(\"resp\", \"\")} for ex in examples],\n", + " \"description\": row.get(\"功能描述\", \"\"),\n", + " \"notes\": row.get(\"备注\", \"\"),\n", + " }\n", + " cmds.append(entry)\n", + " with open(yaml_path, \"w\", encoding=\"utf-8\") as f:\n", + " yaml.safe_dump({\"commands\": cmds}, f, allow_unicode=True, sort_keys=False)\n", + " print(f\"✅ 已生成 YAML → {yaml_path}\")\n", + "\n", + "\n", + "csv_to_yaml(CSV_OUT, YAML_OUT)\n" + ], + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3 — YAML → RST(命令详情文档渲染)\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "from types import SimpleNamespace\n", + "\n", + "from jinja2 import Template\n", + "import yaml\n", + "\n", + "RST_DIR = PROJECT_ROOT / \"data\" / \"rst_output_v6\"\n", + "RST_DIR.mkdir(parents=True, exist_ok=True)\n", + "\n", + "TEMPLATE_STR = \"\"\"\n", + "{{ cmd.command }}\n", + "{{ \"=\" * cmd.command|length }}\n", + "\n", + "**Title**: {{ cmd.title }}\n", + "{% if cmd.group %}**Group**: {{ cmd.group }}{% endif %}\n", + "**Types**: {{ cmd.type|join(\", \") if cmd.type else \"\\u2014\" }}\n", + "\n", + "Formats::\n", + "{% if cmd.formats %}\n", + "{% for fmt in cmd.formats %}\n", + " {{ fmt }}\n", + "{% endfor %}\n", + "{% else %}\n", + " \\u2014\n", + "{% endif %}\n", + "{% if cmd.responses %}\n", + "Responses::\n", + "{% for resp in cmd.responses %}\n", + " {{ resp }}\n", + "{% endfor %}\n", + "\n", + "{% endif %}\n", + "Parameters\n", + "----------\n", + ".. list-table::\n", + " :header-rows: 1\n", + " :widths: 18 34 48\n", + "\n", + " * - Name\n", + " - Description\n", + " - Values\n", + "{% for p in cmd.parameters %}\n", + " * - {{ p.name }}\n", + " - {{ p.desc or \"\\u2014\" }}\n", + " - {% if p.valmap %}\n", + " .. list-table::\n", + " :header-rows: 1\n", + " :widths: 20 40\n", + "\n", + " * - Key\n", + " - Value\n", + "{% for k, v in p.valmap.items() %}\n", + " * - {{ k }}\n", + " - {{ v }}\n", + "{% endfor %}\n", + " {% else %}N/A{% endif %}\n", + "{% endfor %}\n", + "\n", + "Examples\n", + "--------\n", + "{% if cmd.examples %}\n", + "{% for ex in cmd.examples %}\n", + "* ``{{ ex.cmd }}`` → {{ ex.resp or \"\\u2014\" }}\n", + "{% endfor %}\n", + "{% else %}\n", + "* 无示例\n", + "{% endif %}\n", + "\n", + "**Description**: {{ cmd.description or \"\\u2014\" }}\n", + "{% if cmd.notes %}**Notes**: {{ cmd.notes }}{% endif %}\n", + "\"\"\".lstrip()\n", + "\n", + "RST_TMPL = Template(TEMPLATE_STR)\n", + "\n", + "\n", + "def yaml_to_rst(yaml_path: Path, rst_dir: Path):\n", + " with open(yaml_path, \"r\", encoding=\"utf-8\") as f:\n", + " data = yaml.safe_load(f) or {}\n", + " cmds = data.get(\"commands\", [])\n", + " generated = []\n", + " for cmd in cmds:\n", + " parameters = cmd.get(\"parameters\", []) or []\n", + " examples = cmd.get(\"examples\", []) or []\n", + " responses = cmd.get(\"responses\", []) or []\n", + " cmd_obj = SimpleNamespace(\n", + " command=cmd.get(\"command\", \"\"),\n", + " title=cmd.get(\"title\", \"\"),\n", + " group=cmd.get(\"group\", \"\"),\n", + " type=cmd.get(\"type\", []) or [],\n", + " formats=cmd.get(\"formats\", []) or [],\n", + " responses=responses,\n", + " parameters=[\n", + " SimpleNamespace(\n", + " name=p.get(\"name\", \"\"),\n", + " desc=p.get(\"desc\", \"\"),\n", + " valmap=p.get(\"valmap\", {}) if isinstance(p.get(\"valmap\", {}), dict) else {}\n", + " )\n", + " for p in parameters\n", + " ],\n", + " examples=[\n", + " SimpleNamespace(cmd=ex.get(\"cmd\", \"\"), resp=ex.get(\"resp\", \"\"))\n", + " for ex in examples\n", + " ],\n", + " description=cmd.get(\"description\", \"\"),\n", + " notes=cmd.get(\"notes\", \"\"),\n", + " )\n", + " rst_text = RST_TMPL.render(cmd=cmd_obj)\n", + " out_path = rst_dir / f\"{cmd_obj.command}.rst\"\n", + " with open(out_path, \"w\", encoding=\"utf-8\") as f:\n", + " f.write(rst_text)\n", + " generated.append(out_path.name)\n", + " index_lines = [\n", + " \"AT Commands Manual\",\n", + " \"==================\",\n", + " \"\",\n", + " \".. toctree::\",\n", + " \" :maxdepth: 1\",\n", + " \"\",\n", + " ]\n", + " for name in generated:\n", + " index_lines.append(f\" {Path(name).stem}\")\n", + " with open(rst_dir / \"index.rst\", \"w\", encoding=\"utf-8\") as f:\n", + " f.write(\"\\n\".join(index_lines))\n", + " print(f\"✅ 共生成 {len(generated)} 个 RST 文件 → {rst_dir}\")\n", + " return generated\n", + "\n", + "\n", + "sample_files = yaml_to_rst(YAML_OUT, RST_DIR)\n", + "print(\"📄 示例 RST:\", sample_files[:5])\n" + ], + "execution_count": null, + "outputs": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file