In [1]:
import os
import re
import chromadb
import openai
import hashlib
import json
import time
from dotenv import load_dotenv
from openai import OpenAI
from chromadb.utils import embedding_functions

In [2]:
load_dotenv(override=True)

True

## Setup vector database

In [3]:
client = chromadb.Client()
# For persistence to disk
client = chromadb.PersistentClient(path="./chroma_db")

In [4]:
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
                api_key=os.getenv("OPENAI_API_KEY"),
                model_name="text-embedding-3-small")

In [5]:
openai_ef('tere')

[array([ 0.01191489,  0.00940918,  0.00219249, ...,  0.02120901,
         0.02228288, -0.04032141], shape=(1536,), dtype=float32)]

In [6]:
# Create a collection with the custom embedding function
# collection = client.create_collection(
#     name="obsidian_notes",
#     embedding_function=openai_ef
# )

In [7]:
#load if this has been setup
collection = client.get_collection(
    name="obsidian_notes",
    embedding_function=openai_ef  # Your embedding function
)

## Upload documents

In [8]:
def get_file_info(file_path):
    """Get file metadata needed for change detection"""
    stats = os.stat(file_path)
    return {
        "modified_time": stats.st_mtime,
        "size": stats.st_size,
        "hash": calculate_file_hash(file_path)
    }

def calculate_file_hash(file_path):
    """Calculate MD5 hash of file contents"""
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def save_file_metadata(file_path):
    """Save metadata for a single file"""
    file_info = get_file_info(file_path)
    
    # Create a unique filename for the metadata
    metadata_filename = hashlib.md5(file_path.encode()).hexdigest() + ".json"
    metadata_path = os.path.join(METADATA_DIR, metadata_filename)
    
    # Save the metadata
    with open(metadata_path, "w") as f:
        json.dump({
            "file_path": file_path,
            "info": file_info,
            "last_processed": time.time()
        }, f)

In [9]:
def extract_tags_from_note(note_content):
    """Extract tags from an Obsidian note (inline hashtags)"""    
    # This regex finds hashtags but ignores URLs and code blocks
    inline_tags = re.findall(r'(?<!`|\w)#([a-zA-Z0-9_/-]+)', note_content)

    return list(inline_tags)

In [10]:
def process_single_note(file_path, collection):
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    if len(content)>5:
        # Generate a unique ID based on file path
        note_id = hashlib.md5(file_path.encode()).hexdigest()

        # Extract title from filename
        title = os.path.splitext(os.path.basename(file_path))[0]
        folder= file_path.replace('\\', '/').split('/')[1]
        tags = extract_tags_from_note(content)

        # Add to collection
        collection.add(
            ids=[note_id],
            documents=[content],
            metadatas=[{"title": title,"folder":folder, "path": file_path,
                        "last_updated": time.time(),
                        "tags": ",".join(tags)}]
        )

        # Save individual file metadata
        save_file_metadata(file_path)
    else:
        print(f"file {file_path} has length of smaller than 5, skipping")

In [11]:
# Directory to store metadata about processed files
METADATA_DIR = "./note_metadata"
os.makedirs(METADATA_DIR, exist_ok=True)

def process_notes_initially(vault_path, collection):
    # Track all processed files
    processed_files = {}
    
    for root, _, files in os.walk(vault_path):
        for i, file in enumerate(files):
            if i%10==0:
                print(f"working on file {i}")
            if file.endswith('.md'):
                file_path = os.path.join(root, file)
                
                try:
                    # Process and add the note
                    process_single_note(file_path, collection)
                    
                    # Record this file as processed
                    file_info = get_file_info(file_path)
                    processed_files[file_path] = file_info
                    
                except Exception as e:
                    print(f"Error processing {file_path}: {e}")
    
    # Save metadata about all processed files
    with open(os.path.join(METADATA_DIR, "processed_files.json"), "w") as f:
        json.dump(processed_files, f)

In [12]:
process_notes_initially('Obsidian Vault/', collection)

working on file 0
working on file 10
working on file 20
working on file 30
working on file 40
working on file 0
working on file 0
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 0
working on file 0
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 0
working on file 10
working on file 0
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 0
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 0
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 0
working on file 10
working on file 0
working on file 0
working on file 10
working on file 20
working on file 30
working on file 40
working on file 50
working on file 0
working on file 0
working on file 10
working on file 20
working on file 0
working on file 10
work

### detect changes to change only this part in db

