# OSC Market Intelligence App - Component Testing

This notebook provides interactive testing of the main components of the Market Intelligence application, including comprehensive database testing.


## 1. Setup and Imports


In [1]:
# Import required modules
import sys
from pathlib import Path

# Ensure src is in the path
project_root = Path.cwd()
sys.path.insert(0, str(project_root))

from src.config import DatabricksConfig, DatabaseConfig, AppConfig, OSC_COLORS, OSC_FONTS
from src.databricks_client import (
    get_workspace_client,
    call_endpoint,
    get_user_info,
    format_response,
)
from src.database import (
    init_database,
    create_conversation,
    add_message,
    update_message,
    get_conversation_messages,
    get_message_by_query_id,
    get_user_conversations,
)

print("✅ All imports successful!")


✅ All imports successful!


## 2. Test Configuration


In [2]:
# Load configurations
app_config = AppConfig.from_config()
databricks_config = DatabricksConfig.from_config()
database_config = DatabaseConfig.from_config()

print("App Configuration:")
print(f"  Title: {app_config.title}")
print(f"  Layout: {app_config.layout}")
print(f"  Async Queries: {app_config.async_queries_enabled}")
print()
print("Databricks Configuration:")
print(f"  Host: {databricks_config.host}")
print(f"  Endpoint: {databricks_config.endpoint_name}")
print()
print("Database Configuration:")
print(f"  Instance Name: {getattr(database_config, 'instance_name', 'Not configured')}")
print(f"  Database: {getattr(database_config, 'database_name', getattr(database_config, 'name', 'Not configured'))}")
print()
print("Note: Databricks authentication uses SDK defaults (~/.databrickscfg or environment)")
print("Note: Database credentials come from environment variables (DB_USER, DB_PASSWORD)")


App Configuration:
  Title: OSC Market Intelligence
  Layout: wide
  Async Queries: True

Databricks Configuration:
  Host: https://e2-demo-field-eng.cloud.databricks.com
  Endpoint: mas-1ab024e9-endpoint

Database Configuration:
  Instance Name: osc-delete-nov1
  Database: databricks_postgres

Note: Databricks authentication uses SDK defaults (~/.databrickscfg or environment)
Note: Database credentials come from environment variables (DB_USER, DB_PASSWORD)


## 3. Initialize & Test Database

This section will:
- Test database connectivity
- Create required tables if they don't exist
- Verify the schema


In [3]:
from databricks.sdk import WorkspaceClient

# Initialize workspace client using DatabricksConfig credentials
client = WorkspaceClient(
    host=databricks_config.host,
    # Optionally, you could pass token/databricks_cfg_path, but defaults should work if env/config present
)

print(f"✅ Databricks WorkspaceClient initialized for host: {databricks_config.host}")



✅ Databricks WorkspaceClient initialized for host: https://e2-demo-field-eng.cloud.databricks.com


In [4]:
import uuid
import psycopg2

user = client.current_user.me()
_user_email = user.emails[0].value
instance_name = database_config.instance_name
instance = client.database.get_database_instance(name=instance_name)
_credential = client.database.generate_database_credential(
    request_id=str(uuid.uuid4()), instance_names=[instance_name]
)
_connection_params = {
    "host": instance.read_write_dns,
    "dbname": database_config.database_name,
    "user": _user_email,
    "password": _credential.token,
    "sslmode": "require",
}

conn = psycopg2.connect(**_connection_params)

In [5]:
from src.database import get_connection, init_database

In [6]:
conn = get_connection(database_config)

In [7]:
init_database(database_config)

In [8]:
# Test database operations - CRUD operations
print("=" * 60)
print("TESTING DATABASE OPERATIONS")
print("=" * 60)

# 1. Create a test conversation
print("\n1️⃣  Creating test conversation...")
test_user_id = "test_user@example.com"
conversation_id = create_conversation(database_config, test_user_id)
print(f"   ✅ Created conversation with ID: {conversation_id}")

# 2. Add a test message
print("\n2️⃣  Adding test message...")
message_id = add_message(
    database_config,
    conversation_id=conversation_id,
    user_id=test_user_id,
    question="What is the market outlook for 2024?",
    answer="The market shows positive indicators with steady growth expected...",
    status="complete"
)
print(f"   ✅ Added message with ID: {message_id}")

