# PVML SDK Comprehensive Example

This notebook demonstrates a complete workflow using the PVML SDK, showcasing:

1. **Environment Setup** - Loading configuration from .env.local
2. **Client Initialization** - Setting up the PVML client
3. **Workspace Management** - Working with workspaces and users
4. **Data Source Integration** - Connecting to databases
5. **AI Agent Creation** - Building and configuring AI agents
6. **MCP Integration** - Working with Model Context Protocol
7. **Interactive Queries** - Running queries through agents
8. **Session Management** - Managing conversation sessions
9. **Monitoring & Audit** - Tracking usage and performance

## Prerequisites

Before running this notebook, ensure you have:
- PVML SDK installed (`pip install -r requirements.txt`)
- A `.env.local` file with your PVML credentials
- Access to a PVML workspace
- Optional: A database for data source examples


In [None]:
# Environment Setup and Configuration
import os
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv


# Load environment variables from .env.local
def load_env_variables(env_file: str = ".env.local") -> bool:
    """Load environment variables from .env.local file using python-dotenv"""
    env_path = Path(env_file)
    
    if not env_path.exists():
        print(f"Warning: {env_file} not found. Please create it with your PVML credentials.")
        print("\nRequired format:")
        print("PVML_API_KEY=your_api_key_here")
        print("PVML_API_URL=your_api_url_here") 
        print("GPT_KEY=your_gpt_key_here")
        print("PVML_WORKSPACE_ID=your_workspace_id_here")
        print("PVML_DATASOURCE_ID=your_datasource_id_here")
        return False
    
    # Load environment variables from .env.local
    load_dotenv(env_path)
    print(f"Environment variables loaded from {env_file}")
    return True

# Load environment variables
env_loaded = load_env_variables()

# Validate required environment variables
required_vars = ["PVML_API_KEY", "PVML_API_URL", "GPT_KEY", "PVML_WORKSPACE_ID", "PVML_DATASOURCE_ID"]
missing_vars = [var for var in required_vars if not os.getenv(var)]

if missing_vars:
    raise ValueError(f"Missing required environment variables: {missing_vars}")

print("All required environment variables loaded successfully")
print(f"API URL: {os.getenv('PVML_API_URL')}")
print(f"Workspace ID: {os.getenv('PVML_WORKSPACE_ID')}")
print(f"Datasource ID: {os.getenv('PVML_DATASOURCE_ID')}")

