# HPC Job Management System - Comprehensive Guide

## Overview
This notebook provides a comprehensive analysis of High-Performance Computing (HPC) job management, including:
- Resource allocation strategies
- Job scheduling algorithms
- Performance monitoring and metrics
- Benchmarking methodologies
- Parallel execution strategies
- Advanced optimization techniques

**Target System**: HPCShell - An HPC-optimized shell for scientific computing

## Section 1: Resource Allocation and Job Configuration

### Key Concepts
Resource allocation is the foundation of HPC job management. It ensures that:
1. **Isolation**: Jobs don't interfere with each other
2. **Fairness**: Resources are distributed equitably
3. **Efficiency**: System resources are fully utilized
4. **Predictability**: Job performance is consistent

### Resource Types in HPCShell
- **CPU Cores**: Individual processor cores allocated to jobs
- **Memory**: RAM allocated with virtual memory limits
- **GPUs**: Graphics processors for compute acceleration
- **Time Limits**: Maximum runtime before auto-termination
- **I/O Bandwidth**: Disk and network bandwidth quotas

In [1]:
import sys
from pathlib import Path
import json
from dataclasses import dataclass, asdict
from enum import Enum
from typing import Dict, List, Optional
import time

# Add HPC shell to path
project_root = Path('c:/Users/Liver/OneDrive/Desktop/HPC_shell')
sys.path.insert(0, str(project_root))

# Example 1: Resource Configuration Class
@dataclass
class ResourceConfig:
    """Configuration for job resource allocation"""
    cores: int = 1
    memory_gb: float = 1.0
    gpu_count: int = 0
    time_limit_hours: float = 24.0
    disk_quota_gb: float = 100.0
    
    def to_dict(self) -> Dict:
        """Convert to dictionary for storage"""
        return asdict(self)
    
    @staticmethod
    def from_dict(data: Dict) -> 'ResourceConfig':
        """Create from dictionary"""
        return ResourceConfig(**data)
    
    def __str__(self):
        info = [f"Cores: {self.cores}"]
        info.append(f"Memory: {self.memory_gb:.1f} GB")
        if self.gpu_count > 0:
            info.append(f"GPUs: {self.gpu_count}")
        info.append(f"Time Limit: {self.time_limit_hours:.1f}h")
        return " | ".join(info)

# Example configurations
configs = {
    'light': ResourceConfig(cores=1, memory_gb=2.0, gpu_count=0),
    'standard': ResourceConfig(cores=4, memory_gb=8.0, gpu_count=0),
    'compute': ResourceConfig(cores=16, memory_gb=32.0, gpu_count=1),
    'gpu_intensive': ResourceConfig(cores=32, memory_gb=128.0, gpu_count=4, time_limit_hours=48.0),
}

print("=" * 70)
print("RESOURCE ALLOCATION CONFIGURATIONS")
print("=" * 70)
for name, config in configs.items():
    print(f"{name.upper():20s}: {config}")
print()

RESOURCE ALLOCATION CONFIGURATIONS
LIGHT               : Cores: 1 | Memory: 2.0 GB | Time Limit: 24.0h
STANDARD            : Cores: 4 | Memory: 8.0 GB | Time Limit: 24.0h
COMPUTE             : Cores: 16 | Memory: 32.0 GB | GPUs: 1 | Time Limit: 24.0h
GPU_INTENSIVE       : Cores: 32 | Memory: 128.0 GB | GPUs: 4 | Time Limit: 48.0h



## Section 2: Job Scheduling and Priority Management

### Scheduling Algorithms
- **FIFO (First In, First Out)**: Simple queue-based scheduling
- **Priority Queues**: Jobs scheduled by priority level
- **Fair-Share**: Allocate resources based on user/group shares
- **Backfill**: Fill gaps in schedule with smaller jobs
- **Gang Scheduling**: Co-schedule related jobs together

### Priority Levels
| Level | Value | Use Case |
|-------|-------|----------|
| CRITICAL | 0 | System maintenance, emergency tasks |
| HIGH | 1 | Interactive jobs, time-sensitive work |
| NORMAL | 2 | Regular batch jobs (default) |
| LOW | 3 | Background analysis, opportunistic work |

In [2]:
# Example 2: Advanced Scheduling with Priority and Fair-Share
from enum import Enum
from collections import defaultdict

class Priority(Enum):
    CRITICAL = 0
    HIGH = 1
    NORMAL = 2
    LOW = 3

@dataclass
class SchedulingPolicy:
    """Scheduling policy configuration"""
    algorithm: str  # 'fifo', 'priority', 'fairshare', 'backfill'
    priority_boost: float = 1.0  # Boost for older jobs
    max_backfill_jobs: int = 10
    fairshare_weight_cpu: float = 0.7
    fairshare_weight_memory: float = 0.3
    
class FairShareScheduler:
    """Implement fair-share scheduling"""
    
    def __init__(self, policy: SchedulingPolicy):
        self.policy = policy
        self.user_allocation = defaultdict(lambda: {'cpu_used': 0, 'memory_used': 0})
        self.user_quota = defaultdict(lambda: {'cpu': 100, 'memory': 1000})
    
    def calculate_fair_share(self, user: str) -> float:
        """Calculate fair-share score for user (0-1, higher is better)"""
        cpu_used = self.user_allocation[user]['cpu_used']
        cpu_quota = self.user_quota[user]['cpu']
        cpu_ratio = cpu_used / cpu_quota if cpu_quota > 0 else 0
        
        memory_used = self.user_allocation[user]['memory_used']
        memory_quota = self.user_quota[user]['memory']
        memory_ratio = memory_used / memory_quota if memory_quota > 0 else 0
        
        # Weighted combination
        fair_share = (
            (1 - cpu_ratio) * self.policy.fairshare_weight_cpu +
            (1 - memory_ratio) * self.policy.fairshare_weight_memory
        )
        return max(0.0, min(1.0, fair_share))
    
    def schedule_job(self, jobs: List[Dict], available_resources: Dict) -> Optional[Dict]:
        """Select next job to schedule based on fair-share"""
        candidates = []
        
        for job in jobs:
            if job['status'] == 'queued':
                user = job.get('user', 'unknown')
                fair_share = self.calculate_fair_share(user)
                candidates.append((fair_share, job))
        
        if not candidates:
            return None
        
        # Sort by fair-share (descending)
        candidates.sort(reverse=True, key=lambda x: x[0])
        selected = candidates[0][1]
        return selected

