In [1]:
# Import necessary libraries
from docling.document_converter import DocumentConverter
from docling.chunking import HybridChunker
import os
import json

# Check current working directory
print(f"Current working directory: {os.getcwd()}")
print(f"Available files: {[f for f in os.listdir('.') if f.endswith('.pdf')]}")


  from .autonotebook import tqdm as notebook_tqdm


当前工作目录: /Users/user/Claims_poc/OCR
可用文件: ['TALR7983-0923-accelerated-protection-pds-8-sep-2023.pdf', 'TAL_AcceleratedProtection_2022-08-05.pdf']


In [None]:
def convert_and_chunk_document(source_path, chunk_size=768, chunk_overlap=50):
    """Convert document and perform chunking"""
    
    # Document conversion
    print(f"Converting document: {source_path}")
    converter = DocumentConverter()
    result = converter.convert(source_path)
    doc = result.document
    
    # Configure HybridChunker - this is the core!
    chunker = HybridChunker(
        chunk_size=chunk_size,           # Target chunk size
        overlap_size=chunk_overlap,      # Overlap size to maintain context continuity
        split_by_page=True,              # Respect page boundaries
        respect_section_boundaries=True  # Respect section boundaries
    )
    
    # Execute chunking
    chunks = chunker.chunk(doc)
    
    print(f"Conversion completed! Generated {len(chunks)} chunks")
    
    return doc, chunks

# Example usage
source_file = "TALR7983-0923-accelerated-protection-pds-8-sep-2023.pdf"

if os.path.exists(source_file):
    doc, chunks = convert_and_chunk_document(source_file, chunk_size=768, chunk_overlap=50)
    print(f"\nSuccessfully processed document, generated {len(chunks)} chunks")
else:
    print(f"File {source_file} does not exist, please check the file path")
    print("You can place any PDF file in the current directory and modify the source_file variable")


正在转换文档: TALR7983-0923-accelerated-protection-pds-8-sep-2023.pdf




In [None]:
def display_chunks(chunks, start_idx=0, count=3):
    """Display chunk content for the specified range"""
    
    if 'chunks' not in locals() and 'chunks' not in globals():
        print("Please run the document conversion code above first")
        return
    
    print(f"Displaying chunks {start_idx+1} to {min(start_idx + count, len(chunks))}")
    print(f"Total {len(chunks)} chunks\n")
    
    for i in range(start_idx, min(start_idx + count, len(chunks))):
        chunk = chunks[i]
        print(f"{'='*60}")
        print(f"Chunk {i+1}/{len(chunks)} (length: {len(chunk.text)} characters)")
        print(f"{'='*60}")
        
        # Display content (limit length for readability)
        content = chunk.text.strip()
        if len(content) > 300:
            print(content[:300] + "\n...(content truncated, showing first 300 characters)")
        else:
            print(content)
        print()

# If chunks exist, display the first 3 chunks
try:
    if 'chunks' in locals():
        display_chunks(chunks, start_idx=0, count=3)
    else:
        print("Please run the document conversion code above first")
except NameError:
    print("Please run the document conversion code above first")


In [None]:
def analyze_chunks(chunks):
    """Analyze chunking results"""
    
    chunk_lengths = [len(chunk.text) for chunk in chunks]
    
    print("📊 Chunking Statistics:")
    print(f"├── Total chunks: {len(chunks)}")
    print(f"├── Average length: {sum(chunk_lengths) / len(chunk_lengths):.0f} characters")
    print(f"├── Maximum length: {max(chunk_lengths)} characters")
    print(f"├── Minimum length: {min(chunk_lengths)} characters")
    
    # Length distribution analysis
    short_chunks = [l for l in chunk_lengths if l < 200]
    medium_chunks = [l for l in chunk_lengths if 200 <= l <= 800]
    long_chunks = [l for l in chunk_lengths if l > 800]
    
    print(f"├── Short chunks (<200 chars): {len(short_chunks)} ({len(short_chunks)/len(chunks)*100:.1f}%)")
    print(f"├── Medium chunks (200-800 chars): {len(medium_chunks)} ({len(medium_chunks)/len(chunks)*100:.1f}%)")
    print(f"└── Long chunks (>800 chars): {len(long_chunks)} ({len(long_chunks)/len(chunks)*100:.1f}%)")
    
    # Content type analysis
    table_chunks = sum(1 for chunk in chunks if '|' in chunk.text or 'Table' in chunk.text)
    list_chunks = sum(1 for chunk in chunks 
                     if any(line.strip().startswith(('-', '*', '•')) 
                           for line in chunk.text.split('\n')))
    
    print(f"\n📋 Content Types:")
    print(f"├── Chunks with tables: {table_chunks}")
    print(f"└── Chunks with lists: {list_chunks}")
    
    return chunk_lengths

