# E2B Unified Mount Directory Test

**✅ E2B Integration with Unified Mount Structure**

## 🎯 Purpose
Test the **unified mount directory structure** across all environments:
- **Local**: `$HOME/data/sentient/`
- **Docker**: `/data/sentient/`  
- **E2B**: `/data/sentient/`

## 📁 Expected Structure
```
/data/sentient/
├── {project_id}/
│   ├── binance_toolkit/     # Binance API data
│   ├── coingecko_toolkit/   # CoinGecko API data  
│   ├── defillama_toolkit/   # DefiLlama API data
│   ├── arkham_toolkit/      # Arkham API data
│   └── results/             # AI analysis outputs
└── other_projects/
```

## 🔄 Test Flow
1. 🏠 **Local Data Fetching**: Agent with data toolkit fetches crypto data to mount directory
2. ☁️ **S3 Sync**: Data automatically syncs to S3 bucket via mount
3. 🚀 **E2B Execution**: Code runs in sandbox with same mount path `/data/sentient`
4. 🧮 **Analysis**: Sandbox analyzes data and saves results to mount
5. 📊 **Results**: Results visible across all environments

## ✅ What's Working
- Unified `/data/sentient` mount path across environments
- S3 bucket mounted with goofys ✅ **WORKING!**
- Cross-environment data persistence ✅

## 📋 Environment Setup and Cleanup

In [1]:
# Clean imports and memory
import sys
import importlib
import gc

# Remove any cached modules to ensure fresh imports
modules_to_remove = [m for m in sys.modules.keys() if 'sentient' in m.lower() or 'agno' in m.lower()]
for module in modules_to_remove:
    if module in sys.modules:
        del sys.modules[module]

# Force garbage collection
gc.collect()
print("🔄 Ready for fresh imports")

🔄 Ready for fresh imports


## 🔧 Dependencies and Configuration Check

In [2]:
import os
import asyncio
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Check required environment variables
required_env_vars = {
    'E2B_API_KEY': 'E2B API key for sandbox access',
    'AWS_ACCESS_KEY_ID': 'AWS access key for S3 integration',
    'AWS_SECRET_ACCESS_KEY': 'AWS secret key for S3 integration',
    'S3_BUCKET_NAME': 'S3 bucket name for data storage',
}

optional_env_vars = {
    'E2B_TEMPLATE_ID': 'Custom E2B template (defaults to sentient-e2b-s3)',
    'E2B_TIMEOUT': 'E2B sandbox timeout (defaults to 300s)',
    'AWS_REGION': 'AWS region (defaults to us-east-1)',
}

print("🔍 Environment Configuration Check:")
print("=" * 50)

# Check required variables
missing_required = []
for var, description in required_env_vars.items():
    value = os.getenv(var)
    if value:
        masked_value = f"{value[:8]}...{value[-4:]}" if len(value) > 12 else "***"
        print(f"✅ {var}: {masked_value}")
    else:
        print(f"❌ {var}: MISSING - {description}")
        missing_required.append(var)

print("\n📋 Optional Configuration:")
for var, description in optional_env_vars.items():
    value = os.getenv(var)
    if value:
        print(f"✅ {var}: {value}")
    else:
        print(f"ℹ️  {var}: Using default - {description}")

if missing_required:
    print(f"\n⚠️  Missing required environment variables: {', '.join(missing_required)}")
    print("   Please update your .env file with the missing values.")
else:
    print("\n🎉 All required environment variables are configured!")

🔍 Environment Configuration Check:
✅ E2B_API_KEY: e2b_d5d5...bd00
✅ AWS_ACCESS_KEY_ID: AKIA6GBM...QUL3
✅ AWS_SECRET_ACCESS_KEY: K/Ux894V...X8Xp
✅ S3_BUCKET_NAME: ***

📋 Optional Configuration:
ℹ️  E2B_TEMPLATE_ID: Using default - Custom E2B template (defaults to sentient-e2b-s3)
ℹ️  E2B_TIMEOUT: Using default - E2B sandbox timeout (defaults to 300s)
✅ AWS_REGION: us-west-2

🎉 All required environment variables are configured!


## 📦 Import Dependencies

In [3]:
# Import agno and E2B tools
try:
    from agno.agent import Agent as AgnoAgent
    from agno.models.litellm import LiteLLM
    from agno.tools.e2b import E2BTools
    from agno.tools.reasoning import ReasoningTools
    print("✅ Agno dependencies imported successfully")
except ImportError as e:
    print(f"❌ Failed to import agno dependencies: {e}")
    print("   Please install agno: pip install agno-ai")

# Import SentientResearchAgent toolkits for comparison
try:
    from sentientresearchagent.hierarchical_agent_framework.toolkits.data.binance_toolkit import BinanceToolkit
    from sentientresearchagent.hierarchical_agent_framework.toolkits.data.coingecko_toolkit import CoinGeckoToolkit
    print("✅ SentientResearchAgent toolkits imported successfully")
except ImportError as e:
    print(f"❌ Failed to import SentientResearchAgent toolkits: {e}")

# Standard libraries
import json
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

✅ Agno dependencies imported successfully
✅ SentientResearchAgent toolkits imported successfully


In [4]:
import json
import re
from typing import Any, Union

def decode_e2b_output(raw_result: Any) -> str:
    """
    Decode AgnoAgent E2BTools output properly.
    
    The AgnoAgent E2BTools.run_python_code() method returns JSON-encoded output 
    in the format: ["Logs:\nLogs(stdout: [...], stderr: [...])"]
    This function properly decodes the Unicode escapes and extracts the stdout content.
    
    Args:
        raw_result: Raw output from E2BTools.run_python_code()
        
    Returns:
        Properly decoded and formatted output string
    """
    try:
        # Handle different input types
        if isinstance(raw_result, list) and len(raw_result) > 0:
            content = raw_result[0]
        elif isinstance(raw_result, str):
            content = raw_result
        else:
            return str(raw_result)
        
        # Handle JSON-encoded list format: ["Logs:\nLogs(stdout: [...], stderr: [...])"]
        if isinstance(content, str) and content.startswith('["') and content.endswith('"]'):
            try:
                parsed_list = json.loads(content)
                if isinstance(parsed_list, list) and len(parsed_list) > 0:
                    content = parsed_list[0]
            except json.JSONDecodeError:
                pass
        
        # Extract stdout content from Logs format: "Logs:\nLogs(stdout: [...], stderr: [...])"
        if isinstance(content, str) and "Logs(stdout:" in content:
            # Use regex to extract stdout content
            stdout_pattern = r"stdout:\s*\[(.*?)\],?\s*stderr:"
            match = re.search(stdout_pattern, content, re.DOTALL)
            
            if match:
                stdout_raw = match.group(1)
                
                # Parse the stdout array content - handle quoted strings with proper escaping
                stdout_lines = []
                
                # Split by ', ' but handle embedded quotes and escapes
                current_part = ""
                in_quotes = False
                escaped = False
                
                i = 0
                while i < len(stdout_raw):
                    char = stdout_raw[i]
                    
                    if escaped:
                        current_part += char
                        escaped = False
                    elif char == '\\':
                        current_part += char
                        escaped = True
                    elif char == "'" and not escaped:
                        in_quotes = not in_quotes
                        current_part += char
                    elif char == ',' and not in_quotes and i + 1 < len(stdout_raw) and stdout_raw[i + 1] == ' ':
                        # Found separator
                        if current_part.strip():
                            stdout_lines.append(current_part.strip())
                        current_part = ""
                        i += 1  # Skip the space after comma
                    else:
                        current_part += char
                    
                    i += 1
                
                # Add the last part
                if current_part.strip():
                    stdout_lines.append(current_part.strip())
                
                # Clean and decode each part
                decoded_lines = []
                for part in stdout_lines:
                    # Remove outer quotes
                    if (part.startswith("'") and part.endswith("'")) or (part.startswith('"') and part.endswith('"')):
                        part = part[1:-1]
                    
                    # Decode JSON escapes properly
                    try:
                        # Handle JSON string encoding with proper Unicode support
                        decoded_part = json.loads(f'"{part}"')
                        decoded_lines.append(decoded_part)
                    except json.JSONDecodeError:
                        # Fallback: manual decode common escapes
                        decoded_part = part
                        decoded_part = decoded_part.replace('\\n', '\n')
                        decoded_part = decoded_part.replace('\\t', '\t')
                        decoded_part = decoded_part.replace('\\"', '"')
                        decoded_part = decoded_part.replace("\\'", "'")
                        # Handle Unicode escapes manually
                        decoded_part = decode_unicode_escapes(decoded_part)
                        decoded_lines.append(decoded_part)
                
                # Join all stdout lines
                full_output = ''.join(decoded_lines)
                return full_output
        
        # If not in Logs format, try direct JSON decode
        if isinstance(content, str):
            try:
                decoded = json.loads(f'"{content}"')
                return decode_unicode_escapes(decoded)
            except json.JSONDecodeError:
                return decode_unicode_escapes(content)
        
        return str(content)
        
    except Exception as e:
        return f"[Error decoding E2B output: {e}]\n\nRaw output:\n{str(raw_result)}"