# Example simulation
scheduler = FairShareScheduler(SchedulingPolicy(algorithm='fairshare'))

# Simulate user allocations
scheduler.user_allocation['alice'] = {'cpu_used': 20, 'memory_used': 100}
scheduler.user_allocation['bob'] = {'cpu_used': 60, 'memory_used': 200}
scheduler.user_allocation['charlie'] = {'cpu_used': 10, 'memory_used': 50}

print("\n" + "=" * 70)
print("FAIR-SHARE SCHEDULING EXAMPLE")
print("=" * 70)
print(f"{'User':<15} {'CPU Used':<12} {'Memory Used':<12} {'Fair-Share':<10}")
print("-" * 70)

for user in ['alice', 'bob', 'charlie']:
    fair_share = scheduler.calculate_fair_share(user)
    cpu = scheduler.user_allocation[user]['cpu_used']
    mem = scheduler.user_allocation[user]['memory_used']
    print(f"{user:<15} {cpu:<12} {mem:<12} {fair_share:.2%}")
print()


FAIR-SHARE SCHEDULING EXAMPLE
User            CPU Used     Memory Used  Fair-Share
----------------------------------------------------------------------
alice           20           100          83.00%
bob             60           200          52.00%
charlie         10           50           91.50%



## Section 3: Job Dependencies and Arrays

### Job Dependencies
Jobs can depend on other jobs completing successfully before they start:
- **afterok**: Start after job X completes successfully
- **afternotok**: Start after job X fails
- **aftercorr**: Start after job X completes
- **singleton**: Wait for all previous jobs with same name

### Job Arrays
Run the same job with different parameters:
- Useful for parameter sweeps
- Thousands of similar jobs from single submission
- Automatic array index variable substitution

In [3]:
# Example 3: Job Dependencies and Arrays
from typing import Set, Tuple

@dataclass
class JobDependency:
    """Represents a job dependency"""
    job_id: int
    dependency_type: str  # 'afterok', 'afternotok', 'aftercorr'
    parent_job_id: int

class JobDependencyGraph:
    """Manages job dependencies as DAG"""
    
    def __init__(self):
        self.dependencies = {}  # job_id -> list of dependencies
        self.dependents = {}    # job_id -> list of dependent jobs
    
    def add_dependency(self, job_id: int, parent_id: int, dep_type: str = 'afterok'):
        """Add dependency: job_id depends on parent_id"""
        if job_id not in self.dependencies:
            self.dependencies[job_id] = []
        self.dependencies[job_id].append({'parent': parent_id, 'type': dep_type})
        
        if parent_id not in self.dependents:
            self.dependents[parent_id] = []
        self.dependents[parent_id].append(job_id)
    
    def can_run(self, job_id: int, completed_jobs: Set[int], failed_jobs: Set[int]) -> bool:
        """Check if job can run based on dependencies"""
        if job_id not in self.dependencies:
            return True
        
        for dep in self.dependencies[job_id]:
            parent_id = dep['parent']
            dep_type = dep['type']
            
            if dep_type == 'afterok':
                # Parent must complete successfully
                if parent_id not in completed_jobs:
                    return False
                if parent_id in failed_jobs:
                    return False
            elif dep_type == 'afternotok':
                # Parent must fail
                if parent_id not in failed_jobs:
                    return False
            elif dep_type == 'aftercorr':
                # Parent must complete (success or fail)
                if parent_id not in completed_jobs and parent_id not in failed_jobs:
                    return False
        
        return True

# Example DAG workflow
dag = JobDependencyGraph()

# Workflow: preprocessing -> simulation -> post-processing -> analysis
jobs = {
    1: {'name': 'preprocess_data', 'status': 'completed'},
    2: {'name': 'run_simulation', 'status': 'pending'},
    3: {'name': 'postprocess', 'status': 'pending'},
    4: {'name': 'analyze_results', 'status': 'pending'},
}

# Add dependencies
dag.add_dependency(2, 1, 'afterok')      # simulation after preprocessing
dag.add_dependency(3, 2, 'afterok')      # postprocess after simulation
dag.add_dependency(4, 3, 'afterok')      # analysis after postprocessing

print("\n" + "=" * 70)
print("JOB DEPENDENCY GRAPH (DAG)")
print("=" * 70)
print("\nWorkflow: preprocessing -> simulation -> postprocessing -> analysis\n")

completed = {1}
failed = set()

for job_id in range(1, 5):
    can_run = dag.can_run(job_id, completed, failed)
    status = "✓ CAN RUN" if can_run else "⏳ WAITING"
    print(f"Job {job_id}: {jobs[job_id]['name']:<25} {status}")

# Example job array
print("\n" + "=" * 70)
print("JOB ARRAY EXAMPLE")
print("=" * 70)
print("\nJob array: process_data[1-10] processes 10 input files\n")
print(f"{'Array Index':<15} {'Input File':<25} {'Command'}")
print("-" * 65)

for i in range(1, 11):
    input_file = f"data/input_{i:02d}.txt"
    output_file = f"data/output_{i:02d}.txt"
    cmd = f"python process.py {input_file} > {output_file}"
    print(f"{i:<15} {input_file:<25} {cmd}")
print()


JOB DEPENDENCY GRAPH (DAG)

Workflow: preprocessing -> simulation -> postprocessing -> analysis

Job 1: preprocess_data           ✓ CAN RUN
Job 2: run_simulation            ✓ CAN RUN
Job 3: postprocess               ⏳ WAITING
Job 4: analyze_results           ⏳ WAITING

JOB ARRAY EXAMPLE

Job array: process_data[1-10] processes 10 input files

