# postgres-mcp Quickstart & Cookbook

This notebook demonstrates how to programmatically interact with the `postgres-mcp` server using the Python MCP SDK. 

It covers:
1.  **Connecting** to the server.
2.  **Listing Tools** available (195 tools!).
3.  **Reading Data** via SQL.
4.  **Writing Data** (DDL/DML).
5.  **Reading Resources** (Observability).
6.  **Error Handling**.
7.  **Transactions** (Atomic operations).
8.  **AI Prompts** (Built-in workflows).
9.  **Performance Analysis** (EXPLAIN ANALYZE).
10. **JSONB Support** (PostgreSQL's superior JSON).
11. **Vector Search (pgvector)** (AI/ML Similarity Search).
12. **Fulltext Search** (tsvector/tsquery).
13. **PostGIS Spatial Data** (GIS/Location Search).
14. **Code Mode** (Multi-step sandboxed operations).

## Prerequisites

- Node.js 18+ (for running the server)
- Python 3.10+
- A running PostgreSQL instance (12-18)
- `postgres-mcp` built locally (`npm run build`)

## 1. Install Dependencies

Install `mcp` for the protocol and `python-dotenv` for managing credentials.

In [None]:
%pip install mcp python-dotenv

## 2. Configuration & Setup

We define the server connection parameters here. We use `python-dotenv` to load credentials from the project's `.env` file automatically.

In [None]:
import asyncio
import os
import json
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# 1. Load Environment
project_root = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
env_path = os.path.join(project_root, ".env")
load_dotenv(env_path)

# 2. Build Connection String
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "password")
host = os.getenv("POSTGRES_HOST", "localhost")
port = os.getenv("POSTGRES_PORT", "5432")
database = os.getenv("POSTGRES_DATABASE", "postgres")
connection_string = f"postgres://{user}:{password}@{host}:{port}/{database}"

# 3. Server Parameters
SERVER_SCRIPT = os.path.join(project_root, "dist", "cli.js")
server_params = StdioServerParameters(
    command="node",
    args=[SERVER_SCRIPT, "--transport", "stdio", "--postgres", connection_string],
    env=os.environ.copy()
)

print(f"Configured to connect to: postgres://{user}:***@{host}:{port}/{database}")

# 4. Helper Context Manager
@asynccontextmanager
async def mcp_client():
    """Helper to manage the MCP client session lifecycle."""
    # Redirect stderr to devnull to fix Windows/Jupyter 'fileno' issue
    with open(os.devnull, "w") as devnull:
        async with stdio_client(server_params, errlog=devnull) as (read, write):
            async with ClientSession(read, write) as session:
                await session.initialize()
                yield session

## 3. List Available Tools

Let's verify the connection by listing the tools the server provides. PostgreSQL MCP exposes **195 specialized tools**!

In [None]:
async def list_tools_example():
    async with mcp_client() as session:
        result = await session.list_tools()
        print(f"Connected! Found {len(result.tools)} tools.\n")
        
        # Print first few tools
        for tool in result.tools[:5]:
            print(f"[Tool] {tool.name}: {tool.description[:60]}...")

# Run execution (handles Jupyter async loop)
if __name__ == "__main__" and 'get_ipython' in globals():
    await list_tools_example()

## 4. Reading Data (SELECT)

Use `pg_read_query` to run SELECT statements safely with prepared statements.

In [None]:
async def read_data_example():
    async with mcp_client() as session:
        print("--- Executing SELECT query ---")
        # First, list tables to find a valid table name
        tables_res = await session.call_tool("pg_list_tables", {})
        tables_data = json.loads(tables_res.content[0].text)
        
        if not tables_data.get("tables"):
            print("No tables found. Querying pg_catalog instead.")
            query_res = await session.call_tool("pg_read_query", {
                "query": "SELECT tablename FROM pg_catalog.pg_tables LIMIT 5"
            })
        else:
            first_table = tables_data["tables"][0]["name"]
            print(f"Querying table: {first_table}")
            query_res = await session.call_tool("pg_read_query", {
                "query": f"SELECT * FROM {first_table} LIMIT 3"
            })
        
        print("Result:")
        print(query_res.content[0].text)

