# Goals 目標
 1. 實作安全的檔案查找與讀取工具（白名單路徑保護）
 2. 支援多種格式：txt/md/json/csv/yaml 等文字檔案
 3. 建立檔案索引與快速查詢功能（避免目錄遍歷攻擊）
 4. 整合到 function calling 架構中
 5. 提供檔案內容搜尋與摘要功能

# Prerequisites 前置需求
 - 完成 nb20-nb23 (function calling format, calculator, search, extraction)
 - 理解 pydantic schema validation 
 - 基本檔案系統安全概念
 - 路徑正規化與白名單設計原則

In [1]:
# nb24_file_lookup_tool.ipynb
# Stage 3: 檔案索引/讀取工具（路徑白名單）

# Cell1:  Shared Cache Bootstrap
import os, pathlib, torch
import sys
from datetime import datetime

# Shared cache configuration (複製到每本 notebook)
AI_CACHE_ROOT = os.getenv("AI_CACHE_ROOT", "../ai_warehouse/cache")

for k, v in {
    "HF_HOME": f"{AI_CACHE_ROOT}/hf",
    "TRANSFORMERS_CACHE": f"{AI_CACHE_ROOT}/hf/transformers",
    "HF_DATASETS_CACHE": f"{AI_CACHE_ROOT}/hf/datasets",
    "HUGGINGFACE_HUB_CACHE": f"{AI_CACHE_ROOT}/hf/hub",
    "TORCH_HOME": f"{AI_CACHE_ROOT}/torch",
}.items():
    os.environ[k] = v
    pathlib.Path(v).mkdir(parents=True, exist_ok=True)
print("[Cache]", AI_CACHE_ROOT, "| GPU:", torch.cuda.is_available())

[Cache] ../ai_warehouse/cache | GPU: True


In [None]:
# ============================================================================
# Cell 2: Import Dependencies & Setup
# ============================================================================

import json
import yaml
import csv
import re
from pathlib import Path
from typing import List, Dict, Optional, Union
from pydantic import BaseModel, Field, validator
import hashlib
from datetime import datetime
import mimetypes

# For CSV reading
import pandas as pd


# File type detection
def detect_file_type(filepath: Path) -> str:
    """Detect file type based on extension and content"""
    suffix = filepath.suffix.lower()
    mime_type, _ = mimetypes.guess_type(str(filepath))

    if suffix in [".txt", ".md", ".markdown"]:
        return "text"
    elif suffix in [".json"]:
        return "json"
    elif suffix in [".yaml", ".yml"]:
        return "yaml"
    elif suffix in [".csv"]:
        return "csv"
    elif suffix in [".py"]:
        return "python"
    elif suffix in [".log"]:
        return "log"
    else:
        return "unknown"


print("✅ Dependencies imported")

In [None]:
# ============================================================================
# Cell 3: File Lookup Tool Schema & Validation
# ============================================================================


class FileLookupArgs(BaseModel):
    """File lookup tool arguments with security validation"""

    action: str = Field(..., description="Action: 'search', 'read', 'list', 'info'")
    path: Optional[str] = Field(None, description="Target file/directory path")
    pattern: Optional[str] = Field(
        None, description="Search pattern (filename or content)"
    )
    max_results: int = Field(10, description="Maximum results to return")
    content_preview: bool = Field(
        True, description="Include content preview in results"
    )

    @validator("action")
    def validate_action(cls, v):
        allowed = ["search", "read", "list", "info"]
        if v not in allowed:
            raise ValueError(f"Action must be one of: {allowed}")
        return v

    @validator("path")
    def validate_path(cls, v):
        if v is None:
            return v
        # Normalize path and prevent directory traversal
        normalized = str(Path(v).resolve())
        if ".." in normalized or normalized.startswith("/"):
            raise ValueError("Invalid path: directory traversal not allowed")
        return normalized


class FileInfo(BaseModel):
    """File information structure"""

    path: str
    name: str
    size: int
    type: str
    modified: str
    preview: Optional[str] = None
    hash: Optional[str] = None


# Test schema validation
test_args = FileLookupArgs(action="search", pattern="*.md")
print("✅ Schema validation working:", test_args.dict())

In [None]:
# ============================================================================
# Cell 4: Safe File Lookup Core Implementation
# ============================================================================


