home directory: c-val
function availability : ./kubectl/functions.py

Now the job-runner.ipynb will perform the following tasks:
1. get_free_node_list()
    - save it to list - get_free_node_list[]
2. get_db_latest_status() 
    - Get latest test results timmestamp from validation.db for all the nodes in the db ( by accessing gcr-admin-pvc-access pod)
    - per node per test - latest timestamp
    - if a node has no test results - mark it with very old timestamp - highest priority
3. build_priority_queue()
    - Combine free nodes list with get_db_latest_status list, and create a priority queue function that takes 
        1. free nodes list
        2. db latest status
        3. Z days threshold
    - Returns priority queue
        1. Filered free nodes only
        2. skip nodes with test results not older than Z days 
        3. order by latest test results timestamps (oldest first - highest priority) 
    - Format of returned "job_priority_queue_list": [ nodename, priority_order, job_submission_status ]
        [
            [node1, 1, True],
            [node2, 2, False],
            ...
        ]
4. batch job submission
   - takes 
        1. batch size: N single node jobs per batch
        2. job queue list from build_priority_queue()
        3. job template yaml file path  ( /home/hari/b200/validation/c-val/ymls/specific-node-job.yml )
    - for each batch of N nodes
        1. read job template yaml file
        2. edit/ fill in 
            a. node name <node-name>
            b. job name hari-gcr-ceval-<node-name>-<timestamp>
        3. submit job to k8s cluster and repeat N times ( for batch size )
5. monitor job status
    - if a job pending for more than X minutes - cancel the job and update job_submission_status to canceled in job_priority_queue_list
For each node in job queue list
    - Create a job to run cluster-doctor validation tests on that node

6. Job run[Inside Job pod] 
    - git clone c-val repo to /opt/c-val
    - Run cluster-doctor tests on the pod/node and collect logs ( STDOUT/ STDERR) using tee
    - Upon completion of tests
        -Collect test results log ( STDOUT/ STDERR) and save it to /data/continuous_validation/<test-name>/<node-name>/<node-name>-<testname>-<timestamp>.log
    - Update validation.db with new test results and timestamp at /data/continuous_validation/metadata/validation.db using /opt/c-val/kubectl/functions.py/add_result_local()

7. Generate a daily report
    - Summary of nodes tested
    - Summary of test results
    - List of nodes that were never tested
    - Save report to ./gitignored/reports/daily_report_<date>.txt

In [12]:
import sys
import os
import time
import datetime
import importlib

# Add the current directory to path to ensure we can import utils
current_dir = os.path.dirname(os.path.abspath('__file__'))
if current_dir not in sys.path:
    sys.path.append(current_dir)

# Import the utility functions
try:
    import utils.functions as functions
    importlib.reload(functions) # Force reload to get new functions
except ImportError:
    # Fallback if running from a different context
    sys.path.append("/home/hari/b200/validation/c-val")
    import utils.functions as functions
    importlib.reload(functions)

home_dir = "/home/hari/b200/validation/c-val/"
batch_size = 2
monitor_timeout_mins = 2
template_path = os.path.join(home_dir, "ymls/specific-node-job.yml")
days_threshold = 7

