In [None]:
#|default_exp rag

# RAG Support for ShellSage

Implementing RAG functionality using local man pages.

In [None]:
#|export
__all__ = ['get_man_pages', 'read_man_page', 'chunk_text', 'get_embeddings', 'init_db', 'create_chunks_table', 'index_cmd', 'search_cmd']

import lancedb
from chonkie import RecursiveChunker
from model2vec import StaticModel
import subprocess
import re
import logging
import os
import shutil
import pyarrow as pa

logging.basicConfig(level=logging.INFO)

In [None]:
#|export
def get_man_pages():
    """Get all available man pages on the system."""
    result = subprocess.run(['apropos', '-l', '.'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    pages = []
    
    for line in result.stdout.splitlines():
        if not line.strip(): continue
        try:
            name, section = line.split('(', 1)
            section = section.split(')', 1)[0]
            name = name.strip()
            
            path = subprocess.check_output(['man', '-w', name], text=True).strip()
            if os.path.exists(path):
                pages.append({
                    'title': name,
                    'section': section,
                    'path': path
                })
        except Exception as e:
            logging.debug(f"Skipping line {line}: {str(e)}")
            continue
            
    return pages

In [None]:
#|export
def read_man_page(path):
    """Read a man page and return its text content."""
    try:
        result = subprocess.run(['man', path], capture_output=True, text=True)
        return result.stdout if result.returncode == 0 else ""
    except Exception as e:
        logging.warning(f"Failed to read man page {path}: {str(e)}")
        return ""

In [None]:
#|export
def chunk_text(text):
    """Chunk text using Chonkie's RecursiveChunker."""
    chunker = RecursiveChunker(
        tokenizer="gpt2",
        chunk_size=512,
        min_characters_per_chunk=12
    )
    chunks = chunker.chunk(text)
    return [chunk.text for chunk in chunks]

In [None]:
#|export
def get_embeddings(texts):
    """Get embeddings using Model2Vec."""
    model = StaticModel.from_pretrained("minishlab/M2V_base_output")
    vectors = model.encode(texts)
    return [vector.tolist() for vector in vectors]

In [None]:
#|export
def init_db(db_path="man_index.lance"):
    """Initialize or open a LanceDB database."""
    return lancedb.connect(db_path)

def create_chunks_table(db):
    """Create or replace the man page chunks table."""
    schema = pa.schema([
        ('title', pa.string()),
        ('section', pa.string()),
        ('chunk', pa.string()),
        ('vector', pa.list_(pa.float32(), 256))
    ])
    return db.create_table("man_chunks", schema=schema, mode="overwrite")

In [None]:
#|export
def index_cmd(db_path="man_index.lance"):
    """Index all man pages into the vector database."""
    db = init_db(db_path)
    table = create_chunks_table(db)
    
    pages = get_man_pages()
    logging.info(f"Found {len(pages)} man pages to index")
    
    for page in pages:
        try:
            logging.info(f"Processing {page['title']}({page['section']})")
            text = read_man_page(page['path'])
            if not text:
                logging.warning(f"Empty content for {page['title']}({page['section']})")
                continue
                
            chunks = chunk_text(text)
            if not chunks:
                logging.warning(f"No chunks created for {page['title']}({page['section']})")
                continue
                
            vectors = get_embeddings(chunks)
            
            data = [{
                "title": page['title'],
                "section": page['section'],
                "chunk": chunk,
                "vector": vector
            } for chunk, vector in zip(chunks, vectors)]
            
            if data:
                table.add(data)
                logging.info(f"Added {len(data)} chunks for {page['title']}({page['section']})")
        except Exception as e:
            logging.error(f"Failed to process {page['title']}({page['section']}): {str(e)}")
            continue

In [None]:
#|export
def search_cmd(query, top_k=5, db_path="man_index.lance"):
    """Search indexed man pages for relevant information."""
    db = init_db(db_path)
    table = db.open_table("man_chunks")
    
    model = StaticModel.from_pretrained("minishlab/M2V_base_output")
    query_vector = model.encode([query])[0].tolist()
    
    results = table.search(query_vector).limit(top_k).to_list()
    
    for result in results:
        print(f"=== {result['title']}({result['section']}) ===")
        print(result['chunk'])
        print(f"Similarity score: {result['_distance']}")
    
    return results

## Tests

In [None]:
#|test
def test_get_man_pages():
    pages = get_man_pages()
    assert len(pages) > 0, "No man pages found"
    assert all(isinstance(p, dict) for p in pages), "Invalid page format"
    assert all('title' in p and 'section' in p and 'path' in p for p in pages), "Missing required fields"
    print("✓ get_man_pages test passed")

In [None]:
#|test
def test_read_man_page():
    pages = get_man_pages()
    if not pages: return
    content = read_man_page(pages[0]['path'])
    assert content, "Failed to read man page content"
    assert isinstance(content, str), "Content is not string"
    print("✓ read_man_page test passed")

In [None]:
#|test
def test_chunking():
    text = """This is a test document.
    It has multiple lines.
    We will use it to test chunking."""
    chunks = chunk_text(text)
    assert chunks, "No chunks created"
    assert all(isinstance(c, str) for c in chunks), "Invalid chunk format"
    print("✓ chunk_text test passed")

In [None]:
#|test
def test_embeddings():
    texts = ["This is a test sentence."]
    vectors = get_embeddings(texts)
    assert len(vectors) == 1, "Wrong number of vectors"
    assert len(vectors[0]) == 256, "Wrong vector dimension"
    print("✓ get_embeddings test passed")

In [None]:
#|test
def test_index_and_search():
    test_db = "test_man_index.lance"
    try:
        # Test indexing
        index_cmd(test_db)
        assert os.path.exists(test_db), "Database not created"
        
        # Test searching
        results = search_cmd("list files", db_path=test_db)
        assert len(results) > 0, "No search results found"
        assert all('title' in r and 'section' in r and 'chunk' in r for r in results), "Invalid result format"
        print("✓ index_cmd and search_cmd test passed")
    finally:
        if os.path.exists(test_db):
            shutil.rmtree(test_db)