Array Index     Input File                Command
-----------------------------------------------------------------
1               data/input_01.txt         python process.py data/input_01.txt > data/output_01.txt
2               data/input_02.txt         python process.py data/input_02.txt > data/output_02.txt
3               data/input_03.txt         python process.py data/input_03.txt > data/output_03.txt
4               data/input_04.txt         python process.py data/input_04.txt > data/output_04.txt
5               data/input_05.txt         python process.py data/input_05.txt > data/output_05.txt
6               data/input_06.txt         p

## Section 4: Performance Monitoring and Metrics

### Key Metrics
1. **CPU Metrics**
   - Utilization per core (%)
   - Context switches per second
   - Cache hit rates

2. **Memory Metrics**
   - Resident Set Size (RSS)
   - Virtual Memory Size (VMS)
   - Page fault rates

3. **I/O Metrics**
   - Read/write bandwidth (MB/s)
   - IOPS (operations per second)
   - Latency (ms)

4. **System Metrics**
   - Load average
   - Temperature
   - Power consumption

In [4]:
# Example 4: Performance Monitoring System
from collections import deque
import statistics

@dataclass
class PerformanceMetrics:
    """Container for performance metrics"""
    timestamp: float
    cpu_percent: float
    memory_mb: float
    io_read_mb: float
    io_write_mb: float
    network_mb: float = 0.0
    gpu_util: float = 0.0
    temp_c: float = 0.0

class PerformanceMonitor:
    """Monitor and analyze job performance"""
    
    def __init__(self, window_size: int = 60):
        self.window_size = window_size
        self.metrics = deque(maxlen=window_size)
        self.anomalies = []
    
    def record_metrics(self, metrics: PerformanceMetrics):
        """Record performance snapshot"""
        self.metrics.append(metrics)
    
    def get_statistics(self) -> Dict:
        """Calculate statistics over window"""
        if not self.metrics:
            return {}
        
        cpu_values = [m.cpu_percent for m in self.metrics]
        mem_values = [m.memory_mb for m in self.metrics]
        
        return {
            'cpu': {
                'mean': statistics.mean(cpu_values),
                'max': max(cpu_values),
                'min': min(cpu_values),
                'stdev': statistics.stdev(cpu_values) if len(cpu_values) > 1 else 0,
            },
            'memory': {
                'mean': statistics.mean(mem_values),
                'max': max(mem_values),
                'min': min(mem_values),
            },
            'sample_count': len(self.metrics)
        }
    
    def detect_anomalies(self) -> List[Dict]:
        """Detect performance anomalies"""
        stats = self.get_statistics()
        anomalies = []
        
        if not stats or len(self.metrics) < 5:
            return anomalies
        
        cpu_mean = stats['cpu']['mean']
        cpu_stdev = stats['cpu']['stdev']
        
        # Check for unusual CPU spikes (> 2 sigma)
        threshold = cpu_mean + (2 * cpu_stdev)
        
        for i, metric in enumerate(list(self.metrics)[-5:]):
            if metric.cpu_percent > threshold:
                anomalies.append({
                    'type': 'cpu_spike',
                    'value': metric.cpu_percent,
                    'threshold': threshold,
                    'time': metric.timestamp
                })
        
        return anomalies

# Simulate performance data
monitor = PerformanceMonitor(window_size=10)

import random
current_time = time.time()

# Normal operation followed by spike
for i in range(8):
    metrics = PerformanceMetrics(
        timestamp=current_time + i,
        cpu_percent=20 + random.gauss(0, 2),
        memory_mb=512 + random.gauss(0, 10),
        io_read_mb=5.0 + random.gauss(0, 0.5),
        io_write_mb=3.0 + random.gauss(0, 0.3),
    )
    monitor.record_metrics(metrics)

# Add anomaly
anomaly_metrics = PerformanceMetrics(
    timestamp=current_time + 8,
    cpu_percent=85.0,  # CPU spike
    memory_mb=520,
    io_read_mb=12.0,
    io_write_mb=15.0,
)
monitor.record_metrics(anomaly_metrics)

print("\n" + "=" * 70)
print("PERFORMANCE MONITORING ANALYSIS")
print("=" * 70)

stats = monitor.get_statistics()
print(f"\nCPU Usage Statistics (last {monitor.window_size} samples):")
print(f"  Mean:   {stats['cpu']['mean']:.1f}%")
print(f"  Max:    {stats['cpu']['max']:.1f}%")
print(f"  Min:    {stats['cpu']['min']:.1f}%")
print(f"  StdDev: {stats['cpu']['stdev']:.1f}%")

print(f"\nMemory Usage Statistics:")
print(f"  Mean:   {stats['memory']['mean']:.1f} MB")
print(f"  Max:    {stats['memory']['max']:.1f} MB")
print(f"  Min:    {stats['memory']['min']:.1f} MB")

anomalies = monitor.detect_anomalies()
print(f"\nAnomalies Detected: {len(anomalies)}")
for anomaly in anomalies:
    print(f"  - {anomaly['type']}: {anomaly['value']:.1f}% (threshold: {anomaly['threshold']:.1f}%)")
print()


PERFORMANCE MONITORING ANALYSIS

CPU Usage Statistics (last 10 samples):
  Mean:   26.8%
  Max:    85.0%
  Min:    17.1%
  StdDev: 21.9%

Memory Usage Statistics:
  Mean:   515.2 MB
  Max:    523.2 MB
  Min:    504.4 MB

Anomalies Detected: 1
  - cpu_spike: 85.0% (threshold: 70.6%)



## Section 5: Benchmarking CPU, Memory, and I/O

### CPU Benchmarking Metrics
- **FLOPS**: Floating-point operations per second
- **Integer Operations**: Fixed-point arithmetic throughput
- **Matrix Multiplication**: Cache efficiency indicator
- **Memory Bandwidth**: Memory subsystem throughput

### Memory Benchmarking
- **Sequential Access**: Linear memory bandwidth
- **Random Access**: Cache effects on latency
- **Strided Access**: NUMA locality effects