class Cluster:
    def __init__(self, ns="gcr-admin"):
        self.ns = ns
        # numerical timestamp
        self.timestamp = int(time.time())
        self.freenode_list = []
        self.db_status = {}
        self.job_queue = []
        self.template_path = template_path
        self.home_dir = home_dir
        self.batch_size = batch_size
        self.monitor_timeout_mins = monitor_timeout_mins
        self.days_threshold = days_threshold
        
    def refresh_state(self):
        """
        Step 1 & 2: Get free nodes and latest DB status.
        """
        print(f"[{datetime.datetime.now().time()}] Refreshing cluster state...")
        
        # 1. Get Free Node List
        self.freenode_list = functions.get_free_node_list()
        print(f"  Found {len(self.freenode_list)} free nodes (fully avaialble).")
        
        # 2. Get DB Latest Status
        print("  Fetching DB status from cluster...")
        try:
            db_output = functions.get_db_latest_status(namespace=self.ns)
            self.db_status = functions.parse_db_status_output(db_output)
            print(f"  Retrieved status for {len(self.db_status)} nodes from DB.")
        except Exception as e:
            print(f"  Error fetching DB status: {e}")
            self.db_status = {}
            
    def build_priority_queue(self, days_threshold=None, shuffle=False):
        """
        Step 3: Build a priority queue filtering free nodes by age of last test.
        """
        if not self.freenode_list:
            print("No free nodes to queue.")
            self.job_queue = []
            return []

        print(f"[{datetime.datetime.now().time()}] Building priority queue (Threshold: {days_threshold} days, Shuffle: {shuffle})...")
        self.job_queue = functions.build_priority_queue(
            self.freenode_list, 
            self.db_status, 
            days_threshold=days_threshold,
            shuffle=shuffle
        )
        
        print(f"  Queue built: {len(self.job_queue)} jobs candidates.")
        return self.job_queue

    def run_batch(self, batch_size=batch_size, monitor_timeout_mins=monitor_timeout_mins, dry_run=False):
        """
        Step 4 & 5: Submit a batch of jobs AND monitor them.
        """
        if not self.job_queue:
            print("Job queue is empty.")
            return

        print(f"[{datetime.datetime.now().time()}] Processing batch (Size: {batch_size})...")
        
        pending_jobs = [j for j in self.job_queue if not j[2]]
        if not pending_jobs:
            print("  No pending jobs in queue.")
            return

        if not os.path.exists(self.template_path):
            print(f"  Error: Template not found at {self.template_path}")
            return
            
        with open(self.template_path, 'r') as f:
            template_content = f.read()

        active_batch_jobs = [] # format: {'job_name': str, 'node': str, 'start_time': float, 'item_ref': list}
        jobs_submitted_count = 0
        
        # --- SUBMISSION LOOP ---
        for job_info in pending_jobs:
            if jobs_submitted_count >= batch_size:
                break
                
            node_name = job_info[0]
            # Create Job Name
            ts = int(time.time())
            job_name = f"hari-gcr-ceval-{node_name}-{ts}"
            
            # YAML substitution
            # FIX: Correctly substitute placeholders found in templates/specific-node-job.yml
            # Previously used a hardcoded node name string which was incorrect for this template
            job_yaml = template_content.replace("nodename-placeholder", node_name)
            job_yaml = job_yaml.replace("time-placeholder", str(ts))
            
            # Replace job name placeholder
            job_yaml = job_yaml.replace("generateName: jobname-placeholder", f"name: {job_name}")
            
            print(f"  > Target: {node_name} | Job: {job_name}")
            
            if dry_run:
                print("    [Dry Run] Job would be submitted. (Marking as done in queue)")
                job_info[2] = True # Mark submitted mock
                jobs_submitted_count += 1
                continue
                
            # Create Temp File & Submit
            # Save to gitignored directory for debugging/inspection
            temp_dir = os.path.join(home_dir, "gitignored")
            os.makedirs(temp_dir, exist_ok=True)
            temp_path = os.path.join(temp_dir, f"{job_name}.yaml")
            
            try:
                with open(temp_path, 'w') as temp_f:
                    temp_f.write(job_yaml)
                out = functions.create_job(temp_path)
                print(f"    Submitted: {out.strip()}")
                
                # Update queue info status (submitted=True)
                job_info[2] = True
                
                active_batch_jobs.append({
                    'job_name': job_name,
                    'node': node_name,
                    'start_time': time.time(),
                    'item_ref': job_info # Reference to queue item to update status later if needed
                })
                jobs_submitted_count += 1
                
            except Exception as e:
                print(f"    Failed to submit: {e}")
            finally:
                # Keep the file for debugging since user requested "keep the directory"
                # if os.path.exists(temp_path):
                #    os.remove(temp_path)
                pass

        if dry_run:
            print("Batch dry-run complete.")
            return

        # --- MONITORING LOOP ---
        print(f"  Scanning {len(active_batch_jobs)} jobs for status (Timeout: {monitor_timeout_mins}m)...")
        timeout_seconds = monitor_timeout_mins * 60
        
        while len(active_batch_jobs) > 0:
            print(f"  [{datetime.datetime.now().time()}] Checking specific job statuses...")
            
            # Iterate backwards to remove finished jobs safely
            for i in range(len(active_batch_jobs) - 1, -1, -1):
                job = active_batch_jobs[i]
                jname = job['job_name']
                elapsed = time.time() - job['start_time']
                
                # Get Status
                status = functions.get_job_status(jname, namespace=self.ns)
                
                print(f"    [{jname}] Status: {status} (Elapsed: {elapsed:.0f}s)")
                
                # Logic: Succeeded / Failed / Completed -> Done
                if status in ["Completed", "Succeeded", "Failed", "Aborted", "Terminated"]:
                    print(f"    Job {jname}: {status}. Finished.")
                    active_batch_jobs.pop(i)
                elif status == "Pending":
                    # Check timeout
                    if elapsed > timeout_seconds:
                        print(f"    Job {jname}: Timed out ({elapsed:.0f}s > {timeout_seconds}s). Cancelling...")
                        functions.delete_job(jname, namespace=self.ns)
                        active_batch_jobs.pop(i)
                else:
                    # Running or Unknown
                    pass
            
            if not active_batch_jobs:
                break
                
            time.sleep(60) # Poll every minute
            
        print("Batch monitoring complete.")

    def process_full_queue(self, batch_size=batch_size, monitor_timeout_mins=monitor_timeout_mins, dry_run=False):
        """
        Runs multiple batches until the queue is empty.
        """
        print(f"[{datetime.datetime.now().time()}] Starting Full Queue Processing (Dry Run: {dry_run})...")
        
        while True:
            # Check if there are any pending jobs
            pending_jobs = [j for j in self.job_queue if not j[2]]
            if not pending_jobs:
                print("No more pending jobs in the queue. All done.")
                break
                
            print(f"\n--- Batch Start (Remaining: {len(pending_jobs)}) ---")
            self.run_batch(batch_size=batch_size, monitor_timeout_mins=monitor_timeout_mins, dry_run=dry_run)
            
            # Optional: Short pause between batches if not dry_run to allow cluster stabilization
            if not dry_run and len(pending_jobs) > batch_size:
                 time.sleep(10)

    def latest_test_results(self):
        """Helper to print human readable status from loaded DB map"""
        return self.db_status

    def freenodes(self):
        """Helper to return cached list"""
        return self.freenode_list