In [13]:
def update_note_in_collection(file_path, collection):
    """Update an existing note in the collection"""
    # Generate the consistent ID for this file
    note_id = hashlib.md5(file_path.encode()).hexdigest()
    
    # Read the updated content
    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    # Extract title from filename
    title = os.path.splitext(os.path.basename(file_path))[0]
    
    # Update the note in collection
    # First, check if it exists
    results = collection.get(ids=[note_id])
    
    if len(results['ids']) > 0:
        # Update existing entry
        collection.update(
            ids=[note_id],
            documents=[content],
            metadatas=[{"title": title, "path": file_path, "last_updated": time.time()}]
        )
    else:
        # Add as new if not found (shouldn't happen normally)
        collection.add(
            ids=[note_id],
            documents=[content],
            metadatas=[{"title": title, "path": file_path, "last_updated": time.time()}]
        )
    
    # Update file metadata
    save_file_metadata(file_path)


def remove_note_from_collection(file_path, collection):
    """Remove a note from the collection"""
    # Generate the consistent ID for this file
    note_id = hashlib.md5(file_path.encode()).hexdigest()
    
    # Remove from collection
    collection.delete(ids=[note_id])
    
    # Remove metadata file
    metadata_filename = hashlib.md5(file_path.encode()).hexdigest() + ".json"
    metadata_path = os.path.join(metadata_dir, metadata_filename)
    
    if os.path.exists(metadata_path):
        os.remove(metadata_path)

In [14]:
def update_changed_notes(vault_path, collection):
    # Load previously processed files
    try:
        with open(os.path.join(metadata_dir, "processed_files.json"), "r") as f:
            processed_files = json.load(f)
    except FileNotFoundError:
        processed_files = {}
    
    # Track current files
    current_files = set()
    
    # Check all files in the vault
    for root, _, files in os.walk(vault_path):
        for file in files:
            if file.endswith('.md'):
                file_path = os.path.join(root, file)
                current_files.add(file_path)
                
                # Get current file info
                current_info = get_file_info(file_path)
                
                if file_path in processed_files:
                    # File exists in our records, check if modified
                    old_info = processed_files[file_path]
                    
                    if (current_info["modified_time"] != old_info["modified_time"] or
                        current_info["size"] != old_info["size"] or
                        current_info["hash"] != old_info["hash"]):
                        
                        print(f"File changed: {file_path}")
                        
                        # Update in collection
                        update_note_in_collection(file_path, collection)
                        
                        # Update metadata
                        processed_files[file_path] = current_info
                else:
                    # New file
                    print(f"New file: {file_path}")
                    process_single_note(file_path, collection)
                    processed_files[file_path] = current_info
    
    # Check for deleted files
    deleted_files = set(processed_files.keys()) - current_files
    for file_path in deleted_files:
        print(f"File deleted: {file_path}")
        remove_note_from_collection(file_path, collection)
        del processed_files[file_path]
    
    # Save updated metadata
    with open(os.path.join(metadata_dir, "processed_files.json"), "w") as f:
        json.dump(processed_files, f)

## MCP server

In [15]:
import os
import chromadb
import datetime
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP, Context

In [16]:
# Create a dataclass to hold our dependencies
@dataclass
class AppContext:
    collection: chromadb.Collection

@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
    """Set up and tear down the Chroma DB connection"""
    # Initialize Chroma client
    client = chromadb.PersistentClient(path="./chroma_db")
    
    # Get the collection
    collection = client.get_collection("obsidian_notes", embedding_function=openai_ef)
    
    try:
        yield AppContext(collection=collection)
    finally:
        # Any cleanup if needed
        pass

In [17]:
# Create the MCP server with our lifespan
mcp = FastMCP("ObsidianNotes", lifespan=app_lifespan)

### retriever tool

In [18]:
@mcp.tool()
def get_sample_note() -> str:
    """Get a single sample note to examine its structure"""
    # Get the collection
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("obsidian_notes", embedding_function=openai_ef)
    
    # Get just one document
    results = collection.get(limit=1)
    
    if results["ids"] and len(results["ids"]) > 0:
        sample_id = results["ids"][0]
        sample_doc = results["documents"][0]
        sample_metadata = results["metadatas"][0] if "metadatas" in results and results["metadatas"] else {}
        
        # Format the output to show structure
        output = "Sample Note Structure:\n\n"
        output += f"ID: {sample_id}\n\n"
        
        output += "Metadata Fields:\n"
        for key, value in sample_metadata.items():
            output += f"- {key}: {type(value).__name__} = {value}\n"
        
        output += "\nDocument Content (first 200 chars):\n"
        output += sample_doc
        
        # If embeddings exist, show their shape
        if "embeddings" in results and results["embeddings"]:
            embedding = results["embeddings"][0]
            output += f"\n\nEmbedding: Vector of length {len(embedding)}"
        
        return output
    else:
        return "No documents found in the collection."

