In [None]:
# Install required packages
!pip install chardet pandas numpy

# Import necessary libraries
import os
import chardet
import json
import re
from typing import List, Optional
from collections import Counter
import pandas as pd
import numpy as np
from google.colab import files
import io

# Test imports
print("Setup complete! All required packages installed.")


In [None]:
def create_sample_files():
    """Creates sample files demonstrating different document loading scenarios"""

    # Create directory for our samples
    !mkdir -p rag_sample_data

    # Create business memo
    business_content = """
    QUARTERLY BUSINESS REVIEW
    Date: February 9, 2025
    Department: Engineering

    PERFORMANCE METRICS:
    • Project completion rate: 95%
    • Code quality score: 9.2/10
    • Customer satisfaction: 4.8/5

    ACTION ITEMS:
    1. Review Q2 objectives
    2. Update team KPIs
    3. Schedule stakeholder meeting
    """

    with open('rag_sample_data/business_memo.txt', 'w') as f:
        f.write(business_content)

    # Create technical documentation
    technical_content = """
    API DOCUMENTATION
    ================

    Endpoint: /api/v1/documents
    Method: POST

    Request Format:
    {
        "document_id": "string",
        "content": "string",
        "metadata": {
            "author": "string",
            "date": "ISO-8601 timestamp"
        }
    }
    """

    with open('rag_sample_data/technical_doc.txt', 'w') as f:
        f.write(technical_content)

    # Create multilingual content
    multilingual_content = """
    Global Documentation Guidelines
    ============================
    English: Please follow the style guide
    中文: 请遵守文体指南
    日本語: スタイルガイドに従ってください
    हिंदी: कृपया स्टाइल गाइड का पालन करें
    """

    with open('rag_sample_data/multilingual.txt', 'w', encoding='utf-8') as f:
        f.write(multilingual_content)

# Create sample files
create_sample_files()

# Verify creation
!ls -l rag_sample_data/

In [11]:
class EnhancedTextLoader:
    """
    A comprehensive text loader designed for RAG systems.
    Handles encoding detection, metadata extraction, and content cleaning.
    """
    def __init__(self, file_path: str, encoding: Optional[str] = None):
        """Initialize the loader with a file path and optional encoding."""
        self.file_path = file_path
        self.encoding = encoding
        self.metadata = {}

    def detect_encoding(self) -> str:
        """
        Automatically detect file encoding.
        Returns the detected encoding string.
        """
        with open(self.file_path, 'rb') as file:
            raw_data = file.read()
            result = chardet.detect(raw_data)
            return result['encoding']

    def extract_metadata(self, content: str) -> dict:
        """Extract useful metadata about the document."""
        lines = content.split('\n')
        metadata = {
            'filename': os.path.basename(self.file_path),
            'file_size': os.path.getsize(self.file_path),
            'line_count': len(lines),
            'word_count': len(content.split()),
            'char_count': len(content),
            'avg_line_length': sum(len(line) for line in lines) / len(lines) if lines else 0,
            'has_unicode': any(ord(c) > 127 for c in content)
        }
        return metadata

    def clean_text(self, content: str) -> str:
        """Clean text while preserving document structure."""
        # Remove null bytes and other control characters
        content = content.replace('\x00', '')

        # Normalize line endings
        content = content.replace('\r\n', '\n').replace('\r', '\n')

        # Split into lines for processing
        lines = content.splitlines()
        cleaned_lines = []

        for line in lines:
            # Preserve indentation
            indent = len(line) - len(line.lstrip())
            cleaned_line = line.strip()

            # Skip empty lines
            if cleaned_line:
                # Restore indentation with spaces
                cleaned_lines.append(' ' * indent + cleaned_line)
            else:
                cleaned_lines.append('')

        return '\n'.join(cleaned_lines)

    def load(self) -> tuple[str, dict]:
        """
        Load and process the text file.
        Returns tuple of (cleaned_content, metadata).
        """
        try:
            # Detect encoding if not specified
            if not self.encoding:
                self.encoding = self.detect_encoding()

            # Read the file
            with open(self.file_path, 'r', encoding=self.encoding) as file:
                content = file.read()

            # Clean the content
            cleaned_content = self.clean_text(content)

            # Extract metadata
            self.metadata = self.extract_metadata(cleaned_content)

            return cleaned_content, self.metadata

        except Exception as e:
            print(f"Error loading file {self.file_path}: {str(e)}")
            return None, None

