# OpenTrustEval Pipeline Monitoring & Analytics Dashboard

This notebook provides interactive monitoring, analytics, and SQL querying for the OpenTrustEval pipeline using the `pipeline_logs.db` database.

In [None]:
# Import Required Libraries
import pandas as pd
import sqlite3
import plotly.express as px
import plotly.graph_objects as go
from IPython.display import display, Markdown

In [None]:
# Plugin Analytics & Real-Time Monitoring
This section provides advanced analytics for each plugin and enables real-time monitoring of pipeline runs. Use the visualizations to compare plugin performance, detect anomalies, and monitor the system live. Uncomment the auto-refresh code to enable live updates.
# Export Logs to CSV
Export the pipeline logs to a CSV file for external analysis or reporting.
logs_df = load_logs()
logs_df.to_csv('pipeline_logs_export.csv', index=False)
display(Markdown('Logs exported to <code>pipeline_logs_export.csv</code> in the workspace.'))

In [None]:
# Export Logs to Excel and JSON
Export the pipeline logs to Excel and JSON formats for further analysis or sharing.
logs_df = load_logs()
logs_df.to_excel('pipeline_logs_export.xlsx', index=False)
logs_df.to_json('pipeline_logs_export.json', orient='records', lines=True)
display(Markdown('Logs exported to <code>pipeline_logs_export.xlsx</code> and <code>pipeline_logs_export.json</code> in the workspace.'))
# Email Alert for Recent Errors
Send an email alert if errors are detected in the most recent pipeline runs. (Requires configuration of SMTP credentials.)
import smtplib
from email.mime.text import MIMEText

def send_error_alert(email_to, email_from, smtp_server, smtp_port, smtp_user, smtp_pass, n=5):
    recent = load_logs().sort_values('timestamp', ascending=False).head(n)
    errors = recent[recent['error'].notnull()]
    if not errors.empty:
        msg = MIMEText(errors.to_string())
        msg['Subject'] = f'OpenTrustEval Pipeline Error Alert ({len(errors)} recent errors)'
        msg['From'] = email_from
        msg['To'] = email_to
        with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
            server.login(smtp_user, smtp_pass)
            server.sendmail(email_from, [email_to], msg.as_string())
        print(f"Alert sent to {email_to}")
    else:
        print("No recent errors to alert.")

# Example usage (fill in credentials):
# send_error_alert('to@example.com', 'from@example.com', 'smtp.example.com', 465, 'user', 'pass')
# Integration with External Tools
You can integrate pipeline logs with Slack, cloud dashboards, or monitoring systems. For example, use the Slack API to send alerts, or upload logs to a cloud storage bucket for BI dashboards. See documentation for code samples and best practices.

In [None]:
# Upload Logs to AWS S3
Upload the exported logs to an S3 bucket for cloud storage or integration with BI dashboards. Requires `boto3` and AWS credentials.
import boto3
from botocore.exceptions import NoCredentialsError

def upload_to_s3(file_path, bucket, object_name=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None):
    s3 = boto3.client('s3',
                      aws_access_key_id=aws_access_key_id,
                      aws_secret_access_key=aws_secret_access_key,
                      region_name=region_name)
    try:
        s3.upload_file(file_path, bucket, object_name or file_path)
        print(f"Uploaded {file_path} to s3://{bucket}/{object_name or file_path}")
    except NoCredentialsError:
        print("AWS credentials not found.")
    except Exception as e:
        print(f"Upload failed: {e}")

# Example usage (fill in credentials and bucket):
# upload_to_s3('pipeline_logs_export.csv', 'your-bucket', 'logs/pipeline_logs_export.csv', 'AKIA...', 'SECRET...', 'us-east-1')
# Send Logs to Webhook Endpoint
Send the exported logs to a webhook (e.g., for integration with monitoring or automation tools).
import requests

def send_logs_to_webhook(file_path, webhook_url):
    with open(file_path, 'rb') as f:
        files = {'file': (file_path, f)}
        response = requests.post(webhook_url, files=files)
        if response.status_code == 200:
            print(f"Logs sent to webhook: {webhook_url}")
        else:
            print(f"Webhook upload failed: {response.status_code} {response.text}")

# Example usage (fill in your webhook URL):
# send_logs_to_webhook('pipeline_logs_export.csv', 'https://your-webhook-endpoint')

In [None]:
# Upload Logs to Google Cloud Storage
Upload the exported logs to a Google Cloud Storage bucket. Requires `google-cloud-storage` and GCP credentials.
from google.cloud import storage

def upload_to_gcs(file_path, bucket_name, destination_blob_name, credentials_path=None):
    if credentials_path:
        storage_client = storage.Client.from_service_account_json(credentials_path)
    else:
        storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(file_path)
    print(f"Uploaded {file_path} to gs://{bucket_name}/{destination_blob_name}")

# Example usage (fill in credentials and bucket):
# upload_to_gcs('pipeline_logs_export.csv', 'your-bucket', 'logs/pipeline_logs_export.csv', 'path/to/credentials.json')
# Upload Logs to Azure Blob Storage
Upload the exported logs to Azure Blob Storage. Requires `azure-storage-blob` and Azure credentials.
from azure.storage.blob import BlobServiceClient