print(f"\nConfiguration loaded at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


In [None]:
# Import PVML SDK and Initialize Client
from pvml import Client, Workspace, Agent, Datasource, LLM

# Initialize PVML client
def initialize_pvml_client() -> Client:
    """Initialize and return PVML client"""
    api_key = os.environ.get("PVML_API_KEY")
    if not api_key:
        raise ValueError("PVML_API_KEY not found in environment variables")

    client = Client(api_key=api_key)
    print("PVML client initialized successfully")
    return client

# Initialize client
pvml_client = initialize_pvml_client()
print("Client ready for use")


In [None]:
# Workspace Discovery and Connection
def get_workspace() -> Workspace:
    """Get the target workspace"""
    workspace_id = os.environ.get("PVML_WORKSPACE_ID")
    if not workspace_id:
        print("PVML_WORKSPACE_ID not found in environment variables")
        print("Available workspaces:")
        workspaces = pvml_client.get_workspaces()
        for ws_id, workspace in workspaces.items():
            print(f"  ID: {ws_id}, Name: {workspace.name}")
        raise ValueError("PVML_WORKSPACE_ID not found in environment variables")
    
    # Get specific workspace
    workspace = pvml_client.get_workspace(workspace_id)
    print(f"Connected to workspace: {workspace.name}")
    print(f"Workspace ID: {workspace.id}")
    print(f"Description: {workspace.description}")
    
    return workspace

# Connect to workspace
workspace = get_workspace()

# Get current user info
current_user = workspace.get_current_user()
print(f"\nCurrent user: {current_user.email}")
print(f"User role: {current_user.role}")
print(f"User type: {current_user.user_type}")


In [None]:
# Data Source Management
def get_datasource() -> Datasource:
    """Get the target datasource"""
    datasource_id = os.environ.get("PVML_DATASOURCE_ID")
    if not datasource_id:
        print("PVML_DATASOURCE_ID not found in environment variables")
        print("Available datasources:")
        datasources = workspace.get_datasources()
        for ds_id, datasource in datasources.items():
            print(f"  ID: {ds_id}, Name: {datasource.name}, Type: {datasource.type}")
        raise ValueError("PVML_DATASOURCE_ID not found in environment variables")
    
    # Get specific datasource
    datasource = workspace.get_datasource(datasource_id)
    print(f"Connected to datasource: {datasource.name}")
    print(f"Datasource ID: {datasource.id}")
    print(f"Type: {datasource.type}")
    print(f"Description: {datasource.description}")
    
    # Test datasource connection
    ping_result = datasource.ping()
    print(f"Connection test: {ping_result}")
    
    return datasource

# Connect to datasource
datasource = get_datasource()

# Get datasource schema information
print("\nDatasource Schema Information:")
schemas_tree = datasource.get_schemas_tree()
print(f"Available schemas: {len(schemas_tree)}")

# Display first few schemas
for i, schema_info in enumerate(schemas_tree):
    if i >= 3:  # Limit output
        print(f"... and {len(schemas_tree) - 3} more schemas")
        break
    schema_name = schema_info.get('schemaName', f'Schema {i+1}')
    print(f"  Schema: {schema_name}")
    if 'tables' in schema_info:
        table_count = len(schema_info['tables'])
        print(f"    Tables: {table_count}")
    # Show additional schema info if available
    if 'description' in schema_info:
        print(f"    Description: {schema_info['description']}")
    if 'type' in schema_info:
        print(f"    Type: {schema_info['type']}")


In [None]:
# Full Access Permission Setup
from pvml.policy import PolicyType
import json

def setup_full_access_permission():
    """Set up full access permission for the current user on the datasource"""
    print("Setting up full access permissions...")
    
    # Get current user
    current_user = workspace.get_current_user()
    print(f"Current user: {current_user.email}")
    
    # Create a full access policy for all schemas
    # This creates a data access policy that grants access to all schemas
    policy_data = [{"schema":"*","table":"*","column":"*"}]
    
    
    full_access_policy = datasource.create_policy(
        name="Full Access Policy",
        description="Complete access to all data in the datasource",
        policy_type=PolicyType.DATA_ACCESS,
        data=json.dumps(policy_data)
    )
    
    print(f"\nCreated full access policy:")
    print(f"Policy ID: {full_access_policy.id}")
    print(f"Name: {full_access_policy.name}")
    print(f"Type: {full_access_policy.type}")
    print(f"Description: {full_access_policy.description}")
    print(f"Data: {full_access_policy.data}")
    
    return full_access_policy

# Setup full access permission
policy = setup_full_access_permission()
print(f"\nFull access policy ready: {policy.name}")


In [None]:
# View Creation and Policy Assignment
def create_view_with_policy():
    """Create a view associated with the datasource and assign the policy"""
    print("Creating view with policy assignment...")
    
    # Create a view for the datasource
    view = workspace.create_view(
        name="Data Analysis View",
        description="Comprehensive view for data analysis with full access permissions",
        datasource_id=datasource.id
    )
    
    print(f"Created view: {view.name}")
    print(f"View ID: {view.id}")
    print(f"Datasource ID: {view.datasource_id}")
    print(f"Description: {view.description}")
    
    # Assign the full access policy to the view
    print(f"\nAssigning policy to view...")
    view.update_policies(policy_ids_to_add=[policy.id])
    print(f"Policy '{policy.name}' assigned to view '{view.name}'")
    
    # Get current user and assign them to the view
    current_user = workspace.get_current_user()
    print(f"\nAssigning current user to view...")
    view.update_entities(entities_to_add=[current_user])
    print(f"User '{current_user.email}' assigned to view '{view.name}'")
    
    # Verify the view setup
    print(f"\nView setup verification:")
    view_policies = view.get_policies()
    print(f"View has {len(view_policies)} policies assigned:")
    for policy_id, assigned_policy in view_policies.items():
        print(f"  - {assigned_policy.name} (ID: {policy_id})")
    
    view_entities = view.get_entities()
    print(f"View has {len(view_entities)} entities assigned:")
    for entity_id, entity in view_entities.items():
        print(f"  - {entity.entity_type.value} (ID: {entity_id})")
    
    return view

# Create view with policy
view = create_view_with_policy()
print(f"\nView ready for data analysis: {view.name}")


In [None]:
# MCP Retrieval
def get_view_mcp(_view_name: str):
    """Retrieve the MCP created with the same name as the view"""
    print(f"Retrieving MCP associated with view '{_view_name}'...")
    
    # Get all MCPs in the workspace
    all_mcps = workspace.get_mcps()
    print(f"Found {len(all_mcps)} MCPs in workspace")
    
    # Look for MCP with the same name as the view
    view_mcp = None
    for mcp_id, mcp in all_mcps.items():
        print(f"  MCP: {mcp.name} (ID: {mcp_id})")
        if mcp.name == _view_name:
            view_mcp = mcp
            print(f"    -> Found matching MCP for view!")
            break
    
    if not view_mcp:
        print(f"No MCP found with name '{_view_name}'")
        print("Available MCPs:")
        for mcp_id, mcp in all_mcps.items():
            print(f"  - {mcp.name} (ID: {mcp_id})")
        raise ValueError(f"MCP with name '{_view_name}' not found")
    
    print(f"\nRetrieved MCP: {view_mcp.name}")
    print(f"MCP ID: {view_mcp.id}")
    print(f"MCP Type: {view_mcp.type}")
    print(f"MCP URL: {view_mcp.url}")
    print(f"Description: {view_mcp.description}")
    print(f"Created by: {view_mcp.created_by}")
    print(f"Created at: {view_mcp.created_at}")
    
    return view_mcp

# Retrieve the MCP using the view name
mcp = get_view_mcp(view.name)
print(f"\nMCP ready: {mcp.name}")


In [None]:
[s['schemaName'] for s in view.get_view_tree()['schemas']]

In [None]:
# LLM Configuration and Management
def setup_llm() -> LLM:
    """Set up an LLM"""
    
    # Create new LLM if we have GPT_KEY
    gpt_key = os.environ.get("GPT_KEY")
    if not gpt_key:
        raise ValueError("No GPT_KEY provided")
    basic_props = LLM.create_basic_props(
        temperature=0.3
    )
    print("Creating new GPT LLM...")
    llm = workspace.create_llm(
        name="GPT-4 LLM",
        description="OpenAI GPT-4 model for AI agent interactions",
        vendor_name="openAi",
        model_name="gpt-4o-2024-11-20",
        token=gpt_key,
        props=basic_props
    )
    print(f"Created new LLM: {llm.name}")
    return llm

# Setup LLM
llm = setup_llm()

print(f"\nLLM ready: {llm.name}")
print(f"Vendor: {llm.vendor_name}")
print(f"Model: {llm.model_name}")
props = llm.props
print(f"Configuration: {props}")


In [None]:
# Agent Creation and Configuration
from pvml import MCP

def create_data_agent(_llm: LLM, _mcp: MCP) -> Agent:
    """Create a comprehensive data analysis agent with MCP access"""
    # Define agent prompt
    agent_prompt = pvml_client.get_default_prompts()['prompt']

    
    # Create agent with MCP access
    agent = workspace.create_agent(
        name="Data Analysis Agent",
        description="Comprehensive data analysis and reporting agent with database access via MCP",
        prompt=agent_prompt,
        llm_id=_llm.id,
        mcp_ids=[_mcp.id]  # Add the retrieved MCP
    )
    
    print(f"Created agent: {agent.name}")
    print(f"Agent ID: {agent.id}")
    print(f"LLM ID: {agent.llm_id}")
    print(f"MCP IDs: {agent.mcp_ids}")
    
    return agent

# Create agent with LLM and MCP
agent = create_data_agent(llm, mcp)

print(f"\nAgent ready: {agent.name}")
print(f"Description: {agent.description}")

# Get agent details
agent_details = workspace.get_agent(agent.id)
print(f"Created by: {agent_details.created_by}")
print(f"Creation time: {agent_details.creation_time}")

# Verify MCP assignment
# Verify MCP assignment and permissions
# permitted_mcps = workspace.get_permitted_mcps()
# print(f"Permitted MCPs: {permitted_mcps}")



In [None]:
# Interactive Query Examples
def run_sample_queries():
    """Run sample queries through the agent"""
    # Sample queries to demonstrate capabilities
    sample_queries = [
        "how many accounts do we have, retrieve single column named count"
    ]
    
    print("Running sample queries...\n")
    
    for i, query in enumerate(sample_queries, 1):
        print(f"Query {i}: {query}")
        print("-" * 50)
        
        # Generate response using the agent
        response = agent.generate(query)
        print(f"Response: {response}")
        print("\n" + "="*60 + "\n")

# Run sample queries
run_sample_queries()


In [None]:
# Session Management
def demonstrate_session_management():
    """Demonstrate agent session management for conversation continuity"""
    # Start a new session
    session = agent.start_session("Data Analysis Session")
    print(f"Started new session: {session.title}")
    print(f"Session ID: {session.id}")
    print(f"Agent ID: {session.agent_id}")
    
    # Have a conversation with context
    conversation_queries = [
        "how many accounts do we have, retrieve single column named count",
        # "accounts look at column account_id"
    ]
    
    print("\nStarting conversation with context...")
    
    for i, query in enumerate(conversation_queries, 1):
        print(f"\nQuery {i}: {query}")
        print("-" * 40)
        
        # Generate response in session context
        response = session.generate(query)
        print(f"Response: {response}")
    
    # Get session details
    print(f"\nSession Summary:")
    print(f"Title: {session.title}")
    print(f"Messages exchanged: {len(session.messages) if hasattr(session, 'messages') else 'N/A'}")
    print(f"Last modified: {session.last_modified}")
    
    return session

# Demonstrate session management
session = demonstrate_session_management()


In [None]:
# Monitoring and Audit
def demonstrate_monitoring():
    """Demonstrate monitoring and audit capabilities"""
    print("Querying audit logs...")
    
    # Get query audit information
    audit_results = workspace.get_query_audit(
        page_size=10,
        page_number=1
    )
    
    print(f"Found {len(audit_results)} audit records")
    
    # Display audit information
    for i, audit_record in enumerate(audit_results[:5], 1):  # Show first 5 records
        print(f"\nAudit Record {i}:")
        print(f"  Query: {audit_record.get('query', 'N/A')}")
        print(f"  User: {audit_record.get('userEmail', 'N/A')}")
        print(f"  User Question: {audit_record.get('userQuestion', 'N/A')}")
        print(f"  Duration: {audit_record.get('duration', 'N/A') /1000:.2f} s")
        print(f"  Status: {audit_record.get('status', 'N/A')}")
        print(f"  Error Type: {audit_record.get('errorType', 'N/A')}")

    if len(audit_results) > 5:
        print(f"\n... and {len(audit_results) - 5} more records")
    
    return audit_results

# Demonstrate monitoring
audit_data = demonstrate_monitoring()