In [None]:
# Test the complete implementation
def test_loader():
    """Test the EnhancedTextLoader with our sample documents."""
    sample_files = [
        "rag_sample_data/business_memo.txt",
        "rag_sample_data/technical_doc.txt",
        "rag_sample_data/multilingual.txt"
    ]

    for file_path in sample_files:
        print(f"\nProcessing: {file_path}")
        loader = EnhancedTextLoader(file_path)
        content, metadata = loader.load()

        if content is not None:
            print("\nMetadata:")
            for key, value in metadata.items():
                print(f"{key}: {value}")

            print("\nFirst 100 characters of content:")
            print(content[:100])
            print("-" * 50)
        else:
            print(f"Failed to load {file_path}")

# Run the test
test_loader()

In [None]:
def handle_different_encodings():
    """Demonstrate handling of different text encodings"""

    # Create test files with different encodings
    test_content = "This is a test file with special characters: é, ñ, 漢"

    # UTF-8 encoded file
    with open('rag_sample_data/utf8_test.txt', 'w', encoding='utf-8') as f:
        f.write(test_content)

    # Try loading without specifying encoding
    loader = EnhancedTextLoader('rag_sample_data/utf8_test.txt')
    content, metadata = loader.load()
    print("Auto-detected encoding:", metadata.get('encoding', 'unknown'))

    # Try loading with explicit encoding
    loader_explicit = EnhancedTextLoader('rag_sample_data/utf8_test.txt', encoding='utf-8')
    content_explicit, metadata_explicit = loader_explicit.load()
    print("Explicit encoding results match:", content == content_explicit)

# Test encoding handling
handle_different_encodings()

In [None]:
def test_structure_preservation():
    """Demonstrate how document structure is preserved"""

    # Create a test file with specific formatting
    structured_content = """
    SECTION 1:
        • First bullet point
        • Second bullet point
            - Sub bullet

    SECTION 2:
        1. Numbered item
        2. Another item
           More details here
    """

    with open('rag_sample_data/structured_test.txt', 'w') as f:
        f.write(structured_content)

    # Load and verify structure preservation
    loader = EnhancedTextLoader('rag_sample_data/structured_test.txt')
    content, _ = loader.load()
    print("Original structure maintained:")
    print(content)

# Test structure preservation
test_structure_preservation()

In [None]:
def demonstrate_metadata_extraction():
    """Show comprehensive metadata extraction"""

    test_files = [
        'rag_sample_data/business_memo.txt',
        'rag_sample_data/technical_doc.txt',
        'rag_sample_data/multilingual.txt'
    ]

    for file_path in test_files:
        loader = EnhancedTextLoader(file_path)
        _, metadata = loader.load()

        print(f"\nMetadata for {os.path.basename(file_path)}:")
        for key, value in metadata.items():
            print(f"{key}: {value}")

# Test metadata extraction
demonstrate_metadata_extraction()

In [None]:
def test_error_handling():
    """Demonstrate robust error handling"""

    # Test with non-existent file
    loader = EnhancedTextLoader('non_existent.txt')
    content, metadata = loader.load()
    print("Non-existent file handled gracefully:", content is None)

    # Test with corrupt file
    with open('rag_sample_data/corrupt.txt', 'wb') as f:
        f.write(b'\x80\x81\x82\x83')  # Invalid bytes

    loader = EnhancedTextLoader('rag_sample_data/corrupt.txt')
    content, metadata = loader.load()
    print("Corrupt file handled gracefully:", content is None)

# Test error handling
test_error_handling()

