In [2]:
import json
from collections import defaultdict
from typing import Dict, List, Union

In [3]:
import json
from collections import defaultdict
from typing import Dict, List, Union

def analyze_json_structure(file_path: str):
    """
    多层JSON结构分析器
    返回：
    - 结构树
    - 键路径统计
    - 错误日志
    """
    structure_tree = defaultdict(int)
    key_paths = defaultdict(list)
    errors = []

    def recursive_parser(data: Union[Dict, List], current_path: str = "root", depth: int = 0):
        if isinstance(data, dict):
            for key, value in data.items():
                full_path = f"{current_path}.{key}" if current_path != "root" else key
                # 记录当前层级的键
                structure_tree[(key, depth)] += 1  
                
                # 对值进行递归解析
                if isinstance(value, (dict, list)):
                    recursive_parser(value, full_path, depth + 1)
                else:
                    # 记录叶子节点的值类型和示例
                    key_paths[full_path].append({
                        "depth": depth,
                        "value_type": type(value).__name__,
                        "example_value": str(value)[:50] + '...' if value is not None else "null"
                    })
                    
        elif isinstance(data, list):
            # 标记数组层级（关键修改）
            structure_tree[("[]", depth)] += 1
            
            # 遍历数组元素（保持当前层级）
            for index, item in enumerate(data):
                # 对每个元素递归解析（深度+1）
                recursive_parser(item, f"{current_path}[]", depth + 1)  
                
                
    with open(file_path, 'r', encoding='utf-8') as f:
        # 跳过空行
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue
            try:
                # 先解析外层结构
                outer_data = json.loads(line)
                
                # 关键修复：二次解析content字段
                if "content" in outer_data and isinstance(outer_data["content"], str):
                    try:
                        # 去除首尾多余引号（如果有）
                        content_str = outer_data["content"].strip('"')
                        inner_data = json.loads(content_str)
                        recursive_parser(outer_data)  # 解析外层结构
                        recursive_parser(inner_data, current_path="$.content", depth=1)  # 解析内层结构
                    except json.JSONDecodeError as e:
                        errors.append(f"Content解析失败（行 {line_num}）: {str(e)}")
                else:
                    recursive_parser(outer_data)
            except Exception as e:
                errors.append({
                    "line": line_num,
                    "error": str(e),
                    "raw_line": line[:100] + '...' if len(line) > 100 else line
                })
                print(f"第 {line_num} 行发生其他错误: {str(e)}，原始行内容: {line}")

    return structure_tree, key_paths, errors

def visualize_structure(structure_tree: dict, key_paths: dict, max_level=5):
    """可视化输出结构"""
    print("\n层级结构树：")
    level_map = defaultdict(list)
    for (key, depth), count in structure_tree.items():
        level_map[depth].append((key, count))

    for depth in sorted(level_map):
        if depth > max_level:
            continue
        indent = "  " * depth
        print(f"\n层级 {depth}:")
        for key, count in sorted(level_map[depth], key=lambda x: -x[1]):
            print(f"{indent}├── {key} ({count}次)")

    print("\n关键路径示例：")
    for path, info in list(key_paths.items())[:5]:
        print(f"\n路径: {path}")
        print(f"  深度范围: {min(i['depth'] for i in info)}-{max(i['depth'] for i in info)}")
        print(f"  值类型示例: {info[0]['value_type']}")
        print(f"  样例值: {info[0]['example_value']}")