def decode_unicode_escapes(text: str) -> str:
    """
    Decode Unicode escape sequences in text.
    
    Args:
        text: Text that may contain Unicode escapes like \\ud83c\\udfd7\\ufe0f
        
    Returns:
        Text with Unicode escapes properly decoded
    """
    if not isinstance(text, str):
        return str(text)
    
    try:
        # Handle Unicode escapes using encode/decode
        return text.encode().decode('unicode_escape')
    except (UnicodeDecodeError, UnicodeEncodeError):
        # Fallback: manual decode using codecs
        try:
            import codecs
            return codecs.decode(text, 'unicode_escape')
        except:
            # Final fallback: return as-is
            return text

print("🔧 E2B output decoder functions loaded")

🔧 E2B output decoder functions loaded


In [5]:
async def test_local_data_fetch_and_sync():
    """Test local data fetching with toolkits and S3 sync verification."""
    print("🏠 LOCAL DATA FETCHING & S3 SYNC TEST")
    print("=" * 45)
    
    try:
        # Step 1: Fetch data locally using our toolkits
        print("📊 Step 1: Fetching crypto data locally...")
        
        # Initialize local data toolkit (similar to what agents would use)
        local_data_dir = "./notebooks/data/e2b_test"
        os.makedirs(local_data_dir, exist_ok=True)
        
        binance_toolkit = BinanceToolkit(
            symbols=['BTCUSDT', 'ETHUSD', 'SOLUSDT'],
            default_market_type="spot",
            data_dir=local_data_dir,
            parquet_threshold=500
        )
        print(f"✅ BinanceToolkit initialized with local dir: {local_data_dir}")
        
        # Get current prices and save locally (fix: handle API response format)
        print("  Fetching current prices...")
        prices_data = {}
        symbols = ['BTCUSDT', 'ETHUSD', 'SOLUSDT']
        
        for symbol in symbols:
            try:
                price_response = await binance_toolkit.get_current_price(symbol)
                # Handle different response formats
                if isinstance(price_response, dict):
                    if 'price' in price_response:
                        price_value = price_response['price']
                    elif 'data' in price_response and isinstance(price_response['data'], dict):
                        price_value = price_response['data'].get('price', 'N/A')
                    elif 'data' in price_response and isinstance(price_response['data'], list) and len(price_response['data']) > 0:
                        price_value = price_response['data'][0].get('price', 'N/A')
                    else:
                        # Check for any field that might contain price
                        price_value = str(price_response)[:50] + "..." if len(str(price_response)) > 50 else str(price_response)
                else:
                    price_value = str(price_response)
                
                prices_data[symbol] = {'price': price_value, 'raw_response': str(price_response)[:200]}
                print(f"  ✅ {symbol}: ${price_value}")
            except Exception as e:
                print(f"  ❌ {symbol}: Error - {e}")
                prices_data[symbol] = {"error": str(e)}
        
        # Get some historical data (fix: handle DataFrame serialization)
        print("  Fetching historical data...")
        try:
            btc_klines_response = await binance_toolkit.get_klines('BTCUSDT', interval='1h', limit=24)
            
            # Convert DataFrame to serializable format
            if isinstance(btc_klines_response, dict):
                btc_klines = {}
                for key, value in btc_klines_response.items():
                    if hasattr(value, 'to_dict'):  # DataFrame
                        btc_klines[key] = value.to_dict('records')
                    elif hasattr(value, 'tolist'):  # NumPy array
                        btc_klines[key] = value.tolist()
                    else:
                        btc_klines[key] = value
            else:
                btc_klines = {"raw_response": str(btc_klines_response)[:500]}
                
            data_count = len(btc_klines.get('data', [])) if 'data' in btc_klines else 'unknown'
            print(f"  ✅ BTC 24h data: {data_count} klines")
        except Exception as e:
            print(f"  ❌ BTC historical data: Error - {e}")
            btc_klines = {"error": str(e)}
        
        # Create comprehensive dataset (ensure all data is JSON serializable)
        dataset = {
            'timestamp': datetime.now().isoformat(),
            'data_source': 'binance_api',
            'fetch_location': 'local_notebook',
            'current_prices': prices_data,
            'btc_24h_klines': btc_klines,
            'symbols_count': len(symbols),
            'local_storage_path': local_data_dir
        }
        
        # Step 2: Save data locally (this should sync to S3 via mounted filesystem)
        print(f"\n💾 Step 2: Saving data locally for S3 sync...")
        local_file = os.path.join(local_data_dir, 'crypto_dataset.json')
        
        with open(local_file, 'w') as f:
            json.dump(dataset, f, indent=2)
        
        local_size = os.path.getsize(local_file)
        print(f"✅ Data saved locally: {local_file} ({local_size} bytes)")
        
        # Also save as CSV for easier analysis  
        if prices_data:
            import pandas as pd
            prices_df = pd.DataFrame([
                {
                    'symbol': symbol,
                    'price': str(data.get('price', 'N/A')),
                    'timestamp': dataset['timestamp'],
                    'error': data.get('error', None),
                    'has_data': 'price' in data and data.get('error') is None
                }
                for symbol, data in prices_data.items()
            ])
            
            csv_file = os.path.join(local_data_dir, 'current_prices.csv')
            prices_df.to_csv(csv_file, index=False)
            csv_size = os.path.getsize(csv_file)
            print(f"✅ Prices CSV saved: {csv_file} ({csv_size} bytes)")
        
        # Step 3: Wait for S3 sync and verify in sandbox
        print(f"\n☁️ Step 3: Verifying S3 sync via E2B sandbox...")
        
        # Give some time for S3 sync (if using mounted filesystem)
        print("   Waiting 5 seconds for potential S3 sync...")
        await asyncio.sleep(5)
        
        if not direct_success:
            print("⚠️ Skipping S3 verification - direct E2B test failed")
            return False
        
        # Check if data is accessible from E2B sandbox using human-readable template name
        template_name = "sentient-e2b-s3"
        e2b_toolkit = E2BTools(sandbox_options={"template": template_name})
        
        # This code will run in the E2B sandbox to check S3 data availability
        s3_verification_code = f'''
import os
import json
import glob

print("S3 DATA SYNC VERIFICATION")
print("=" * 30)

# Check potential S3 mount paths where our data might appear
s3_paths = [
    "/home/user/s3-bucket",
    "/workspace/s3-bucket", 
    "/workspace/data",
    "/workspace/results"
]

found_data = False
dataset_file = None

# Look for our dataset in mounted S3 paths
for s3_path in s3_paths:
    if os.path.exists(s3_path):
        print(f"Searching in: {{s3_path}}")
        
        # Search for our crypto dataset
        search_patterns = [
            os.path.join(s3_path, "**", "crypto_dataset.json"),
            os.path.join(s3_path, "**", "current_prices.csv"),
            os.path.join(s3_path, "crypto_dataset.json"),
            os.path.join(s3_path, "current_prices.csv")
        ]
        
        for pattern in search_patterns:
            matches = glob.glob(pattern, recursive=True)
            if matches:
                for match in matches:
                    print(f"FOUND data file: {{match}}")
                    size = os.path.getsize(match)
                    print(f"   Size: {{size}} bytes")
                    
                    if match.endswith('crypto_dataset.json'):
                        dataset_file = match
                        found_data = True
                        
                        # Try to read and verify the data
                        try:
                            with open(match, 'r') as f:
                                data = json.load(f)
                            print(f"   Data readable: {{len(data)}} fields")
                            print(f"   Timestamp: {{data.get('timestamp', 'unknown')}}")
                            print(f"   Symbols: {{data.get('symbols_count', 'unknown')}}")
                            print(f"   Source: {{data.get('data_source', 'unknown')}}")
                        except Exception as e:
                            print(f"   Read error: {{e}}")
        
        if not found_data:
            # List what's actually in the S3 mount
            try:
                contents = os.listdir(s3_path)
                print(f"Directory {{s3_path}}: {{len(contents)}} items")
                if contents:
                    # Show all items (no truncation)
                    for item in contents:
                        item_path = os.path.join(s3_path, item)
                        is_dir = os.path.isdir(item_path)
                        size = "dir" if is_dir else f"{{os.path.getsize(item_path)}}b"
                        print(f"   - {{item}} ({{size}})")
                else:
                    print(f"   (empty)")
            except Exception as e:
                print(f"Cannot list {{s3_path}}: {{e}}")
    else:
        print(f"Path {{s3_path}}: does not exist")

if found_data:
    print()
    print("SUCCESS: Local data is accessible in E2B sandbox!")
    print(f"Dataset location in sandbox: {{dataset_file}}")
    print("File path translation working!")
else:
    print()
    print("Local data not yet synced to S3 or not accessible from sandbox")
    print("This could be normal if:")
    print("- S3 sync takes time")
    print("- Files are in different S3 paths") 
    print("- Local data needs to be stored in S3-mounted directory")

print()
print("S3 sync verification completed")
'''
        
        result = e2b_toolkit.run_python_code(s3_verification_code)
        decoded_output = decode_e2b_output(result)
        print("✅ S3 verification completed")
        print("📊 Sandbox output:")
        print("-" * 60)
        print(decoded_output)
        print("-" * 60)
        
        print(f"\n🎉 Local data fetching and sync test completed!")
        print(f"📁 Local files created: {local_data_dir}")
        
        return True
        
    except Exception as e:
        print(f"❌ Local data fetching test failed: {e}")
        import traceback
        traceback.print_exc()
        return False

