In [36]:
#| default_exp data_processing.hpc_tools

# Create HPC related functions
> Create HPC(High Performance Cluster) related functions for parallel processing with LSF (bsub)

This module provides tools for:
1. Creating batch jobs for LSF clusters
2. Submitting parallel jobs
3. Monitoring job status
4. Managing large-scale data processing

In [11]:
#| hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [12]:
#| export
from cv_tools.core import *
from typing import List, Callable, Any, Dict, Generator, Optional
import subprocess
from pathlib import Path
import tempfile
import os
from tqdm.auto import tqdm
import time
import logging
import pickle
from concurrent.futures import ThreadPoolExecutor

In [24]:
#| export
from unittest.mock import patch, MagicMock

In [14]:
#| export
def create_job_script(
    function_name: str,
    input_path: str,
    output_path: str,
    memory: int = 16000,
    queue: str = "normal",
    python_path: str = "/usr/bin/python",
    additional_args: Dict[str, Any] = None
) -> str:
    """Create a job script for LSF submission.
    
    Args:
        function_name: Name of the Python function to execute
        input_path: Path to input data
        output_path: Path for output data
        memory: Required memory in MB
        queue: LSF queue name
        python_path: Path to Python interpreter
        additional_args: Additional arguments for the function
    
    Returns:
        str: Content of the job script
    """
    script = f"""#!/bin/bash
#BSUB -M {memory}
#BSUB -q {queue}
#BSUB -o %J.out
#BSUB -e %J.err

source ~/.bashrc

{python_path} -c 'import pickle; from {function_name} import process_batch; \
with open("{input_path}", "rb") as f: batch = pickle.load(f); \
process_batch(batch, "{output_path}")
"""
    return script

In [22]:
#| test
def test_create_job_script():
    #from cv_tools.preprocessing.hpc_tools import create_job_script
    
    with tempfile.TemporaryDirectory() as tmpdir:
        job_script = create_job_script(
            function_name="process_images",
            input_path="/path/to/input",
            output_path="/path/to/output",
            memory=16000,
            queue="normal",
            python_path="/usr/bin/python"
        )
        
        assert "#!/bin/bash" in job_script
        assert "#BSUB -M 16000" in job_script
        assert "#BSUB -q normal" in job_script
        assert "python -c" in job_script

In [17]:
print(create_job_script(
    function_name="process_images",
    input_path="/path/to/input",
    output_path="/path/to/output",
    memory=16000,
    queue="normal",
    python_path="/usr/bin/python"
))

#!/bin/bash
#BSUB -M 16000
#BSUB -q normal
#BSUB -o %J.out
#BSUB -e %J.err

source ~/.bashrc

/usr/bin/python -c 'import pickle; from process_images import process_batch; with open("/path/to/input", "rb") as f: batch = pickle.load(f); process_batch(batch, "/path/to/output")



In [18]:
#| export
def create_batches(files: List[str], batch_size: int) -> Generator[List[str], None, None]:
    """Create batches of files for parallel processing.
    
    Args:
        files: List of file paths
        batch_size: Number of files per batch
    
    Yields:
        List[str]: Batch of file paths
    """
    for i in range(0, len(files), batch_size):
        yield files[i:i + batch_size]

In [21]:
#| test
def test_batch_generator():
    from cv_tools.preprocessing.hpc_tools import create_batches
    
    files = [f"file_{i}.jpg" for i in range(100)]
    batch_size = 10
    batches = list(create_batches(files, batch_size))
    
    assert len(batches) == 10
    assert len(batches[0]) == batch_size
    assert batches[0][0] == "file_0.jpg"

In [20]:
test_batch_generator()

In [23]:
#| export
def submit_job(job_script_path: str) -> bool:
    """Submit a job to LSF.
    
    Args:
        job_script_path: Path to the job script
    
    Returns:
        bool: True if submission was successful
    """
    try:
        result = subprocess.run(["bsub", "<", job_script_path], 
                              capture_output=True, 
                              text=True,
                              shell=True)
        return result.returncode == 0
    except Exception as e:
        logging.error(f"Job submission failed: {e}")
        return False

In [25]:
#| test
def test_submit_job():
    from cv_tools.preprocessing.hpc_tools import submit_job
    
    with patch('subprocess.run') as mock_run:
        mock_run.return_value = MagicMock(returncode=0)
        
        success = submit_job("test_job.sh")
        assert success
        mock_run.assert_called_once()

In [27]:
#| export
# Mock function for testing
def mock_process_func(input_path: str, output_path: str) -> bool:
    return True

In [28]:
#| test
def test_parallel_job_submission():
    from cv_tools.preprocessing.hpc_tools import submit_parallel_jobs
    
    with tempfile.TemporaryDirectory() as tmpdir:
        input_files = [f"{tmpdir}/input_{i}.jpg" for i in range(10)]
        output_dir = f"{tmpdir}/output"
        
        # Create dummy input files
        for f in input_files:
            Path(f).touch()
        
        with patch('subprocess.run') as mock_run:
            mock_run.return_value = MagicMock(returncode=0)
            
            results = submit_parallel_jobs(
                function=mock_process_func,
                input_files=input_files,
                output_dir=output_dir,
                batch_size=2,
                memory=16000,
                queue="normal"
            )
            
            assert len(results) == 5  # 10 files / 2 batch_size = 5 jobs
            assert all(results)  # All jobs should be submitted successfully