# 3. Add a pending message (simulating async query)
print("\n3️⃣  Adding pending message (async simulation)...")
pending_message_id = add_message(
    database_config,
    conversation_id=conversation_id,
    user_id=test_user_id,
    question="Analyze trade execution patterns?",
    status="pending",
    query_id="test-query-123"
)
print(f"   ✅ Added pending message with ID: {pending_message_id}")

# 4. Update the pending message
print("\n4️⃣  Updating pending message...")
update_message(
    database_config,
    message_id=pending_message_id,
    answer="Trade execution analysis complete: patterns show...",
    status="complete"
)
print(f"   ✅ Updated message {pending_message_id} to complete")

# 5. Retrieve all messages in conversation
print("\n5️⃣  Retrieving conversation messages...")
messages = get_conversation_messages(database_config, conversation_id)
print(f"   ✅ Retrieved {len(messages)} message(s)")

print("\n   📋 Conversation History:")
for i, msg in enumerate(messages, 1):
    print(f"\n   Message {i} (ID: {msg['id']})")
    print(f"   ├─ Question: {msg['question'][:60]}...")
    print(f"   ├─ Answer: {msg['answer'][:60] if msg['answer'] else 'N/A'}...")
    print(f"   ├─ Status: {msg['status']}")
    print(f"   ├─ Query ID: {msg['query_id'] or 'N/A'}")
    print(f"   └─ Created: {msg['created_at']}")

# 6. Test query_id lookup
print("\n6️⃣  Testing query_id lookup...")
message_by_query = get_message_by_query_id(database_config, "test-query-123")
if message_by_query:
    print(f"   ✅ Found message by query_id: {message_by_query['id']}")
else:
    print("   ⚠️  No message found for query_id")

# 7. Get user conversations
print("\n7️⃣  Retrieving user conversations...")
conversations = get_user_conversations(database_config, test_user_id)
print(f"   ✅ User has {len(conversations)} conversation(s)")

for conv in conversations:
    print(f"\n   Conversation {conv['id']}:")
    print(f"   ├─ Messages: {conv['message_count']}")
    print(f"   └─ Created: {conv['created_at']}")

print("\n" + "=" * 60)
print("✅ ALL DATABASE TESTS PASSED!")
print("=" * 60)


TESTING DATABASE OPERATIONS

1️⃣  Creating test conversation...
   ✅ Created conversation with ID: 5

2️⃣  Adding test message...
   ✅ Added message with ID: 4

3️⃣  Adding pending message (async simulation)...
   ✅ Added pending message with ID: 5

4️⃣  Updating pending message...
   ✅ Updated message 5 to complete

5️⃣  Retrieving conversation messages...
   ✅ Retrieved 2 message(s)

   📋 Conversation History:

   Message 1 (ID: 4)
   ├─ Question: What is the market outlook for 2024?...
   ├─ Answer: The market shows positive indicators with steady growth expe...
   ├─ Status: complete
   ├─ Query ID: N/A
   └─ Created: 2025-10-24 05:53:53.058736

   Message 2 (ID: 5)
   ├─ Question: Analyze trade execution patterns?...
   ├─ Answer: Trade execution analysis complete: patterns show......
   ├─ Status: complete
   ├─ Query ID: test-query-123
   └─ Created: 2025-10-24 05:54:09.358340

6️⃣  Testing query_id lookup...
   ✅ Found message by query_id: 2

7️⃣  Retrieving user conversations.

# OSC Market Intelligence App - Component Testing

This notebook provides interactive testing of the main components of the Market Intelligence application.


In [9]:
## 1. Setup and Imports
import sys
from pathlib import Path

# Ensure src is in the path
project_root = Path.cwd()
sys.path.insert(0, str(project_root))

from src.config import DatabricksConfig, DatabaseConfig, AppConfig, OSC_COLORS, OSC_FONTS
from src.databricks_client import (
    get_workspace_client,
    call_endpoint,
    get_user_info,
    format_response,
)
from src.database import (
    init_database,
    create_conversation,
    add_message,
    update_message,
    get_conversation_messages,
    get_user_conversations,
)