class SafeFileLookup:
    """Safe file lookup tool with whitelist protection"""

    def __init__(self, whitelist_dirs: List[str] = None):
        # Default whitelist: only allow specific directories
        self.whitelist_dirs = whitelist_dirs or [
            "data/docs",
            "data/samples",
            "outs/reports",
            "configs",
            "README.md",
        ]

        # Convert to resolved paths for security
        self.safe_paths = []
        for dir_path in self.whitelist_dirs:
            try:
                resolved = Path(dir_path).resolve()
                self.safe_paths.append(resolved)
                # Ensure directory exists
                if resolved.suffix == "":  # is directory
                    resolved.mkdir(parents=True, exist_ok=True)
            except Exception as e:
                print(f"⚠️ Warning: Cannot resolve {dir_path}: {e}")

    def _is_path_safe(self, target_path: Path) -> bool:
        """Check if target path is within whitelist"""
        try:
            resolved_target = target_path.resolve()

            for safe_path in self.safe_paths:
                try:
                    # Check if target is safe_path itself or under safe_path
                    if resolved_target == safe_path:
                        return True
                    if safe_path.is_dir() and resolved_target.is_relative_to(safe_path):
                        return True
                except (ValueError, OSError):
                    continue
            return False
        except Exception:
            return False

    def _read_file_content(self, filepath: Path, max_chars: int = 2000) -> str:
        """Safely read file content with size limit"""
        try:
            if filepath.stat().st_size > 1024 * 1024:  # 1MB limit
                return "[File too large (>1MB)]"

            # Try different encodings
            for encoding in ["utf-8", "utf-8-sig", "gbk", "big5"]:
                try:
                    with open(filepath, "r", encoding=encoding) as f:
                        content = f.read(max_chars)
                        if len(content) == max_chars:
                            content += "...[truncated]"
                        return content
                except UnicodeDecodeError:
                    continue

            # If all text encodings fail, try as binary
            with open(filepath, "rb") as f:
                raw = f.read(min(max_chars, 1000))
                return f"[Binary file, {len(raw)} bytes]: " + str(raw[:100])

        except Exception as e:
            return f"[Error reading file: {e}]"

    def search_files(
        self, pattern: str = "*", content_search: str = None
    ) -> List[FileInfo]:
        """Search files by pattern or content"""
        results = []

        for safe_dir in self.safe_paths:
            if not safe_dir.exists():
                continue

            try:
                # Search by filename pattern
                if safe_dir.is_file():
                    files = [safe_dir] if safe_dir.match(pattern) else []
                else:
                    files = list(safe_dir.rglob(pattern))

                for filepath in files:
                    if not filepath.is_file():
                        continue

                    # Check content search if specified
                    if content_search:
                        content = self._read_file_content(filepath, max_chars=5000)
                        if content_search.lower() not in content.lower():
                            continue

                    # Build file info
                    stat = filepath.stat()
                    file_info = FileInfo(
                        path=str(filepath),
                        name=filepath.name,
                        size=stat.st_size,
                        type=detect_file_type(filepath),
                        modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
                        preview=self._read_file_content(filepath, max_chars=200),
                    )
                    results.append(file_info)

            except Exception as e:
                print(f"⚠️ Search error in {safe_dir}: {e}")

        return sorted(results, key=lambda x: x.modified, reverse=True)


# Initialize lookup tool
file_lookup = SafeFileLookup()
print("✅ SafeFileLookup initialized")
print("📁 Whitelist paths:", [str(p) for p in file_lookup.safe_paths])

In [None]:
# ============================================================================
# Cell 5: Function Calling Interface
# ============================================================================


def file_lookup_tool(args_dict: dict) -> dict:
    """Main file lookup tool function for agent calling"""

    try:
        # Validate arguments
        args = FileLookupArgs(**args_dict)

        if args.action == "list":
            # List files in whitelist directories
            files = file_lookup.search_files("*")
            return {
                "status": "success",
                "action": "list",
                "count": len(files),
                "files": [f.dict() for f in files[: args.max_results]],
            }

        elif args.action == "search":
            # Search by pattern or content
            if not args.pattern:
                return {"status": "error", "message": "Pattern required for search"}

            files = file_lookup.search_files(pattern=args.pattern)
            return {
                "status": "success",
                "action": "search",
                "pattern": args.pattern,
                "count": len(files),
                "files": [f.dict() for f in files[: args.max_results]],
            }

        elif args.action == "read":
            # Read specific file
            if not args.path:
                return {"status": "error", "message": "Path required for read"}

            target_path = Path(args.path)
            if not file_lookup._is_path_safe(target_path):
                return {"status": "error", "message": "Path not in whitelist"}

            if not target_path.exists():
                return {"status": "error", "message": "File not found"}

            content = file_lookup._read_file_content(target_path)
            stat = target_path.stat()

            return {
                "status": "success",
                "action": "read",
                "path": str(target_path),
                "size": stat.st_size,
                "type": detect_file_type(target_path),
                "content": content,
            }

        elif args.action == "info":
            # Get file information only
            if not args.path:
                return {"status": "error", "message": "Path required for info"}

            target_path = Path(args.path)
            if not file_lookup._is_path_safe(target_path):
                return {"status": "error", "message": "Path not in whitelist"}

            if not target_path.exists():
                return {"status": "error", "message": "File not found"}

            stat = target_path.stat()
            file_info = FileInfo(
                path=str(target_path),
                name=target_path.name,
                size=stat.st_size,
                type=detect_file_type(target_path),
                modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
                hash=hashlib.md5(target_path.read_bytes()).hexdigest()[:16],
            )

            return {"status": "success", "action": "info", "file": file_info.dict()}

        else:
            return {"status": "error", "message": f"Unknown action: {args.action}"}

    except Exception as e:
        return {"status": "error", "message": f"Tool error: {str(e)}"}