if __name__ == "__main__" and 'get_ipython' in globals():
    await read_data_example()

## 5. Writing Data (CREATE / INSERT)

Use `pg_write_query` for DDL and DML operations. Let's create a temporary table and insert data.

In [None]:
async def write_data_example():
    async with mcp_client() as session:
        table_name = "notebook_demo_table"
        
        print(f"--- Creating table '{table_name}' ---")
        await session.call_tool("pg_write_query", {
            "query": f"CREATE TABLE IF NOT EXISTS {table_name} (id SERIAL PRIMARY KEY, message VARCHAR(255))"
        })
        print("Table created.")
        
        print("--- Inserting data ---")
        await session.call_tool("pg_write_query", {
            "query": f"INSERT INTO {table_name} (message) VALUES ('Hello from Jupyter'), ('MCP is cool')"
        })
        print("Data inserted.")
        
        print("--- Verifying data ---")
        result = await session.call_tool("pg_read_query", {
            "query": f"SELECT * FROM {table_name}"
        })
        print(result.content[0].text)
        
        # Cleanup
        await session.call_tool("pg_drop_table", {"table": table_name})
        print("Cleanup complete.")

if __name__ == "__main__" and 'get_ipython' in globals():
    await write_data_example()

## 6. Reading Resources

MCP Resources provide direct access to data like system status or schema info. 
Key PostgreSQL resources: `postgres://health`, `postgres://extensions`, `postgres://stats`.

In [None]:
async def read_resources_example():
    async with mcp_client() as session:
        print("--- Reading 'postgres://health' Resource ---")
        
        # Read the health resource
        resource = await session.read_resource("postgres://health")
        
        # Parse and display
        data = json.loads(resource.contents[0].text)
        print(f"Server Version: {data.get('version')}")
        print(f"Overall Status: {data.get('status')}")
        print(f"Connection Pool: {data.get('connectionPool', {}).get('status')}")
        
        print("\n--- Reading 'postgres://extensions' Resource ---")
        ext_resource = await session.read_resource("postgres://extensions")
        ext_data = json.loads(ext_resource.contents[0].text)
        print(f"Available Extensions: {len(ext_data.get('available', []))}")
        print(f"Installed Extensions: {[e['name'] for e in ext_data.get('installed', [])[:5]]}")
        
if __name__ == "__main__" and 'get_ipython' in globals():
    await read_resources_example()

## 7. Error Handling

The server returns descriptive PostgreSQL-style errors for invalid queries.

In [None]:
async def error_handling_example():
    async with mcp_client() as session:
        print("--- Triggering Syntax Error ---")
        try:
            result = await session.call_tool("pg_read_query", {
                "query": "SELECT * FROM non_existent_table THIS_IS_INVALID_SQL"
            })
            # MCP returns errors in the result content when isError is true
            if result.isError:
                print(f"Error returned: {result.content[0].text}")
            else:
                print(result.content[0].text)
        except Exception as e:
            print(f"Caught Expected Error:\n{e}")

if __name__ == "__main__" and 'get_ipython' in globals():
    await error_handling_example()

## 8. Transactions (Atomic Operations)

PostgreSQL supports full ACID transactions with savepoints. Use `pg_transaction_begin`, `pg_transaction_commit`, and `pg_transaction_rollback`.

