# Documentation helper

There are three main packages or domains in this solution: 

* Rules
* Dita/XML
* LLM

Linguistic rules tagging is proposed to verify the validity of the generated manual. 

In [1]:
%pip install -qU "langchain[aws]" python-dotenv lxml atlassian-python-api

from dotenv import load_dotenv
load_dotenv()

Note: you may need to restart the kernel to use updated packages.


True

In [6]:
import boto3
from botocore.config import Config

# Configure the Bedrock client with a read timeout of 3600 seconds
config = Config(read_timeout=3600, region_name="us-east-1")

bedrock_client = boto3.client("bedrock-runtime", config=config)

with open("json/rules.json") as file:
    rules = file.read()

with open("json/bad_words.json") as file:
    bad_words = file.read()

def base_prompt(job, rules, bad_words, instructions): return f"""Your job is to {job} based on the following
Rules:
{rules}

Forbidden words:
{bad_words}

Instructions:
{instructions}
"""

In [6]:
from botocore.exceptions import ClientError
import json
from enum import Enum


class DocumentType(str, Enum):
    HTML = "html"
    MARKDOWN = "md"
    PDF = "pdf"
    DOCX = "docx"
    DOC = "doc"
    XLSX = "xlsx"
    XLS = "xls"
    CSV = "csv"
    TXT = "txt"


async def generate_dita(document: bytes, file_name: str, rules: str, bad_words: str, model_id: str, document_type: DocumentType = DocumentType.TXT):

    instructions = """
    - Check the document for compliance with the provided rules.
    - Identify any violations of the rules.
    - Suggest corrections for the identified violations.
    - Ensure the output is in DITA format.

    Answer only in DITA format, without any additional text or explanations. If needed, do multiple files to cover all content.
    """

    system_role = f"You are a technical editor that generates dita files based on given rules.\n\n{base_prompt(job='generate DITA file', rules=rules, bad_words=bad_words, instructions=instructions)}"

    document_reference = {
        "format": document_type,
        "name": file_name,
        "source": {
            "bytes": document,
        }
    }

    user_prompt = "Read the document and generate the DITA file with the necessary corrections based on the provided rules and forbidden words."

    inference_config = {
        "temperature": 0.2,
        "system": system_role,
        "max_tokens": 5000,
    }

    return bedrock_client.converse(
        modelId=model_id,
        system=[{"text": system_role}],
        messages=[
            {
                "role": "user",
                "content": [
                        {"text": user_prompt},
                        {"document": document_reference}
                ]   
            }
        ]
    )

    # except (ClientError, Exception) as e:
    # print(f"ERROR: Can't invoke '{model_id}'. Reason: {e}")

with open("txt/PMD_PRISM.IO_Digital_IO_Management.docx", "rb") as file:
    manual = file.read()

async def generate_corrected_dita(*args):
    pass

print("Generating response")
resp = await generate_corrected_dita(manual, "IO Digital IO Management", rules, bad_words, "arn:aws:bedrock:us-east-1:473326111529:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0", DocumentType.DOCX)

usage = resp["usage"]

FileNotFoundError: [Errno 2] No such file or directory: 'txt/PMD_PRISM.IO_Digital_IO_Management.docx'

In [7]:
async def correct_dita(document: str, file_name: str, rules: str, bad_words: str, model_id: str):
    """
    Generate a corrected DITA file based on the provided document and rules.

    Args:
        document (bytes): The document content to be processed.
        file_name (str): The name of the file being processed.
        rules (str): The rules to check against.
        bad_words (str): The forbidden words to avoid.
        model_id (str): The model ID for the inference.
        document_type (DocumentType): The type of the document.

    Returns:
        dict: The response from the Bedrock client.
    """
    instructions = """
    - Modify the text only when necessary.
    - Mark all corrections using the following syntax:
        - ~~Original~~ → **Corrected** (Reason)
    - Do not add new sections unless explicitly required."""

    system_role = f"You are a technical editor that verifies dita files based on given rules.\n\n{base_prompt(job='corrects DITA file', rules=rules, bad_words=bad_words, instructions=instructions)}"
    user_prompt = "Read the document and correct it based on the provided rules and forbidden words."
    inference_config = {
        "temperature": 0.2,
        "system": system_role,
        "max_tokens": 5000,
    }
    return bedrock_client.converse(
        modelId=model_id,
        system=[{"text": system_role}],
        messages=[
            {
                "role": "user",
                "content": [
                    {"text": user_prompt},
                    {"text": document}
                ]
            }
        ]
    )

    
with open("dita/parameters.dita") as file:
    dita = file.read()

print("Generating response")
resp = await correct_dita(dita, "parameters.dita", rules, bad_words, "arn:aws:bedrock:us-east-1:473326111529:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0")

usage = resp["usage"]

Generating response


## BitBucket connector

This connector is essential to get the dita files and publish changes through pull requests. 

In [4]:
from atlassian import Bitbucket
import os

connector = Bitbucket(
    url=os.getenv("BITBUCKET_URL"),
    username=os.getenv("BITBUCKET_USERNAME"),
    password=os.getenv("BITBUCKET_PASSWORD")
)

## XML/DITA Parser

This parser will have the responsibility to extract and validate the dita syntax, although the validation part could already be performed by the technical writer if he is using oxygen. 

In [2]:
from lxml import etree

