In [1]:
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

from geecs_data_utils import ScanTag, ScanPaths, GeecsPathsConfig
from scan_analysis.analyzers.common.array2D_scan_analysis import Array2DScanAnalyzer
from scan_analysis.analyzers.common.array1d_scan_analysis import Array1DScanAnalyzer
from scan_analysis.analyzers.Undulator.HIMG_with_average_saving import HIMGWithAveraging
from scan_analysis.analyzers.Undulator.ict_plot_analysis import ICTPlotAnalysis
from image_analysis.offline_analyzers.beam_analyzer import BeamAnalyzer
from image_analysis.config_loader import set_config_base_dir

# Set the logging level. Can set to INFO or DEBUG to get more detail about the
# actual execution
logging.getLogger("image_analysis").setLevel(logging.WARNING)
logging.getLogger("scan_analysis").setLevel(logging.WARNING)
logging.getLogger("geecs_data_utils").setLevel(logging.WARNING)

set_config_base_dir(ScanPaths.paths_config.image_analysis_configs_path)

# ScanPaths.reload_paths_config(set_base_path="C:/temp/data/")

In [8]:
# List of devices that should render figures (critical diagnostics)
RENDER_FIGURES = {
    "UC_Amp4_IR_inputasdfas"
}

# E-beam analyzers
laser_diags = [
    "UC_Amp4_IR_input", "UC_Amp3_IR_input", "UC_Amp2_IR_input",
    "UC_GaiaMode", "UC_ExpanderIn1_Pulsed", "UC_ExpanderIn1"
]

# E-beam analyzers
laser_diags = [
    "UC_GaiaMode",
    "UC_ExpanderIn1_Pulsed",
     "UC_Amp3_IR_input", "UC_Amp2_IR_input",
]

all_analyzers = []

# Create e-beam analyzers
for device in laser_diags:
    analyzer = Array2DScanAnalyzer(
        device_name=device,
        image_analyzer=BeamAnalyzer(camera_config_name=device),
        flag_save_images=(device in RENDER_FIGURES),
        skip_plt_show=True,
    )
    all_analyzers.append(analyzer)

print(f"Configured {len(all_analyzers)} analyzers")
print(f"Rendering figures for: {RENDER_FIGURES}")

Configured 4 analyzers
Rendering figures for: {'UC_Amp4_IR_inputasdfas'}


In [9]:
def run_analyzer(analyzer, scan_tag):
    """Run a single analyzer and return timing/status."""
    start = time.monotonic()
    device = analyzer.device_name
    
    try:
        result = analyzer.run_analysis(scan_tag=scan_tag)
        elapsed = time.monotonic() - start
        return {
            'device': device,
            'status': 'success',
            'time': elapsed,
            'result': result,
        }
    except Exception as e:
        elapsed = time.monotonic() - start
        return {
            'device': device,
            'status': 'error', 
            'time': elapsed,
            'error': str(e),
        }

def run_parallel(analyzers, scan_tag, max_workers=8):
    """Run all analyzers in parallel with progress reporting."""
    print(f"Running {len(analyzers)} analyzers ({max_workers} parallel)...\n")
    
    results = []
    start_time = time.monotonic()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(run_analyzer, analyzer, scan_tag): analyzer
            for analyzer in analyzers
        }
        
        for i, future in enumerate(as_completed(futures), 1):
            result = future.result()
            results.append(result)
            
            status = "✓" if result['status'] == 'success' else "✗"
            print(f"{status} [{i:2d}/{len(analyzers)}] {result['device']:20s} "
                  f"{result['time']:6.1f}s")
            
            if result['status'] == 'error':
                print(f"   └─ Error: {result['error']}")
    
    total_time = time.monotonic() - start_time
    success = sum(1 for r in results if r['status'] == 'success')
    total_compute = sum(r['time'] for r in results)
    
    print(f"\n{'='*65}")
    print(f"Complete: {success}/{len(analyzers)} succeeded in {total_time:.1f}s")
    print(f"Sequential would have taken: {total_compute:.1f}s")
    print(f"Speedup: {total_compute/total_time:.1f}x")
    print(f"{'='*65}")
    
    return results

In [10]:
# Define scan
scan_tag = ScanTag(
    year = 2025,
    month = 12,
    day = 4,
    number = 12,
    experiment="Undulator"
)

print(f"Scan: {scan_tag}\n")

# Run analysis
results = run_parallel(
    analyzers=all_analyzers,
    scan_tag=scan_tag,
    max_workers=8  # Adjust based on CPU cores
)




Scan: year=2025 month=12 day=4 number=12 experiment='Undulator'

Running 4 analyzers (8 parallel)...



TIMING BREAKDOWN: UC_Amp2_IR_input
  File Mapping:   2.51s (  0.7%)
  Image I/O:    141.39s ( 39.2%)
  Batch Prep:     0.00s (  0.0%)
  Analysis:      68.26s ( 18.9%)
  Rendering:      4.24s (  1.2%)
  ──────────────────────────────────────────────────────────
  TOTAL:        360.30s


✓ [ 1/4] UC_Amp2_IR_input      218.0s


TIMING BREAKDOWN: UC_GaiaMode
  File Mapping:   3.06s (  0.8%)
  Image I/O:    107.94s ( 28.9%)
  Batch Prep:     0.01s (  0.0%)
  Analysis:     148.85s ( 39.8%)
  Rendering:      2.72s (  0.7%)
  ──────────────────────────────────────────────────────────
  TOTAL:        373.56s


✓ [ 2/4] UC_GaiaMode           263.6s


TIMING BREAKDOWN: UC_Amp3_IR_input
  File Mapping:   2.51s (  0.5%)
  Image I/O:    226.06s ( 45.4%)
  Batch Prep:     0.00s (  0.0%)
  Analysis:      38.12s (  7.6%)
  Rendering:      3.20s (  0.6%)
  ──────────────────────────────────────────────────────────
  TOTAL:        498.47s


✓ [ 3/4] UC_Amp3_IR_input      270.7s


TIMING BREAKDOWN: UC_ExpanderIn1_Pulsed
  File Mapping:   3.01s (  0.7%)
  Image I/O:    155.63s ( 36.2%)
  Batch Prep:     0.00s (  0.0%)
  Analysis:     109.02s ( 25.4%)
  Rendering:      3.31s (  0.8%)
  ──────────────────────────────────────────────────────────
  TOTAL:        429.62s


✓ [ 4/4] UC_ExpanderIn1_Pulsed  271.7s

Complete: 4/4 succeeded in 271.7s
Sequential would have taken: 1023.9s
Speedup: 3.8x