In [None]:
async def transaction_example():
    async with mcp_client() as session:
        print("--- Executing Atomic Transaction ---")
        table = "txn_demo"
        
        # 1. Setup table (outside txn)
        await session.call_tool("pg_write_query", {"query": f"CREATE TABLE IF NOT EXISTS {table} (val INT)"})

        # 2. Begin Transaction
        await session.call_tool("pg_transaction_begin", {})
        print("Transaction started.")
        
        # 3. Execute multiple statements
        await session.call_tool("pg_write_query", {"query": f"INSERT INTO {table} (val) VALUES (100)"})
        await session.call_tool("pg_write_query", {"query": f"INSERT INTO {table} (val) VALUES (200)"})
        await session.call_tool("pg_write_query", {"query": f"UPDATE {table} SET val = val * 2"})
        
        # 4. Commit
        await session.call_tool("pg_transaction_commit", {})
        print("Transaction committed.")
        
        # 5. Verify
        check = await session.call_tool("pg_read_query", {"query": f"SELECT * FROM {table}"})
        print("\nTable State (Expect 200, 400):")
        print(check.content[0].text)
        
        # Cleanup
        await session.call_tool("pg_drop_table", {"table": table})

if __name__ == "__main__" and 'get_ipython' in globals():
    await transaction_example()

## 9. AI Prompts

`postgres-mcp` exposes **19 built-in prompts** that help AI assistants generate better SQL, design schemas, and set up extensions.

In [None]:
async def prompt_example():
    async with mcp_client() as session:
        print("--- Listing Prompts ---")
        prompts = await session.list_prompts()
        print(f"Found {len(prompts.prompts)} prompts.\n")
        
        for p in prompts.prompts[:5]:
            print(f"- {p.name}: {p.description[:60]}...")
            
        # Example: Extension setup prompts
        print("\n--- Extension Setup Prompts ---")
        setup_prompts = [p for p in prompts.prompts if 'setup' in p.name]
        for p in setup_prompts:
            print(f"- {p.name}")

if __name__ == "__main__" and 'get_ipython' in globals():
    await prompt_example()

## 10. Performance Analysis

Use `pg_explain` to analyze query execution plans with PostgreSQL's powerful EXPLAIN ANALYZE.

In [None]:
async def explain_example():
    async with mcp_client() as session:
        print("--- Executing EXPLAIN ANALYZE ---")
        # Explain a simple query on pg_catalog (guaranteed to exist)
        result = await session.call_tool("pg_explain", {
            "query": "SELECT * FROM pg_catalog.pg_tables WHERE schemaname = 'public'",
            "analyze": True,
            "format": "JSON"
        })
        
        # Show the execution plan
        plan = json.loads(result.content[0].text)
        print(json.dumps(plan, indent=2)[:800] + "... [truncated]")

if __name__ == "__main__" and 'get_ipython' in globals():
    await explain_example()

## 11. Working with JSONB (PostgreSQL's Superior JSON)

PostgreSQL's JSONB type is binary, indexable, and supports rich path operators. `postgres-mcp` exposes dedicated `pg_jsonb_*` tools for document-style operations.

In [None]:
async def jsonb_example():
    async with mcp_client() as session:
        table = "jsonb_demo"
        
        try:
            print("--- Creating Table with JSONB Column ---")
            await session.call_tool("pg_write_query", {
                "query": f"CREATE TABLE IF NOT EXISTS {table} (id SERIAL PRIMARY KEY, metadata JSONB)"
            })

            print("--- Inserting JSONB Document ---")
            doc = {
                "user_id": 42,
                "preferences": {"theme": "dark", "notifications": True},
                "tags": ["developer", "mcp"]
            }
            await session.call_tool("pg_jsonb_insert", {
                "table": table,
                "column": "metadata",
                "data": doc
            })

            print("--- Extracting Field with Path Operator ---")
            # Use PostgreSQL's -> and ->> operators
            extract_res = await session.call_tool("pg_read_query", {
                "query": f"SELECT metadata->'preferences'->>'theme' as theme FROM {table}"
            })
            print(f"Extracted Theme: {extract_res.content[0].text}")

            print("--- Querying with JSONB Containment ---")
            # Use @> containment operator
            query_res = await session.call_tool("pg_jsonb_query", {
                "table": table,
                "column": "metadata",
                "containment": {"tags": ["developer"]}
            })
            print(f"Found documents: {query_res.content[0].text}")
            
        finally:
            await session.call_tool("pg_drop_table", {"table": table})

if __name__ == "__main__" and 'get_ipython' in globals():
    await jsonb_example()