In [None]:
def comprehensive_testing():
    """Run comprehensive tests with various document types"""

    # Test results container
    test_results = {
        'total_tests': 0,
        'passed': 0,
        'failed': 0,
        'issues': []
    }

    # Test all sample files
    for file_path in os.listdir('rag_sample_data'):
        test_results['total_tests'] += 1
        try:
            loader = EnhancedTextLoader(f'rag_sample_data/{file_path}')
            content, metadata = loader.load()
            if content and metadata:
                test_results['passed'] += 1
            else:
                test_results['failed'] += 1
                test_results['issues'].append(f"Failed to load {file_path}")
        except Exception as e:
            test_results['failed'] += 1
            test_results['issues'].append(f"Error processing {file_path}: {str(e)}")

    # Print test results
    print("\nTest Results:")
    print(f"Total Tests: {test_results['total_tests']}")
    print(f"Passed: {test_results['passed']}")
    print(f"Failed: {test_results['failed']}")
    if test_results['issues']:
        print("\nIssues Found:")
        for issue in test_results['issues']:
            print(f"- {issue}")

# Run comprehensive tests
comprehensive_testing()

**Practical Exercices**

In [None]:
class HierarchicalTextLoader(EnhancedTextLoader):
    """
    Enhanced loader that understands document hierarchies.
    Extends EnhancedTextLoader with section detection capabilities.
    """
    def extract_hierarchy(self, content: str) -> dict:
        """Extract document hierarchy from markdown-style headers"""
        sections = {}
        current_section = None
        current_level = 0

        for line in content.split('\n'):
            if line.strip().startswith('#'):
                # Count header level
                level = len(line.strip()) - len(line.strip().lstrip('#'))
                title = line.strip('#').strip()

                if level == 1:
                    sections[title] = {'content': [], 'subsections': {}}
                    current_section = title
                    current_level = 1
                elif current_section and level > current_level:
                    sections[current_section]['subsections'][title] = []
            else:
                if current_section and line.strip():
                    if current_level == 1:
                        sections[current_section]['content'].append(line)

        return sections

def test_hierarchical_loader():
    """Test the hierarchical document loader"""

    # Create a test document with clear hierarchy
    hierarchical_content = """
    # Main Title

    ## Section 1
    This is the first section content.

    ### Subsection 1.1
    Deeper level content here.

    ## Section 2
    Another main section.

    ### Subsection 2.1
    More content here.
    """

    with open('rag_sample_data/hierarchical_doc.txt', 'w') as f:
        f.write(hierarchical_content)

    # Test the implementation
    loader = HierarchicalTextLoader('rag_sample_data/hierarchical_doc.txt')
    content, metadata = loader.load()
    hierarchy = loader.extract_hierarchy(content)
    print("Document Hierarchy:")
    print(json.dumps(hierarchy, indent=2))

# Run the test
test_hierarchical_loader()

In [None]:
class MetadataEnhancedLoader(EnhancedTextLoader):
    """
    Enhanced loader with advanced metadata analysis capabilities.
    """
    def analyze_content_patterns(self, content: str) -> dict:
        """Analyze content for various patterns and metrics"""

        # Sentence analysis
        sentences = [s.strip() for s in content.split('.') if s.strip()]
        avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0

        # Paragraph analysis
        paragraphs = [p for p in content.split('\n\n') if p.strip()]
        avg_paragraph_length = sum(len(p.split()) for p in paragraphs) / len(paragraphs) if paragraphs else 0

        # Common words analysis
        words = content.lower().split()
        word_freq = Counter(words).most_common(10)

        return {
            'avg_sentence_length': round(avg_sentence_length, 2),
            'avg_paragraph_length': round(avg_paragraph_length, 2),
            'paragraph_count': len(paragraphs),
            'common_words': dict(word_freq)
        }

    def load(self) -> tuple[str, dict]:
        """Override load to include pattern analysis"""
        content, metadata = super().load()
        if content:
            metadata['content_patterns'] = self.analyze_content_patterns(content)
        return content, metadata

def test_metadata_loader():
    """Test the enhanced metadata analysis"""
    loader = MetadataEnhancedLoader('rag_sample_data/business_memo.txt')
    content, metadata = loader.load()
    print("Enhanced Content Analysis:")
    print(json.dumps(metadata.get('content_patterns', {}), indent=2))