### I/O Benchmarking
- **Sequential Read/Write**: Peak throughput
- **Random I/O**: IOPS performance
- **Mixed Workload**: Real-world patterns

In [5]:
# Example 5: Comprehensive Benchmarking
import math

@dataclass
class BenchmarkResult:
    """Results from a single benchmark run"""
    name: str
    score: float
    unit: str
    timestamp: float = field(default_factory=time.time)

class BenchmarkSuite:
    """Suite of system benchmarks"""
    
    def __init__(self):
        self.results = []
    
    def cpu_flops_benchmark(self, duration: float = 1.0) -> BenchmarkResult:
        """Benchmark floating-point operations"""
        iterations = 0
        a, b, c = 1.5, 2.5, 3.5
        
        start = time.time()
        end_time = start + duration
        
        while time.time() < end_time:
            for _ in range(1000):
                a = a * b + c
                b = b * c + a
                c = c * a + b
            iterations += 1000
        
        elapsed = time.time() - start
        flops = (iterations * 6) / elapsed  # 6 FLOPs per iteration
        
        return BenchmarkResult(
            name='CPU FLOPS',
            score=flops / 1e9,  # Convert to GFLOPs
            unit='GFLOPS'
        )
    
    def memory_bandwidth_benchmark(self, size_mb: int = 100) -> BenchmarkResult:
        """Benchmark memory bandwidth"""
        array = bytearray(size_mb * 1024 * 1024)
        
        start = time.time()
        # Simulate sequential memory access
        sum_val = 0
        for i in range(0, len(array), 64):  # 64-byte stride (cache line)
            sum_val += array[i]
        elapsed = time.time() - start
        
        bandwidth = (size_mb * 1024 * 1024) / (elapsed * 1e9)
        
        return BenchmarkResult(
            name='Memory Bandwidth',
            score=bandwidth,
            unit='GB/s'
        )
    
    def io_sequential_benchmark(self, file_size_mb: int = 100) -> Dict:
        """Benchmark sequential I/O"""
        import tempfile
        
        # Write test
        with tempfile.NamedTemporaryFile(delete=False) as f:
            temp_file = f.name
            start = time.time()
            bytes_written = 0
            while bytes_written < file_size_mb * 1024 * 1024:
                chunk = b'x' * (1024 * 1024)  # 1MB chunks
                f.write(chunk)
                bytes_written += len(chunk)
            write_time = time.time() - start
        
        # Read test
        start = time.time()
        with open(temp_file, 'rb') as f:
            while f.read(1024 * 1024):
                pass
        read_time = time.time() - start
        
        # Cleanup
        import os
        os.unlink(temp_file)
        
        return {
            'read_bandwidth': (file_size_mb * 1024 * 1024) / (read_time * 1e9),
            'write_bandwidth': (file_size_mb * 1024 * 1024) / (write_time * 1e9),
            'unit': 'GB/s'
        }

# Run benchmarks
suite = BenchmarkSuite()

print("\n" + "=" * 70)
print("SYSTEM BENCHMARK SUITE")
print("=" * 70)

# CPU benchmark
cpu_result = suite.cpu_flops_benchmark(duration=0.5)
print(f"\nCPU Performance: {cpu_result.score:.2f} {cpu_result.unit}")

# Memory benchmark
mem_result = suite.memory_bandwidth_benchmark(size_mb=50)
print(f"Memory Bandwidth: {mem_result.score:.2f} {mem_result.unit}")

# I/O benchmark (skip for notebooks to save time)
print(f"I/O Sequential: (simulated) Read: 500.0 MB/s, Write: 450.0 MB/s")

# Overall score calculation
print("\n" + "-" * 70)
print("Benchmark Summary")
print("-" * 70)

scores = {
    'CPU (normalized)': cpu_result.score / 10.0,  # Normalize to 0-10
    'Memory (normalized)': mem_result.score / 20.0,  # Normalize
}

overall = sum(scores.values()) / len(scores)
print(f"Overall Score: {overall:.1f}/10")
print()

NameError: name 'field' is not defined

## Section 6: Parallel Execution Strategies

### Parallelization Models
1. **Data Parallelism**: Process different data chunks in parallel
2. **Task Parallelism**: Run different tasks simultaneously
3. **Pipeline Parallelism**: Chain operations across cores
4. **SIMD**: Single instruction, multiple data operations
5. **MPI (Message Passing Interface)**: Distributed memory parallelism

### GNU Parallel-like Syntax
```bash
# Process multiple files
parallel process_file {} ::: file1 file2 file3

# Use multiple cores
parallel --jobs 4 command {} ::: input1 input2 input3

# Chain operations
parallel 'preprocess {} | analyze | generate_report' ::: data_*.csv
```

### Load Balancing
- **Static**: Pre-assign work to cores
- **Dynamic**: Distribute work as cores become available
- **Guided**: Gradually reduce chunk size as load balances

In [None]:
# Example 6: Parallel Execution Engine
from multiprocessing import Pool, cpu_count
from typing import Callable, Iterable

class ParallelExecutor:
    """Execute tasks in parallel with load balancing"""
    
    def __init__(self, num_workers: int = None):
        self.num_workers = num_workers or cpu_count()
        self.completed = 0
        self.failed = 0
    
    def map_function(self, func: Callable, data: Iterable) -> List:
        """Map function over data using multiprocessing"""
        results = []
        
        try:
            with Pool(processes=self.num_workers) as pool:
                results = pool.map(func, data)
            self.completed = len(results)
        except Exception as e:
            self.failed += 1
            print(f"Error in parallel execution: {e}")
        
        return results
    
    def dynamic_load_balancing(self, tasks: List[Dict], num_cores: int) -> List[List[Dict]]:
        """
        Distribute tasks using dynamic load balancing.
        Returns list of task groups for each core.
        """
        work_queues = [[] for _ in range(num_cores)]
        task_times = {task['id']: task.get('estimated_time', 1.0) for task in tasks}
        
        # Greedy assignment: assign to least loaded core
        for task in sorted(tasks, key=lambda t: t_times[t['id']], reverse=True):
            min_idx = min(range(num_cores), key=lambda i: sum(task_times[t['id']] for t in work_queues[i]))
            work_queues[min_idx].append(task)
        
        return work_queues

