```
Copyright 2024 Control-M Jupyter Notebooks Project Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```

**Author:** Control-M Engineering Team  
**E-Mail:** orchestrator@bmc.com  
**Version:** 1.0.0  
**License:** Apache 2.0  

**Warranty Disclaimer:** This software is provided "AS IS" without warranty of any kind. Use at your own risk.

# Control-M Monitoring & Testing

Interactive monitoring notebook for testing agents and connection profiles.

**Instructions:** Run all cells in order, then use the interactive test buttons at the bottom.

In [None]:
import requests
import json
import os
import pandas as pd
from datetime import datetime
import ipywidgets as widgets
from IPython.display import display, clear_output
from typing import Dict, List, Any, Union, Optional, Tuple

# Widget variable declarations for type checking
agent_dropdown: widgets.Dropdown
cp_dropdown: widgets.Dropdown
output_area: widgets.Output

In [None]:
# Environment configuration
def load_env_file(filepath: str) -> Dict[str, str]:
    """Load environment variables from file"""
    env_vars: Dict[str, str] = {}
    try:
        with open(filepath, 'r') as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith('#') and '=' in line:
                    key, value = line.split('=', 1)
                    env_vars[key.strip()] = value.strip().strip('"').strip("'")
        return env_vars
    except Exception:
        return {}

# Load environment - relative paths for same folder
env_files = ['.env', '../.env']
env_vars: Dict[str, str] = {}
for env_file in env_files:
    env_vars = load_env_file(env_file)
    if env_vars:
        print(f"📁 Loaded: {env_file}")
        break

CTM_HOST: Optional[str] = os.getenv("WERKSTATT_CTM_HOST") or env_vars.get("WERKSTATT_CTM_HOST")
CTM_PORT: Optional[str] = os.getenv("WERKSTATT_CTM_PORT") or env_vars.get("WERKSTATT_CTM_PORT")
AUTH_TOKEN: Optional[str] = os.getenv("WERKSTATT_CTM_AAPI") or env_vars.get("WERKSTATT_CTM_AAPI")

BASE_URL: Optional[str] = f"https://{CTM_HOST}:{CTM_PORT}/automation-api" if CTM_HOST and CTM_PORT else None
headers: Dict[str, str] = {"x-api-key": AUTH_TOKEN or "", "Content-Type": "application/json", "Accept": "application/json"}

print(f"✅ CTM server: {BASE_URL}")
print(f"✅ Token: {'SET' if AUTH_TOKEN else 'NOT SET'}")

In [None]:
# Load inventory data - relative path for same folder
inventory_path = 'ctm-inventory.json'
try:
    with open(inventory_path, 'r') as f:
        inventory = json.load(f)
    
    print(f"📊 Inventory loaded from: {inventory_path}")
    print(f"  📅 Date: {inventory.get('date', 'Unknown')}")
    print(f"  🆔 UUID: {inventory.get('uuid', 'Unknown')}")
    print(f"  📊 Servers: {len(inventory.get('Servers', {}))}")
    print(f"  🤖 Agents: {len(inventory.get('Agents', {}))}")
    print(f"  🔗 Connection Profiles: {len(inventory.get('CCP', {}))}")
    
    # Extract data for dropdowns
    agents: Dict[str, Any] = inventory.get('Agents', {})
    connection_profiles: Dict[str, Any] = inventory.get('CCP', {})
    servers: Dict[str, Any] = inventory.get('Servers', {})
    
    print("\n✅ Inventory data ready for monitoring!")
    
except FileNotFoundError:
    print(f"❌ Inventory file not found: {inventory_path}")
    print("Please run the ctm.ipynb notebook first to generate the inventory.")
    agents = {}
    connection_profiles = {}
    servers = {}
except Exception as e:
    print(f"❌ Error loading inventory: {e}")
    agents = {}
    connection_profiles = {}
    servers = {}

In [None]:
# Display available agents and connection profiles summary
if agents and connection_profiles:
    print("📊 Available for Testing:")
    print(f"\n🤖 Agents ({len(agents)}):")
    for name, data in list(agents.items())[:10]:  # Show first 10
        status = data.get('status', 'Unknown')
        server = data.get('server', 'Unknown')
        print(f"  • {name} ({server}) - {status}")
    if len(agents) > 10:
        print(f"  ... and {len(agents) - 10} more agents")
    
    print(f"\n🔗 Connection Profiles ({len(connection_profiles)}):")
    for name, data in list(connection_profiles.items())[:10]:  # Show first 10
        cp_type = data.get('Type', 'Unknown')
        hostname = data.get('HostName', 'N/A')
        print(f"  • {name} ({cp_type}) - {hostname}")
    if len(connection_profiles) > 10:
        print(f"  ... and {len(connection_profiles) - 10} more profiles")
        
    print(f"\n⬇️ Run all cells above first, then use the interactive test controls below!")