# Run the test
test_metadata_loader()

In [None]:
import re

class MultilingualLoader(EnhancedTextLoader):
    """
    Enhanced loader with multilingual text support.
    """
    def detect_languages(self, content: str) -> dict:
        """Detect languages present in the document"""
        patterns = {
            'latin': r'[a-zA-Z]',
            'chinese': r'[\u4e00-\u9fff]',
            'japanese': r'[\u3040-\u30ff]',
            'korean': r'[\uac00-\ud7af]',
            'devanagari': r'[\u0900-\u097f]'
        }

        language_presence = {}
        for lang, pattern in patterns.items():
            matches = len(re.findall(pattern, content))
            if matches > 0:
                language_presence[lang] = matches

        return language_presence

    def load(self) -> tuple[str, dict]:
        """Override load to include language detection"""
        content, metadata = super().load()
        if content:
            metadata['languages'] = self.detect_languages(content)
        return content, metadata

# Test multilingual support
def test_multilingual_loader():
    """Test the multilingual loader"""
    loader = MultilingualLoader('rag_sample_data/multilingual.txt')
    content, metadata = loader.load()
    print("Language Detection Results:")
    print(json.dumps(metadata['languages'], indent=2))

# Run the test
test_multilingual_loader()

In [None]:
class ComprehensiveDocumentLoader(HierarchicalTextLoader, MetadataEnhancedLoader, MultilingualLoader):
    """
    A comprehensive document loader that combines hierarchical structure analysis,
    enhanced metadata extraction, and multilingual support.
    """
    def __init__(self, file_path: str, encoding: Optional[str] = None):
        super().__init__(file_path, encoding)
        self.hierarchy = {}
        self.patterns = {}
        self.languages = {}

    def load(self) -> tuple[str, dict]:
        """
        Load and analyze document with all available features.
        Returns tuple of (content, comprehensive_metadata).
        """
        # First, load content using parent class method
        content, base_metadata = super().load()

        if content:
            # Extract hierarchical structure
            self.hierarchy = self.extract_hierarchy(content)

            # Analyze content patterns
            self.patterns = self.analyze_content_patterns(content)

            # Detect languages
            self.languages = self.detect_languages(content)

            # Combine all metadata
            comprehensive_metadata = {
                **base_metadata,
                'document_structure': self.hierarchy,
                'content_analysis': self.patterns,
                'language_analysis': self.languages
            }

            return content, comprehensive_metadata

        return None, None

def test_comprehensive_loader():
    """Test the comprehensive document loader with a complex document"""

    # Create a complex test document that exercises all features
    complex_content = """
    # Technical Documentation

    ## Introduction
    This is a multilingual technical guide.
    这是一个多语言技术指南。

    ## System Architecture
    The system consists of three main components:
    • Frontend Interface
    • Backend API
    • Database Layer

    ## Implementation Details
    Here are the key implementation points:
    1. Use RESTful principles
    2. Follow security guidelines
    3. Implement proper error handling

    ### Code Examples
    Example implementation:
    ```python
    def process_data():
        return {"status": "success"}
    ```
    """

    # Write test document
    with open('rag_sample_data/complex_doc.txt', 'w', encoding='utf-8') as f:
        f.write(complex_content)

    # Test the comprehensive loader
    loader = ComprehensiveDocumentLoader('rag_sample_data/complex_doc.txt')
    content, metadata = loader.load()

    print("Comprehensive Document Analysis:")
    print("\n1. Basic Metadata:")
    print(json.dumps({k:v for k,v in metadata.items()
                     if k not in ['document_structure', 'content_analysis', 'language_analysis']},
                     indent=2))

    print("\n2. Document Structure:")
    print(json.dumps(metadata.get('document_structure', {}), indent=2))

    print("\n3. Content Analysis:")
    print(json.dumps(metadata.get('content_analysis', {}), indent=2))

    print("\n4. Language Analysis:")
    print(json.dumps(metadata.get('language_analysis', {}), indent=2))

# Run the comprehensive test
test_comprehensive_loader()