In [19]:
def format_search_results(vector_results):
    formatted_results = []
    for i, doc in enumerate(vector_results["documents"][0]):
        doc_id = vector_results["ids"][0][i]
        folder = vector_results["metadatas"][0][i]['folder']
        title = vector_results["metadatas"][0][i]['title']
        # Convert distance to similarity score (closer to 1 is better)
        similarity = 1.0 / (1.0 + vector_results["distances"][0][i]) if "distances" in vector_results else "N/A"

        # Add formatted result
        formatted_results.append(
            f"Note ID: {doc_id}\n"
            f"Title: {title}\n"
            f"Folder: {folder}\n"
            f"Similarity: {similarity:.4f}\n\n"
            f"{doc}\n"
            f"---")
    return "\n".join(formatted_results)

In [20]:
#if need to remove tool
# if "vector_search" in mcp._tool_manager._tools:
#     del mcp._tool_manager._tools["vector_search"]

In [21]:
@mcp.tool()
def vector_search(query: str, n_results: int = 5, folder: str = None, ) -> str:
    """
    Search notes using both vector similarity
    
    Args:
        query: The search query
        n_results: Number of results to return
    """
    # Get the collection
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("obsidian_notes", embedding_function=openai_ef)
        
    where_clause = {}
    if folder:
        where_clause["folder"] = folder
    
    # 1. Vector search
    vector_results = collection.query(
        query_embeddings = openai_ef(query),
        where=where_clause if where_clause else None,
        n_results=n_results * 2  # Get more results to rerank
    )
    
    #Format the results
    if vector_results["documents"] and len(vector_results["documents"][0]) > 0:
        formatted_results = format_search_results(vector_results)
        return formatted_results
    else:
        return f"No notes found for query: '{query}'"

In [22]:
# res=vector_search('Transparency', folder='Learning')
res=vector_search('Transparency')

In [23]:
print(res)

Note ID: 2266b46d92e2cad2bbd1cc30daf58540
Title: Radical candor framework
Folder: startup
Similarity: 0.4454

![[Pasted image 20231115205458.png]]

Radical candor is a quadrant where you can be honest (give negative feedback) but at the same time be respectful. Being aggressive or false positive (giving empathy too much or being silent) is not going to help.

"Radical Candor: Be a Kickass Boss Without Losing Your Humanity, Kim Malone Scott, page 38

[[Bill Campbell coaching style and ideas]]
[[Google excellent team factors]]
[[Intel operating style]]

#psychological_safety 
#management 
#leadership 
#google 

---
Note ID: 40329e717d71fe67393cefead7af1d4b
Title: Algorithm governance questions
Folder: AI
Similarity: 0.4430

I believe that the experience of kidney allocation may be able to shed some light:
1. Participation by stakeholders
2. Transparency measures
3. Forecasting of system impacts
4. Auditing of what actually happens once the system is turned on

"Voices in the Code: A Stor

In [24]:
print(get_sample_note())

Sample Note Structure:

ID: 40329e717d71fe67393cefead7af1d4b

Metadata Fields:
- folder: str = AI
- tags: str = AI,algorithms,governance,bias,fairness,openness
- path: str = Obsidian Vault/AI\Algorithm governance questions.md
- last_updated: float = 1747735517.90906
- title: str = Algorithm governance questions

Document Content (first 200 chars):
I believe that the experience of kidney allocation may be able to shed some light:
1. Participation by stakeholders
2. Transparency measures
3. Forecasting of system impacts
4. Auditing of what actually happens once the system is turned on

"Voices in the Code: A Story about People, Their Values, and the Algorithm They Made", David G. Robinson, page 37


[[Automated hiring software is mistakenly rejecting millions of viable job candidates]]

#AI 
#algorithms
#governance
#bias 
#fairness
#openness 


### Browse notes tool