# Register tool for agent framework
TOOL_REGISTRY = {
    "file_lookup": {
        "function": file_lookup_tool,
        "schema": FileLookupArgs,
        "description": "Search, read, and get info about files in whitelisted directories",
    }
}

print("✅ File lookup tool registered")

In [None]:
# ============================================================================
# Cell 6: Create Sample Data for Testing
# ============================================================================

# Create sample files for testing
sample_dir = Path("data/docs")
sample_dir.mkdir(parents=True, exist_ok=True)

# Sample 1: README
readme_content = """# 專案文檔

## 簡介
這是一個 LLM × RAG × Agent 的測試專案。

## 主要功能
- 文檔處理與索引
- 檢索增強生成
- 多代理協作

## 使用方法
1. 設定環境變數
2. 載入模型
3. 建立索引
4. 開始對話

## 注意事項
- 確保 GPU 記憶體充足
- 模型檔案存放在 cache 目錄
"""

with open(sample_dir / "README.md", "w", encoding="utf-8") as f:
    f.write(readme_content)

# Sample 2: Config YAML
config_content = """
model:
  name: "Qwen2.5-7B-Instruct"
  max_tokens: 2048
  temperature: 0.7

rag:
  chunk_size: 800
  overlap: 80
  top_k: 5

agent:
  roles: ["researcher", "planner", "writer", "reviewer"]
  max_iterations: 5
"""

with open(sample_dir / "config.yaml", "w", encoding="utf-8") as f:
    f.write(config_content)

# Sample 3: Sample data CSV
import csv