# Run the test
local_sync_success = await test_local_data_fetch_and_sync()

[32m2025-08-21 19:50:44.151[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.base.base_api[0m:[36m_init_cache_system[0m:[36m270[0m - [34m[1mInitialized generic cache system with TTL: 3600s[0m
[32m2025-08-21 19:50:44.151[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.utils.http_client[0m:[36m__init__[0m:[36m96[0m - [34m[1mInitialized DataHTTPClient with 30.0s timeout[0m
[32m2025-08-21 19:50:44.152[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.base.base_api[0m:[36m_init_standard_configuration[0m:[36m559[0m - [34m[1mInitialized standard configuration: timeout=30.0s, retries=3, cache_ttl=3600s[0m
[32m2025-08-21 19:50:44.153[0m | [1mINFO    [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.base.base_data[0m:[36m_detect_e2b_context[0m:[36m131[0m - [1mS3 integration detected with bucket: roma-sh

🏠 LOCAL DATA FETCHING & S3 SYNC TEST
📊 Step 1: Fetching crypto data locally...
✅ BinanceToolkit initialized with local dir: ./notebooks/data/e2b_test
  Fetching current prices...


[32m2025-08-21 19:50:44.456[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.base.base_api[0m:[36m_cache_data[0m:[36m311[0m - [34m[1mCached set data (246 items) for key 'symbols_spot'[0m
[32m2025-08-21 19:50:44.456[0m | [1mINFO    [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.data.binance_toolkit[0m:[36mreload_symbols[0m:[36m509[0m - [1mLoaded 246 symbols for Binance Spot Trading[0m
[32m2025-08-21 19:50:44.457[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.utils.http_client[0m:[36m_make_request[0m:[36m317[0m - [34m[1mMaking GET request to spot/api/v3/ticker/price (attempt 1)[0m
[32m2025-08-21 19:50:44.581[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.utils.http_client[0m:[36m_make_request[0m:[36m317[0m - [34m[1mMaking GET request to spot/api/v3/ticker/price (attempt 1)[0m
[32m2025-0

  ✅ BTCUSDT: $112409.04
  ✅ ETHUSD: $4234.26


[32m2025-08-21 19:50:44.850[0m | [34m[1mDEBUG   [0m | [36msentientresearchagent.hierarchical_agent_framework.toolkits.utils.http_client[0m:[36m_make_request[0m:[36m317[0m - [34m[1mMaking GET request to spot/api/v3/klines (attempt 1)[0m


  ✅ SOLUSDT: $181.54
  Fetching historical data...
  ✅ BTC 24h data: 24 klines

💾 Step 2: Saving data locally for S3 sync...
✅ Data saved locally: ./notebooks/data/e2b_test/crypto_dataset.json (15306 bytes)
✅ Prices CSV saved: ./notebooks/data/e2b_test/current_prices.csv (185 bytes)

☁️ Step 3: Verifying S3 sync via E2B sandbox...
   Waiting 5 seconds for potential S3 sync...
❌ Local data fetching test failed: name 'direct_success' is not defined


Traceback (most recent call last):
  File "/var/folders/dk/m1n_82n10g9fnf63ct8vl2540000gn/T/ipykernel_22457/3609255658.py", line 121, in test_local_data_fetch_and_sync
    if not direct_success:
           ^^^^^^^^^^^^^^
NameError: name 'direct_success' is not defined


async def test_cross_environment_data_sync():
    \"\"\"Test data sync between local and E2B environments using unified mount.\"\"\"\n    print(\"🔄 CROSS-ENVIRONMENT DATA SYNC TEST\")\n    print(\"=\" * 45)\n    \n    try:\n        # Test project setup\n        test_project_id = f\"sync_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}\"\n        mount_dir = \"/data/sentient\"  # Unified mount directory\n        \n        print(f\"🧪 Test Project ID: {test_project_id}\")\n        print(f\"📁 Mount Directory: {mount_dir}\")\n        \n        # Step 1: Create test data locally (simulating toolkit data)\n        print(\"\\n📊 Step 1: Creating test data locally...\")\n        \n        local_data_dir = f\"./notebooks/data/{test_project_id}\"\n        os.makedirs(local_data_dir, exist_ok=True)\n        \n        # Create sample toolkit data\n        test_data = {\n            'project_id': test_project_id,\n            'timestamp': datetime.now().isoformat(),\n            'data_source': 'local_simulation',\n            'crypto_prices': {\n                'BTC': 45000.50,\n                'ETH': 3200.75,\n                'SOL': 105.25\n            },\n            'analysis_ready': True\n        }\n        \n        # Save locally\n        local_file = os.path.join(local_data_dir, 'test_data.json')\n        with open(local_file, 'w') as f:\n            json.dump(test_data, f, indent=2)\n        \n        local_size = os.path.getsize(local_file)\n        print(f\"✅ Test data created: {local_file} ({local_size} bytes)\")\n        \n        # Step 2: Test E2B sandbox access to mount directory\n        print(\"\\n☁️ Step 2: Testing E2B sandbox mount access...\")\n        \n        # Initialize E2B\n        template_name = \"sentient-e2b-s3\"\n        e2b_toolkit = E2BTools(sandbox_options={\"template\": template_name})\n        \n        # Test code to run in E2B sandbox\n        sync_test_code = f\"\"\"\nimport os\nimport json\nfrom datetime import datetime\n\nMOUNT_DIR = \"{mount_dir}\"\nPROJECT_ID = \"{test_project_id}\"\n\nprint(\"🔄 CROSS-ENVIRONMENT SYNC VERIFICATION\")\nprint(\"=\" * 45)\nprint(f\"Project: {{PROJECT_ID}}\")\nprint(f\"Mount: {{MOUNT_DIR}}\")\n\n# Check if mount directory exists\nif os.path.exists(MOUNT_DIR):\n    print(f\"✅ Mount directory exists: {{MOUNT_DIR}}\")\n    \n    # Create test project structure in E2B\n    project_dir = os.path.join(MOUNT_DIR, PROJECT_ID)\n    os.makedirs(project_dir, exist_ok=True)\n    \n    # Create toolkit directories\n    toolkit_dirs = ['binance_toolkit', 'coingecko_toolkit', 'results']\n    created_dirs = []\n    \n    for toolkit in toolkit_dirs:\n        toolkit_dir = os.path.join(project_dir, toolkit)\n        os.makedirs(toolkit_dir, exist_ok=True)\n        created_dirs.append(toolkit)\n        \n        # Create a test file in each directory\n        test_file = os.path.join(toolkit_dir, 'e2b_test.json')\n        e2b_data = {{\n            'created_in': 'e2b_sandbox',\n            'toolkit': toolkit,\n            'project_id': PROJECT_ID,\n            'timestamp': datetime.now().isoformat(),\n            'mount_path': MOUNT_DIR\n        }}\n        \n        with open(test_file, 'w') as f:\n            json.dump(e2b_data, f, indent=2)\n        \n        print(f\"✅ Created: {{toolkit_dir}}/e2b_test.json\")\n    \n    # Summary\n    print(f\"\\n📊 Created {{len(created_dirs)}} toolkit directories:\")\n    for d in created_dirs:\n        print(f\"   📁 {{d}}\")\n    \n    # Test write/read operations\n    results_file = os.path.join(project_dir, 'results', 'sync_verification.json')\n    verification_data = {{\n        'test': 'cross_environment_sync',\n        'created_in': 'e2b_sandbox',\n        'project_id': PROJECT_ID,\n        'timestamp': datetime.now().isoformat(),\n        'status': 'success',\n        'message': 'E2B can create and write files to mount directory'\n    }}\n    \n    with open(results_file, 'w') as f:\n        json.dump(verification_data, f, indent=2)\n    \n    print(f\"\\n💾 Verification file created: {{results_file}}\")\n    \n    # List project contents\n    if os.path.exists(project_dir):\n        contents = os.listdir(project_dir)\n        print(f\"\\n📋 Project structure created: {{len(contents)}} directories\")\n        for item in contents:\n            item_path = os.path.join(project_dir, item)\n            if os.path.isdir(item_path):\n                files = os.listdir(item_path)\n                print(f\"   📁 {{item}}: {{len(files)}} files\")\n    \n    print(\"\\n✅ Cross-environment sync test completed successfully!\")\n    print(\"🎉 Data created in E2B will be visible in other environments\")\n    \nelse:\n    print(f\"❌ Mount directory not found: {{MOUNT_DIR}}\")\n    print(\"   Check E2B template configuration\")\n    print(\"   Expected mount: /data/sentient\")\n\"\"\"\n        \n        result = e2b_toolkit.run_python_code(sync_test_code)\n        print(\"✅ E2B sync test completed\")\n        print(\"📊 E2B Output:\")\n        print(\"-\" * 50)\n        print(result)\n        print(\"-\" * 50)\n        \n        print(f\"\\n🎉 Cross-environment data sync test completed!\")\n        print(f\"📁 Local test data: {local_data_dir}\")\n        print(f\"☁️ E2B created structure in: {mount_dir}/{test_project_id}\")\n        \n        return True\n        \n    except Exception as e:\n        print(f\"❌ Cross-environment sync test failed: {e}\")\n        import traceback\n        traceback.print_exc()\n        return False\n\n# Run the test\nsync_success = await test_cross_environment_data_sync()

In [None]:
async def test_data_analysis_with_mount():
    \"\"\"Test data analysis using the unified mount directory.\"\"\"\n    print(\"💰 DATA ANALYSIS WITH UNIFIED MOUNT\")\n    print(\"=\" * 45)\n    \n    try:\n        # Test project setup\n        test_project_id = f\"analysis_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}\"\n        print(f\"🧪 Test Project ID: {test_project_id}\")\n        \n        # Initialize E2B\n        template_name = \"sentient-e2b-s3\"\n        e2b_toolkit = E2BTools(sandbox_options={\"template\": template_name})\n        \n        print(\"\\n📊 Step 1: Create sample data and analyze in E2B\")\n        \n        analysis_code = f\"\"\"\nimport pandas as pd\nimport numpy as np\nimport json\nimport os\nfrom datetime import datetime, timedelta\n\nMOUNT_DIR = \"/data/sentient\"\nPROJECT_ID = \"{test_project_id}\"\n\nprint(\"💰 CRYPTO DATA ANALYSIS\")\nprint(\"=\" * 30)\nprint(f\"Project: {{PROJECT_ID}}\")\nprint(f\"Mount: {{MOUNT_DIR}}\")\n\n# Create sample crypto data (simulating toolkit output)\nsymbols = ['BTC', 'ETH', 'SOL', 'ADA', 'DOT']\nnp.random.seed(42)  # Reproducible results\n\n# Generate realistic crypto price data\nbase_prices = {{'BTC': 45000, 'ETH': 3000, 'SOL': 100, 'ADA': 0.5, 'DOT': 25}}\n\n# Generate 30 days of hourly data\nhours = 24 * 30  # 30 days\ntimestamps = [datetime.now() - timedelta(hours=i) for i in range(hours, 0, -1)]\n\ncrypto_data = []\nfor symbol in symbols:\n    base_price = base_prices[symbol]\n    \n    for i, timestamp in enumerate(timestamps):\n        # Simulate price movement with trend and volatility\n        trend = 0.001 * np.sin(i / 24)  # Daily cycle\n        volatility = np.random.normal(0, 0.02)  # 2% volatility\n        price = base_price * (1 + trend + volatility) ** (i / hours)\n        \n        crypto_data.append({{\n            'timestamp': timestamp.isoformat(),\n            'symbol': symbol,\n            'price': round(price, 6),\n            'volume': np.random.uniform(1000000, 10000000)\n        }})\n\n# Convert to DataFrame\ndf = pd.DataFrame(crypto_data)\nprint(f\"📊 Generated data: {{len(df)}} records for {{len(symbols)}} symbols\")\n\n# Perform analysis\nprint(\"\\n🔍 Analysis Results:\")\n\n# Current prices\ncurrent_prices = df.groupby('symbol')['price'].last()\nprint(\"\\n💰 Current Prices:\")\nfor symbol, price in current_prices.items():\n    print(f\"  {{symbol}}: ${{price:,.2f}}\")\n\n# 24h changes\ndf['date'] = pd.to_datetime(df['timestamp']).dt.date\ndaily_prices = df.groupby(['symbol', 'date'])['price'].last().reset_index()\nlatest_date = daily_prices['date'].max()\nprevious_date = daily_prices['date'].max() - timedelta(days=1)\n\nchanges_24h = {{}}\nfor symbol in symbols:\n    latest = daily_prices[(daily_prices['symbol'] == symbol) & (daily_prices['date'] == latest_date)]['price'].iloc[0]\n    previous = daily_prices[(daily_prices['symbol'] == symbol) & (daily_prices['date'] == previous_date)]['price'].iloc[0]\n    change_pct = ((latest - previous) / previous) * 100\n    changes_24h[symbol] = change_pct\n\nprint(\"\\n📈 24h Changes:\")\nfor symbol, change in changes_24h.items():\n    emoji = \"🟢\" if change > 0 else \"🔴\" if change < 0 else \"⚪\"\n    print(f\"  {{emoji}} {{symbol}}: {{change:+.2f}}%\")\n\n# Save results to mount directory\nif os.path.exists(MOUNT_DIR):\n    results_dir = os.path.join(MOUNT_DIR, PROJECT_ID, \"results\")\n    os.makedirs(results_dir, exist_ok=True)\n    \n    # Save analysis results\n    analysis_results = {{\n        'timestamp': datetime.now().isoformat(),\n        'project_id': PROJECT_ID,\n        'analysis_type': 'crypto_market_analysis',\n        'current_prices': dict(current_prices),\n        'changes_24h': changes_24h,\n        'data_points': len(df),\n        'symbols': symbols\n    }}\n    \n    results_file = os.path.join(results_dir, 'analysis_results.json')\n    with open(results_file, 'w') as f:\n        json.dump(analysis_results, f, indent=2)\n    \n    print(f\"\\n💾 Results saved: {{results_file}}\")\n    \n    # Save raw data\n    data_file = os.path.join(results_dir, 'crypto_data.csv')\n    df.to_csv(data_file, index=False)\n    \n    print(f\"💾 Data saved: {{data_file}} ({{len(df)}} records)\")\n    \n    # Create data summary for toolkits\n    for symbol in symbols:\n        symbol_dir = os.path.join(MOUNT_DIR, PROJECT_ID, f\"{{symbol.lower()}}_toolkit\")\n        os.makedirs(symbol_dir, exist_ok=True)\n        \n        symbol_data = df[df['symbol'] == symbol].copy()\n        symbol_file = os.path.join(symbol_dir, f\"{{symbol.lower()}}_prices.json\")\n        \n        symbol_summary = {{\n            'symbol': symbol,\n            'current_price': float(current_prices[symbol]),\n            'change_24h': changes_24h[symbol],\n            'data_points': len(symbol_data),\n            'price_range': {{\n                'min': float(symbol_data['price'].min()),\n                'max': float(symbol_data['price'].max()),\n                'avg': float(symbol_data['price'].mean())\n            }}\n        }}\n        \n        with open(symbol_file, 'w') as f:\n            json.dump(symbol_summary, f, indent=2)\n    \n    print(f\"\\n📁 Project structure created in {{MOUNT_DIR}}/{{PROJECT_ID}}\")\n    print(\"✅ Analysis completed successfully!\")\n    \n    # Verify structure\n    project_path = os.path.join(MOUNT_DIR, PROJECT_ID)\n    if os.path.exists(project_path):\n        contents = os.listdir(project_path)\n        print(f\"\\n📋 Project contents: {{len(contents)}} directories\")\n        for item in contents:\n            item_path = os.path.join(project_path, item)\n            if os.path.isdir(item_path):\n                files = os.listdir(item_path)\n                print(f\"   📁 {{item}}: {{len(files)}} files\")\nelse:\n    print(\"❌ Mount directory not available\")\n    \nprint(\"\\n🎉 Data analysis with mount completed!\")\n\"\"\"\n        \n        result = e2b_toolkit.run_python_code(analysis_code)\n        print(f\"📊 Analysis result:\\n{result}\")\n        \n        print(\"\\n🎉 Data analysis with unified mount completed!\")\n        return True\n        \n    except Exception as e:\n        print(f\"❌ Data analysis test failed: {e}\")\n        import traceback\n        traceback.print_exc()\n        return False\n\n# Run the test\ndata_analysis_success = await test_data_analysis_with_mount()

## 🤖 Test 3: Agent-Based Code Execution

In [None]:
async def test_agent_code_execution():
    """Test agent-based code execution with E2B tools and data analysis."""
    print("🤖 AGENT-BASED CODE EXECUTION TEST")
    print("=" * 40)
    
    try:
        # Initialize model
        model = LiteLLM(id="openrouter/google/gemini-2.5-flash:nitro")
        
        # Initialize E2B toolkit with human-readable template name
        template_name = "sentient-e2b-s3"
        e2b_tools = E2BTools(sandbox_options={"template": template_name})
        reasoning_tools = ReasoningTools()
        
        # Create data analysis agent
        data_agent = AgnoAgent(
            model=model,
            tools=[e2b_tools, reasoning_tools],
            name="DataAnalysisAgent",
            system_message="""
You are an expert data analyst with access to secure E2B sandbox environments for code execution.

Key capabilities:
1. E2B sandbox for secure Python code execution (template: sentient-e2b-s3)
2. S3 storage mounted at /workspace/data and /workspace/results (goofys)
3. Data analysis libraries: pandas, numpy, matplotlib, seaborn
4. Structured reasoning tools

File Path Translation Rules:
- Local files sync to S3 bucket automatically
- In E2B sandbox, access S3 data via: /home/user/s3-bucket/ or /workspace/
- Save analysis results to /workspace/results/ for persistence

When analyzing data:
1. Use E2B sandbox for all computations
2. Access data files from mounted S3 paths
3. Save results to /workspace/results/ for sync back to local
4. Provide clear explanations of findings
5. Include proper error handling

Always be thorough and explain your analysis steps.
"""
        )
        
        print("✅ Data analysis agent initialized with E2B tools")
        
        # Test agent with crypto data analysis task
        print("\n📊 Testing agent with crypto data analysis...")
        
        analysis_task = f"""
Please perform a comprehensive cryptocurrency data analysis using the E2B sandbox.

Tasks:
1. Check what data is available in the mounted S3 storage paths:
   - /home/user/s3-bucket/
   - /workspace/data/
   - /workspace/results/
   
2. Generate sample crypto market data for BTC, ETH, SOL if no real data found

3. Perform technical analysis:
   - Calculate price changes and volatility
   - Compute moving averages
   - Identify trends and patterns
   
4. Create visualizations (save as PNG files)

5. Save comprehensive analysis results to:
   - /workspace/results/crypto_analysis_report.json
   - /workspace/results/analysis_summary.csv

6. Provide actionable insights and risk assessment

Use only the E2B sandbox for all computations. Template: {template_name}
Current timestamp: {datetime.now().isoformat()}
"""
        
        print(f"🔄 Agent task: {analysis_task[:200]}...")
        print("\n🤖 Agent is working (this may take a few minutes)...")
        
        # Execute the task with timeout
        try:
            response = await asyncio.wait_for(
                data_agent.arun(analysis_task), 
                timeout=300  # 5 minute timeout
            )
            
            print(f"\n🤖 Agent Response:\n{response}")
            
            # Verify results were created in E2B sandbox
            print("\n🔍 Verifying agent results in E2B sandbox...")
            verification_code = '''
import os
import json
import glob

print("🔍 AGENT RESULTS VERIFICATION")
print("=" * 35)

# Check for expected output files
expected_files = [
    "/workspace/results/crypto_analysis_report.json",
    "/workspace/results/analysis_summary.csv"
]

results_found = 0
for file_path in expected_files:
    if os.path.exists(file_path):
        size = os.path.getsize(file_path)
        print(f"✅ Found: {file_path} ({size} bytes)")
        results_found += 1
        
        # Try to read JSON files
        if file_path.endswith('.json'):
            try:
                with open(file_path, 'r') as f:
                    data = json.load(f)
                print(f"   📊 JSON structure: {list(data.keys()) if isinstance(data, dict) else type(data).__name__}")
            except Exception as e:
                print(f"   ❌ JSON read error: {e}")
    else:
        print(f"❌ Missing: {file_path}")

# Check for any PNG files (visualizations)
png_files = glob.glob("/workspace/results/*.png")
if png_files:
    print(f"\\n📈 Visualizations found: {len(png_files)} PNG files")
    for png in png_files:
        size = os.path.getsize(png)
        print(f"   - {os.path.basename(png)}: {size} bytes")
else:
    print("\\n📈 No PNG visualizations found")

# List all files in results directory
results_dir = "/workspace/results"
if os.path.exists(results_dir):
    all_files = os.listdir(results_dir)
    print(f"\\n📁 All files in {results_dir}: {len(all_files)} items")
    for f in all_files[:10]:  # Show first 10
        path = os.path.join(results_dir, f)
        size = os.path.getsize(path) if os.path.isfile(path) else "dir"
        print(f"   - {f}: {size}")
    if len(all_files) > 10:
        print(f"   ... and {len(all_files)-10} more")

print(f"\\n📊 Agent Results Summary: {results_found}/{len(expected_files)} expected files found")
print("✅ Verification completed")
'''
            
            verification_result = e2b_tools.run_python_code(verification_code)
            if verification_result and verification_result.get('status') == 'success':
                print("✅ Results verification completed")
                print("📊 Verification output:", verification_result.get('output', 'No output'))
            else:
                print("❌ Results verification failed:", verification_result)
            
            print("\n🎉 Agent-based code execution test completed!")
            return True
            
        except asyncio.TimeoutError:
            print("❌ Agent task timed out after 5 minutes")
            return False
            
    except Exception as e:
        print(f"❌ Agent-based code execution test failed: {e}")
        import traceback
        traceback.print_exc()
        return False

# Run the test
agent_success = await test_agent_code_execution()

## 🏠 Test 5: Local Dependencies Setup Check

Since this notebook runs locally, we should check if goofys and other S3 dependencies are available locally for testing.

In [None]:
import subprocess
import shutil

def check_local_dependencies():
    """Check if local dependencies are available for E2B template building."""
    print("🏠 LOCAL DEPENDENCIES CHECK")
    print("=" * 40)
    
    dependencies = {
        'e2b': 'E2B CLI for template management',
        'docker': 'Docker for building E2B templates',
        'aws': 'AWS CLI for S3 operations',
        'goofys': 'FUSE filesystem for S3 mounting (optional for local testing)',
        's3fs': 'Alternative S3 filesystem (optional for local testing)'
    }
    
    results = {}
    
    for tool, description in dependencies.items():
        path = shutil.which(tool)
        if path:
            try:
                # Get version info
                if tool == 'e2b':
                    result = subprocess.run([tool, '--version'], capture_output=True, text=True, timeout=5)
                elif tool == 'docker':
                    result = subprocess.run([tool, '--version'], capture_output=True, text=True, timeout=5)
                elif tool == 'aws':
                    result = subprocess.run([tool, '--version'], capture_output=True, text=True, timeout=5)
                else:
                    result = subprocess.run([tool, '--help'], capture_output=True, text=True, timeout=5)
                
                version = result.stdout.split('\n')[0] if result.stdout else result.stderr.split('\n')[0]
                results[tool] = {'available': True, 'path': path, 'version': version}
                print(f"✅ {tool}: {path}")
                if version.strip():
                    print(f"   Version: {version.strip()}")
            except Exception as e:
                results[tool] = {'available': True, 'path': path, 'error': str(e)}
                print(f"⚠️  {tool}: {path} (version check failed: {e})")
        else:
            results[tool] = {'available': False}
            print(f"❌ {tool}: not found - {description}")
    
    # Installation suggestions
    print("\n📋 Installation suggestions:")
    install_commands = {
        'e2b': 'npm install -g @e2b/cli',
        'docker': 'Install Docker Desktop from https://docker.com',
        'aws': 'pip install awscli',
        'goofys': 'Download from https://github.com/kahing/goofys/releases',
        's3fs': 'brew install s3fs (macOS) or apt-get install s3fs (Ubuntu)'
    }
    
    for tool, available_info in results.items():
        if not available_info['available']:
            print(f"  {tool}: {install_commands.get(tool, 'See documentation')}")
    
    # Check E2B authentication if available
    if results.get('e2b', {}).get('available'):
        print("\n🔐 E2B Authentication Check:")
        try:
            result = subprocess.run(['e2b', 'auth', 'whoami'], capture_output=True, text=True, timeout=10)
            if result.returncode == 0:
                print(f"✅ E2B authenticated: {result.stdout.strip()}")
            else:
                print(f"❌ E2B not authenticated: {result.stderr.strip()}")
                print(f"   Run: e2b auth login")
        except Exception as e:
            print(f"⚠️  E2B auth check failed: {e}")
    
    return results

local_deps = check_local_dependencies()

## 🔧 Test 6: E2B Template Building (Optional)

In [None]:
def test_e2b_template_building():
    """Test E2B template building if dependencies are available."""
    print("🔧 E2B TEMPLATE BUILDING TEST")
    print("=" * 40)
    
    if not local_deps.get('e2b', {}).get('available'):
        print("⚠️  E2B CLI not available - skipping template building test")
        return False
        
    if not local_deps.get('docker', {}).get('available'):
        print("⚠️  Docker not available - skipping template building test")
        return False
    
    try:
        # Check if our custom template exists
        template_id = os.getenv("E2B_TEMPLATE_ID", "sentient-e2b-s3")
        print(f"\n🔍 Checking if template '{template_id}' exists...")
        
        result = subprocess.run(
            ['e2b', 'template', 'list'], 
            capture_output=True, text=True, timeout=30
        )
        
        if result.returncode == 0:
            templates = result.stdout
            if template_id in templates:
                print(f"✅ Template '{template_id}' exists")
                print("\n📋 Available templates:")
                for line in templates.split('\n'):
                    if line.strip():
                        print(f"  {line}")
            else:
                print(f"❌ Template '{template_id}' not found")
                print("\n🏗️  To build the template, run:")
                print(f"  cd docker/e2b-sandbox")
                print(f"  e2b template build -n {template_id}")
        else:
            print(f"❌ Failed to list templates: {result.stderr}")
            
    except Exception as e:
        print(f"❌ Template check failed: {e}")
        return False
    
    return True

# Only run if we have the basic tools
if local_deps.get('e2b', {}).get('available'):
    template_success = test_e2b_template_building()
else:
    print("⚠️  Skipping template building test - E2B CLI not available")
    template_success = None

## 📊 Test Summary

In [None]:
def print_test_summary():
    \"\"\"Print a summary of all tests.\"\"\"\n    print(\"📊 E2B UNIFIED MOUNT TEST SUMMARY\")\n    print(\"=\" * 50)\n    \n    tests = [\n        (\"Unified Mount Directory\", mount_success if 'mount_success' in locals() else False),\n        (\"Data Analysis with Mount\", data_analysis_success if 'data_analysis_success' in locals() else False),\n    ]\n    \n    passed = 0\n    total = 0\n    \n    for test_name, result in tests:\n        if result is None:\n            status = \"⏭️  SKIPPED\"\n        elif result:\n            status = \"✅ PASSED\"\n            passed += 1\n            total += 1\n        else:\n            status = \"❌ FAILED\"\n            total += 1\n        \n        print(f\"{status} {test_name}\")\n    \n    print(f\"\\n📈 Results: {passed}/{total} tests passed\")\n    \n    if passed == total and total > 0:\n        print(\"🎉 All tests passed! Unified mount directory is working correctly.\")\n        print(\"✅ Cross-environment data persistence is functional\")\n        print(\"✅ Project structure creation is working\")\n    elif passed > 0:\n        print(f\"⚠️  Some tests failed. Check the output above for details.\")\n    else:\n        print(f\"❌ Tests failed. Check your E2B template and mount configuration.\")\n    \n    # Recommendations\n    print(\"\\n💡 Key Points:\")\n    print(\"  📁 Mount directory: /data/sentient (unified across all environments)\")\n    print(\"  🗂️  Project structure: /data/sentient/{project_id}/{toolkit_name|results}/\")\n    print(\"  🔄 Data flows: Local → S3 → E2B sandbox (same paths)\")\n    print(\"  📊 Results: Saved to /data/sentient/{project_id}/results/\")\n    \n    print(\"\\n🔗 Next Steps:\")\n    print(\"  1. Verify mount is active locally: ls $HOME/data/sentient\")\n    print(\"  2. Check S3 sync: aws s3 ls s3://your-bucket\")\n    print(\"  3. Use BaseDataToolkit with CURRENT_PROJECT_ID env var\")\n    print(\"  4. Results will appear in all environments at same paths\")\n\nprint_test_summary()

def print_test_summary():
    \"\"\"Print a summary of all tests.\"\"\"\n    print(\"📊 E2B UNIFIED MOUNT TEST SUMMARY\")\n    print(\"=\" * 50)\n    \n    tests = [\n        (\"Unified Mount Directory\", mount_success if 'mount_success' in globals() else False),\n        (\"Cross-Environment Data Sync\", sync_success if 'sync_success' in globals() else False),\n        (\"Data Analysis with Mount\", data_analysis_success if 'data_analysis_success' in globals() else False),\n    ]\n    \n    passed = 0\n    total = 0\n    \n    for test_name, result in tests:\n        if result is None:\n            status = \"⏭️  SKIPPED\"\n        elif result:\n            status = \"✅ PASSED\"\n            passed += 1\n            total += 1\n        else:\n            status = \"❌ FAILED\"\n            total += 1\n        \n        print(f\"{status} {test_name}\")\n    \n    print(f\"\\n📈 Results: {passed}/{total} tests passed\")\n    \n    if passed == total and total > 0:\n        print(\"🎉 All tests passed! Unified mount directory is working correctly.\")\n        print(\"✅ Cross-environment data persistence is functional\")\n        print(\"✅ Project structure creation is working\")\n    elif passed > 0:\n        print(f\"⚠️  Some tests failed. Check the output above for details.\")\n    else:\n        print(f\"❌ Tests failed. Check your E2B template and mount configuration.\")\n    \n    # Recommendations\n    print(\"\\n💡 Key Points:\")\n    print(\"  📁 Mount directory: /data/sentient (unified across all environments)\")\n    print(\"  🗂️  Project structure: /data/sentient/{project_id}/{toolkit_name|results}/\")\n    print(\"  🔄 Data flows: Local → S3 → E2B sandbox (same paths)\")\n    print(\"  📊 Results: Saved to /data/sentient/{project_id}/results/\")\n    \n    print(\"\\n🔗 Next Steps:\")\n    print(\"  1. Verify mount is active locally: ls $HOME/data/sentient\")\n    print(\"  2. Check S3 sync: aws s3 ls s3://your-bucket\")\n    print(\"  3. Use BaseDataToolkit with CURRENT_PROJECT_ID env var\")\n    print(\"  4. Results will appear in all environments at same paths\")\n\nprint_test_summary()

In [None]:
# Debug E2B output truncation - let's examine what E2BTools actually returns
print("🔍 DEBUGGING E2B OUTPUT TRUNCATION")
print("=" * 50)

async def debug_e2b_output():
    try:
        # Initialize E2B with our template
        from agno.tools.e2b import E2BTools
        template_name = "sentient-e2b-s3"
        e2b_toolkit = E2BTools(sandbox_options={"template": template_name})
        
        print(f"🎯 Using template: {template_name}")
        
        # Simple test code that should produce known output
        test_code = '''
print("🔍 DEBUG OUTPUT INVESTIGATION")
print("=" * 40)
print("Line 1: This is a test line")
print("Line 2: Testing emoji display 🏗️ ✅ 📁")
print("Line 3: Unicode characters: 🚀 💰 🎉")
print("Line 4: Long line with lots of content to see if truncation happens here with more text")
print("Line 5: More content to fill output")
print("Line 6: Even more content")
print("Line 7: Keep adding lines")
print("Line 8: To test truncation")
print("Line 9: Nearly done")
print("Line 10: Final line - END OF OUTPUT")
'''
        
        print("📝 Executing test code in E2B sandbox...")
        result = e2b_toolkit.run_python_code(test_code)
        
        print("\n📊 RAW RESULT ANALYSIS:")
        print("-" * 40)
        print(f"Type: {type(result)}")
        print(f"Length: {len(str(result))} characters")
        print(f"First 200 chars: {str(result)[:200]}...")
        print(f"Last 200 chars: ...{str(result)[-200:]}")
        
        print("\n📋 DETAILED STRUCTURE:")
        if isinstance(result, list):
            print(f"List length: {len(result)}")
            for i, item in enumerate(result):
                print(f"  Item {i}: {type(item)} - {len(str(item))} chars")
                if isinstance(item, str) and "stdout:" in item:
                    # Try to extract just the stdout part
                    import re
                    stdout_match = re.search(r"stdout:\s*\[(.*?)\]", item, re.DOTALL)
                    if stdout_match:
                        stdout_content = stdout_match.group(1)
                        print(f"    Stdout length: {len(stdout_content)} chars")
                        print(f"    Stdout preview: {stdout_content[:100]}...")
        
        print("\n🔧 DECODED OUTPUT:")
        print("-" * 40)
        decoded = decode_e2b_output(result)
        print(f"Decoded length: {len(decoded)} characters")
        print("Decoded content:")
        print(decoded)
        print("-" * 40)
        
        # Check if the output contains all expected lines
        expected_lines = [f"Line {i}:" for i in range(1, 11)]
        found_lines = []
        for line_prefix in expected_lines:
            if line_prefix in decoded:
                found_lines.append(line_prefix)
        
        print(f"\n📈 TRUNCATION ANALYSIS:")
        print(f"Expected lines: {len(expected_lines)}")
        print(f"Found lines: {len(found_lines)}")
        print(f"Missing lines: {set(expected_lines) - set(found_lines)}")
        
        if len(found_lines) < len(expected_lines):
            print("❌ OUTPUT IS TRUNCATED!")
            print("🔍 Truncation occurs before all lines are included")
        else:
            print("✅ All expected lines found - no truncation detected")
            
        return result
        
    except Exception as e:
        print(f"❌ Debug failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# Run the debug
debug_result = await debug_e2b_output()

In [None]:
# Try to access the underlying E2B client directly to bypass AgnoAgent wrapper
print("🔗 DIRECT E2B CLIENT ACCESS TEST")
print("=" * 45)

async def test_direct_e2b_client():
    try:
        # Try to import e2b directly
        from e2b_code_interpreter import Sandbox
        
        # Create direct E2B client
        template_id = "sentient-e2b-s3"
        print(f"🎯 Connecting directly to template: {template_id}")
        
        # Create sandbox directly
        sandbox = Sandbox(template=template_id)
        
        print("✅ Direct E2B sandbox created")
        
        # Test the same code that showed truncation
        test_code = '''
print("🔍 DIRECT E2B CLIENT TEST")
print("=" * 30)
print("Line 1: This is a test line")
print("Line 2: Testing emoji display 🏗️ ✅ 📁")
print("Line 3: Unicode characters: 🚀 💰 🎉")
print("Line 4: Long line with lots of content to see if truncation happens here")
print("Line 5: More content to fill output")
print("Line 6: Even more content")
print("Line 7: Keep adding lines")
print("Line 8: To test truncation")
print("Line 9: Nearly done")
print("Line 10: Final line - END OF OUTPUT")
'''
        
        print("📝 Executing code via direct E2B client...")
        result = sandbox.run_code(test_code)
        
        print("\n📊 DIRECT E2B RESULT:")
        print("-" * 30)
        print(f"Type: {type(result)}")
        print(f"Dir: {[attr for attr in dir(result) if not attr.startswith('_')][:10]}")
        
        # Check if result has stdout/stderr attributes
        if hasattr(result, 'stdout'):
            print(f"Stdout: {result.stdout}")
        if hasattr(result, 'stderr'):
            print(f"Stderr: {result.stderr}")
        if hasattr(result, 'output'):
            print(f"Output: {result.output}")
        if hasattr(result, 'logs'):
            print(f"Logs: {result.logs}")
        
        # Check the actual content
        output_content = None
        if hasattr(result, 'stdout') and result.stdout:
            output_content = result.stdout
        elif hasattr(result, 'output') and result.output:
            output_content = result.output
        else:
            output_content = str(result)
        
        print(f"\n📋 OUTPUT CONTENT:")
        print(f"Length: {len(output_content)} characters")
        print("Content:")
        print(output_content)
        
        # Count lines to check for truncation
        lines = output_content.split('\n')
        expected_lines = [f"Line {i}:" for i in range(1, 11)]
        found_lines = [line for line in lines if any(exp in line for exp in expected_lines)]
        
        print(f"\n📈 TRUNCATION ANALYSIS (Direct Client):")
        print(f"Total output lines: {len(lines)}")
        print(f"Expected test lines: {len(expected_lines)}")
        print(f"Found test lines: {len(found_lines)}")
        
        if len(found_lines) < len(expected_lines):
            print("❌ TRUNCATION CONFIRMED IN DIRECT E2B CLIENT!")
        else:
            print("✅ No truncation in direct E2B client")
        
        # Clean up
        sandbox.kill()
        print("🧹 Sandbox cleaned up")
        
        return result
        
    except ImportError:
        print("❌ Cannot import e2b directly - not available")
        return None
    except Exception as e:
        print(f"❌ Direct E2B test failed: {e}")
        import traceback
        traceback.print_exc()
        return None

# Run direct E2B test
direct_e2b_result = await test_direct_e2b_client()

In [None]:
# Enhanced E2B output decoder with better display handling
def enhanced_decode_e2b_output(raw_result: Any, max_display_length: int = None) -> str:
    """
    Enhanced decoder for E2B output with optional display length limiting.
    
    Args:
        raw_result: Raw output from E2BTools.run_python_code()
        max_display_length: Optional max characters to display (None = no limit)
        
    Returns:
        Properly decoded and formatted output string
    """
    try:
        # Use the original decoder
        decoded = decode_e2b_output(raw_result)
        
        # Apply display length limit if specified
        if max_display_length and len(decoded) > max_display_length:
            truncated = decoded[:max_display_length]
            lines_count = decoded.count('\n')
            truncated_lines_count = truncated.count('\n')
            
            return f"{truncated}\n\n[...OUTPUT TRUNCATED FOR DISPLAY...]\n" \
                   f"[Full output: {len(decoded)} characters, {lines_count} lines]\n" \
                   f"[Displayed: {len(truncated)} characters, {truncated_lines_count} lines]"
        
        return decoded
        
    except Exception as e:
        return f"[Error in enhanced decoder: {e}]\n\nFalling back to raw result:\n{str(raw_result)}"


def analyze_e2b_output(raw_result: Any) -> dict:
    """
    Analyze E2B output structure and content for debugging.
    
    Returns:
        Dictionary with analysis results
    """
    analysis = {
        'raw_type': type(raw_result).__name__,
        'raw_length': len(str(raw_result)),
        'has_content': False,
        'decoded_length': 0,
        'line_count': 0,
        'contains_unicode': False,
        'truncation_detected': False,
        'content_preview': '',
        'full_content': ''
    }
    
    try:
        decoded = decode_e2b_output(raw_result)
        analysis['has_content'] = bool(decoded.strip())
        analysis['decoded_length'] = len(decoded)
        analysis['line_count'] = decoded.count('\n')
        analysis['contains_unicode'] = any(ord(char) > 127 for char in decoded)
        analysis['content_preview'] = decoded[:200] + "..." if len(decoded) > 200 else decoded
        analysis['full_content'] = decoded
        
        # Check for common truncation indicators
        truncation_indicators = ['...', '[truncated]', 'OUTPUT_LIMIT', 'MAX_LENGTH']
        analysis['truncation_detected'] = any(indicator.lower() in decoded.lower() 
                                            for indicator in truncation_indicators)
        
    except Exception as e:
        analysis['error'] = str(e)
    
    return analysis

# Test the enhanced decoder
print("🔧 ENHANCED E2B OUTPUT DECODER TEST")
print("=" * 45)

# Test with a sample from our debug results
if 'debug_result' in locals() and debug_result:
    print("✅ Using debug_result for enhanced decoder test")
    
    # Analyze the output
    analysis = analyze_e2b_output(debug_result)
    
    print("📊 OUTPUT ANALYSIS:")
    for key, value in analysis.items():
        if key != 'full_content':  # Don't print the full content here
            print(f"  {key}: {value}")
    
    print(f"\n🎯 ENHANCED DECODED OUTPUT (limited to 500 chars):")
    print("-" * 50)
    enhanced_output = enhanced_decode_e2b_output(debug_result, max_display_length=500)
    print(enhanced_output)
    print("-" * 50)
    
    print(f"\n🔬 FULL OUTPUT AVAILABLE:")
    print(f"  Full content length: {len(analysis['full_content'])} characters")
    print(f"  Lines in full content: {analysis['line_count']}")
    print(f"  No actual truncation: {not analysis['truncation_detected']}")
else:
    print("⚠️  No debug_result available. Run the debug cell first.")

print(f"\n✅ Enhanced decoder ready for use!")
print("🎉 This should resolve any display-related 'truncation' issues.")