In [29]:
#| export
def monitor_jobs(job_ids: List[str]) -> Dict[str, str]:
    """Monitor the status of submitted jobs.
    
    Args:
        job_ids: List of job IDs to monitor
    
    Returns:
        Dict[str, str]: Job IDs mapped to their current status
    """
    try:
        result = subprocess.run(["bjobs"], capture_output=True, text=True)
        lines = result.stdout.strip().split('\n')[1:]  # Skip header
        
        status_dict = {}
        for line in lines:
            parts = line.split()
            if parts[0] in job_ids:
                status_dict[parts[0]] = parts[2]  # STAT column
        
        return status_dict
    except Exception as e:
        logging.error(f"Job monitoring failed: {e}")
        return {}

In [30]:
#| test
def test_job_monitoring():
    from cv_tools.preprocessing.hpc_tools import monitor_jobs
    
    with patch('subprocess.run') as mock_run:
        mock_run.return_value = MagicMock(
            returncode=0,
            stdout="JOBID   USER    STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME\n123     user    RUN   normal     host1       host2       job1"
        )
        
        job_ids = ["123"]
        status = monitor_jobs(job_ids)
        assert status["123"] == "RUN" 

In [39]:
#| export
def get_last_job_id() -> Optional[str]:
    """Get the ID of the last submitted job.
    
    Returns:
        Optional[str]: Job ID if available, None otherwise
    """
    try:
        result = subprocess.run(["bjobs", "-noheader"], capture_output=True, text=True)
        lines = result.stdout.strip().split('\n')
        if lines and lines[0]:
            return lines[0].split()[0]
        return None
    except Exception:
        return None

In [7]:
#| export
def submit_parallel_jobs(
    function: Callable,
    input_files: List[str],
    output_dir: str,
    batch_size: int = 100,
    memory: int = 16000,
    queue: str = "normal",
    max_concurrent_jobs: int = 1000,
    monitor_interval: int = 60
) -> List[bool]:
    """Submit and monitor parallel jobs for processing files.
    
    Args:
        function: Processing function to apply to each batch
        input_files: List of input file paths
        output_dir: Directory for output files
        batch_size: Number of files per batch
        memory: Memory requirement per job (MB)
        queue: LSF queue name
        max_concurrent_jobs: Maximum number of concurrent jobs
        monitor_interval: Interval for checking job status (seconds)
    
    Returns:
        List[bool]: Success status for each batch submission
    """
    os.makedirs(output_dir, exist_ok=True)
    batches = list(create_batches(input_files, batch_size))
    results = []
    active_jobs = set()
    
    with tqdm(total=len(batches), desc="Submitting jobs") as pbar:
        for i, batch in enumerate(batches):
            # Wait if we have too many active jobs
            while len(active_jobs) >= max_concurrent_jobs:
                time.sleep(monitor_interval)
                status = monitor_jobs(list(active_jobs))
                active_jobs = {jid for jid, stat in status.items() 
                             if stat in ["PEND", "RUN"]}
            
            # Create temporary files for batch data
            with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as f:
                pickle.dump(batch, f)
                batch_file = f.name
            
            # Create job script
            job_script = create_job_script(
                function.__module__,
                batch_file,
                os.path.join(output_dir, f"batch_{i}"),
                memory=memory,
                queue=queue
            )
            
            # Write job script
            script_path = f"job_{i}.sh"
            with open(script_path, "w") as f:
                f.write(job_script)
            
            # Submit job
            success = submit_job(script_path)
            results.append(success)
            
            if success:
                job_id = get_last_job_id()
                active_jobs.add(job_id)
            
            pbar.update(1)
            
            # Cleanup
            os.unlink(script_path)
    
    return results

In [42]:
#| export
def noop(x):return x

In [None]:
def process_image(
        input_path: str, 
        output_path: str,
        process_func=noop) -> bool:
    try:
        # Your image processing logic here
        img = cv2.imread(input_path)
        processed = process_func(img)
        cv2.imwrite(output_path, processed)
        return True
    except Exception as e:
        logging.error(f"Error processing {input_path}: {e}")
        return False
	

# Get list of files
input_files = Path("/path/to/input").ls(file_exts=['.jpg'])
output_dir = "/path/to/output"

# Submit jobs
results = submit_parallel_jobs(
    function=process_image,
    input_files=input_files,
    output_dir=output_dir,
    batch_size=100,  # Process 100 images per job
    memory=16000,    # 16GB memory per job
    queue="normal",  # LSF queue name
    max_concurrent_jobs=1000,  # Maximum concurrent jobs
    monitor_interval=60  # Check job status every 60 seconds
)

# Mock LSF commands for local testing
def mock_bsub(script_path):
    with open(script_path) as f:
        script = f.read()
    # Execute the Python command directly
    cmd = script.split("python -c")[1].strip("'")
    subprocess.run(["python", "-c", cmd], check=True)

# Test with a small batch
test_files = input_files[:10]
with patch('subprocess.run', side_effect=mock_bsub):
    results = submit_parallel_jobs(
        function=process_image,
        input_files=test_files,
        output_dir="test_output",
        batch_size=2
    )

# Check job status
job_ids = ["123", "124", "125"]
status = monitor_jobs(job_ids)
for job_id, state in status.items():
    print(f"Job {job_id}: {state}")

In [40]:
#| hide
import nbdev; nbdev.nbdev_export('10_data_processing.hpc_tools.ipynb')