In [None]:
cluster = Cluster("gcr-admin")
cluster.refresh_state()
print(f"Free Nodes: {len(cluster.freenodes())}")
print(f"DB Records: {len(cluster.latest_test_results())}")

[15:58:26.977365] Refreshing cluster state...


In [None]:
# show latest test results
for node, record in list(cluster.latest_test_results().items())[:5]:
    print(f"  {node}: Last Test: {record['last_test_time']}, Status: {record['last_result']}")

In [14]:
# Build Queue (e.g. nodes not tested in last 2 days)
# Use shuffle=True to randomize the order of jobs
queue = cluster.build_priority_queue(days_threshold=2, shuffle=False)

print("\n Jobs in Queue:")
for item in queue:
    print(f"  {item[0]} (Last Tested: {item[1]})")

[15:26:55.585120] Building priority queue (Threshold: 2 days, Shuffle: False)...
  Queue built: 88 jobs candidates.

 Jobs in Queue:
  slc01-cl02-hgx-0006 (Last Tested: 1)
  slc01-cl02-hgx-0008 (Last Tested: 2)
  slc01-cl02-hgx-0009 (Last Tested: 3)
  slc01-cl02-hgx-0012 (Last Tested: 4)
  slc01-cl02-hgx-0023 (Last Tested: 5)
  slc01-cl02-hgx-0024 (Last Tested: 6)
  slc01-cl02-hgx-0027 (Last Tested: 7)
  slc01-cl02-hgx-0029 (Last Tested: 8)
  slc01-cl02-hgx-0031 (Last Tested: 9)
  slc01-cl02-hgx-0034 (Last Tested: 10)
  slc01-cl02-hgx-0054 (Last Tested: 11)
  slc01-cl02-hgx-0062 (Last Tested: 12)
  slc01-cl02-hgx-0073 (Last Tested: 13)
  slc01-cl02-hgx-0083 (Last Tested: 14)
  slc01-cl02-hgx-0085 (Last Tested: 15)
  slc01-cl02-hgx-0099 (Last Tested: 16)
  slc01-cl02-hgx-0107 (Last Tested: 17)
  slc01-cl02-hgx-0111 (Last Tested: 18)
  slc01-cl02-hgx-0115 (Last Tested: 19)
  slc01-cl02-hgx-0119 (Last Tested: 20)
  slc01-cl02-hgx-0124 (Last Tested: 21)
  slc01-cl02-hgx-0130 (Last Tested: 

In [16]:
# IMPORTANT: Rebuild queue before running if you want to start fresh or change shuffle mode.
# If you ran a dry_run previously, the queue might be marked as "processed".
queue = cluster.build_priority_queue(days_threshold=2, shuffle=False)

# Process the entire queue with batches. 
# It will run until all nodes in the priority queue have been tested.
# Set dry_run=False to actually submit jobs.
cluster.process_full_queue(batch_size=5, monitor_timeout_mins=3, dry_run=False) # Set dry_run=True for testing without submission

[15:54:35.678527] Building priority queue (Threshold: 2 days, Shuffle: False)...
  Queue built: 88 jobs candidates.
[15:54:35.678681] Starting Full Queue Processing (Dry Run: False)...

--- Batch Start (Remaining: 88) ---
[15:54:35.678699] Processing batch (Size: 5)...
  > Target: slc01-cl02-hgx-0006 | Job: hari-gcr-ceval-slc01-cl02-hgx-0006-1768262075
    Submitted: job.batch.volcano.sh/hari-gcr-ceval-slc01-cl02-hgx-0006-1768262075 created
  > Target: slc01-cl02-hgx-0008 | Job: hari-gcr-ceval-slc01-cl02-hgx-0008-1768262076
    Submitted: job.batch.volcano.sh/hari-gcr-ceval-slc01-cl02-hgx-0008-1768262076 created
  > Target: slc01-cl02-hgx-0009 | Job: hari-gcr-ceval-slc01-cl02-hgx-0009-1768262076
    Submitted: job.batch.volcano.sh/hari-gcr-ceval-slc01-cl02-hgx-0009-1768262076 created
  > Target: slc01-cl02-hgx-0012 | Job: hari-gcr-ceval-slc01-cl02-hgx-0012-1768262077
    Submitted: job.batch.volcano.sh/hari-gcr-ceval-slc01-cl02-hgx-0012-1768262077 created
  > Target: slc01-cl02-hgx-002

KeyboardInterrupt: 

In [None]:
# Debug: Check queue Status
print(f"Total jobs in queue: {len(cluster.job_queue)}")
submitted = [j for j in cluster.job_queue if j[2]]
print(f"Submitted jobs count: {len(submitted)}")
pending = [j for j in cluster.job_queue if not j[2]]
print(f"Pending jobs count: {len(pending)}")