# 05 - S3 Directory Reader Provider

## Setup

If you haven't already, install the toolkit and dependencies using the [Setup](./00-Setup.ipynb) notebook.

In [None]:
%reload_ext dotenv
%dotenv

import os

from graphrag_toolkit.lexical_graph import LexicalGraphIndex, set_logging_config
from graphrag_toolkit.lexical_graph.storage import VectorStoreFactory
from graphrag_toolkit.lexical_graph.storage import GraphStoreFactory
from graphrag_toolkit.lexical_graph.storage.graph.neo4j_graph_store_factory import Neo4jGraphStoreFactory

GraphStoreFactory.register(Neo4jGraphStoreFactory)

graph_store = GraphStoreFactory.for_graph_store(os.environ['GRAPH_STORE'])
vector_store = VectorStoreFactory.for_vector_store(os.environ['VECTOR_STORE'])

graph_index = LexicalGraphIndex(
    graph_store, 
    vector_store
)

## S3 Directory Reader Provider

The S3 Directory reader provider reads documents from AWS S3 buckets using LlamaIndex's S3Reader.

**Prerequisites:**
- AWS credentials configured (AWS CLI, environment variables, or IAM role)
- S3 bucket with readable documents
- `boto3` package installed

### AWS Credentials Check

In [None]:
import boto3
from botocore.exceptions import NoCredentialsError, ClientError