if __name__ == "__main__":
    file_path = "output_0.txt"

    try:
        tree_stats, path_stats, errors = analyze_json_structure(file_path)

        visualize_structure(tree_stats, path_stats)

        print("\n错误日志：")
        if errors:
            for err in errors[:3]:
                print(f"行 {err['line']}: {err['error']}")
                print(f"  内容: {err['raw_line']}")
        else:
            print("未发现解析错误")

        # 生成分析报告
        with open("json_structure_report.md", "w", encoding="utf-8") as f:
            f.write("# JSON结构分析报告\n\n")
            f.write("## 层级统计\n")
            f.write("| 层级 | 键名 | 出现次数 |\n")
            f.write("|------|------|----------|\n")
            for (key, depth), count in sorted(tree_stats.items(), key=lambda x: (x[0][1], -x[1])):
                f.write(f"| {depth} | `{key}` | {count} |\n")

            f.write("\n## 关键路径\n")
            f.write("| 路径 | 最小深度 | 最大深度 | 类型示例 |\n")
            f.write("|------|----------|----------|----------|\n")
            for path, info in list(path_stats.items())[:20]:
                f.write(f"| `{path}` | {min(i['depth'] for i in info)} | {max(i['depth'] for i in info)} | {info[0]['value_type']} |\n")

        print("\n已生成Markdown格式报告: json_structure_report.md")

    except FileNotFoundError:
        print(f"错误：文件 {file_path} 不存在")


层级结构树：

层级 0:
├── content (3713次)

层级 1:
  ├── [] (3713次)

层级 2:
    ├── ctl_id (306815次)
    ├── ctl_name (306815次)
    ├── ctl_type (306815次)
    ├── link_type (306815次)
    ├── ctl_type_attr (306815次)
    ├── val_str (306815次)
    ├── val_score (306815次)
    ├── val_date (306815次)
    ├── val_code (306815次)
    ├── val_signurl (306815次)
    ├── std_code (306815次)
    ├── std_version (306815次)
    ├── std_inner_code (306815次)
    ├── std_inner_version (306815次)
    ├── val_extend (306815次)
    ├── edit_code (447次)
    ├── edit_name (447次)

关键路径示例：