In [25]:
@mcp.tool()
def browse_notes(folder: str = None, tag: str = None, limit: int = 10, offset: int = 0) -> str:
    """
    Browse notes by category or tag
    
    Args:
        category: Optional category to filter by
        tag: Optional tag to filter by (will match if tag string contains this value)
        limit: Maximum number of notes to return
        offset: Number of notes to skip (for pagination)
    """
    # Get the collection
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("obsidian_notes", embedding_function=openai_ef)
    
    # Build where clause for filtering
    where_clause = {}
    if folder:
        where_clause["folder"] = folder
    
    # For tag filtering, we'll need to handle it manually since we want substring matching
    
    # Get all notes that match the category filter (or all notes if no category filter)
    results = collection.get(
        where=where_clause if where_clause else None,
        limit=1000  # Get a larger batch to filter manually
    )
    
    # Filter by tag if specified
    filtered_indices = []
    if tag and results["metadatas"]:
        for i, metadata in enumerate(results["metadatas"]):
            # Check if tags field exists and contains the tag substring
            if "tags" in metadata:
                tags_str = metadata["tags"]
                if isinstance(tags_str, str) and tag.lower() in tags_str.lower():
                    filtered_indices.append(i)
    else:
        # If no tag filter, use all results
        filtered_indices = list(range(len(results["documents"])))
    
    # Apply pagination
    start_idx = min(offset, len(filtered_indices))
    end_idx = min(start_idx + limit, len(filtered_indices))
    page_indices = filtered_indices[start_idx:end_idx]
    
    # Format the results
    if page_indices:
        formatted_results = []
        for idx in page_indices:
            doc = results["documents"][idx]
            doc_id = results["ids"][idx]
            metadata = results["metadatas"][idx]
            
            # Extract title and other metadata
            title = metadata.get("title", "Untitled")
            tags = metadata.get("tags", "")
            category = metadata.get("category", "Uncategorized")
            
            # Create a preview
            preview = doc[:100] + "..." if len(doc) > 100 else doc
            
            # Format the result
            formatted_results.append(
                f"## {title}\n"
                f"ID: {doc_id}\n"
                f"Category: {category}\n"
                f"Tags: {tags}\n\n"
                f"Preview: {preview}\n"
                f"---"
            )
        
        # Add pagination info
        pagination_info = f"Showing results {start_idx+1}-{end_idx} of {len(filtered_indices)}. "
        if end_idx < len(filtered_indices):
            pagination_info += f"Use offset={end_idx} to see more."
        
        return pagination_info + "\n\n" + "\n".join(formatted_results)
    else:
        return f"No notes found with the specified filters."

In [26]:
browse_notes('AI')

'Showing results 1-10 of 12. Use offset=10 to see more.\n\n## Algorithm governance questions\nID: 40329e717d71fe67393cefead7af1d4b\nCategory: Uncategorized\nTags: AI,algorithms,governance,bias,fairness,openness\n\nPreview: I believe that the experience of kidney allocation may be able to shed some light:\n1. Participation ...\n---\n## Automated hiring software is mistakenly rejecting millions of viable job candidates\nID: 2ea949c8aef6a2dc251c678b020c7851\nCategory: Uncategorized\nTags: AI_hype,AI,ai_hiring,bias,society\n\nPreview: "The study’s authors identify a number of factors blocking people from employment, but say automated...\n---\n## C. Clarke’s third law of prediction\nID: b4cbf197a6f5e47aee89afa711fffb74\nCategory: Uncategorized\nTags: prediction,technology\n\nPreview: Arthur\xa0C. Clarke’s third law of prediction is, famously, “Any sufficiently advanced technology is in...\n---\n## Can we create tools to augment our empathy\nID: 585c1a57d10e93f11ead58f1656405ee\nCategory: Un

### get a single note tool

In [27]:
@mcp.resource("note://{note_id}")
def get_note(note_id: str) -> str:
    """
    Get the full content of a specific note by ID
    
    Args:
        note_id: The ID of the note to retrieve
    """
    # Get the collection
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("obsidian_notes", embedding_function=openai_ef)
    
    # Query for the specific note
    result = collection.get(
        ids=[note_id]
    )
    
    # Check if note was found
    if result["documents"] and len(result["documents"]) > 0:
        # Get the note content and metadata
        content = result["documents"][0]
        metadata = result["metadatas"][0] if result["metadatas"] else {}
        
        # Format with metadata if available
        title = metadata.get("title", "Untitled")
        tags = metadata.get("tags", "")
        
        return f"# {title}\n\nTags: {tags}\n\n{content}"
    else:
        return f"Note with ID '{note_id}' not found."