csv_path = sample_dir / "sample_data.csv"
with open(csv_path, "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["name", "type", "score", "description"])
    writer.writerow(["RAG", "技術", "95", "檢索增強生成"])
    writer.writerow(["Agent", "架構", "90", "多代理系統"])
    writer.writerow(["LLM", "模型", "88", "大型語言模型"])

print("✅ Sample files created:")
for file in sample_dir.iterdir():
    print(f"  📄 {file.name} ({file.stat().st_size} bytes)")

In [None]:
# ============================================================================
# Cell 7: Smoke Test - Basic Tool Operations
# ============================================================================

print("🧪 Smoke Test: File Lookup Tool")

# Test 1: List all files
print("\n=== Test 1: List Files ===")
result = file_lookup_tool({"action": "list", "max_results": 5})
print(f"Status: {result['status']}")
print(f"Found {result.get('count', 0)} files")
if result["status"] == "success":
    for file in result["files"][:2]:
        print(f"  📄 {file['name']} ({file['size']} bytes, {file['type']})")

# Test 2: Search for markdown files
print("\n=== Test 2: Search Markdown ===")
result = file_lookup_tool({"action": "search", "pattern": "*.md"})
print(f"Status: {result['status']}")
if result["status"] == "success":
    print(f"Found {result['count']} .md files")
    for file in result["files"]:
        print(f"  📄 {file['name']}: {file['preview'][:50]}...")

# Test 3: Read specific file
print("\n=== Test 3: Read Config ===")
result = file_lookup_tool({"action": "read", "path": "data/docs/config.yaml"})
print(f"Status: {result['status']}")
if result["status"] == "success":
    print(f"File: {result['path']}")
    print(f"Type: {result['type']}")
    print(f"Content preview:\n{result['content'][:200]}...")

# Test 4: Security test (should fail)
print("\n=== Test 4: Security Check ===")
result = file_lookup_tool({"action": "read", "path": "../../../etc/passwd"})
print(f"Status: {result['status']} (should be 'error')")
print(f"Message: {result.get('message', 'N/A')}")

print("\n✅ All smoke tests completed!")

In [None]:
# ============================================================================
# Cell 8: Integration with ReAct Agent Pattern
# ============================================================================


def simple_react_with_file_lookup(query: str, max_iterations: int = 3) -> str:
    """Simple ReAct pattern with file lookup capability"""

    # Mock LLM response generator (replace with real LLM)
    def mock_llm_response(prompt: str) -> str:
        """Mock LLM that recognizes file lookup needs"""
        if "檔案" in query or "文件" in query or "file" in query.lower():
            return """Thought: User is asking about files. I should search for relevant files first.

Action: file_lookup
Args: {"action": "search", "pattern": "*", "max_results": 5}"""

        elif "讀取" in query or "read" in query.lower():
            return """Thought: User wants to read a specific file. I should list available files first.

Action: file_lookup
Args: {"action": "list", "max_results": 10}"""

        else:
            return f"Answer: I can help you with file operations. Available actions: search, read, list, info."

    conversation = f"User Query: {query}\n\n"

    for i in range(max_iterations):
        # Get LLM response
        llm_output = mock_llm_response(query)
        conversation += f"Iteration {i+1}:\n{llm_output}\n\n"

        # Parse action if present
        if "Action: file_lookup" in llm_output:
            # Extract args (simple parsing)
            import re

            args_match = re.search(r"Args: ({.*?})", llm_output, re.DOTALL)
            if args_match:
                try:
                    args = json.loads(args_match.group(1))
                    result = file_lookup_tool(args)
                    conversation += f"Observation: {json.dumps(result, ensure_ascii=False, indent=2)}\n\n"

                    # Simple follow-up logic
                    if result["status"] == "success" and result.get("files"):
                        files_summary = []
                        for file in result["files"][:3]:
                            files_summary.append(f"- {file['name']} ({file['type']})")

                        answer = f"找到以下檔案：\n" + "\n".join(files_summary)
                        if len(result["files"]) > 3:
                            answer += f"\n... 還有 {len(result['files']) - 3} 個檔案"

                        conversation += f"Answer: {answer}\n"
                        break

                except Exception as e:
                    conversation += f"Observation: Error parsing args: {e}\n\n"
        else:
            # No action, return answer
            break

    return conversation


# Test ReAct integration
print("🤖 Testing ReAct + File Lookup Integration")

test_query = "幫我找一下專案中有哪些文件"
result = simple_react_with_file_lookup(test_query)
print("Query:", test_query)
print("\nReAct Process:")
print(result)

In [None]:
# ============================================================================
# Cell 9: Key Parameters & Low-VRAM Considerations
# ============================================================================

print("⚙️ Key Parameters & Low-VRAM Options")

# File lookup configuration
FILE_LOOKUP_CONFIG = {
    # Security settings
    "whitelist_dirs": ["data/docs", "data/samples", "outs/reports", "configs"],
    # Performance settings
    "max_file_size_mb": 1,  # Max file size to read
    "max_content_chars": 2000,  # Max characters to read per file
    "max_search_results": 10,  # Max files to return in search
    # Memory optimization
    "use_streaming_read": True,  # For large files
    "cache_file_info": False,  # Don't cache to save RAM
    "lazy_content_loading": True,  # Only load content when needed
}

print("Configuration:", json.dumps(FILE_LOOKUP_CONFIG, indent=2))

# Low-VRAM considerations:
print("\n💾 Low-VRAM Considerations:")
print("- File content is read in chunks, not loaded entirely into memory")
print("- Large files (>1MB) are automatically truncated or skipped")
print("- No persistent caching to minimize RAM usage")
print("- Content preview is limited to first 200 chars")
print("- Use streaming reads for CSV/JSON parsing when possible")

In [None]:
# ============================================================================
# Cell 10: When to Use This & Next Steps
# ============================================================================

print("📋 When to Use File Lookup Tool:")
print()
print("✅ Suitable scenarios:")
print("- Reading configuration files (YAML, JSON)")
print("- Searching project documentation")
print("- Loading sample data for processing")
print("- Accessing structured data files (CSV, logs)")
print("- Building knowledge bases from local files")
print()
print("⚠️ Limitations:")
print("- Only works with whitelisted directories (security)")
print("- Large files (>1MB) are truncated")
print("- No real-time file watching")
print("- Limited to text-based formats")
print()
print("🔄 Integration points:")
print("- RAG systems: Load documents for indexing")
print("- Agent workflows: Access configuration and data")
print("- Research tasks: Find and read relevant documents")
print("- File-based knowledge retrieval")

print("\n🚀 Next Steps (nb25 onwards):")
print("- nb25: ReAct Pattern Implementation")
print("- nb26: Tool Router & Security Guards")
print("- nb27: Schema Validation & Auto-Retry")
print("- nb28: Multi-step Plan-Execute with File Access")

# Save notebook state summary
summary = {
    "notebook": "nb24_file_lookup_tool",
    "completed_features": [
        "Safe file lookup with whitelist protection",
        "Multiple file formats support (txt/md/json/yaml/csv)",
        "Function calling integration",
        "Security validation against directory traversal",
        "ReAct pattern integration demo",
    ],
    "files_created": [
        "data/docs/README.md",
        "data/docs/config.yaml",
        "data/docs/sample_data.csv",
    ],
    "next_notebook": "nb25_react_pattern_minimal",
}

print(f"\n📝 Notebook Summary: {json.dumps(summary, ensure_ascii=False, indent=2)}")