## 12. Vector Search with pgvector (AI/ML)

**pgvector** enables semantic similarity search for AI/ML applications like RAG. This is a killer feature for AI-powered applications.

> ⚠️ **Prerequisite:** Requires pgvector extension installed (`CREATE EXTENSION vector`)

In [None]:
async def vector_example():
    async with mcp_client() as session:
        table = "vector_docs"
        
        try:
            # Check if pgvector is available
            ext_check = await session.call_tool("pg_read_query", {
                "query": "SELECT 1 FROM pg_extension WHERE extname = 'vector'"
            })
            if 'rows' not in ext_check.content[0].text or '[]' in ext_check.content[0].text:
                print("pgvector not installed. Run: CREATE EXTENSION vector;")
                return
            
            print("--- Creating Vector Table (384 dimensions) ---")
            await session.call_tool("pg_vector_create_table", {
                "table": table,
                "dimensions": 384,
                "additionalColumns": ["title TEXT", "content TEXT"]
            })
            
            print("--- Creating HNSW Index ---")
            await session.call_tool("pg_vector_create_index", {
                "table": table,
                "column": "embedding",
                "method": "hnsw",
                "metric": "cosine"
            })

            print("--- Inserting Sample Embeddings ---")
            # In real usage, these would come from an embedding model
            sample_embedding = [0.1] * 384  # Simplified example
            await session.call_tool("pg_vector_insert", {
                "table": table,
                "embedding": sample_embedding,
                "metadata": {"title": "Machine Learning Intro", "content": "Neural networks..."}
            })

            print("--- Similarity Search ---")
            query_embedding = [0.1] * 384
            results = await session.call_tool("pg_vector_search", {
                "table": table,
                "column": "embedding",
                "queryVector": query_embedding,
                "limit": 5,
                "metric": "cosine"
            })
            print(results.content[0].text)
            
        finally:
            await session.call_tool("pg_drop_table", {"table": table})

if __name__ == "__main__" and 'get_ipython' in globals():
    await vector_example()

## 13. Fulltext Search (tsvector/tsquery)

PostgreSQL's built-in full-text search uses `tsvector` and `tsquery` for powerful text searching with relevance ranking.

In [None]:
async def fulltext_example():
    async with mcp_client() as session:
        table = "fts_docs"
        
        try:
            print("--- Creating Table with tsvector Column ---")
            await session.call_tool("pg_write_query", {
                "query": f"""
                    CREATE TABLE IF NOT EXISTS {table} (
                        id SERIAL PRIMARY KEY,
                        title TEXT,
                        content TEXT,
                        search_vector tsvector GENERATED ALWAYS AS (
                            setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
                            setweight(to_tsvector('english', coalesce(content, '')), 'B')
                        ) STORED
                    )
                """
            })
            
            print("--- Creating GIN Index ---")
            await session.call_tool("pg_fulltext_create_index", {
                "table": table,
                "column": "search_vector"
            })

            print("--- Inserting Sample Documents ---")
            docs = [
                ("Machine Learning", "Introduction to supervised learning and neural networks."),
                ("Database Systems", "ACID properties ensure reliable database transactions."),
                ("Prompt Engineering", "Techniques for getting better output from LLMs.")
            ]
            for title, content in docs:
                await session.call_tool("pg_write_query", {
                    "query": f"INSERT INTO {table} (title, content) VALUES ($1, $2)",
                    "params": [title, content]
                })

            print("--- Fulltext Search (Query: 'learning') ---")
            search_res = await session.call_tool("pg_fulltext_search", {
                "table": table,
                "column": "search_vector",
                "query": "learning",
                "selectColumns": ["title", "content"]
            })
            print(search_res.content[0].text)
                
        finally:
            await session.call_tool("pg_drop_table", {"table": table})

if __name__ == "__main__" and 'get_ipython' in globals():
    await fulltext_example()

## 14. PostGIS Spatial Data (GIS / Location Search)