# Example: Processing files in parallel
def process_file(filename):
    """Simulate file processing"""
    time.sleep(0.1)
    return f"Processed {filename}"

# Simulate parallel execution
executor = ParallelExecutor(num_workers=4)

test_files = [f"file_{i}.txt" for i in range(8)]

print("\n" + "=" * 70)
print("PARALLEL EXECUTION EXAMPLE")
print("=" * 70)
print(f"\nProcessing {len(test_files)} files with {executor.num_workers} workers")
print(f"Files: {test_files[:3]}... (showing first 3 of {len(test_files)})")

# Sequential execution
print("\nSequential execution:")
start = time.time()
seq_results = [process_file(f) for f in test_files[:3]]
seq_time = time.time() - start
print(f"  Time: {seq_time:.2f}s")

# Parallel execution
print(f"\nParallel execution ({executor.num_workers} workers):")
start = time.time()
par_results = executor.map_function(process_file, test_files)
par_time = time.time() - start
print(f"  Time: {par_time:.2f}s")
print(f"  Speedup: {seq_time / par_time:.1f}x")

# Load balancing example
print("\n" + "-" * 70)
print("Dynamic Load Balancing")
print("-" * 70)

tasks = [
    {'id': 1, 'name': 'simulation_1', 'estimated_time': 10.0},
    {'id': 2, 'name': 'analysis_1', 'estimated_time': 5.0},
    {'id': 3, 'name': 'simulation_2', 'estimated_time': 15.0},
    {'id': 4, 'name': 'analysis_2', 'estimated_time': 3.0},
    {'id': 5, 'name': 'simulation_3', 'estimated_time': 8.0},
]

queues = executor.dynamic_load_balancing(tasks, num_cores=2)

for core_idx, queue in enumerate(queues):
    total_time = sum(t['estimated_time'] for t in queue)
    print(f"\nCore {core_idx}: {total_time:.1f}s")
    for task in queue:
        print(f"  - {task['name']}: {task['estimated_time']:.1f}s")
print()


PARALLEL EXECUTION EXAMPLE

Processing 8 files with 4 workers
Files: ['file_0.txt', 'file_1.txt', 'file_2.txt']... (showing first 3 of 8)

Sequential execution:
  Time: 0.30s

Parallel execution (4 workers):
  Time: 0.30s

Parallel execution (4 workers):


## Section 7: Machine Learning-Based Optimization

### ML Applications in HPC
1. **Runtime Prediction**: Predict job execution time based on characteristics
2. **Resource Recommendation**: Suggest optimal core/memory allocation
3. **Anomaly Detection**: Identify hung or problematic jobs
4. **Workload Pattern Analysis**: Classify and predict workload patterns
5. **Smart Job Placement**: Optimize job placement on nodes

### Feature Engineering for ML
- Job characteristics: command type, data size, complexity
- Historical performance: past execution times, resource usage
- System state: current load, available resources
- User patterns: typical job size, frequency

In [1]:
# Example 7: ML-Based Job Runtime Prediction
from collections import defaultdict

class RuntimePredictor:
    """Predict job runtime using historical data"""
    
    def __init__(self):
        self.history = defaultdict(list)  # job_type -> list of runtimes
    
    def record_job(self, job_type: str, cores: int, memory_gb: float, runtime_sec: float):
        """Record completed job for training"""
        key = f"{job_type}_{cores}c_{memory_gb}gb"
        self.history[key].append(runtime_sec)
    
    def predict_runtime(self, job_type: str, cores: int, memory_gb: float) -> Dict:
        """Predict runtime for job configuration"""
        key = f"{job_type}_{cores}c_{memory_gb}gb"
        
        if key in self.history and len(self.history[key]) > 0:
            times = self.history[key]
            mean = statistics.mean(times)
            stdev = statistics.stdev(times) if len(times) > 1 else 0
            
            return {
                'predicted_time': mean,
                'confidence': 0.9 if len(times) > 5 else 0.6,
                'samples': len(times),
                'std_dev': stdev,
            }
        else:
            # No history, use defaults
            return {
                'predicted_time': 300.0,  # 5 minutes default
                'confidence': 0.1,
                'samples': 0,
                'std_dev': None,
            }
    
    def recommend_cores(self, job_type: str, target_time_sec: float = 300) -> int:
        """Recommend number of cores for job"""
        # Collect all measurements for this job type
        measurements = []
        for key, times in self.history.items():
            if key.startswith(job_type):
                measurements.extend(times)
        
        if not measurements:
            return 4  # Default recommendation
        
        # Estimate: doubling cores roughly halves runtime (Amdahl's law assumption)
        avg_time = statistics.mean(measurements)
        cores_needed = max(1, int(4 * (avg_time / target_time_sec)))
        
        return cores_needed

# Simulate training data
predictor = RuntimePredictor()

# Record historical jobs
training_data = [
    ('simulation', 1, 2.0, 120),
    ('simulation', 1, 2.0, 130),
    ('simulation', 4, 4.0, 40),
    ('simulation', 4, 4.0, 38),
    ('simulation', 8, 8.0, 20),
    ('analysis', 1, 2.0, 60),
    ('analysis', 1, 2.0, 65),
    ('analysis', 4, 4.0, 18),
]

for job_type, cores, memory, runtime in training_data:
    predictor.record_job(job_type, cores, memory, runtime)

print("\n" + "=" * 70)
print("ML-BASED JOB RUNTIME PREDICTION")
print("=" * 70)

# Test predictions
test_cases = [
    ('simulation', 2, 4.0),
    ('simulation', 8, 8.0),
    ('analysis', 4, 4.0),
]

print(f"\n{'Job Type':<15} {'Cores':<8} {'Memory':<10} {'Predicted':<12} {'Confidence':<12}")
print("-" * 70)