In [28]:
get_note('40329e717d71fe67393cefead7af1d4b')

'# Algorithm governance questions\n\nTags: AI,algorithms,governance,bias,fairness,openness\n\nI believe that the experience of kidney allocation may be able to shed some light:\n1. Participation by stakeholders\n2. Transparency measures\n3. Forecasting of system impacts\n4. Auditing of what actually happens once the system is turned on\n\n"Voices in the Code: A Story about People, Their Values, and the Algorithm They Made", David G. Robinson, page 37\n\n\n[[Automated hiring software is mistakenly rejecting millions of viable job candidates]]\n\n#AI \n#algorithms\n#governance\n#bias \n#fairness\n#openness '

## Prompt

In [29]:
# @mcp.prompt()
# def explore_topic(topic: str) -> str:
#     return f"""
#     System: You are a helpful assistant that helps users explore topics (named "folder" in database) in their notes. 
#     Use the vector_search tool to find relevant notes and browse_notes to explore categories.
#     When showing specific notes, use the note:// resource to get full content.
    
#     User: I want to learn more about {topic}
    
#     Assistant: I'd be happy to help you explore this topic. I'll search your notes to find relevant information.
#     """

In [30]:
from mcp.server.fastmcp.prompts import base

@mcp.prompt()
def explore_topic(topic: str) -> list[base.Message]:
    return [
        # Include system instructions in the first user message instead
        base.Message(
            role="user",
            content=f"I want to learn more about {topic}. Please act as a helpful assistant that helps me explore topics in my notes. "
                   f"Use the vector_search tool to find relevant notes and browse_notes to explore categories (which are named as folder in database). "
                   f"When showing specific notes, use the note:// resource to get full content."
        ),
        base.Message(
            role="assistant",
            content="I'd be happy to help you explore this topic. I'll search your notes to find relevant information."
        )
    ]

In [31]:
explore_topic('AI')

[Message(role='user', content=TextContent(type='text', text='I want to learn more about AI. Please act as a helpful assistant that helps me explore topics in my notes. Use the vector_search tool to find relevant notes and browse_notes to explore categories (which are named as folder in database). When showing specific notes, use the note:// resource to get full content.', annotations=None)),
 Message(role='assistant', content=TextContent(type='text', text="I'd be happy to help you explore this topic. I'll search your notes to find relevant information.", annotations=None))]

## Run server

In [32]:
import threading

def run_server():
    # Check the supported parameters for your version
    mcp.run(transport="streamable-http")
#     mcp.run(transport="http")

# Start the server in a separate thread
server_thread = threading.Thread(target=run_server)
server_thread.daemon = True  # Allow the thread to exit when the notebook closes
server_thread.start()

print("Server running on default address")

Server running on default address


INFO:     Started server process [14628]
INFO:     Waiting for application startup.


INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


## Make request to server

In [58]:
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def test_prompt(prompt_name, arguments):
    # Connect to your local server
    async with streamablehttp_client("http://localhost:8000/mcp") as (read_stream, write_stream, _):
        # Create a session
        async with ClientSession(read_stream, write_stream) as session:
            # Initialize the connection
            await session.initialize()
            
            # Get the prompt
            prompt_result = await session.get_prompt(prompt_name, arguments)
            
            # Print the prompt content
            print("Prompt content:")
            for message in prompt_result.messages:
                print(f"{message.role}: {message.content.text}")
            
            return prompt_result

# Run the test function
import asyncio
prompt_result = asyncio.run(test_prompt("explore_topic", {"topic": "machine learning"}))

RuntimeError: asyncio.run() cannot be called from a running event loop

In [59]:
import subprocess
import json

def test_prompt_cli(prompt_name, arguments):
    # Convert arguments to JSON string
    args_json = json.dumps(arguments)
    
    # Run the MCP CLI command
    result = subprocess.run(
        ["mcp", "prompt", "get", prompt_name, "--arguments", args_json],
        capture_output=True,
        text=True
    )
    
    return result.stdout

# Test the prompt
output = test_prompt_cli("explore_topic", {"topic": "AI"})
print(output)




In [60]:
import requests
import json

def test_prompt_direct(prompt_name, arguments):
    # Endpoint for getting a prompt
    url = "http://localhost:8000/mcp/prompt"
    
    # Prepare the request payload
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "prompt/get",
        "params": {
            "name": prompt_name,
            "arguments": arguments
        }
    }
    
    # Make the request
    response = requests.post(url, json=payload)
    
    # Return the response
    return response.json()