**PostGIS** is the gold standard for geospatial databases. `postgres-mcp` provides 12 dedicated tools for spatial operations.

> ⚠️ **Prerequisite:** Requires PostGIS extension installed (`CREATE EXTENSION postgis`)

In [None]:
async def spatial_example():
    async with mcp_client() as session:
        table = "spatial_places"
        
        try:
            # Check if PostGIS is available
            ext_check = await session.call_tool("pg_read_query", {
                "query": "SELECT 1 FROM pg_extension WHERE extname = 'postgis'"
            })
            if 'rows' not in ext_check.content[0].text or '[]' in ext_check.content[0].text:
                print("PostGIS not installed. Run: CREATE EXTENSION postgis;")
                return

            print("--- Creating Spatial Table ---")
            await session.call_tool("pg_postgis_create_table", {
                "table": table,
                "geometryColumn": "location",
                "geometryType": "POINT",
                "srid": 4326,
                "additionalColumns": ["name TEXT"]
            })

            print("--- Creating Spatial Index ---")
            await session.call_tool("pg_postgis_create_index", {
                "table": table,
                "column": "location"
            })

            print("--- Inserting Locations ---")
            # New York Landmarks (Longitude, Latitude)
            places = [
                ("Empire State Building", -73.9857, 40.7484),
                ("Central Park", -73.9665, 40.7829),
                ("Statue of Liberty", -74.0445, 40.6892)
            ]
            
            for name, lon, lat in places:
                await session.call_tool("pg_postgis_insert_point", {
                    "table": table,
                    "column": "location",
                    "longitude": lon,
                    "latitude": lat,
                    "additionalData": {"name": name}
                })

            print("--- Finding Nearby Places (within 5km of Times Square) ---")
            nearby = await session.call_tool("pg_postgis_nearby", {
                "table": table,
                "column": "location",
                "longitude": -73.9855,
                "latitude": 40.7580,
                "radiusMeters": 5000,
                "limit": 5
            })

            print("Places within 5km of Times Square:")
            print(nearby.content[0].text)

        finally:
            await session.call_tool("pg_drop_table", {"table": table})

if __name__ == "__main__" and 'get_ipython' in globals():
    await spatial_example()

## 15. Code Mode (Multi-Step Sandboxed Operations)

**Code Mode** is a unique feature that lets you write JavaScript to orchestrate multiple database operations in a single call. This can reduce token usage by 70-90% for complex operations!

> ⚠️ **Prerequisite:** Requires `admin` OAuth scope when using OAuth authentication.

In [None]:
async def codemode_example():
    async with mcp_client() as session:
        print("--- Executing Code Mode: Get Table Row Counts ---")
        
        # This single call replaces multiple individual tool calls
        code = """
        // Get row counts for all tables in a single operation
        const tables = await pg.core.listTables();
        
        // Limit to first 5 tables for demo
        const subset = tables.slice(0, 5);
        
        return Promise.all(subset.map(async t => {
            const result = await pg.core.readQuery({ 
                query: `SELECT COUNT(*) as count FROM ${t.schema}.${t.name}` 
            });
            return {
                table: `${t.schema}.${t.name}`,
                rowCount: result[0]?.count || 0
            };
        }));
        """
        
        result = await session.call_tool("pg_execute_code", {
            "code": code
        })
        
        print("Table Row Counts:")
        data = json.loads(result.content[0].text)
        for item in data:
            print(f"  {item['table']}: {item['rowCount']} rows")

if __name__ == "__main__" and 'get_ipython' in globals():
    await codemode_example()

## Next Steps

Now that you've seen the basics, explore more:

1. **Extension-Specific Tools**: pg_cron, pgcrypto, pg_partman, citext, ltree
2. **Monitoring Resources**: `postgres://activity`, `postgres://locks`, `postgres://vacuum`
3. **AI Prompts**: Use `pg_query_builder`, `pg_schema_design` for guided workflows
4. **Performance Tuning**: `pg_index_recommendations`, `pg_stat_statements`

See the [README](../../README.md) for full documentation of all 195 tools!