# Monitor Active Jobs

This notebook monitors all active batches by polling Moody's API for job status updates.

**Purpose:**
- Find all ACTIVE batches across all cycles
- Poll Moody's API for job status updates
- Update job statuses in database
- Reconcile batch statuses
- Display monitoring summary

**Execution:**
- Run manually for immediate monitoring
- Schedule to run every 10 minutes for automated monitoring
- See README.md for scheduling instructions

In [1]:
%load_ext autoreload
%autoreload 2

import sys
from pathlib import Path
from datetime import datetime, timedelta

# Add workspace to path
workspace = Path.cwd().parent.parent
if str(workspace) not in sys.path:
    sys.path.insert(0, str(workspace))

from helpers import ux
from helpers.database import execute_query
from helpers.batch import recon_batch
from helpers.job import track_job_status
from helpers.constants import JobStatus, BatchStatus
from helpers.irp_integration import IRPClient

# Track monitoring start time
monitoring_start = datetime.now()
ux.header("Job Status Monitoring")
ux.info(f"Started: {monitoring_start.strftime('%Y-%m-%d %H:%M:%S')}")
ux.info("")

## 1) Find Active Batches

In [2]:
# Query for all ACTIVE batches
ux.subheader("Finding Active Batches")

query = """
    SELECT 
        b.id as batch_id,
        b.batch_type,
        b.configuration_id,
        b.submitted_ts,
        c.cycle_name,
        c.id as cycle_id
    FROM irp_batch b
    INNER JOIN irp_configuration cfg ON b.configuration_id = cfg.id
    INNER JOIN irp_cycle c ON cfg.cycle_id = c.id
    WHERE b.status = 'ACTIVE'
    ORDER BY b.submitted_ts DESC
"""

active_batches = execute_query(query)

if active_batches.empty:
    ux.info("No active batches found")
    ux.info("All jobs are either completed or not yet submitted")
    ux.info("")
    monitoring_end = datetime.now()
    duration = (monitoring_end - monitoring_start).total_seconds()
    ux.info(f"Completed in {duration:.1f} seconds")
else:
    # Display found batches
    ux.success(f"Found {len(active_batches)} active batch(es)")
    ux.info("")

    batch_display = []
    for _, batch in active_batches.iterrows():
        batch_display.append([
            batch['batch_id'],
            batch['cycle_name'],
            batch['batch_type'],
            batch['submitted_ts'].strftime('%Y-%m-%d %H:%M:%S')
        ])
    
    ux.table(batch_display, headers=["Batch ID", "Cycle", "Type", "Submitted"])
    ux.info("")

Batch ID,Cycle,Type,Submitted
6,Quarterly-202511-Analysis,EDM Creation,2025-11-13 16:12:18


## 2) Poll Jobs for Each Batch

In [3]:
# Track all jobs across all batches
if active_batches.empty:
    ux.info("No active batches to poll")
else:
    ux.subheader("Polling Job Statuses")
    
    total_jobs_tracked = 0
    total_status_changes = 0
    polling_errors = []
    status_transitions = []
    
    for _, batch_row in active_batches.iterrows():
        batch_id = int(batch_row['batch_id'])
        batch_type = batch_row['batch_type']
        
        ux.info(f"Batch {batch_id} ({batch_type}):")
        
        # Get all non-terminal jobs for this batch
        jobs_query = """
            SELECT id, status, moodys_workflow_id
            FROM irp_job
            WHERE batch_id = %s
              AND status IN ('SUBMITTED', 'QUEUED', 'PENDING', 'RUNNING', 'ERROR')
              AND skipped = FALSE
            ORDER BY id
        """
        
        jobs = execute_query(jobs_query, (batch_id,))
        
        if jobs.empty:
            ux.info("  No active jobs to track")
            continue
        
        ux.info(f"  Tracking {len(jobs)} job(s)...")
        
        # Track each job
        for _, job in jobs.iterrows():
            job_id = int(job['id'])
            old_status = job['status']
            
            try:
                # Call track_job_status (polls Moody's API)
                new_status = track_job_status(job_id, IRPClient())
                
                total_jobs_tracked += 1
                
                # Check if status changed
                if new_status != old_status:
                    total_status_changes += 1
                    status_transitions.append({
                        'job_id': job_id,
                        'batch_id': batch_id,
                        'old_status': old_status,
                        'new_status': new_status
                    })
                    ux.info(f"    Job {job_id}: {old_status} → {new_status}")
                
            except Exception as e:
                # Log error but continue with other jobs
                error_msg = f"Job {job_id}: {str(e)}"
                polling_errors.append(error_msg)
                ux.warning(f"    {error_msg}")
        
        ux.info("")
    
    ux.success(f"Tracked {total_jobs_tracked} job(s)")
    ux.info(f"Status changes: {total_status_changes}")
    
    if polling_errors:
        ux.warning(f"Polling errors: {len(polling_errors)}")
    
    ux.info("")