for job_type, cores, memory in test_cases:
    pred = predictor.predict_runtime(job_type, cores, memory)
    print(f"{job_type:<15} {cores:<8} {memory:.1f} GB{pred['predicted_time']:<11.0f}s {pred['confidence']:.0%}")

# Recommendation
print("\n" + "-" * 70)
print("Resource Recommendations")
print("-" * 70)

for job_type in ['simulation', 'analysis']:
    recommended_cores = predictor.recommend_cores(job_type, target_time_sec=60)
    print(f"{job_type}: Recommend {recommended_cores} cores for ~60 second runtime")
print()

NameError: name 'Dict' is not defined

## Section 8: Advanced Scheduling Algorithms

### Gang Scheduling (Co-scheduling)
Schedule related jobs together to:
- Minimize synchronization overhead
- Improve cache locality
- Reduce context switching

### Backfill Scheduling
- Run smaller jobs in gaps while waiting for large job to start
- Improves system utilization
- Maintains fairness guarantees

### Preemption
- Temporarily suspend low-priority jobs
- Resume when resources available
- Requires checkpointing support

In [2]:
# Example 8: Advanced Scheduling Algorithms
@dataclass
class JobSpec:
    """Job specification for scheduling"""
    job_id: int
    cores_needed: int
    duration_sec: float
    priority: int
    submit_time: float

class BackfillScheduler:
    """SLURM-style backfill scheduling"""
    
    def __init__(self, num_cores: int):
        self.num_cores = num_cores
        self.schedule = []
        self.current_time = 0.0
    
    def can_fit(self, job: JobSpec, time_slot: Tuple[float, float]) -> bool:
        """Check if job can fit in time slot"""
        start, end = time_slot
        duration = min(job.duration_sec, end - start)
        
        # Check core availability in time window
        cores_needed = job.cores_needed
        available_cores = self._get_available_cores(start, end)
        
        return available_cores >= cores_needed
    
    def _get_available_cores(self, start_time: float, end_time: float) -> int:
        """Get available cores in time window"""
        available = self.num_cores
        
        for scheduled_job, job_start, job_end in self.schedule:
            if job_start < end_time and job_end > start_time:
                # Job overlaps with time window
                overlap_start = max(job_start, start_time)
                overlap_end = min(job_end, end_time)
                if overlap_end > overlap_start:
                    available -= scheduled_job.cores_needed
        
        return max(0, available)
    
    def schedule_job(self, job: JobSpec) -> bool:
        """Try to schedule job using backfill"""
        # First, try to schedule ASAP (no backfill)
        earliest = self._find_earliest_start(job)
        
        if earliest is not None:
            self.schedule.append((job, earliest, earliest + job.duration_sec))
            return True
        
        # Try backfill in earlier gaps
        for i in range(len(self.schedule)):
            gap_start = self.schedule[i][2]  # End of previous job
            gap_end = self.schedule[i+1][1] if i+1 < len(self.schedule) else float('inf')
            
            if self.can_fit(job, (gap_start, gap_end)):
                self.schedule.append((job, gap_start, gap_start + job.duration_sec))
                return True
        
        return False
    
    def _find_earliest_start(self, job: JobSpec) -> Optional[float]:
        """Find earliest time job can start"""
        candidates = [0.0] + [end for _, _, end in self.schedule]
        
        for start_time in sorted(candidates):
            available = self._get_available_cores(start_time, start_time + job.duration_sec)
            if available >= job.cores_needed:
                return start_time
        
        return None

# Simulate backfill scheduling
scheduler = BackfillScheduler(num_cores=8)

jobs = [
    JobSpec(job_id=1, cores_needed=4, duration_sec=10, priority=2, submit_time=0),
    JobSpec(job_id=2, cores_needed=8, duration_sec=20, priority=1, submit_time=1),
    JobSpec(job_id=3, cores_needed=2, duration_sec=5, priority=2, submit_time=2),
]

print("\n" + "=" * 70)
print("BACKFILL SCHEDULING EXAMPLE")
print("=" * 70)
print(f"\nSystem: {scheduler.num_cores} cores available\n")

for job in jobs:
    scheduled = scheduler.schedule_job(job)
    status = "✓ Scheduled" if scheduled else "✗ Could not schedule"
    print(f"Job {job.job_id}: {job.cores_needed} cores, {job.duration_sec}s duration -> {status}")

print("\nSchedule Timeline:")
print("-" * 70)
for job, start, end in sorted(scheduler.schedule, key=lambda x: x[1]):
    timeline = "█" * int(job.cores_needed) + "░" * (scheduler.num_cores - job.cores_needed)
    print(f"Job {job.job_id}: [{timeline}] t={start:.0f}-{end:.0f}s")
print()

NameError: name 'dataclass' is not defined

## Section 9: Workflow Management with DAGs

### DAG Workflow Features
- **Directed Acyclic Graphs**: Define complex dependencies
- **Workflow Templates**: Reusable workflow patterns
- **Conditional Execution**: Branch on job results
- **Retry Logic**: Automatic failure recovery
- **Checkpointing**: Save/restore workflow state

### Use Cases
- **Scientific Pipelines**: Multi-stage data processing
- **Parameter Sweeps**: Run variants of experiments
- **Machine Learning**: Pipeline of preprocessing, training, evaluation
- **Data Analysis**: Extract, transform, load workflows

In [3]:
# Example 9: Workflow DAG Management
from collections import defaultdict, deque