# If chunks exist, perform analysis
try:
    if 'chunks' in locals():
        chunk_lengths = analyze_chunks(chunks)
    else:
        print("Please run the document conversion code first")
except NameError:
    print("Please run the document conversion code first")


In [None]:
def compare_chunking_strategies(doc):
    """Compare different chunking strategies"""
    
    strategies = [
        {"name": "Small chunks (fine-grained retrieval)", "chunk_size": 256, "overlap": 25},
        {"name": "Medium chunks (balanced performance)", "chunk_size": 512, "overlap": 50},
        {"name": "Large chunks (more context)", "chunk_size": 1024, "overlap": 100},
    ]
    
    results = {}
    
    print("🔄 Testing different chunking strategies...")
    
    for strategy in strategies:
        chunker = HybridChunker(
            chunk_size=strategy["chunk_size"],
            overlap_size=strategy["overlap"],
            split_by_page=True,
            respect_section_boundaries=True
        )
        
        chunks = chunker.chunk(doc)
        lengths = [len(chunk.text) for chunk in chunks]
        
        results[strategy["name"]] = {
            "chunks": chunks,
            "count": len(chunks),
            "avg_length": sum(lengths) / len(lengths),
            "lengths": lengths
        }
        
        print(f"✓ {strategy['name']}: {len(chunks)} chunks, average length {sum(lengths) / len(lengths):.0f} characters")
    
    # Display comparison table
    print(f"\n📊 Strategy Comparison:")
    print(f"{'Strategy':<35} {'Count':<8} {'Avg Length':<12} {'Min Length':<12} {'Max Length':<12}")
    print("-" * 85)
    
    for name, result in results.items():
        lengths = result["lengths"]
        print(f"{name:<35} {result['count']:<8} {result['avg_length']:<12.0f} {min(lengths):<12} {max(lengths):<12}")
    
    return results

# If doc exists, perform strategy comparison
try:
    if 'doc' in locals():
        strategy_results = compare_chunking_strategies(doc)
    else:
        print("Please run the document conversion code first")
except NameError:
    print("Please run the document conversion code first")


In [None]:
def save_chunks_to_files(chunks, base_filename):
    """Save chunking results to files"""
    
    # Prepare structured data
    chunk_data = []
    for i, chunk in enumerate(chunks):
        chunk_info = {
            "id": i,
            "text": chunk.text.strip(),
            "length": len(chunk.text),
            "word_count": len(chunk.text.split()),
            "has_tables": "|" in chunk.text or "Table" in chunk.text,
            "has_lists": any(line.strip().startswith(("-", "*", "•")) 
                           for line in chunk.text.split("\n")),
            "metadata": {
                "chunk_type": "table" if "|" in chunk.text else 
                             "list" if any(line.strip().startswith(("-", "*", "•")) 
                                         for line in chunk.text.split("\n")) else "text"
            }
        }
        chunk_data.append(chunk_info)
    
    # Save JSON format (for programmatic processing)
    json_filename = f"{base_filename}_chunks.json"
    with open(json_filename, 'w', encoding='utf-8') as f:
        json.dump(chunk_data, f, ensure_ascii=False, indent=2)
    
    # Save readable format (for human review)
    txt_filename = f"{base_filename}_chunks.txt"
    with open(txt_filename, 'w', encoding='utf-8') as f:
        f.write(f"Document Chunking Results\\n{'='*50}\\n\\n")
        f.write(f"Original document: {base_filename}\\n")
        f.write(f"Total chunks: {len(chunks)}\\n")
        f.write(f"Average chunk size: {sum(c['length'] for c in chunk_data) / len(chunk_data):.0f} characters\\n\\n")
        
        for chunk_info in chunk_data:
            f.write(f"--- Chunk {chunk_info['id'] + 1} [{chunk_info['metadata']['chunk_type']}] ---\\n")
            f.write(f"Length: {chunk_info['length']} characters, {chunk_info['word_count']} words\\n")
            if chunk_info['has_tables']:
                f.write("✓ Contains tables\\n")
            if chunk_info['has_lists']:
                f.write("✓ Contains lists\\n")
            f.write(f"Content:\\n{chunk_info['text']}\\n\\n")
    
    print(f"💾 Chunking results saved:")
    print(f"├── JSON format: {json_filename}")
    print(f"└── Text format: {txt_filename}")
    
    return json_filename, txt_filename