def check_aws_credentials():
    """Check if AWS credentials are configured."""
    try:
        # Try to create an S3 client
        s3_client = boto3.client('s3')
        
        # Try to list buckets (minimal operation)
        response = s3_client.list_buckets()
        
        print("AWS credentials are configured")
        print(f"Found {len(response['Buckets'])} accessible S3 buckets")
        
        # Show first few bucket names
        if response['Buckets']:
            print("\nAccessible buckets:")
            for bucket in response['Buckets'][:5]:  # Show first 5
                print(f"  - {bucket['Name']}")
            if len(response['Buckets']) > 5:
                print(f"  ... and {len(response['Buckets']) - 5} more")
        
        return True, response['Buckets']
        
    except NoCredentialsError:
        print("AWS credentials not found")
        print("Configure credentials using one of:")
        print("  - AWS CLI: aws configure")
        print("  - Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
        print("  - IAM role (if running on EC2)")
        return False, []
        
    except ClientError as e:
        print(f"AWS credentials error: {e}")
        return False, []
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        return False, []

# Check AWS setup
aws_configured, available_buckets = check_aws_credentials()

### Create Test S3 Bucket and Files

**Note:** This will create actual AWS resources. Make sure you have appropriate permissions and are aware of potential costs.

In [None]:
import uuid
import tempfile
from pathlib import Path

# Only proceed if AWS is configured
if aws_configured:
    # Generate unique bucket name
    bucket_name = f"graphrag-test-{uuid.uuid4().hex[:8]}"
    s3_client = boto3.client('s3')
    
    try:
        # Create bucket
        print(f"Creating test bucket: {bucket_name}")
        s3_client.create_bucket(Bucket=bucket_name)
        
        # Create test documents
        test_documents = {
            'documents/ai-overview.txt': '''
Artificial Intelligence Overview

Artificial Intelligence (AI) is a branch of computer science that aims to create 
intelligent machines capable of performing tasks that typically require human intelligence.

Key areas include:
- Machine Learning
- Natural Language Processing
- Computer Vision
- Robotics
''',
            'documents/ml-basics.txt': '''
Machine Learning Basics

Machine Learning is a subset of AI that enables computers to learn and improve 
from experience without being explicitly programmed.

Types of Machine Learning:
1. Supervised Learning
2. Unsupervised Learning
3. Reinforcement Learning
''',
            'notes/project-notes.md': '''
# Project Notes

## Meeting Summary
- Discussed AI implementation strategy
- Reviewed technical requirements
- Planned development timeline

## Next Steps
- Finalize architecture design
- Begin prototype development
- Schedule follow-up meetings
''',
            'data/sample-data.json': '''
{
  "project": "GraphRAG Toolkit",
  "version": "1.0.0",
  "components": [
    "lexical-graph",
    "readers",
    "storage"
  ],
  "status": "active"
}
'''
        }
        
        # Upload test documents
        print("Uploading test documents...")
        for key, content in test_documents.items():
            s3_client.put_object(
                Bucket=bucket_name,
                Key=key,
                Body=content.strip(),
                ContentType='text/plain'
            )
            print(f"  Uploaded: s3://{bucket_name}/{key}")
        
        print(f"\nTest S3 bucket created: {bucket_name}")
        print(f"Total files: {len(test_documents)}")
        
    except Exception as e:
        print(f"Error creating test bucket: {e}")
        bucket_name = None
        
else:
    print("Skipping S3 bucket creation - AWS not configured")
    bucket_name = None

### S3 Reading with Key Prefix

Read only files from a specific "directory" (key prefix) in S3.

In [None]:
from graphrag_toolkit.lexical_graph.indexing.load.readers import S3DirectoryReaderProvider, S3DirectoryReaderConfig

if bucket_name:
    # Configure S3 Reader to load a single file from S3
    s3_config = S3DirectoryReaderConfig(
        bucket=bucket_name,
        key="documents/ai-overview.txt",
        metadata_fn=lambda s3_path: {
            'source': 's3_file',
            's3_path': s3_path,
            'storage_type': 'cloud',
            'reader_type': 'basic'
        }
    )

    s3_reader = S3DirectoryReaderProvider(s3_config)

    try:
        # Read one document from S3
        s3_docs = s3_reader.read(None)

        print(f"Loaded {len(s3_docs)} document(s) from S3:")
        print("\nS3 document details:")
        for i, doc in enumerate(s3_docs):
            file_name = doc.metadata.get('file_name', 'unknown')
            file_path = doc.metadata.get('file_path', 'unknown')
            s3_path = doc.metadata.get('s3_path', 'unknown')
            print(f"  Document {i+1}: {file_name}")
            print(f"    S3 Path: {s3_path}")
            print(f"    Content preview: {doc.text[:80]}...\n")

    except Exception as e:
        print(f"Error reading from S3: {e}")
        s3_docs = []

else:
    print("Skipping S3 reading - no test bucket available")
    s3_docs = []


### Reading Multiple S3 Prefixes

Read from different "directories" in S3 with different configurations.

In [None]:
from graphrag_toolkit.lexical_graph.indexing.load.readers import S3DirectoryReaderProvider, S3DirectoryReaderConfig

if bucket_name:
    # Define different S3 "prefixes" (directories) to read
    s3_configs = {
        'notes': S3DirectoryReaderConfig(
            bucket=bucket_name,
            prefix='notes/',
            metadata_fn=lambda s3_path: {
                'source': 's3_notes',
                's3_path': s3_path,
                'folder': 'notes',
                'content_type': 'meeting_notes'
            }
        ),
        'data': S3DirectoryReaderConfig(
            bucket=bucket_name,
            prefix='data/',
            metadata_fn=lambda s3_path: {
                'source': 's3_data',
                's3_path': s3_path,
                'folder': 'data',
                'content_type': 'structured_data'
            }
        )
    }
    
    all_prefix_docs = []

    for prefix_name, config in s3_configs.items():
        try:
            reader = S3DirectoryReaderProvider(config)
            docs = reader.read(None)
            all_prefix_docs.extend(docs)
            
            print(f"Loaded {len(docs)} documents from '{prefix_name}/' prefix")
            
        except Exception as e:
            print(f"Error reading '{prefix_name}/' prefix: {e}")
    
    print(f"\nTotal documents from all prefixes: {len(all_prefix_docs)}")
    
    # Group by content_type
    content_types = {}
    for doc in all_prefix_docs:
        content_type = doc.metadata.get('content_type', 'unknown')
        content_types[content_type] = content_types.get(content_type, 0) + 1

    print("\Content types summary:")
    for content_type, count in content_types.items():
        print(f"  - {content_type}: {count} documents")

else:
    print("Skipping S3 prefix reading - no bucket name provided")
    all_prefix_docs = []


### Advanced S3 Metadata

Create sophisticated metadata functions for S3 documents.

In [None]:
import datetime
from urllib.parse import urlparse
from graphrag_toolkit.lexical_graph.indexing.load.readers import S3DirectoryReaderConfig, S3DirectoryReaderProvider

def advanced_s3_metadata(s3_path: str) -> dict:
    """Extract detailed metadata from an S3 object path."""
    if not s3_path:
        return {}

    # Parse S3 URI or raw path
    if s3_path.startswith("s3://"):
        parsed = urlparse(s3_path)
        bucket = parsed.netloc
        key = parsed.path.lstrip('/')
    else:
        parts = s3_path.split('/', 1)
        bucket = parts[0] if len(parts) > 0 else 'unknown'
        key = parts[1] if len(parts) > 1 else ''

    # Split key into components
    key_parts = key.split('/') if key else []
    file_name = key_parts[-1] if key_parts else 'unknown'
    folder_path = '/'.join(key_parts[:-1]) if len(key_parts) > 1 else ''
    file_ext = '.' + file_name.split('.')[-1] if '.' in file_name else ''

    # Base metadata
    metadata = {
        'source': 's3_advanced',
        's3_bucket': bucket,
        's3_key': key,
        's3_full_path': f's3://{bucket}/{key}',
        'file_name': file_name,
        'file_extension': file_ext,
        'folder_path': folder_path,
        'processing_timestamp': datetime.datetime.now().isoformat(),
        'storage_provider': 'aws_s3'
    }

    # Tag with content category
    folder_lower = folder_path.lower()
    if 'documents' in folder_lower:
        metadata['content_category'] = 'documentation'
    elif 'notes' in folder_lower:
        metadata['content_category'] = 'meeting_notes'
    elif 'data' in folder_lower:
        metadata['content_category'] = 'structured_data'
    else:
        metadata['content_category'] = 'general'

    return metadata

# === Execute if bucket name is defined ===
if bucket_name:
    # Configure S3 reader using advanced metadata function
    s3_config = S3DirectoryReaderConfig(
        bucket=bucket_name,
        prefix="documents/",  # Change as needed
        metadata_fn=advanced_s3_metadata
    )

    s3_reader = S3DirectoryReaderProvider(s3_config)

    try:
        docs = s3_reader.read(None)
        print(f"Loaded {len(docs)} documents from S3 with advanced metadata")

        if docs:
            doc = docs[0]
            print("Sample document metadata:")
            print(f"  File name     : {doc.metadata.get('file_name')}")
            print(f"  S3 path       : {doc.metadata.get('s3_full_path')}")
            print(f"  Category      : {doc.metadata.get('content_category')}")
            print(f"  All metadata  : {list(doc.metadata.keys())}")

    except Exception as e:
        print(f"Error loading documents with advanced metadata: {e}")
        docs = []

else:
    print("Skipping advanced metadata load — no bucket name provided")
    docs = []


### Index S3 Documents

Index all the documents we've read from S3.

In [None]:
if bucket_name and any([s3_docs, all_prefix_docs]):
    # Combine all S3 document lists safely
    all_s3_docs = []
    for doc_list in [s3_docs, all_prefix_docs]:
        if doc_list:
            all_s3_docs.extend(doc_list)

    print(f"Total S3 documents to index: {len(all_s3_docs)}")

    # Show document sources
    sources = {}
    for doc in all_s3_docs:
        source = doc.metadata.get('source', 'unknown')
        sources[source] = sources.get(source, 0) + 1

    print("Document sources:")
    for source, count in sources.items():
        print(f"  {source}: {count} documents")

    # Index the documents
    print("Indexing S3 documents...")
    try:
        graph_index.extract_and_build(all_s3_docs, show_progress=True)
        print("S3 documents indexed successfully!")
    except Exception as e:
        print(f"Error indexing S3 documents: {e}")

else:
    print("No S3 documents to index.")


### Cleanup S3 Resources

**Important:** Clean up the test S3 bucket to avoid ongoing charges.

In [None]:
if bucket_name:
    try:
        print(f"Cleaning up S3 bucket: {bucket_name}")
        
        # Delete all objects in bucket
        response = s3_client.list_objects_v2(Bucket=bucket_name)
        if 'Contents' in response:
            objects_to_delete = [{'Key': obj['Key']} for obj in response['Contents']]
            s3_client.delete_objects(
                Bucket=bucket_name,
                Delete={'Objects': objects_to_delete}
            )
            print(f"  Deleted {len(objects_to_delete)} objects")
        
        # Delete the bucket
        s3_client.delete_bucket(Bucket=bucket_name)
        print(f"Successfully deleted S3 bucket: {bucket_name}")
        
    except Exception as e:
        print(f"Error cleaning up S3 bucket: {e}")
        print(f"Please manually delete bucket: {bucket_name}")
        
else:
    print("No S3 bucket to clean up")

## Configuration Options

The S3DirectoryReaderConfig supports several options:

### Basic Options
- `bucket`: S3 bucket name (required)
- `key`: S3 key prefix (optional, None reads entire bucket)
- `aws_region`: AWS region (optional, uses default if not specified)

### AWS Authentication
- `aws_profile`: AWS profile name (optional)
- Uses standard AWS credential chain if not specified

### Metadata Enhancement
- `metadata_fn`: Function to add custom metadata to each document

### Example Configurations

```python
# Read entire bucket
full_bucket_config = S3DirectoryReaderConfig(
    bucket="my-documents-bucket",
    aws_region="us-west-2"
)

# Read specific folder
folder_config = S3DirectoryReaderConfig(
    bucket="my-documents-bucket",
    key="documents/reports/",
    aws_region="us-west-2"
)

# Use specific AWS profile
profile_config = S3DirectoryReaderConfig(
    bucket="my-documents-bucket",
    aws_profile="production",
    aws_region="us-west-2"
)
```

## Troubleshooting

### Common Issues

1. **AWS Credentials Not Found**
   - Configure AWS CLI: `aws configure`
   - Set environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`
   - Use IAM roles if running on EC2

2. **Access Denied Errors**
   - Ensure your AWS credentials have S3 read permissions
   - Check bucket policies and ACLs
   - Verify the bucket exists and is in the correct region

3. **No Documents Found**
   - Verify the bucket name and key prefix
   - Check if files exist in the specified S3 location
   - Ensure files are readable (not encrypted with inaccessible keys)

4. **Region Mismatch**
   - Specify the correct AWS region in the configuration
   - Some operations require region-specific endpoints

### Best Practices

- Use IAM roles instead of access keys when possible
- Implement least-privilege access policies
- Monitor S3 costs, especially for large buckets
- Use key prefixes to organize and filter documents
- Test with small buckets before processing large datasets

## Complete

This notebook demonstrated the S3 Directory Reader Provider capabilities:

### Key Features:
1. **AWS Credentials Check**: Verify AWS configuration
2. **Test Bucket Creation**: Create S3 resources for testing
3. **Basic S3 Reading**: Read all files from a bucket
4. **Prefix-based Reading**: Read from specific S3 "directories"
5. **Multiple Prefix Reading**: Process different folders with different configs
6. **Advanced Metadata**: Extract rich metadata from S3 paths
7. **Resource Cleanup**: Properly clean up test resources

### Use Cases:
- **Document Archives**: Process existing document collections in S3
- **Data Lakes**: Extract insights from structured data repositories
- **Content Migration**: Import cloud-stored content into knowledge graphs
- **Multi-tenant Systems**: Process documents from different S3 prefixes

### Security Considerations:
- Always use least-privilege IAM policies
- Monitor S3 access logs
- Be aware of data transfer costs
- Clean up test resources to avoid charges

The S3 Directory Reader Provider enables seamless integration with cloud-stored documents, making it easy to build knowledge graphs from existing S3 data repositories.