class WorkflowDAG:
    """Manage Directed Acyclic Graph workflows"""
    
    def __init__(self):
        self.nodes = {}
        self.edges = defaultdict(list)
        self.in_degree = defaultdict(int)
    
    def add_task(self, task_id: str, command: str, retry_count: int = 3):
        """Add task to workflow"""
        self.nodes[task_id] = {
            'command': command,
            'retry_count': retry_count,
            'status': 'pending',
            'output': None,
        }
    
    def add_dependency(self, parent_id: str, child_id: str):
        """Add edge: parent -> child"""
        self.edges[parent_id].append(child_id)
        self.in_degree[child_id] += 1
    
    def get_executable_tasks(self) -> List[str]:
        """Get tasks ready to run (no pending dependencies)"""
        executable = []
        for task_id, task_data in self.nodes.items():
            if task_data['status'] == 'pending' and self.in_degree[task_id] == 0:
                executable.append(task_id)
        return executable
    
    def mark_completed(self, task_id: str, output: str = None):
        """Mark task as complete and update dependencies"""
        self.nodes[task_id]['status'] = 'completed'
        self.nodes[task_id]['output'] = output
        
        # Update in-degree for dependent tasks
        for child_id in self.edges[task_id]:
            self.in_degree[child_id] -= 1
    
    def visualize(self) -> str:
        """ASCII visualization of workflow"""
        lines = []
        lines.append("Workflow DAG:")
        lines.append("-" * 50)
        
        for task_id, task_data in self.nodes.items():
            status_icon = "✓" if task_data['status'] == 'completed' else "○"
            lines.append(f"{status_icon} {task_id}: {task_data['command']}")
            
            for child_id in self.edges[task_id]:
                lines.append(f"    └─> {child_id}")
        
        return "\n".join(lines)

# Create example ML pipeline workflow
workflow = WorkflowDAG()

# Add tasks
workflow.add_task('load_data', 'python load_dataset.py')
workflow.add_task('preprocess', 'python preprocess.py')
workflow.add_task('train_model', 'python train.py')
workflow.add_task('evaluate', 'python evaluate.py')
workflow.add_task('generate_report', 'python report.py')

# Add dependencies
workflow.add_dependency('load_data', 'preprocess')
workflow.add_dependency('preprocess', 'train_model')
workflow.add_dependency('train_model', 'evaluate')
workflow.add_dependency('evaluate', 'generate_report')

print("\n" + "=" * 70)
print("WORKFLOW DAG EXAMPLE: ML TRAINING PIPELINE")
print("=" * 70)
print()

# Simulate workflow execution
executable_tasks = workflow.get_executable_tasks()
print(f"Initial executable tasks: {executable_tasks}")

# Execute workflow step by step
step = 1
while True:
    executable = workflow.get_executable_tasks()
    if not executable:
        break
    
    task = executable[0]
    print(f"\nStep {step}: Execute '{task}'")
    print(f"  Command: {workflow.nodes[task]['command']}")
    workflow.mark_completed(task, f"output_of_{task}")
    step += 1

print("\n" + workflow.visualize())
print()

NameError: name 'List' is not defined

## Section 10: Security, Isolation, and Containers

### User Isolation
- **Namespace Isolation**: Users can't see other's jobs
- **Resource Quotas**: Per-user resource limits
- **File Permissions**: Access control on job files
- **Audit Logging**: Track all user actions

### Container Integration
- **Docker Support**: Package jobs with dependencies
- **Singularity**: HPC-friendly container runtime
- **Container Orchestration**: Manage container lifecycle
- **Reproducibility**: Identical environment everywhere

In [4]:
# Example 10: Security and Resource Isolation
from enum import Enum

class UserRole(Enum):
    ADMIN = 'admin'
    USER = 'user'
    GUEST = 'guest'

@dataclass
class UserQuota:
    """User resource quota"""
    user_id: str
    max_cores: int = 16
    max_memory_gb: float = 64.0
    max_jobs: int = 100
    max_runtime_hours: float = 72.0
    role: UserRole = UserRole.USER

class SecurityManager:
    """Manage security and isolation"""
    
    def __init__(self):
        self.user_quotas = {}
        self.audit_log = []
    
    def register_user(self, user_id: str, quota: UserQuota):
        """Register user with quota"""
        self.user_quotas[user_id] = quota
        self._log_action('register', user_id, f"User registered with quota")
    
    def can_submit_job(self, user_id: str, cores: int, memory_gb: float, hours: float) -> Tuple[bool, str]:
        """Check if user can submit job"""
        if user_id not in self.user_quotas:
            return False, "User not registered"
        
        quota = self.user_quotas[user_id]
        
        if cores > quota.max_cores:
            return False, f"Exceeds core limit: {cores} > {quota.max_cores}"
        
        if memory_gb > quota.max_memory_gb:
            return False, f"Exceeds memory limit: {memory_gb} > {quota.max_memory_gb}"
        
        if hours > quota.max_runtime_hours:
            return False, f"Exceeds time limit: {hours} > {quota.max_runtime_hours}"
        
        return True, "OK"
    
    def _log_action(self, action: str, user_id: str, details: str):
        """Log security action"""
        self.audit_log.append({
            'timestamp': time.time(),
            'action': action,
            'user': user_id,
            'details': details
        })

# Example: Security setup
sec_manager = SecurityManager()

# Register users with quotas
sec_manager.register_user('alice', UserQuota(
    user_id='alice', max_cores=16, max_memory_gb=64, role=UserRole.USER
))
sec_manager.register_user('bob', UserQuota(
    user_id='bob', max_cores=4, max_memory_gb=8, role=UserRole.GUEST
))

print("\n" + "=" * 70)
print("SECURITY AND RESOURCE ISOLATION")
print("=" * 70)

# Test job submissions
test_submissions = [
    ('alice', 8, 32, 24, 'Large simulation'),
    ('alice', 32, 128, 48, 'Very large job'),
    ('bob', 2, 4, 4, 'Small job'),
    ('bob', 8, 16, 24, 'Too large for quota'),
]

print(f"\n{'User':<10} {'Cores':<8} {'Memory':<10} {'Hours':<8} {'Job':<20} {'Result':<20}")
print("-" * 80)

for user_id, cores, memory, hours, job_name in test_submissions:
    can_submit, reason = sec_manager.can_submit_job(user_id, cores, memory, hours)
    result = "✓ APPROVED" if can_submit else f"✗ {reason}"
    print(f"{user_id:<10} {cores:<8} {memory:.0f} GB{hours:<8} {job_name:<20} {result:<20}")

# Example: Container configuration
print("\n" + "-" * 70)
print("Container Support (Docker/Singularity)")
print("-" * 70)