# If chunks exist, save results
try:
    if 'chunks' in locals() and 'source_file' in locals():
        base_name = os.path.splitext(source_file)[0]
        json_file, txt_file = save_chunks_to_files(chunks, base_name)
    else:
        print("Please run the document conversion code first")
except NameError:
    print("Please run the document conversion code first")


In [None]:
def analyze_claims_content(chunks):
    """Analyze Claims-related content types"""
    
    # Define keyword categories
    claims_keywords = [
        'claim', 'policy', 'premium', 'coverage', 'deductible',
        'benefit', 'exclusion', 'liability', 'settlement', 'payout',
        'insured', 'policyholder', 'beneficiary'
    ]
    
    financial_keywords = [
        '$', 'amount', 'cost', 'fee', 'rate', 'percentage', '%',
        'sum', 'limit', 'maximum', 'minimum'
    ]
    
    legal_keywords = [
        'terms', 'conditions', 'clause', 'provision', 'agreement',
        'contract', 'obligation', 'responsibility', 'liable'
    ]
    
    results = {
        'claims_related': [],
        'financial_info': [],
        'legal_terms': [],
        'tables_and_data': [],
        'general_text': []
    }
    
    print("🔍 Analyzing Claims document content...")
    
    for i, chunk in enumerate(chunks):
        text_lower = chunk.text.lower()
        
        # Classify chunks
        has_claims = any(keyword in text_lower for keyword in claims_keywords)
        has_financial = any(keyword in text_lower for keyword in financial_keywords)
        has_legal = any(keyword in text_lower for keyword in legal_keywords)
        has_tables = '|' in chunk.text or 'Table' in chunk.text
        
        chunk_info = {
            'index': i,
            'length': len(chunk.text),
            'preview': chunk.text[:80] + '...' if len(chunk.text) > 80 else chunk.text,
            'keywords_found': []
        }
        
        # Record found keywords
        if has_claims:
            found_claims = [kw for kw in claims_keywords if kw in text_lower]
            chunk_info['keywords_found'].extend(found_claims)
            results['claims_related'].append(chunk_info.copy())
        
        if has_financial:
            found_financial = [kw for kw in financial_keywords if kw in text_lower]
            chunk_info['keywords_found'].extend(found_financial)
            results['financial_info'].append(chunk_info.copy())
        
        if has_legal:
            found_legal = [kw for kw in legal_keywords if kw in text_lower]
            chunk_info['keywords_found'].extend(found_legal)
            results['legal_terms'].append(chunk_info.copy())
        
        if has_tables:
            results['tables_and_data'].append(chunk_info.copy())
        
        if not (has_claims or has_financial or has_legal or has_tables):
            results['general_text'].append(chunk_info.copy())
    
    # Print statistics
    print(f"\\n📋 Claims Content Analysis Results:")
    print(f"├── Claims-related chunks: {len(results['claims_related'])}")
    print(f"├── Financial info chunks: {len(results['financial_info'])}")
    print(f"├── Legal terms chunks: {len(results['legal_terms'])}")
    print(f"├── Tables/data chunks: {len(results['tables_and_data'])}")
    print(f"└── General text chunks: {len(results['general_text'])}")
    
    # Show some key chunk previews
    if results['claims_related']:
        print(f"\\n🏷️  Claims-related chunk examples:")
        for chunk_info in results['claims_related'][:2]:
            print(f"  Chunk {chunk_info['index']+1}: {chunk_info['preview']}")
    
    if results['financial_info']:
        print(f"\\n💰 Financial info chunk examples:")
        for chunk_info in results['financial_info'][:2]:
            print(f"  Chunk {chunk_info['index']+1}: {chunk_info['preview']}")
    
    return results

# If chunks exist, perform Claims analysis
try:
    if 'chunks' in locals():
        claims_analysis = analyze_claims_content(chunks)
    else:
        print("Please run the document conversion code first")
except NameError:
    print("Please run the document conversion code first")