## 3) Reconcile Batch Statuses

In [4]:
# Run batch reconciliation for each active batch
if active_batches.empty:
    ux.info("No active batches to poll")
else:
    ux.subheader("Reconciling Batch Statuses")
    
    recon_results = []
    recon_errors = []
    
    for _, batch_row in active_batches.iterrows():
        batch_id = int(batch_row['batch_id'])
        batch_type = batch_row['batch_type']
        
        try:
            # Run reconciliation
            final_status = recon_batch(batch_id)
            
            recon_results.append({
                'batch_id': batch_id,
                'batch_type': batch_type,
                'status': final_status
            })
            
            # Display result
            if final_status != 'ACTIVE':
                ux.success(f"Batch {batch_id}: {final_status} ✓")
            else:
                ux.info(f"Batch {batch_id}: {final_status}")
            
        except Exception as e:
            error_msg = f"Batch {batch_id}: {str(e)}"
            recon_errors.append(error_msg)
            ux.warning(error_msg)
    
    ux.info("")
    
    # Display reconciliation summary
    if recon_results:
        completed_batches = [r for r in recon_results if r['status'] == 'COMPLETED']
        failed_batches = [r for r in recon_results if r['status'] == 'FAILED']
        active_batches_remaining = [r for r in recon_results if r['status'] == 'ACTIVE']
        
        if completed_batches:
            ux.success(f"Completed batches: {len(completed_batches)}")
        if failed_batches:
            ux.warning(f"Failed batches: {len(failed_batches)}")
        if active_batches_remaining:
            ux.info(f"Still active: {len(active_batches_remaining)}")
    
    if recon_errors:
        ux.warning(f"Reconciliation errors: {len(recon_errors)}")
    
    ux.info("")

## 4) Monitoring Summary

In [5]:
# Display comprehensive monitoring summary
if active_batches.empty:
    ux.info("No active batches to poll")
else:
    ux.header("Monitoring Summary")
    
    monitoring_end = datetime.now()
    duration = (monitoring_end - monitoring_start).total_seconds()
    
    summary_data = [
        ["Started", monitoring_start.strftime('%Y-%m-%d %H:%M:%S')],
        ["Completed", monitoring_end.strftime('%Y-%m-%d %H:%M:%S')],
        ["Duration", f"{duration:.1f} seconds"],
        ["", ""],
        ["Batches Found", len(active_batches)],
        ["Jobs Tracked", total_jobs_tracked],
        ["Status Changes", total_status_changes],
        ["", ""],
        ["Polling Errors", len(polling_errors)],
        ["Recon Errors", len(recon_errors)]
    ]
    
    ux.table(summary_data, headers=["Metric", "Value"])
    
    # Display status transitions if any
    if status_transitions:
        ux.subheader("\nStatus Transitions")
        transition_rows = []
        for t in status_transitions:
            transition_rows.append([
                t['job_id'],
                t['batch_id'],
                t['old_status'],
                t['new_status']
            ])
        ux.table(transition_rows, headers=["Job ID", "Batch ID", "From", "To"])
    
    # Display errors if any
    if polling_errors:
        ux.subheader("\nPolling Errors")
        for error in polling_errors:
            ux.warning(f"  • {error}")
    
    if recon_errors:
        ux.subheader("\nReconciliation Errors")
        for error in recon_errors:
            ux.warning(f"  • {error}")
    
    # Next run recommendation
    next_run = monitoring_end + timedelta(minutes=10)
    ux.info("")

Metric,Value
Started,2025-11-13 16:23:29
Completed,2025-11-13 16:23:33
Duration,3.7 seconds
,
Batches Found,1
Jobs Tracked,3
Status Changes,3
,
Polling Errors,0
Recon Errors,0


Job ID,Batch ID,From,To
101,6,RUNNING,FINISHED
103,6,RUNNING,FINISHED
104,6,RUNNING,FINISHED