# Test the prompt
result = test_prompt_direct("explore_topic", {"topic": "AI"})
print(json.dumps(result, indent=2))

{
  "jsonrpc": "2.0",
  "id": "server-error",
  "error": {
    "code": -32600,
    "message": "Not Acceptable: Client must accept both application/json and text/event-stream"
  }
}


In [63]:
import requests
import json

# Test a tool with proper headers
def call_tool(tool_name, arguments):
    headers = {
        "Accept": "application/json, text/event-stream",
        "Content-Type": "application/json"
    }
    
    response = requests.post(
        "http://localhost:8000/mcp/tool",
        headers=headers,
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "tool/call",
            "params": {
                "name": tool_name,
                "arguments": arguments
            }
        }
    )
    return response.json()

# Example: Call your vector_search tool
result = call_tool("vector_search", {"query": "AI", "n_results": 3})
print(json.dumps(result, indent=2))

{
  "jsonrpc": "2.0",
  "id": "server-error",
  "error": {
    "code": -32600,
    "message": "Bad Request: Missing session ID"
  }
}


In [28]:
import requests
import json
import uuid

# Create a session ID
session_id = str(uuid.uuid4())

def call_mcp_api(method, params):
    headers = {
        "Accept": "application/json, text/event-stream",
        "Content-Type": "application/json",
        "X-MCP-Session-ID": session_id  # Include session ID in header
    }
    
    # First initialize the session
    if method != "initialize":
        init_response = requests.post(
            "http://localhost:8000/mcp",
            headers=headers,
            json={
                "jsonrpc": "2.0",
                "id": 0,
                "method": "initialize",
                "params": {
                    "client_name": "python-test",
                    "client_version": "1.0.0"
                }
            }
        )
        print("Initialization response:", init_response.status_code)
    
    # Then make the actual request
    response = requests.post(
        "http://localhost:8000/mcp",
        headers=headers,
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params
        }
    )
    print(response)
    return response.json()

# Example: Call your vector_search tool
result = call_mcp_api("tool/call", {
    "name": "vector_search", 
    "arguments": {"query": "machine learning", "n_results": 3}
})
print(json.dumps(result, indent=2))

Initialization response: 500
<Response [500]>


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [27]:
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
import asyncio
import nest_asyncio


async def test_mcp():
    try:
        # Connect to your server
        async with streamablehttp_client("http://localhost:8000/mcp") as (read_stream, write_stream, _):
            # Create a session
            async with ClientSession(read_stream, write_stream) as session:
                # Initialize
                await session.initialize()
                
                # List tools
                tools = await session.list_tools()
                print("Available tools:", [t.name for t in tools])
                
                # Call a tool
                result = await session.call_tool("vector_search", {"query": "machine learning", "n_results": 3})
                print("Result:", result)
    except Exception as e:
        print(f"Error: {e}")

# Run the async function properly
if __name__ == "__main__":
    nest_asyncio.apply()
    await test_mcp()

Error: unhandled errors in a TaskGroup (1 sub-exception)


In [26]:
import subprocess
import time
import threading

# Function to run the MCP server in a subprocess
def run_mcp_server(server_file):
    process = subprocess.Popen(
        ["mcp", "dev", server_file],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    
    # Print output in real-time
    def print_output():
        while True:
            line = process.stdout.readline()
            if not line and process.poll() is not None:
                break
            if line:
                print(line.strip())
    
    # Start output thread
    output_thread = threading.Thread(target=print_output)
    output_thread.daemon = True
    output_thread.start()
    
    return process

# Save your server code to a temporary file
import tempfile
with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as f:
    server_file = f.name
    # Write your server code to the file
    f.write("""
# Your MCP server code here
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("ObsidianNotes")

# Add your tools, resources, and prompts here
@mcp.tool()
def test_tool(message: str) -> str:
    return f"Test response: {message}"

if __name__ == "__main__":
    mcp.run()
""".encode())

# Run the MCP server
server_process = run_mcp_server(server_file)

print(f"MCP server started. Open http://localhost:8000 in your browser.")
print("Press Ctrl+C to stop the server when done.")

# Keep the server running until interrupted
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    server_process.terminate()
    print("Server stopped.")

MCP server started. Open http://localhost:8000 in your browser.
Press Ctrl+C to stop the server when done.
Starting MCP inspector...
Server stopped.