ROOT_TAGS = ["concept", "topic", "task", "reference", "glossentry"]
ERRORS, WARNINGS = [], []

def prettyprint(element, **kwargs):
    xml = etree.tostring(element, pretty_print=True, **kwargs)
    print(xml.decode(), end='')



In [5]:
# Load and parse the DITA file
tree = etree.parse("./docs/accessing_the_device.dita")
root = tree.getroot()

# First check: root tag should be in any of the initial root tags, if not. Show warning
if root.tag not in ROOT_TAGS:
    print(f"Warning: Root tag '{root.tag}' is not in the list of allowed root tags: {ROOT_TAGS}")


In [6]:
import sys, json, argparse, pathlib, re
from lxml import etree
from enum import Enum
import yaml

ERRORS, WARNINGS = [], []

class IssueSeverity(str, Enum):
    ERROR = "ERROR"
    WARN = "WARN"

def add_issue(sev: IssueSeverity, file, line, code, msg):
    entry = {"severity": sev, "file": str(file), "line": line, "code": code, "message": msg}
    (ERRORS if sev==IssueSeverity.ERROR else WARNINGS).append(entry)

def well_formed(path: pathlib.Path):
    try:
        etree.parse(str(path))
        return True
    except etree.XMLSyntaxError as e:
        add_issue(IssueSeverity.ERROR, path, getattr(e, 'position', (None,))[0], "XML_NOT_WELLFORMED", str(e))
        return False

def check_href_exists(doc_path, tree, docs_root):
    # href on elements like <xref> <image> <link> etc.
    for el in tree.xpath('//*[@href]'):
        href = el.get("href")
        if not href or re.match(r'^\w+://', href):  # skip external
            continue
        target = (doc_path.parent / href).resolve()
        # prevent escaping docs root
        if docs_root not in target.parents and target != docs_root:
            add_issue(IssueSeverity.ERROR, doc_path, el.sourceline, "PATH_ESCAPE", f"path escapes docs root: {href}")
            continue
        if '#' in href:
            file_part, frag = href.split('#', 1)
            target_file = (doc_path.parent / file_part).resolve()
            if not target_file.exists():
                add_issue(IssueSeverity.ERROR, doc_path, el.sourceline, "MISSING_TARGET", f"file not found: {file_part}")
                continue
            try:
                ttree = etree.parse(str(target_file))
            except Exception:
                add_issue(IssueSeverity.ERROR, doc_path, el.sourceline, "TARGET_NOT_XML", f"target not parseable: {file_part}")
                continue
            if not ttree.xpath(f"//*[@id='{frag}']"):
                add_issue(IssueSeverity.ERROR, doc_path, el.sourceline, "MISSING_FRAGMENT", f"#{frag} not found in {file_part}")
        else:
            if not target.exists():
                add_issue(IssueSeverity.ERROR, doc_path, el.sourceline, "MISSING_TARGET", f"file not found: {href}")

def is_svg(path: pathlib.Path) -> bool:
    return path.suffix.lower() == ".svg"

def sniff_svg(path: pathlib.Path) -> bool:
    try:
        tree = etree.parse(str(path))
    except etree.XMLSyntaxError as e:
        add_issue(IssueSeverity.ERROR, path, getattr(e, 'position', (None,))[0], "SVG_NOT_WELLFORMED", str(e))
        return False
    root = tree.getroot()
    if root.tag.lower().endswith("svg") is False:
        add_issue(IssueSeverity.ERROR, path, 1, "SVG_ROOT_INVALID", "root element is not <svg>")
        return False
    # disallow scripts and external refs
    for bad in root.xpath('.//script|.//*[@href or @xlink:href]'):
        href = bad.get('href') or bad.get('{http://www.w3.org/1999/xlink}href')
        if bad.tag.endswith('script'):
            add_issue(IssueSeverity.ERROR, path, bad.sourceline, "SVG_SCRIPT_FORBIDDEN", "<script> not allowed")
        elif href and re.match(r'^\w+://', href):
            add_issue(IssueSeverity.WARN, path, bad.sourceline, "SVG_EXTERNAL_REF_FORBIDDEN", f"external ref: {href}")
    # viewBox/size
    if not (root.get('viewBox') or (root.get('width') and root.get('height'))):
        add_issue(IssueSeverity.WARN, path, 1, "SVG_DIMENSIONS_WEAK", "missing viewBox and dimensions")
    return True

def walk_and_validate(cfg):
    docs_root = pathlib.Path(cfg['docs_root']).resolve()
    for xml in docs_root.rglob("*.dita"):
        validate_single(xml, docs_root)
    for xml in docs_root.rglob("*.xml"):
        validate_single(xml, docs_root)
    # image sweep (including non-referenced in v1)
    for img in docs_root.rglob("*.svg"):
        sniff_svg(img)

def validate_single(path, docs_root):
    if not well_formed(path):
        return
    tree = etree.parse(str(path))
    check_href_exists(path, tree, docs_root)
    # (Optional) hook: DTD/RNG/Schematron validation here

def main():
    cfg = yaml.safe_load(open("config.yaml"))
    walk_and_validate(cfg)
    report = {"errors": ERRORS, "warnings": WARNINGS, "summary": {
        "error_count": len(ERRORS), "warning_count": len(WARNINGS)}}
    print(json.dumps(report, indent=2))

main()

{
  "errors": [],
  "summary": {
    "error_count": 0,
  }
}
