In [5]:
import re
import json
import csv
import os
from typing import List, Dict, Optional
from dataclasses import dataclass

@dataclass
class DocumentChunk:
    """Represents a single document chunk with metadata"""
    content: str
    page_number: int
    side: str  # 'LEFT' or 'RIGHT'
    chunk_id: str
    metadata: Dict = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}
        
        # Add basic metadata
        self.metadata.update({
            'page_number': self.page_number,
            'side': self.side,
            'chunk_id': self.chunk_id,
            'word_count': len(self.content.split()),
            'char_count': len(self.content)
        })

class DocumentPageChunker:
    """
    Chunks documents based on page markers for RAG applications.
    Handles both LEFT and RIGHT page sections.
    """
    
    def __init__(self):
        # Regex pattern to match page markers
        self.page_pattern = r'--- Page (\d+) \((LEFT|MIDDLE|RIGHT)\) ---'
    
    def chunk_document(self, text: str, 
                      min_chunk_length: int = 50,
                      include_empty_chunks: bool = False) -> List[DocumentChunk]:
        """
        Chunk document by page markers.
        
        Args:
            text: The document text to chunk
            min_chunk_length: Minimum character length for a chunk to be included
            include_empty_chunks: Whether to include chunks with no content
            
        Returns:
            List of DocumentChunk objects
        """
        chunks = []
        
        # Find all page markers with their positions
        matches = list(re.finditer(self.page_pattern, text))
        
        if not matches:
            # If no page markers found, return the entire text as a single chunk
            return [DocumentChunk(
                content=text.strip(),
                page_number=1,
                side='UNKNOWN',
                chunk_id='chunk_1_UNKNOWN'
            )]
        
        # Process each page section
        for i, match in enumerate(matches):
            page_num = int(match.group(1))
            side = match.group(2)
            
            # Determine start and end positions for this chunk
            start_pos = match.end()  # Start after the page marker
            
            if i + 1 < len(matches):
                # End before the next page marker
                end_pos = matches[i + 1].start()
            else:
                # Last chunk goes to end of document
                end_pos = len(text)
            
            # Extract and clean the content
            content = text[start_pos:end_pos].strip()
            
            # Apply filtering based on parameters
            if not include_empty_chunks and len(content) == 0:
                continue
                
            if len(content) < min_chunk_length:
                continue
            
            # Create chunk ID
            chunk_id = f"chunk_{page_num}_{side}"
            
            # Create the chunk object
            chunk = DocumentChunk(
                content=content,
                page_number=page_num,
                side=side,
                chunk_id=chunk_id
            )
            
            chunks.append(chunk)
        
        return chunks
    
    def chunk_from_file(self, file_path: str, **kwargs) -> List[DocumentChunk]:
        """
        Chunk document directly from a file.
        
        Args:
            file_path: Path to the text file
            **kwargs: Additional arguments passed to chunk_document
            
        Returns:
            List of DocumentChunk objects
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                text = file.read()
            return self.chunk_document(text, **kwargs)
        except FileNotFoundError:
            raise FileNotFoundError(f"File not found: {file_path}")
        except Exception as e:
            raise Exception(f"Error reading file {file_path}: {str(e)}")
    
    def get_chunks_summary(self, chunks: List[DocumentChunk]) -> Dict:
        """
        Get summary statistics about the chunks.
        
        Args:
            chunks: List of DocumentChunk objects
            
        Returns:
            Dictionary with summary statistics
        """
        if not chunks:
            return {'total_chunks': 0}
        
        pages = set(chunk.page_number for chunk in chunks)
        sides = set(chunk.side for chunk in chunks)
        word_counts = [chunk.metadata['word_count'] for chunk in chunks]
        char_counts = [chunk.metadata['char_count'] for chunk in chunks]
        
        return {
            'total_chunks': len(chunks),
            'unique_pages': len(pages),
            'page_range': f"{min(pages)}-{max(pages)}" if pages else "N/A",
            'sides_present': list(sides),
            'avg_word_count': sum(word_counts) / len(word_counts),
            'avg_char_count': sum(char_counts) / len(char_counts),
            'min_word_count': min(word_counts),
            'max_word_count': max(word_counts),
            'total_words': sum(word_counts)
        }
    
    def export_chunks_for_rag(self, chunks: List[DocumentChunk], 
                             format_type: str = 'dict') -> List:
        """
        Export chunks in a format suitable for RAG systems.
        
        Args:
            chunks: List of DocumentChunk objects
            format_type: Export format ('dict', 'text', or 'structured')
            
        Returns:
            List of chunks in the specified format
        """
        if format_type == 'dict':
            return [
                {
                    'id': chunk.chunk_id,
                    'content': chunk.content,
                    'metadata': chunk.metadata
                }
                for chunk in chunks
            ]
        
        elif format_type == 'text':
            return [chunk.content for chunk in chunks]
        
        elif format_type == 'structured':
            return [
                {
                    'id': chunk.chunk_id,
                    'text': chunk.content,
                    'page': chunk.page_number,
                    'side': chunk.side,
                    'word_count': chunk.metadata['word_count'],
                    'source': f"Page {chunk.page_number} ({chunk.side})"
                }
                for chunk in chunks
            ]
        
    def save_chunks(self, chunks: List[DocumentChunk], 
                    output_dir: str = "chunks_output",
                    formats: List[str] = ['json', 'jsonl', 'csv', 'txt']) -> Dict[str, str]:
        """
        Save chunks to files in multiple formats for RAG systems.
        
        Args:
            chunks: List of DocumentChunk objects
            output_dir: Directory to save files
            formats: List of formats to save ('json', 'jsonl', 'csv', 'txt', 'markdown')
            
        Returns:
            Dictionary with format -> file_path mappings
        """
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        saved_files = {}
        
        # Prepare data for export
        chunk_data = self.export_chunks_for_rag(chunks, format_type='structured')
        
        # Save as JSON (single file with all chunks)
        if 'json' in formats:
            json_path = os.path.join(output_dir, 'chunks.json')
            with open(json_path, 'w', encoding='utf-8') as f:
                json.dump(chunk_data, f, indent=2, ensure_ascii=False)
            saved_files['json'] = json_path
        
        # Save as JSONL (one JSON object per line - great for streaming)
        if 'jsonl' in formats:
            jsonl_path = os.path.join(output_dir, 'chunks.jsonl')
            with open(jsonl_path, 'w', encoding='utf-8') as f:
                for chunk in chunk_data:
                    f.write(json.dumps(chunk, ensure_ascii=False) + '\n')
            saved_files['jsonl'] = jsonl_path
        
        # Save as CSV (easy to import into databases)
        if 'csv' in formats:
            csv_path = os.path.join(output_dir, 'chunks.csv')
            with open(csv_path, 'w', newline='', encoding='utf-8') as f:
                if chunk_data:
                    fieldnames = chunk_data[0].keys()
                    writer = csv.DictWriter(f, fieldnames=fieldnames)
                    writer.writeheader()
                    for chunk in chunk_data:
                        writer.writerow(chunk)
            saved_files['csv'] = csv_path
        
        # Save as separate text files (useful for vector databases)
        if 'txt' in formats:
            txt_dir = os.path.join(output_dir, 'txt_files')
            os.makedirs(txt_dir, exist_ok=True)
            for chunk in chunk_data:
                filename = f"{chunk['id']}.txt"
                txt_path = os.path.join(txt_dir, filename)
                with open(txt_path, 'w', encoding='utf-8') as f:
                    f.write(f"Source: {chunk['source']}\n")
                    f.write(f"Page: {chunk['page']}, Side: {chunk['side']}\n")
                    f.write(f"Word Count: {chunk['word_count']}\n")
                    f.write("-" * 50 + "\n")
                    f.write(chunk['text'])
            saved_files['txt'] = txt_dir
        
        # Save as Markdown (great for documentation and readability)
        if 'markdown' in formats:
            md_path = os.path.join(output_dir, 'chunks.md')
            with open(md_path, 'w', encoding='utf-8') as f:
                f.write("# Document Chunks\n\n")
                for chunk in chunk_data:
                    f.write(f"## {chunk['id']}\n\n")
                    f.write(f"**Source:** {chunk['source']}  \n")
                    f.write(f"**Word Count:** {chunk['word_count']}  \n\n")
                    f.write(chunk['text'])
                    f.write("\n\n---\n\n")
            saved_files['markdown'] = md_path
        
        return saved_files

# Convenience functions for easy importing and usage
def chunk_file(file_path: str, min_chunk_length: int = 50, 
               include_empty_chunks: bool = False) -> List[DocumentChunk]:
    """
    Simple function to chunk a file - easy to import and use.
    
    Args:
        file_path: Path to your text file
        min_chunk_length: Minimum character length for chunks
        include_empty_chunks: Whether to include empty sections
        
    Returns:
        List of DocumentChunk objects
    """
    chunker = DocumentPageChunker()
    return chunker.chunk_from_file(file_path, 
                                  min_chunk_length=min_chunk_length,
                                  include_empty_chunks=include_empty_chunks)

def chunk_text(text: str, min_chunk_length: int = 50, 
               include_empty_chunks: bool = False) -> List[DocumentChunk]:
    """
    Simple function to chunk text - easy to import and use.
    
    Args:
        text: Document text to chunk
        min_chunk_length: Minimum character length for chunks  
        include_empty_chunks: Whether to include empty sections
        
    Returns:
        List of DocumentChunk objects
    """
    chunker = DocumentPageChunker()
    return chunker.chunk_document(text, 
                                 min_chunk_length=min_chunk_length,
                                 include_empty_chunks=include_empty_chunks)

def get_rag_chunks(file_path: str, format_type: str = 'structured') -> List[Dict]:
    """
    One-liner to get RAG-ready chunks from a file.
    
    Args:
        file_path: Path to your text file
        format_type: 'dict', 'text', or 'structured'
        
    Returns:
        List of chunks ready for RAG system
    """
    chunks = chunk_file(file_path)
    chunker = DocumentPageChunker()
    return chunker.export_chunks_for_rag(chunks, format_type=format_type)

def save_chunks_to_files(file_path: str, output_dir: str = "chunks_output", 
                        formats: List[str] = ['json', 'jsonl', 'csv']) -> Dict[str, str]:
    """
    One-liner to chunk a file and save in multiple formats for RAG.
    
    Args:
        file_path: Path to your text file
        output_dir: Directory to save chunks
        formats: List of formats ('json', 'jsonl', 'csv', 'txt', 'markdown')
        
    Returns:
        Dictionary with format -> file_path mappings
    """
    chunker = DocumentPageChunker()
    chunks = chunker.chunk_from_file(file_path)
    return chunker.save_chunks(chunks, output_dir=output_dir, formats=formats)

def create_rag_dataset(file_path: str, output_dir: str = "rag_dataset") -> str:
    """
    Create a complete RAG dataset from your document.
    Saves in multiple formats optimized for different RAG systems.
    
    Args:
        file_path: Path to your text file
        output_dir: Directory to save the dataset
        
    Returns:
        Path to the created dataset directory
    """
    print(f"Creating RAG dataset from: {file_path}")
    
    # Chunk the document
    chunker = DocumentPageChunker()
    chunks = chunker.chunk_from_file(file_path)
    
    print(f"Found {len(chunks)} chunks")
    
    # Save in all formats
    saved_files = chunker.save_chunks(
        chunks, 
        output_dir=output_dir,
        formats=['json', 'jsonl', 'csv', 'txt', 'markdown']
    )
    
    # Create a README file
    readme_path = os.path.join(output_dir, 'README.md')
    with open(readme_path, 'w', encoding='utf-8') as f:
        f.write("# RAG Dataset\n\n")
        f.write(f"Generated from: `{file_path}`\n\n")
        f.write(f"Total chunks: {len(chunks)}\n\n")
        
        # Get summary statistics
        summary = chunker.get_chunks_summary(chunks)
        f.write("## Dataset Statistics\n\n")
        for key, value in summary.items():
            f.write(f"- **{key.replace('_', ' ').title()}**: {value}\n")
        
        f.write("\n## Available Formats\n\n")
        for format_name, file_path in saved_files.items():
            f.write(f"- **{format_name.upper()}**: `{file_path}`\n")
        
        f.write("\n## Usage Examples\n\n")
        f.write("### Load JSON chunks\n")
        f.write("```python\n")
        f.write("import json\n")
        f.write(f"with open('{saved_files['json']}', 'r') as f:\n")
        f.write("    chunks = json.load(f)\n")
        f.write("```\n\n")
        
        f.write("### Load JSONL chunks (streaming)\n")
        f.write("```python\n")
        f.write("import json\n")
        f.write("chunks = []\n")
        f.write(f"with open('{saved_files['jsonl']}', 'r') as f:\n")
        f.write("    for line in f:\n")
        f.write("        chunks.append(json.loads(line))\n")
        f.write("```\n")
    
    print(f"\nRAG dataset created successfully!")
    print(f"Location: {output_dir}")
    print("\nFiles created:")
    for format_name, file_path in saved_files.items():
        print(f"  {format_name.upper()}: {file_path}")
    print(f"  README: {readme_path}")
    
    return output_dir

# # Example usage
# def example_usage():
#     """Shows how to use the chunker with your file and save results"""
    
#     # Replace with your actual file path
#     file_path = 'paste.txt'
    
#     try:
#         # Method 1: Simple save in multiple formats
#         print("=== METHOD 1: Save chunks in multiple formats ===")
#         saved_files = save_chunks_to_files(
#             file_path, 
#             output_dir="my_chunks",
#             formats=['json', 'jsonl', 'csv']
#         )
#         print("Saved files:", saved_files)
        
#         # Method 2: Create complete RAG dataset
#         print("\n=== METHOD 2: Create complete RAG dataset ===")
#         dataset_dir = create_rag_dataset(file_path, output_dir="tbi_rag_dataset")
        
#         # Method 3: Load and use the chunks
#         print("\n=== METHOD 3: Load saved chunks ===")
#         with open(os.path.join(dataset_dir, 'chunks.json'), 'r') as f:
#             loaded_chunks = json.load(f)
        
#         print(f"Loaded {len(loaded_chunks)} chunks")
#         if loaded_chunks:
#             print("First chunk example:")
#             first_chunk = loaded_chunks[0]
#             print(f"  ID: {first_chunk['id']}")
#             print(f"  Source: {first_chunk['source']}")
#             print(f"  Text preview: {first_chunk['text'][:100]}...")
            
#     except FileNotFoundError:
#         print(f"File not found: {file_path}")
#         print("Make sure your file path is correct!")
#     except Exception as e:
#         print(f"Error: {e}")

# if __name__ == "__main__":
#     example_usage()

In [13]:
# # Save this code as 'document_chunker.py'
# from document_chunker import create_rag_dataset, save_chunks_to_files, chunk_file
article='scn'
# Method 1: Create complete RAG dataset (recommended)
dataset_dir = create_rag_dataset(f'{article}_output/{article}_text.txt', output_dir=f'{article}_output/chunks/')

# # Method 2: Save in specific formats
# saved_files = save_chunks_to_files(
#     'paste.txt', 
#     output_dir='chunks_output',
#     formats=['json', 'jsonl', 'csv']
# )

# # Method 3: Just get chunks without saving
# chunks = chunk_file('paste.txt')
# print(f"Found {len(chunks)} chunks")

Creating RAG dataset from: scn_output/scn_text.txt
Found 23 chunks

RAG dataset created successfully!
Location: scn_output/chunks/

Files created:
  JSON: scn_output/chunks/chunks.json
  JSONL: scn_output/chunks/chunks.jsonl
  CSV: scn_output/chunks/chunks.csv
  TXT: scn_output/chunks/txt_files
  MARKDOWN: scn_output/chunks/chunks.md
  README: scn_output/chunks/README.md