def upload_to_azure_blob(file_path, connection_string, container_name, blob_name):
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    with open(file_path, 'rb') as data:
        blob_client.upload_blob(data, overwrite=True)
    print(f"Uploaded {file_path} to Azure container {container_name} as {blob_name}")

# Example usage (fill in connection string and container):
# upload_to_azure_blob('pipeline_logs_export.csv', 'your-connection-string', 'your-container', 'logs/pipeline_logs_export.csv')
# Advanced Automation: Scheduled Exports
Automate log exports and uploads using cron jobs, Airflow, or cloud-native schedulers. For example, create a script that runs the export/upload code and schedule it to run hourly or daily. See documentation for setup instructions and best practices.

In [None]:
# Define Next Step Functionality: Load logs and run SQL queries
def load_logs(db_path='pipeline_logs.db'):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query('SELECT * FROM pipeline_logs', conn)
    conn.close()
    return df

def run_sql_query(query, db_path='pipeline_logs.db'):
    conn = sqlite3.connect(db_path)
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

In [None]:
# Pipeline Logs Summary
Display a summary of recent pipeline runs, including total runs, errors, and average timings.
import numpy as np
logs_df = load_logs()
if not logs_df.empty:
    total_runs = len(logs_df)
    error_count = logs_df['error'].notnull().sum() if 'error' in logs_df.columns else 0
    avg_time = logs_df['total_time'].mean() if 'total_time' in logs_df.columns else np.nan
    display(Markdown(f"**Total Runs:** {total_runs}  |  **Errors:** {error_count}  |  **Avg. Time (s):** {avg_time:.2f}"))
else:
    display(Markdown("No logs found in the database."))
# Visualize Pipeline Timings
Visualize the distribution of total pipeline execution times.
if not logs_df.empty and 'total_time' in logs_df.columns:
    fig = px.histogram(logs_df, x='total_time', nbins=30, title='Pipeline Execution Time Distribution (seconds)')
    fig.show()
# Visualize Resource Usage
Show CPU and memory usage per run if available.
if not logs_df.empty and 'cpu_usage' in logs_df.columns and 'memory_usage' in logs_df.columns:
    fig = px.scatter(logs_df, x='cpu_usage', y='memory_usage', color='total_time',
                     title='CPU vs Memory Usage per Run', labels={'cpu_usage':'CPU (%)', 'memory_usage':'Memory (MB)'})
    fig.show()
# Error Rate Over Time
Visualize error rate trends if error column exists.
if not logs_df.empty and 'timestamp' in logs_df.columns and 'error' in logs_df.columns:
    logs_df['date'] = pd.to_datetime(logs_df['timestamp']).dt.date
    error_trend = logs_df.groupby('date')['error'].apply(lambda x: x.notnull().mean())
    fig = px.line(error_trend, title='Error Rate Over Time')
    fig.update_layout(xaxis_title='Date', yaxis_title='Error Rate')
    fig.show()
# Interactive SQL Query
Run a custom SQL query on the pipeline logs database.
import ipywidgets as widgets
query_box = widgets.Textarea(value='SELECT * FROM pipeline_logs LIMIT 10', description='SQL Query:', layout=widgets.Layout(width='100%', height='80px'))
run_button = widgets.Button(description='Run Query')
output = widgets.Output()
def on_run_query_clicked(b):
    with output:
        output.clear_output()
        try:
            df = run_sql_query(query_box.value)
            display(df)
        except Exception as e:
            print(f"Error: {e}")
run_button.on_click(on_run_query_clicked)
display(query_box, run_button, output)

In [None]:
# Plugin-Specific Results Breakdown
Visualize and analyze results for each plugin (if plugin columns exist in logs).
plugin_cols = [col for col in logs_df.columns if col.startswith('plugin_')]
if plugin_cols:
    for col in plugin_cols:
        display(Markdown(f"### Results for {col}"))
        if logs_df[col].dtype == 'float' or logs_df[col].dtype == 'int':
            fig = px.histogram(logs_df, x=col, title=f'Distribution for {col}')
            fig.show()
        else:
            value_counts = logs_df[col].value_counts()
            fig = px.bar(x=value_counts.index, y=value_counts.values, labels={'x':col, 'y':'Count'}, title=f'Value Counts for {col}')
            fig.show()
else:
    display(Markdown("No plugin-specific columns found in logs."))
# Real-Time Log Monitoring (Experimental)
Display the most recent pipeline runs and auto-refresh every 10 seconds (if running in a Jupyter environment).
import time
from IPython.display import clear_output
import threading

def show_latest_logs(n=5):
    latest = load_logs().sort_values('timestamp', ascending=False).head(n)
    display(latest)

def auto_refresh_logs(interval=10, n=5, stop_after=60):
    start = time.time()
    while time.time() - start < stop_after:
        clear_output(wait=True)
        print(f"Showing latest {n} runs (auto-refreshing every {interval}s, stop after {stop_after}s)...")
        show_latest_logs(n)
        time.sleep(interval)
    print("Auto-refresh stopped.")

# Uncomment to enable auto-refresh (will run for 60 seconds):
# auto_refresh_logs(interval=10, n=5, stop_after=60)