路径: content
  深度范围: 0-0
  值类型示例: str
  样例值: [{"ctl_id":"1","ctl_name":"职业","ctl_type":"drop","...

路径: $.content[].ctl_id
  深度范围: 2-2
  值类型示例: str
  样例值: 1...

路径: $.content[].ctl_name
  深度范围: 2-2
  值类型示例: str
  样例值: 职业...

路径: $.content[].ctl_type
  深度范围: 2-2
  值类型示例: str
  样例值: drop...

路径: $.content[].link_type
  深度范围: 2-2
  值类型示例: str
  样例值: checkboxlastedit...

错误日志：


TypeError: string indices must be integers

In [4]:
import json

result = []

with open('output_0.txt', 'r', encoding='utf-8') as file:
    for line_num, line in enumerate(file, 1):
        #跳过空行
        if not line:
            continue
        try:
            # 解析单行JSON
            record = json.loads(line.strip())
            
            # 处理content字段的特殊情况
            raw_content = record.get('content', [])
            if isinstance(raw_content, str):
                try:
                    content_items = json.loads(raw_content)  # 二次解析
                except json.JSONDecodeError:
                    print(f"第{line_num}行: content字段包含无效JSON数组")
                    content_items = []
            else:
                content_items = raw_content  # 直接使用数组
            
            # 提取字段
            extracted_data = {}
            for item in content_items:
                if isinstance(item, dict):  # 确保是字典类型
                    ctl_name = item.get('ctl_name')
                    val_str = item.get('val_str', '')
                    if ctl_name:
                        extracted_data[ctl_name] = val_str
            
            result.append(extracted_data)
            
        except json.JSONDecodeError:
            print(f"第{line_num}行: 无效的JSON格式")
            continue

# 保存结果
with open('output.json', 'w', encoding='utf-8') as outfile:
    json.dump(result, outfile, ensure_ascii=False, indent=2)

print(f"成功处理{len(result)}条记录")

第2行: 无效的JSON格式
第4行: 无效的JSON格式
第6行: 无效的JSON格式
第8行: 无效的JSON格式
第10行: 无效的JSON格式
第12行: 无效的JSON格式
第14行: 无效的JSON格式
第16行: 无效的JSON格式
第18行: 无效的JSON格式
第20行: 无效的JSON格式
第22行: 无效的JSON格式
第24行: 无效的JSON格式
第26行: 无效的JSON格式
第28行: 无效的JSON格式
第30行: 无效的JSON格式
第32行: 无效的JSON格式
第34行: 无效的JSON格式
第36行: 无效的JSON格式
第38行: 无效的JSON格式
第40行: 无效的JSON格式
第42行: 无效的JSON格式
第44行: 无效的JSON格式
第46行: 无效的JSON格式
第48行: 无效的JSON格式
第50行: 无效的JSON格式
第52行: 无效的JSON格式
第54行: 无效的JSON格式
第56行: 无效的JSON格式
第58行: 无效的JSON格式
第60行: 无效的JSON格式
第62行: 无效的JSON格式
第64行: 无效的JSON格式
第66行: 无效的JSON格式
第68行: 无效的JSON格式
第70行: 无效的JSON格式
第72行: 无效的JSON格式
第74行: 无效的JSON格式
第76行: 无效的JSON格式
第78行: 无效的JSON格式
第80行: 无效的JSON格式
第82行: 无效的JSON格式
第84行: 无效的JSON格式
第86行: 无效的JSON格式
第88行: 无效的JSON格式
第90行: 无效的JSON格式
第92行: 无效的JSON格式
第94行: 无效的JSON格式
第96行: 无效的JSON格式
第98行: 无效的JSON格式
第100行: 无效的JSON格式
第102行: 无效的JSON格式
第104行: 无效的JSON格式
第106行: 无效的JSON格式
第108行: 无效的JSON格式
第110行: 无效的JSON格式
第112行: 无效的JSON格式
第114行: 无效的JSON格式
第116行: 无效的JSON格式
第118行: 无效的JSON格式
第120行: 无效的JSON格式
第122行: 无效的JSON格式
第124行: 无效的JSON格式

In [None]:
import json
from collections import defaultdict

def check_format_consistency(input_file):
    # 读取结果文件
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    # 按病历名称分组
    format_dict = defaultdict(list)
    for idx, record in enumerate(data):
        # 获取病历名称（兼容不同位置的情况）
        form_name = record.get("病历名称") or record.get("ctl_name") or "未知病历"
        format_dict[form_name].append((idx, record))
    
    # 校验结果存储
    report = {
        "total_forms": len(format_dict),
        "consistent_forms": 0,
        "inconsistent_forms": [],
        "details": {}
    }

    # 格式比对
    for form_name, records in format_dict.items():
        # 获取首个记录的键集合作为基准
        base_keys = set(records[0][1].keys())
        is_consistent = True
        diffs = []

        # 遍历后续记录
        for rec_idx, record in records[1:]:
            current_keys = set(record.keys())
            
            # 比较键集合差异
            missing_keys = base_keys - current_keys
            extra_keys = current_keys - base_keys
            
            if missing_keys or extra_keys:
                is_consistent = False
                diffs.append({
                    "record_index": rec_idx,
                    "missing_keys": list(missing_keys),
                    "extra_keys": list(extra_keys)
                })

        # 记录结果
        if is_consistent:
            report["consistent_forms"] += 1
            report["details"][form_name] = {"status": "一致", "total_records": len(records)}
        else:
            report["inconsistent_forms"].append(form_name)
            report["details"][form_name] = {
                "status": "不一致",
                "base_format": list(base_keys),
                "total_records": len(records),
                "differences": diffs
            }

    return report

# 使用示例
if __name__ == "__main__":
    result = check_format_consistency("output.json")
    
    # 控制台输出摘要
    print(f"总病历类型: {result['total_forms']}")
    print(f"一致病历数: {result['consistent_forms']}")
    print(f"不一致病历数: {len(result['inconsistent_forms'])}")
    
    # 生成详细报告
    with open("format_validation_report.json", "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)
    
    print("详细报告已保存至 format_validation_report.json")

总病历类型: 103
一致病历数: 54
不一致病历数: 49
详细报告已保存至 format_validation_report.json
