# Project Obsidian Core - Integration Test Notebook

This notebook verifies end-to-end data flow from databases through OpenTelemetry QAN processors to Druid and finally to Jupyter analysis.

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import json
from datetime import datetime, timedelta
import time

# Configure matplotlib
%matplotlib inline
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = (14, 8)

## Step 1: Verify Druid Connection

In [None]:
# Druid configuration
DRUID_HOST = 'druid-router'
DRUID_PORT = 8888
DRUID_URL = f'http://{DRUID_HOST}:{DRUID_PORT}'

# Helper function to execute Druid SQL queries
def query_druid(sql):
    try:
        response = requests.post(
            f'{DRUID_URL}/druid/v2/sql',
            headers={'Content-Type': 'application/json'},
            json={'query': sql, 'context': {'sqlQueryId': f'test-{int(time.time())}'}}
        )
        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error: {response.status_code} - {response.text}")
            return None
    except Exception as e:
        print(f"Exception: {e}")
        return None

# Test connection and list available tables
tables = query_druid("SHOW TABLES")
if tables:
    print("Successfully connected to Druid")
    print(f"Available tables: {tables}")
else:
    print("Failed to connect to Druid")

## Step 2: Check QAN Data Availability

In [None]:
# Define time range for analysis (last hour by default)
end_time = datetime.now()
start_time = end_time - timedelta(hours=1)

# Format for Druid SQL
start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S")

print(f"Analyzing data from {start_time_str} to {end_time_str}")

# Query for QAN data counts by database system
count_query = f"""
SELECT
  db.system AS database_system,
  COUNT(*) AS record_count
FROM qan_db
WHERE "__time" BETWEEN TIMESTAMP '{start_time_str}' AND TIMESTAMP '{end_time_str}'
GROUP BY db.system
"""

count_results = query_druid(count_query)
if count_results:
    df_counts = pd.DataFrame(count_results)
    print("QAN Data Records:")
    display(df_counts)
    
    # Check if we have data for both systems
    systems = set(df_counts['database_system'])
    if 'mysql' in systems and 'postgresql' in systems:
        print("✅ Successfully found QAN data for both MySQL and PostgreSQL")
    elif 'mysql' in systems:
        print("⚠️ Found QAN data for MySQL but not PostgreSQL")
    elif 'postgresql' in systems:
        print("⚠️ Found QAN data for PostgreSQL but not MySQL")
    else:
        print("❌ No QAN data found for either database system")
else:
    print("Failed to query QAN data counts")

## Step 3: Check MySQL Test Data

In [None]:
# Query for MySQL test queries
mysql_query = f"""
SELECT
  db.statement.sample AS query,
  SUM(db.query.calls.delta) AS execution_count,
  SUM(db.query.total_timer_wait.delta) / 1000000000 AS total_time_sec,
  db.schema AS schema_name
FROM qan_db
WHERE "__time" BETWEEN TIMESTAMP '{start_time_str}' AND TIMESTAMP '{end_time_str}'
  AND db.system = 'mysql'
  AND db.statement.sample LIKE '%test_e2e%'
GROUP BY db.statement.sample, db.schema
ORDER BY total_time_sec DESC
LIMIT 10
"""

mysql_results = query_druid(mysql_query)
if mysql_results:
    df_mysql = pd.DataFrame(mysql_results)
    print("MySQL Test Queries:")
    display(df_mysql)
    
    if not df_mysql.empty:
        print(f"✅ Successfully found integration test data in MySQL QAN records")
    else:
        print("❌ No integration test data found in MySQL QAN records")
else:
    print("Failed to query MySQL test data")

## Step 4: Check PostgreSQL Test Data

In [None]:
# Query PostgreSQL test queries
pg_query = f"""
SELECT
  db.statement.sample AS query,
  SUM(db.query.calls.delta) AS execution_count,
  SUM(db.query.total_exec_time.delta) / 1000 AS total_time_sec
FROM qan_db
WHERE "__time" BETWEEN TIMESTAMP '{start_time_str}' AND TIMESTAMP '{end_time_str}'
  AND db.system = 'postgresql'
  AND db.statement.sample LIKE '%products%'
GROUP BY db.statement.sample
ORDER BY total_time_sec DESC
LIMIT 10
"""