print("✅ All imports successful!")


✅ All imports successful!


In [None]:
## 4. Test Endpoint Call
test_question = "Summarize the Trade execution patterns and suspicious activity rates in the past 24 hours"

try:
    print(f"Sending question: {test_question}")
    print("\nCalling endpoint...")
    
    # The call_endpoint function now handles the host configuration internally
    response = call_endpoint(
        databricks_config,
        test_question
    )
    
    print("\n✅ Response received!")
    print("\nRaw response:")
    print(response)
    
    # Format the response
    formatted_answer = format_response(response)
    print("\nFormatted answer:")
    print(formatted_answer)
    
except Exception as e:
    print(f"\n❌ Error calling endpoint: {e}")
    print("\nThis could be due to:")
    print("  - Endpoint not running")
    print("  - Incorrect endpoint name in config.yaml")
    print("  - Network connectivity issues")
    print("  - Authentication problems")


Sending question: Summarize the Trade execution patterns and suspicious activity rates in the past 24 hours

Calling endpoint...


In [None]:
# Test conversation creation and message storage

# Create a test conversation
test_user_id = "test_user@example.com"
conversation_id = create_conversation(database_config, test_user_id)
print(f"✅ Created conversation with ID: {conversation_id}")

# Add a test message
message_id = add_message(
    database_config,
    conversation_id=conversation_id,
    user_id=test_user_id,
    question="What is the market outlook for 2024?",
    answer="The market shows positive indicators...",
    status="complete"
)
print(f"✅ Added message with ID: {message_id}")

# Retrieve messages
messages = get_conversation_messages(database_config, conversation_id)
print(f"\n✅ Retrieved {len(messages)} message(s)")

for msg in messages:
    print(f"\nMessage ID: {msg['id']}")
    print(f"  Question: {msg['question']}")
    print(f"  Answer: {msg['answer']}")
    print(f"  Status: {msg['status']}")
    print(f"  Created: {msg['created_at']}")

# Get user conversations
conversations = get_user_conversations(database_config, test_user_id)
print(f"\n✅ User has {len(conversations)} conversation(s)")


✅ Created conversation with ID: 2
✅ Added message with ID: 3

✅ Retrieved 1 message(s)

Message ID: 3
  Question: What is the market outlook for 2024?
  Answer: The market shows positive indicators...
  Status: complete
  Created: 2025-10-24 05:28:05.637313

✅ User has 2 conversation(s)


In [None]:
# Complete workflow test

print("Starting end-to-end test...\n")

# 1. Create a new conversation
user_id = 'test_id'
conv_id = create_conversation(database_config, user_id)
print(f"1. ✅ Created conversation: {conv_id}")

# 2. Ask a question
question = "What are the main regulatory priorities for 2024?"
print(f"2. 📝 Question: {question}")

# 3. Call the endpoint
print("3. 🔄 Calling Databricks endpoint...")
response = call_endpoint(databricks_config, question)
answer = format_response(response)
print(f"   ✅ Got response")

# 4. Save to database
msg_id = add_message(
    database_config,
    conversation_id=conv_id,
    user_id=user_id,
    question=question,
    answer=answer,
    status="complete"
)
print(f"4. ✅ Saved message: {msg_id}")

# 5. Retrieve and display
messages = get_conversation_messages(database_config, conv_id)
print(f"5. ✅ Retrieved conversation history")

print("\n" + "=" * 60)
print("CONVERSATION SUMMARY")
print("=" * 60)
for msg in messages:
    print(f"\n🤔 Question: {msg['question']}")
    print(f"\n💡 Answer: {msg['answer']}")
    print(f"\n📊 Status: {msg['status']}")
    print(f"⏰ Time: {msg['created_at']}")

print("\n" + "=" * 60)
print("✅ END-TO-END TEST SUCCESSFUL!")
print("=" * 60)

Starting end-to-end test...

1. ✅ Created conversation: 4
2. 📝 Question: What are the main regulatory priorities for 2024?
3. 🔄 Calling Databricks endpoint...
   ✅ Got response


ProgrammingError: can't adapt type 'dict'