else:
    print("❌ No inventory data available for testing.")

In [None]:
# Define test functions
def ping_agent(agent_name: str) -> Dict[str, Any]:
    """Ping the selected agent"""
    if not agent_name or agent_name not in agents:
        return {"error": "Invalid agent selected"}
    
    agent_data = agents[agent_name]
    server_name = agent_data.get('server', '')
    
    if not server_name:
        return {"error": "No server found for agent"}
    
    if not BASE_URL:
        return {"error": "No BASE_URL configured"}
    
    try:
        ping_url = f"{BASE_URL}/config/server/{server_name}/agent/{agent_name}/ping"
        ping_body = {"discover": False, "timeout": 60, "agentType": "Agent"}
        
        print(f"🏓 Pinging agent: {agent_name} on server: {server_name}")
        print(f"📡 URL: {ping_url}")
        
        response = requests.post(ping_url, headers=headers, json=ping_body, verify=True)
        
        result: Dict[str, Any] = {
            "agent": agent_name,
            "server": server_name,
            "status_code": response.status_code,
            "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        if response.status_code == 200:
            ping_data = response.json()
            result["success"] = True
            result["response"] = ping_data
            
            # Extract message
            if isinstance(ping_data, dict):
                message = ping_data.get('message', 'No message')
            elif isinstance(ping_data, list) and len(ping_data) > 0:
                if isinstance(ping_data[0], dict):
                    message = ping_data[0].get('message', 'No message')
                else:
                    message = str(ping_data[0])
            else:
                message = str(ping_data)
            
            result["message"] = message
            result["available"] = 'available' in message.lower() and 'unavailable' not in message.lower()
        else:
            result["success"] = False
            result["error"] = f"HTTP {response.status_code}: {response.text}"
            
        return result
        
    except Exception as e:
        return {
            "agent": agent_name,
            "server": server_name,
            "success": False,
            "error": str(e),
            "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }

def test_connection_profile(cp_name: str, agent_name: str) -> Dict[str, Any]:
    """Test the selected connection profile using the specified agent"""
    if not cp_name or cp_name not in connection_profiles:
        return {"error": "Invalid connection profile selected"}
    
    if not agent_name or agent_name not in agents:
        return {"error": "Invalid agent selected"}
    
    if not BASE_URL:
        return {"error": "No BASE_URL configured"}
    
    cp_data = connection_profiles[cp_name]
    agent_data = agents[agent_name]
    server_name = agent_data.get('server', '')
    cp_type_full = cp_data.get('Type', '')
    
    if not server_name:
        return {"error": "No server found for agent"}
    
    if not cp_type_full:
        return {"error": "No type found for connection profile"}
    
    # Extract only the first level after "ConnectionProfile:"
    # From "ConnectionProfile:FileTransfer:S3:Amazon" -> "FileTransfer"
    if cp_type_full.startswith('ConnectionProfile:'):
        type_parts = cp_type_full.split(':')
        if len(type_parts) >= 2:
            cp_type = type_parts[1]  # Get "FileTransfer" from "ConnectionProfile:FileTransfer:S3:Amazon"
        else:
            cp_type = cp_type_full
    else:
        cp_type = cp_type_full
    
    try:
        # URL pattern: /deploy/connectionprofile/centralized/test/:type/:name/:server/:agent
        test_url = f"{BASE_URL}/deploy/connectionprofile/centralized/test/{cp_type}/{cp_name}/{server_name}/{agent_name}"
        
        print(f"🔗 Testing connection profile: {cp_name} ({cp_type_full})")
        print(f"🔧 Extracted API type: {cp_type}")
        print(f"🤖 Using agent: {agent_name} on server: {server_name}")
        print(f"📡 URL: {test_url}")
        
        response = requests.post(test_url, headers=headers, verify=True)
        
        result: Dict[str, Any] = {
            "connection_profile": cp_name,
            "type_full": cp_type_full,
            "type_api": cp_type,
            "agent": agent_name,
            "server": server_name,
            "status_code": response.status_code,
            "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        if response.status_code == 200:
            test_data = response.json()
            result["success"] = True
            result["response"] = test_data
        else:
            result["success"] = False
            result["error"] = f"HTTP {response.status_code}: {response.text}"
            
        return result
        
    except Exception as e:
        return {
            "connection_profile": cp_name,
            "type_full": cp_type_full,
            "type_api": cp_type,
            "agent": agent_name,
            "server": server_name,
            "success": False,
            "error": str(e),
            "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }

print("🔧 Test functions defined and ready!")

In [None]:
# Define button click handlers
def on_ping_button_clicked(b: widgets.Button) -> None:
    """Handle ping button click"""
    with output_area:
        clear_output()
        selected_agent = agent_dropdown.value
        print(f"🏓 Starting agent ping test...\n")
        
        result = ping_agent(selected_agent)
        
        if result.get('success'):
            status_icon = "✅" if result.get('available') else "❌"
            print(f"{status_icon} Agent Ping Result:")
            print(f"  Agent: {result['agent']}")
            print(f"  Server: {result['server']}")
            print(f"  Status: {'Available' if result.get('available') else 'Unavailable'}")
            print(f"  Message: {result.get('message', 'No message')}")
            print(f"  Timestamp: {result['timestamp']}")
            print(f"\n📋 Full Response:")
            print(json.dumps(result['response'], indent=2))
        else:
            print(f"❌ Agent Ping Failed:")
            print(f"  Agent: {result.get('agent', 'Unknown')}")
            print(f"  Server: {result.get('server', 'Unknown')}")
            print(f"  Error: {result.get('error', 'Unknown error')}")
            print(f"  Timestamp: {result.get('timestamp', 'Unknown')}")

def on_test_cp_button_clicked(b: widgets.Button) -> None:
    """Handle connection profile test button click"""
    with output_area:
        clear_output()
        selected_agent = agent_dropdown.value
        selected_cp = cp_dropdown.value
        print(f"🔗 Starting connection profile test...\n")
        
        result = test_connection_profile(selected_cp, selected_agent)
        
        if result.get('success'):
            print(f"✅ Connection Profile Test Result:")
            print(f"  Connection Profile: {result['connection_profile']}")
            print(f"  Full Type: {result['type_full']}")
            print(f"  API Type: {result['type_api']}")
            print(f"  Agent: {result['agent']}")
            print(f"  Server: {result['server']}")
            print(f"  Status: Success")
            print(f"  Timestamp: {result['timestamp']}")
            print(f"\n📋 Full Response:")
            print(json.dumps(result['response'], indent=2))
        else:
            print(f"❌ Connection Profile Test Failed:")
            print(f"  Connection Profile: {result.get('connection_profile', 'Unknown')}")
            print(f"  Full Type: {result.get('type_full', 'Unknown')}")
            print(f"  API Type: {result.get('type_api', 'Unknown')}")
            print(f"  Agent: {result.get('agent', 'Unknown')}")
            print(f"  Server: {result.get('server', 'Unknown')}")
            print(f"  Error: {result.get('error', 'Unknown error')}")
            print(f"  Timestamp: {result.get('timestamp', 'Unknown')}")

print("🎯 Button handlers defined!")

---
# 🎛️ Interactive Testing Controls

**Make sure all cells above have been executed successfully before using these controls!**

In [None]:
# Create interactive widgets for testing - REQUIRES ALL CELLS ABOVE TO BE RUN FIRST
if agents and connection_profiles:
    # Agent selection dropdown
    agent_options = [(f"{name} ({data.get('server', '')}) - {data.get('status', 'Unknown')}", name) 
                     for name, data in agents.items()]
    agent_dropdown = widgets.Dropdown(
        options=agent_options,
        description='Agent:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='600px')
    )
    
    # Connection profile selection dropdown
    cp_options = [(f"{name} ({data.get('Type', 'Unknown')}) - {data.get('HostName', 'N/A')}", name) 
                  for name, data in connection_profiles.items()]
    cp_dropdown = widgets.Dropdown(
        options=cp_options,
        description='Connection Profile:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='600px')
    )
    
    # Test buttons
    ping_button = widgets.Button(
        description='🏓 Ping Agent',
        button_style='info',
        layout=widgets.Layout(width='150px')
    )
    
    test_cp_button = widgets.Button(
        description='🔗 Test Connection Profile',
        button_style='success',
        layout=widgets.Layout(width='200px')
    )
    
    # Output area
    output_area = widgets.Output()
    
    # Connect button handlers
    ping_button.on_click(on_ping_button_clicked)
    test_cp_button.on_click(on_test_cp_button_clicked)
    
    print("🎛️ Interactive Controls Ready:")
    display(widgets.VBox([
        widgets.HTML("<h3>Select Agent and Connection Profile for Testing</h3>"),
        agent_dropdown,
        cp_dropdown,
        widgets.HBox([ping_button, test_cp_button]),
        output_area
    ]))
else:
    print("⚠️ No agents or connection profiles found. Please run the inventory collection first.")