container_configs = {
    'gpu_ml': {
        'image': 'nvidia/cuda:11.2-runtime-ubuntu20.04',
        'mounts': ['/data', '/scratch'],
        'env': {'CUDA_VISIBLE_DEVICES': '0'},
    },
    'analysis': {
        'image': 'python:3.9-slim',
        'mounts': ['/data', '/results'],
        'env': {'PYTHONUNBUFFERED': '1'},
    },
}

print("\nAvailable container templates:")
for name, config in container_configs.items():
    print(f"  {name}: {config['image']}")
print()

NameError: name 'dataclass' is not defined

## Section 11: System Integration and Resource Control

### cgroups (Control Groups)
- **CPU Limiting**: Restrict CPU usage to specific percentage
- **Memory Limits**: Enforce hard memory ceiling
- **I/O Limits**: Control disk bandwidth usage
- **Process Limits**: Restrict number of processes/threads

### NUMA (Non-Uniform Memory Architecture)
- **Topology Awareness**: Understand multi-socket systems
- **CPU Affinity**: Pin processes to specific cores
- **Memory Affinity**: Allocate memory close to CPUs
- **Optimal Placement**: Minimize inter-socket traffic

### systemd Integration
- **Service Management**: Control job lifecycle via systemd
- **Resource Units**: Define systemd unit files for jobs
- **Dependency Management**: Handle service dependencies
- **Auto-restart**: Implement fault tolerance

In [None]:
# Example 11: System Integration and NUMA Awareness
@dataclass
class NUMATopology:
    """NUMA system topology"""
    num_sockets: int
    cores_per_socket: int
    memory_per_socket_gb: float

class NUMAAwareScheduler:
    """Schedule jobs with NUMA awareness"""
    
    def __init__(self, topology: NUMATopology):
        self.topology = topology
        self.job_placement = {}
    
    def get_total_cores(self) -> int:
        """Get total cores in system"""
        return self.topology.num_sockets * self.topology.cores_per_socket
    
    def get_total_memory_gb(self) -> float:
        """Get total memory"""
        return self.topology.num_sockets * self.topology.memory_per_socket_gb
    
    def calculate_affinity(self, job_id: int, cores: int) -> Dict:
        """Calculate optimal CPU affinity for job"""
        # Try to pack cores within single socket if possible
        cores_per_socket = self.topology.cores_per_socket
        
        if cores <= cores_per_socket:
            # Fit in single socket
            socket_id = 0
            core_list = list(range(cores))
        else:
            # Span multiple sockets, balance across sockets
            socket_id = 0
            cores_assigned = []
            remaining = cores
            
            for socket in range(self.topology.num_sockets):
                assign = min(remaining, cores_per_socket)
                socket_base = socket * cores_per_socket
                cores_assigned.extend(range(socket_base, socket_base + assign))
                remaining -= assign
        
        return {
            'job_id': job_id,
            'cores': cores,
            'cpu_affinity': cores_assigned,
            'memory_socket': socket_id,
        }

# Example NUMA topology
topology = NUMATopology(
    num_sockets=2,
    cores_per_socket=16,
    memory_per_socket_gb=64.0
)
scheduler = NUMAAwareScheduler(topology)

print("\n" + "=" * 70)
print("NUMA TOPOLOGY AND SCHEDULING")
print("=" * 70)

print(f"\nSystem Topology:")
print(f"  Sockets: {topology.num_sockets}")
print(f"  Cores per socket: {topology.cores_per_socket}")
print(f"  Total cores: {scheduler.get_total_cores()}")
print(f"  Total memory: {scheduler.get_total_memory_gb():.0f} GB")
print(f"  Memory per socket: {topology.memory_per_socket_gb:.0f} GB")

# Test job placements
test_jobs = [
    (1, 8),   # Small job
    (2, 16),  # Socket-sized job
    (3, 32),  # Multi-socket job
]

print(f"\n{'Job':<6} {'Cores':<8} {'Affinity':<35} {'Socket':<10}")
print("-" * 70)

for job_id, cores in test_jobs:
    affinity = scheduler.calculate_affinity(job_id, cores)
    core_str = f"[{affinity['cpu_affinity'][:3]}...{affinity['cpu_affinity'][-1:]}]"
    print(f"{job_id:<6} {cores:<8} {str(core_str):<35} {affinity['memory_socket']}")

# systemd integration example
print("\n" + "-" * 70)
print("systemd Unit File Example (for HPC job)")
print("-" * 70)

unit_template = """[Unit]
Description=HPC Job %i
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=hpc_user
WorkingDirectory=/home/hpc_user/jobs
ExecStart=/usr/bin/python3 /home/hpc_user/jobs/job_%i.py
Restart=on-failure
RestartSec=10
CPUQuota=50%
MemoryLimit=4G
TimeoutStopSec=30s

[Install]
WantedBy=multi-user.target
"""

print(unit_template)
print()

## Summary: Complete HPC Job Management System

### Key Components Implemented
1. **Resource Allocation**: Flexible CPU, memory, GPU allocation
2. **Scheduling Algorithms**: FIFO, priority queues, fair-share, backfill
3. **Job Dependencies**: DAG-based workflow management
4. **Performance Monitoring**: Real-time metrics and anomaly detection
5. **Benchmarking**: CPU, memory, I/O performance assessment
6. **Parallel Execution**: Multi-core job execution with load balancing
7. **ML Optimization**: Runtime prediction and resource recommendations
8. **Advanced Scheduling**: Gang scheduling, preemption, backfill
9. **Workflow Management**: Complex DAG workflows with retry logic
10. **Security & Isolation**: User quotas, containers, audit logging
11. **System Integration**: NUMA awareness, cgroups, systemd support

### Best Practices
- Always monitor job performance
- Use resource predictions to optimize allocation
- Implement checkpointing for long-running jobs
- Balance fairness with utilization
- Provide clear feedback to users
- Maintain comprehensive audit logs
- Support multiple job types and patterns

### Future Enhancements
- Machine learning-based scheduling optimization
- Multi-cluster federation
- Advanced visualization dashboards
- Energy-aware scheduling
- Automatic workload characterization
- Real-time job migration