pg_results = query_druid(pg_query)
if pg_results:
    df_pg = pd.DataFrame(pg_results)
    print("PostgreSQL Test Queries:")
    display(df_pg)
    
    if not df_pg.empty:
        print(f"✅ Successfully found integration test data in PostgreSQL QAN records")
    else:
        print("❌ No integration test data found in PostgreSQL QAN records")
else:
    print("Failed to query PostgreSQL test data")

## Step 5: Query Execution Over Time

In [None]:
# Query execution trends over time
time_series_query = f"""
SELECT
  TIME_FLOOR("__time", 'PT1M') AS time_bucket,
  db.system AS database_system,
  SUM(db.query.calls.delta) AS query_count
FROM qan_db
WHERE "__time" BETWEEN TIMESTAMP '{start_time_str}' AND TIMESTAMP '{end_time_str}'
GROUP BY TIME_FLOOR("__time", 'PT1M'), db.system
ORDER BY time_bucket ASC
"""

time_series_results = query_druid(time_series_query)
if time_series_results:
    df_time = pd.DataFrame(time_series_results)
    
    if not df_time.empty:
        # Convert timestamp to datetime for better plotting
        df_time['time_bucket'] = pd.to_datetime(df_time['time_bucket'])
        
        # Plot time series
        plt.figure(figsize=(14, 8))
        
        # Plot each database system
        for db_system in df_time['database_system'].unique():
            db_data = df_time[df_time['database_system'] == db_system]
            plt.plot(db_data['time_bucket'], db_data['query_count'], marker='o', linestyle='-', label=db_system)
        
        plt.xlabel('Time')
        plt.ylabel('Query Execution Count')
        plt.title('Query Execution Counts Over Time (1-minute intervals)')
        plt.legend()
        plt.grid(True)
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
        
        print(f"✅ Successfully generated time series visualization of query execution data")
    else:
        print("❌ No time series data found")
else:
    print("Failed to query time series data")

## Step 6: Integration Test Summary

In [None]:
# Let's create a summary of all our tests
print("=== INTEGRATION TEST SUMMARY ===")
print("")

# 1. Druid Connection
druid_ok = tables is not None
print(f"{'✅' if druid_ok else '❌'} Druid Connection")

# 2. QAN Data Available
qan_data_ok = count_results is not None and not pd.DataFrame(count_results).empty
print(f"{'✅' if qan_data_ok else '❌'} QAN Data Available")

# 3. MySQL Test Data
mysql_data_ok = mysql_results is not None and not pd.DataFrame(mysql_results).empty
print(f"{'✅' if mysql_data_ok else '❌'} MySQL Test Data")

# 4. PostgreSQL Test Data
pg_data_ok = pg_results is not None and not pd.DataFrame(pg_results).empty
print(f"{'✅' if pg_data_ok else '❌'} PostgreSQL Test Data")

# 5. Time Series Data
time_series_ok = time_series_results is not None and not pd.DataFrame(time_series_results).empty
print(f"{'✅' if time_series_ok else '❌'} Time Series Data")

print("")
overall_success = all([druid_ok, qan_data_ok, mysql_data_ok, pg_data_ok, time_series_ok])
print(f"{'✅' if overall_success else '❌'} OVERALL TEST STATUS: {'PASSED' if overall_success else 'FAILED'}")

if overall_success:
    print("")
    print("Congratulations! The integration test has verified the full data flow:")
    print("1. Test data was generated in MySQL and PostgreSQL")
    print("2. Data was collected by OpenTelemetry QAN processors")
    print("3. Data was successfully ingested into Druid")
    print("4. JupyterLab successfully queried and visualized the data")
else:
    print("")
    print("Some tests failed. Please check the individual test results above for details.")