# Confluence Module Docker Runner

## Run Confluence on a LOCAL Machine

## Requirements
* docker installed somewhere where you have sudo priveledges to the point where "docker --version" completes successfully
* a python environment

## Optional
* a dockerhub account (free, good for initial transition to HPC, sharing, versioning)


## Run Confluence 
1. Git clone all of the repos you want to run to a machine where you have sudo priveledges and where "docker --version" works (locally)
2. Prep an empty_mnt directory to store confluence run (requires gdown package in environment)
3. Run the "build_and_push_images" function of this notebook locally
4. Run the "generate_run_scripts" section of this notebook to create python submission scripts for each module
5. Run the generate_all_modules_bash Confluence Driver Script Generator section of this notebook  to create a .sh submission script that runs each of the modules one by one (the one click run)

In [None]:
import os
import subprocess as sp

# FUNCTIONS IGNORE

def clone_repos(repo_dir, repo_names, name_map, branch='main'):
    """Clone repositories with specified branch.
    
    Parameters
    ----------
    repo_dir : str
        Directory to clone repos into
    repo_names : list
        List of repository names to clone
    branch : str or dict, optional
        Branch name to clone. Can be:
        - A string: same branch for all repos (default: 'main')
        - A dict: mapping repo name to specific branch
    """
    os.makedirs(repo_dir, exist_ok=True)
    
    for name in repo_names:
        path = os.path.join(repo_dir, name)
        repo_name = name_map.get(name, name)
        url = f'https://github.com/SWOT-Confluence/{repo_name}.git'
        
        # Determine which branch to use
        if isinstance(branch, dict):
            branch_name = branch.get(name, 'main')
        else:
            branch_name = branch
        
        if os.path.exists(path):
            print(f'[Remove] Deleting existing {name} to overwrite...')
            try:
                shutil.rmtree(path)  # rm -rf
            except OSError as e:
                print(f"Error: {path} : {e.strerror}")
        
        print(f'[Clone] Cloning {name} from branch {branch_name}...')
        sp.run(['git', 'clone', '--branch', branch_name, url, name], cwd=repo_dir)



def build_and_push_images(repo_directory:str, modules_to_run:list, docker_username:str, push:bool = True, custom_tag_name:str = 'latest'):
    for a_repo_name in modules_to_run:
        repo_path = os.path.join(repo_directory, a_repo_name)
        docker_path = f'{docker_username}/{a_repo_name}:{custom_tag_name}'
        build_cmd = ['docker', 'build','--quiet', '-f', os.path.join(repo_path, "Dockerfile"), '-t', docker_path, repo_path]
        try:
            sp.run(build_cmd)
        except Exception as e:
            raise RuntimeError(
                f"Docker build failed...\n"
                f"Build Command: {build_cmd}\n"
                f"Error: {e}"
            )
        if push:
            try:
                push_cmd = ['docker', 'push','--quiet', docker_path]
                sp.run(push_cmd)
            except Exception as e:
                raise RuntimeError(
                    f"Docker push failed...\n"
                    f"Push Command: {push_cmd}\n"
                    f"Error: {e}"
                )

import os
import json
import subprocess as sp

def generate_run_scripts(
    run,
    modules_to_run,          # list[str]
    script_jobs,             # dict of module -> job count (str), e.g., "7" or "$default_jobs"
    base_dir,
    repo_directory,
    rebuild_docker,
    docker_username,
    push,
    custom_tag_name,
    default_jobs_json_path=None
):
    """
    Generates a separate Python run script for each module in modules_to_run.
    If a module's job count is "$default_jobs", dynamically determine the number
    of jobs from the length of the JSON list at default_jobs_json_path.

    Parameters:
    - run (str): Name of the confluence run
    - modules_to_run (list[str]): Modules to generate scripts for.
    - script_jobs (dict): Module -> job count (int as str or "$default_jobs").
    - base_dir (str): Path to repos.
    - rebuild_docker (bool): Whether to rebuild docker images.
    - default_jobs_json_path (str): JSON file path to determine default_jobs length.
    """
    # Has to exist with 'mnt' structure (Doit exister avec la structure 'mnt')
    mnt_dir = os.path.join(base_dir, f'confluence_{run}', f'{run}_mnt')
    
    # Create the sh_scripts directory (Cree le repertoire sh_scripts)
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    if not os.path.exists(sh_dir):
        os.makedirs(sh_dir)
    
    # Create the sif directory (Cree la repertoire sif)
    sif_dir = os.path.join(base_dir, f'confluence_{run}', 'sif')
    if not os.path.exists(sif_dir):
        os.makedirs(sif_dir)
    
    # Create the report directory (Cree la repertoire report)
    report_dir = os.path.join(base_dir, f'confluence_{run}', 'report')
    if not os.path.exists(report_dir):
        os.makedirs(report_dir)

    # This is a dictionary of all of the Confluence module run commands translated to singularity run commands.
    # You should not have to change anything here.
    command_dict = {
        'expanded_setfinder': f'docker run -v {mnt_dir}/input:/data setfinder -r reaches_of_interest.json -c continent.json -e -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i index_to_run',
        'expanded_combine_data': f'docker run -v {mnt_dir}/input:/data combine_data -d /data -e -s 17',
        'input': f'docker run -v {mnt_dir}/input:/mnt/data input -v 17 -r /mnt/data/expanded_reaches_of_interest.json -c SWOT_L2_HR_RiverSP_D -i index_to_run',
        'non_expanded_setfinder': f'docker run -v {mnt_dir}/input:/data setfinder -c continent.json -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i index_to_run',
        'non_expanded_combine_data': f'docker run -v {mnt_dir}/input:/data combine_data -d /data -s 17',
        'prediagnostics': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/diagnostics/prediagnostics:/mnt/data/output prediagnostics -r reaches.json -i index_to_run',
        'unconstrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data priors -r unconstrained -p usgs riggs -g -s local -i index_to_run', 
        'constrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data priors -r constrained -p usgs riggs -g -s local -i index_to_run', 
        'metroman': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/output metroman -r metrosets.json -s local -v -i index_to_run',
        'metroman_consolidation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/flpe metroman_consolidation -i index_to_run',
        'unconstrained_momma': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output momma -r reaches.json -m 3 -i index_to_run',
        'neobam': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/geobam:/mnt/data/output neobam -r reaches.json -i index_to_run',
        'sad': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sad:/mnt/data/output sad --reachfile reaches.json --index index_to_run',
        'moi': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/output moi -j basin.json -v -b unconstrained -i index_to_run', #-s local
        'consensus': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe --mntdir /mnt/data -r /mnt/data/input/reaches.json -i index_to_run',
        'unconstrained_offline': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/output offline unconstrained timeseries integrator reaches.json index_to_run',
        'validation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/output validation reaches.json unconstrained index_to_run',
        # 'output': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/diagnostics:/mnt/data/diagnostics -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/validation -v {mnt_dir}/output:/mnt/data/output output -s local -j /app/metadata/metadata.json -m input prediagnostics momma hivdi metroman sic4dvar sad moi consensus offline validation swot priors -v 17 -i index_to_run'
        'output': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/diagnostics:/mnt/data/diagnostics -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/validation -v {mnt_dir}/output:/mnt/data/output output -s local -j /app/metadata/metadata.json -m input prediagnostics momma metroman sic4dvar sad consensus swot priors -v 17 -i index_to_run'

    }
    
    # Build docker images once if requested
    if rebuild_docker:
        build_and_push_images(
            repo_directory=repo_directory,
            modules_to_run=modules_to_run,
            docker_username=docker_username,
            push=push,
            custom_tag_name=custom_tag_name
        )

    # Load default_jobs from JSON if needed
    default_jobs = None
    if "$default_jobs" in script_jobs.values():
        if default_jobs_json_path is None:
            raise ValueError("default_jobs_json_path must be provided when using $default_jobs")
        with open(default_jobs_json_path, "r") as f:
            data = json.load(f)
            if isinstance(data, list):
                default_jobs = len(data)
            else:
                raise ValueError(f"Expected a JSON list at {default_jobs_json_path}")

    output_paths = []

    for module in modules_to_run:
        job_count = script_jobs.get(module, "1")
        if job_count == "$default_jobs":
            if default_jobs is None:
                raise ValueError(f"default_jobs JSON not loaded for module {module}")
            start_index = 0
            end_index = default_jobs - 1
        else:
            start_index = 0
            end_index = int(job_count) - 1

        script_content = f"""#!/usr/bin/env python3
import subprocess as sp

# Docker command for module: {module}
command = f'{command_dict[module]}'

for index in range({start_index}, {end_index} + 1):
    print(f"Running command for module '{module}' at index {{index}}")
    run_command = command.replace('index_to_run', str(index))
    sp.run(run_command, shell=True, check=True)
"""

        output_script_path = os.path.join(sh_dir, f"run_{module}.py")
        with open(output_script_path, 'w') as f:
            f.write(script_content)

        os.chmod(output_script_path, 0o755)
        output_paths.append(output_script_path)
        print(f"Python script created: {output_script_path}")

    return output_paths


import os

def generate_run_all_modules_bash(
    modules_to_run: list[str],
    script_jobs: dict,
    base_dir: str,
    script_folder_name: str = "sh_scripts",
    bash_script_name: str = "run_all_modules.sh"
) -> str:
    """
    Generates a Bash script that runs all per-module scripts in series.
    Only includes modules for which a script was actually generated.

    Parameters:
    - modules_to_run: list of module names
    - script_jobs: dict of module -> job count (str), used to filter modules
    - base_dir: directory to save the bash script
    - script_folder_name: folder inside base_dir where run_<module>.py scripts reside
    - bash_script_name: name of the generated bash script

    Returns:
    - full path to the generated bash script
    """
    os.makedirs(base_dir, exist_ok=True)
    bash_script_path = os.path.join(base_dir, f'confluence_{run}', bash_script_name)

    # Only include modules that have non-zero job counts
    filtered_modules = []
    for module in modules_to_run:
        count = script_jobs.get(module, "0")
        if count != "0":
            filtered_modules.append(module)

    # Generate modules array string
    modules_array_str = "modules_to_run=(\n"
    for module in filtered_modules:
        modules_array_str += f"    \"{module}\"\n"
    modules_array_str += ")\n"

    script_content = f"""#!/bin/bash
# {bash_script_name}
# Runs all generated module scripts in series

SCRIPT_DIR="{script_folder_name}"

{modules_array_str}

echo "Starting module runs..."

for module in "${{modules_to_run[@]}}"; do
    script_path="${{SCRIPT_DIR}}/run_${{module}}.py"

    if [[ -f "$script_path" ]]; then
        echo "---------------------------------------------"
        echo "Running module: $module"
        echo "Script: $script_path"
        echo "---------------------------------------------"
        python3 "$script_path"
        if [[ $? -ne 0 ]]; then
            echo "Error occurred while running $module. Exiting."
            exit 1
        fi
        echo "Finished module: $module"
        echo
    else
        echo "Script not found for module: $module. Skipping."
    fi
done

echo "All modules completed successfully."
"""

    with open(bash_script_path, "w") as f:
        f.write(script_content)

    os.chmod(bash_script_path, 0o755)
    print(f"Generated bash script: {bash_script_path}")
    return bash_script_path


### 1. Prepare Run

In [None]:
import os
import shutil
import subprocess as sp
from pathlib import Path
import json
import glob
import numpy as np
import pandas as pd

BASE_DIR = r'C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence' #directory storing confluence runs
REPO_DIR = os.path.join(BASE_DIR, 'modules/D') #directory storing repos i.e ./modules/
RUN_NAME = 'Dtest' #Specific run name i.e. 'test'
os.chdir(BASE_DIR)


###############################
## INITIAL OR NEW MNT DOWNLOAD:
###############################
## Install empty /mnt directory with input data and eventual output data
# ! pip install gdown
# ! gdown 1ok73if0F0oBlYycUr-rr3N_dXoOcXr3u
# ! tar -xzvf *.tar.gz

####################
## SUBSEQUENT RUNS:
####################
# src_dir = os.path.join(BASE_DIR, 'confluence_empty')  # initial unzipped gdown 
run_dir = os.path.join(BASE_DIR, f'confluence_{RUN_NAME}') # new directory for run
# shutil.copytree(src_dir, run_dir) # copy the contents of empty to new (preserves initial data)

# p = Path(f"{new_dir}/empty_mnt") # rename internal mnt to run name
# p.rename(p.with_name(f"{RUN_NAME}_mnt"))


# Point to necessary directories 
SIF_DIR = os.path.join(run_dir, 'sif') # Store built Docker images
sh_dir = os.path.join(run_dir, 'sh_scripts') # Store the sh scripts to run each module
report_dir = os.path.join(run_dir, 'report') # Job logs
mnt_dir = os.path.join(run_dir, f'{RUN_NAME}_mnt') #the mnt storing all confluence run data

os.environ['RUN_NAME'] = RUN_NAME
os.environ['BASE_DIR'] = BASE_DIR
os.environ['REPO_DIR'] = REPO_DIR
os.environ['SIF_DIR'] = SIF_DIR

# Fail safe for directory build
for d in [SIF_DIR, sh_dir, report_dir, REPO_DIR]:
    os.makedirs(d, exist_ok=True)

print(f'RUN_NAME: {RUN_NAME}')
print(f'REPO_DIR: {REPO_DIR}')
print(f'SIF_DIR: {SIF_DIR}')

RUN_NAME: Dtest
REPO_DIR: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\modules/D
SIF_DIR: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sif


In [10]:

#Name of confluence offline module
#expanded and non_expanded modules work from 'setfinder' and 'combine_data'
INCLUDED_MODULES = [
    'expanded_setfinder',
    'expanded_combine_data',
    'input',
    'non_expanded_setfinder',
    'non_expanded_combine_data',
    'prediagnostics',
    # 'priors',
    'metroman',
    'metroman_consolidation',
    'unconstrained_momma',
    # 'hivdi',
    # 'sad',
    'sic4dvar',
    'consensus',
    # 'moi',
    # 'unconstrained_offline',
    # 'validation',
    'output'
]

# Git modules to pull
TARGET_MODULES = [
    'setfinder',
    'combine_data',
    'input',
    'prediagnostics',
    # 'priors',
    'metroman',
    'metroman_consolidation',
    'momma',
    # 'hivdi',
    # 'sad',
    'sic4dvar',
    'consensus',
    # 'moi',
    # 'offline',
    # 'validation',
    'output'
]

# Pull working branches for certain Git repos
branch_map = {
    'setfinder': 'main',
    'combine_data': 'main',
    'input': 'input_D_products',
    'prediagnostics': 'main',
    # 'priors': 'main',
    'metroman': 'main',
    'metroman_consolidation': 'main',
    'momma': 'main',
    # 'hivdi': 'main',
    # 'sad': 'main',
    'sic4dvar': 'main',
    'consensus': 'main',
    # 'moi': 'main',
    # 'offline': 'main',
    # 'validation': 'main',
    'output': 'add-sword-version'
}


In [None]:
name_map = {
        'offline': 'offline-discharge-data-product-creation',
        'moi': 'MOI',
        'validation': 'Validation'
    } # docker images must be lower case

clone_repos(repo_dir=REPO_DIR, repo_names=TARGET_MODULES, name_map=name_map, branch=branch_map)

[Clone] Cloning setfinder from branch main...
[Clone] Cloning combine_data from branch main...
[Clone] Cloning input from branch input_D_products...
[Clone] Cloning prediagnostics from branch main...
[Clone] Cloning metroman from branch main...
[Clone] Cloning metroman_consolidation from branch main...
[Clone] Cloning momma from branch main...
[Clone] Cloning sic4dvar from branch main...
[Clone] Cloning consensus from branch main...
[Clone] Cloning output from branch add-sword-version...


## 3. Build/Push modules to Docker

In [14]:


#------------------------------------------------

# SETUP, DOCKER MUST BE OPEN ON COMPUTER

push = True # Only select True if want to store images on dockerhub to move to HPC (you probably do)
docker_username = 'efrie130'
custom_tag_name = 'vD' # good to name same as the run, will default to 'latest'

# --------------------------------------------------------------------------------------


# Generate Docker images from cloned modules

build_and_push_images(\
                      repo_directory = REPO_DIR, \
                      modules_to_run = TARGET_MODULES, \
                      docker_username = docker_username, \
                      push = push, \
                      custom_tag_name = custom_tag_name \
                     )
                      
# The output should look something like 
# sha256:6900c3d99325a4a7c8b282d4a7a62f2a0f3fc673f03f5ca3333c2746bf20d06a
# docker.io/travissimmons/setfinder:latest


#command line version of build_and_push_images function above - verbose, remove push if not using DockerHub, change tag name as needed
# ! docker build -t efrie130/validation:latest ./Validation/ && docker push efrie130/validation:latest & 

In [None]:
def generate_local_run_scripts(
    run: str,
    modules_to_run: list,
    script_jobs: dict,
    base_dir: str,
    repo_directory: str,
    rebuild_docker: bool,
    docker_username: str,
    push: bool,
    custom_tag_name: str
):
    """
    Generate Python scripts to run Docker containers locally for each module.
    Handles dynamic JSON file detection similar to SLURM version.
    """
    # Directory structure
    mnt_dir = os.path.join(base_dir, f'confluence_{run}', f'{run}_mnt')
    input_dir = os.path.join(mnt_dir, 'input')
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    os.makedirs(sh_dir, exist_ok=True)
    
    # JSON file paths (similar to HPC version)
    json_files = {
        'reaches_of_interest': os.path.join(input_dir, 'reaches_of_interest.json'),
        'expanded': os.path.join(input_dir, 'expanded_reaches_of_interest.json'),
        'reaches': os.path.join(input_dir, 'reaches.json'),
        'basin': os.path.join(input_dir, 'basin.json'),
        'metrosets': os.path.join(input_dir, 'metrosets.json'),
    }
    
    # Build Docker images if requested
    if rebuild_docker:
        print("Building Docker images...")
        build_and_push_images(
            repo_directory=repo_directory,
            target_modules=modules_to_run,
            docker_username=docker_username,
            push=push,
            custom_tag_name=custom_tag_name
        )
    
    # Command dictionary - DO NOT CHANGE
    command_dict = {
        'expanded_setfinder': f'docker run --rm -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -r reaches_of_interest.json -c continent.json -e -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'expanded_combine_data': f'docker run --rm -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -e -s 17',
        'input': f'docker run --rm -v {mnt_dir}/input:/mnt/data {docker_username}/input:{custom_tag_name} -v 17 -r /mnt/data/expanded_reaches_of_interest.json -c SWOT_L2_HR_RiverSP_D -i {{index}}',
        'non_expanded_setfinder': f'docker run --rm -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -c continent.json -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'non_expanded_combine_data': f'docker run --rm -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -s 17',
        'prediagnostics': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/diagnostics/prediagnostics:/mnt/data/output {docker_username}/prediagnostics:{custom_tag_name} -r reaches.json -i {{index}}',
        'unconstrained_priors': f'docker run --rm -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r unconstrained -p usgs riggs -g -s local -i {{index}}',
        'constrained_priors': f'docker run --rm -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r constrained -p usgs riggs -g -s local -i {{index}}',
        'metroman': f'docker run --rm --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/output {docker_username}/metroman:{custom_tag_name} -r metrosets.json -s local -v -i {{index}}',
        'metroman_consolidation': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/flpe {docker_username}/metroman_consolidation:{custom_tag_name} -i {{index}}',
        'unconstrained_momma': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -i {{index}}',
        'constrained_momma': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -c -i {{index}}',
        'neobam': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/geobam:/mnt/data/output {docker_username}/neobam:{custom_tag_name} -r reaches.json -i {{index}}',
        'sad': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sad:/mnt/data/output {docker_username}/sad:{custom_tag_name} --reachfile reaches.json --index {{index}}',
        'sic4dvar': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sic4dvar:/mnt/data/output -v {mnt_dir}/logs:/mnt/data/logs {docker_username}/sic4dvar:{custom_tag_name} -r reaches.json --index {{index}}',
        'moi': f'docker run --rm --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/output {docker_username}/moi:{custom_tag_name} -j basin.json -v -b unconstrained -i {{index}}',
        'consensus': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe {docker_username}/consensus:{custom_tag_name} --mntdir /mnt/data -r /mnt/data/input/reaches.json -i {{index}}',
        'unconstrained_offline': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/output {docker_username}/offline:{custom_tag_name} unconstrained timeseries integrator reaches.json {{index}}',
        'validation': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/output {docker_username}/validation:{custom_tag_name} reaches.json unconstrained {{index}}',
        'output': f'docker run --rm -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/diagnostics:/mnt/data/diagnostics -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/validation -v {mnt_dir}/output:/mnt/data/output {docker_username}/output:{custom_tag_name} -s local -j /app/metadata/metadata.json -m input prediagnostics momma metroman sic4dvar consensus swot priors -v 17 -i {{index}}'
    }
    
    output_paths = []
    
    for module in modules_to_run:
        if module not in command_dict:
            print(f"Warning: No command defined for module '{module}', skipping")
            continue
        
        job_count = script_jobs.get(module, "1")
        
        # Generate Python script with dynamic job count detection
        # Generate Python script with dynamic job count detection
        script_content = f'''#!/usr/bin/env python3
import subprocess as sp
import sys
import os
import json

# Module: {module}

# JSON file paths
json_files = {{
    'reaches_of_interest': r'{json_files['reaches_of_interest']}',
    'expanded': r'{json_files['expanded']}',
    'reaches': r'{json_files['reaches']}',
    'basin': r'{json_files['basin']}',
    'metrosets': r'{json_files['metrosets']}',
}}

def get_json_length(filepath):
    """Get length of JSON array file"""
    if not os.path.exists(filepath):
        return None
    try:
        with open(filepath, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                return len(data)
    except Exception as e:
        print(f"Error reading {{{{filepath}}}}: {{{{e}}}}")
    return None

# Determine job count for this module
job_count = "{job_count}"

if job_count == "$default_jobs":
    # Dynamic job count based on module-specific logic
    num_jobs = None
    
    # Module-specific JSON file selection (matching HPC logic)
    if "{module}" == "input":
        # Use expanded_reaches_of_interest.json if it exists
        num_jobs = get_json_length(json_files['expanded'])
        if num_jobs is None:
            print("Error: expanded_reaches_of_interest.json not found for input module")
            print("Make sure expanded_combine_data has been run first")
            sys.exit(1)
    
    elif "{module}" in ["metroman", "metroman_consolidation"]:
        # Use metrosets.json if it exists, otherwise reaches.json, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['metrosets'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    elif "{module}" == "moi":
        # Use basin.json if it exists, otherwise reaches.json, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['basin'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    else:
        # For most modules: use reaches.json if exists, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    if num_jobs is None:
        print("Error: Could not determine job count for module '{module}'")
        sys.exit(1)
    
    print(f"Determined {{{{num_jobs}}}} job(s) dynamically for module '{module}'")
else:
    num_jobs = int(job_count)
    print(f"Using hardcoded job count: {{{{num_jobs}}}} for module '{module}'")

# Docker command template
command_template = """{command_dict[module]}"""

print(f"\\nStarting module: {module}")
print(f"Running {{{{num_jobs}}}} job(s)\\n")

for index in range(num_jobs):
    print(f"--- Running job {{{{index + 1}}}}/{{{{num_jobs}}}} for module '{module}' ---")
    
    # Replace {{{{{{{{index}}}}}}}} with actual index
    run_command = command_template.replace('{{{{index}}}}', str(index))
    print(f"Command: {{{{run_command}}}}")
    
    try:
        result = sp.run(run_command, shell=True, check=True)
        print(f"✓ Job {{{{index}}}} completed successfully\\n")
    except sp.CalledProcessError as e:
        print(f"✗ Job {{{{index}}}} failed with exit code {{{{e.returncode}}}}")
        sys.exit(1)

print(f"✓ All jobs completed for module '{module}'")
'''

        output_script_path = os.path.join(sh_dir, f"run_{module}.py")
        with open(output_script_path, 'w') as f:
            f.write(script_content)
        
        os.chmod(output_script_path, 0o755)
        output_paths.append(output_script_path)
        print(f"Created: {output_script_path}")
    
    return output_paths

def generate_run_all_modules_script(
    run: str,
    modules_to_run: list,
    script_jobs: dict,
    base_dir: str,
    script_name: str = "run_all_modules.sh"
):
    """
    Generate a bash script that runs all module scripts in series.
    
    Parameters
    ----------
    run : str
        Run name
    modules_to_run : list
        List of modules
    script_jobs : dict
        Module -> job count mapping
    base_dir : str
        Base directory
    script_name : str
        Name of the generated script
    """
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    script_path = os.path.join(base_dir, f'confluence_{run}', script_name)
    
    # Filter modules with non-zero job counts
    filtered_modules = [m for m in modules_to_run if script_jobs.get(m, "0") != "0"]
    
    # Generate modules array
    modules_array = "modules_to_run=(\n"
    for module in filtered_modules:
        modules_array += f'    "{module}"\n'
    modules_array += ")\n"
    
    script_content = f"""#!/bin/bash
# {script_name}
# Runs all module scripts in series for run: {run}

SCRIPT_DIR="{sh_dir}"

{modules_array}

echo "=========================================="
echo "Starting Confluence Run: {run}"
echo "=========================================="
echo ""

for module in "${{modules_to_run[@]}}"; do
    script_path="${{SCRIPT_DIR}}/run_${{module}}.py"
    
    if [[ -f "$script_path" ]]; then
        echo "=========================================="
        echo "Running module: $module"
        echo "Script: $script_path"
        echo "=========================================="
        python3 "$script_path"
        
        if [[ $? -ne 0 ]]; then
            echo "Error occurred while running $module. Exiting."
            exit 1
        fi
        
        echo "Finished module: $module"
        echo ""
    else
        echo "Script not found for module: $module. Skipping."
    fi
done

echo "=========================================="
echo "All modules completed successfully!"
echo "=========================================="
"""
    
    with open(script_path, "w") as f:
        f.write(script_content)
    
    os.chmod(script_path, 0o755)
    print(f"Created: {script_path}")
    return script_path






In [21]:



# Define hardcoded job counts
HARDCODED_JOBS = {
    "expanded_setfinder": "7",
    "expanded_combine_data": "1",
    "non_expanded_setfinder": "7",
    "non_expanded_combine_data": "1",
    "unconstrained_priors": "7",
    "constrained_priors": "7",
    "output": "7",
}

# Define dynamic modules
DYNAMIC_MODULES = [
    "input",
    "prediagnostics",
    "metroman",
    "metroman_consolidation",
    "sic4dvar",
    "unconstrained_momma",
    "constrained_momma",
    "sad",
    "moi",
    "consensus",
    "unconstrained_offline",
    "validation",
]

# Build script_jobs dict
script_jobs = {}
for module in INCLUDED_MODULES:
    if module in HARDCODED_JOBS:
        script_jobs[module] = HARDCODED_JOBS[module]
    elif module in DYNAMIC_MODULES:
        script_jobs[module] = "$default_jobs"

# Generate scripts
generate_local_run_scripts(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    repo_directory=REPO_DIR,
    rebuild_docker=False,
    docker_username=docker_username,
    push=False,
    custom_tag_name=custom_tag_name
)

# Generate master run script
generate_run_all_modules_script(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    script_name="run_all_modules.sh"
)

print("\nAll scripts generated!")
print(f"\nTo run all modules:")
print(f"  cd {os.path.join(BASE_DIR, f'confluence_{RUN_NAME}')}")
print(f"  ./run_all_modules.sh")

Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_expanded_setfinder.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_expanded_combine_data.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_input.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_non_expanded_setfinder.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_non_expanded_combine_data.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_prediagnostics.py
Created: C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts\run_metroman.py
Created: C:\

In [34]:
import os
os.getcwd()
!dir


 Volume in drive C is OS
 Volume Serial Number is 1E96-2F26

 Directory of C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence

02/04/2026  12:50 PM    <DIR>          .
02/04/2026  12:49 PM    <DIR>          ..
02/04/2026  01:48 PM    <DIR>          confluence_Dtest
02/04/2026  12:50 PM    <DIR>          modules
               0 File(s)              0 bytes
               4 Dir(s)  13,928,333,312 bytes free


In [35]:
base = os.path.abspath("confluence_Dtest/sh_scripts")
print(base)
!dir "{base}"


C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts
 Volume in drive C is OS
 Volume Serial Number is 1E96-2F26

 Directory of C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts

02/04/2026  01:48 PM    <DIR>          .
02/04/2026  01:48 PM    <DIR>          ..
02/04/2026  01:48 PM             5,554 run_consensus.py
02/04/2026  01:48 PM             5,481 run_expanded_combine_data.py
02/04/2026  01:48 PM             5,536 run_expanded_setfinder.py
02/04/2026  01:48 PM             5,400 run_input.py
02/04/2026  01:48 PM             5,562 run_metroman.py
02/04/2026  01:48 PM             5,655 run_metroman_consolidation.py
02/04/2026  01:48 PM             5,518 run_non_expanded_combine_data.py
02/04/2026  01:48 PM             5,545 run_non_expanded_setfinder.py
02/04/2026  01:48 PM             6,258 run_output.py
02/04/2026  01:48 PM             5,598 run_prediagnostics.py
02/04

In [36]:
for mod in [
    'expanded_setfinder','expanded_combine_data','input','prediagnostics',
    'metroman','metroman_consolidation','unconstrained_momma',
    'sic4dvar','consensus','output'
]:
    !python "{base}/run_{mod}.py"


  File "C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts/run_expanded_setfinder.py", line 2
    import subprocess as sp
IndentationError: unexpected indent
  File "C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts/run_expanded_combine_data.py", line 2
    import subprocess as sp
IndentationError: unexpected indent
  File "C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts/run_input.py", line 2
    import subprocess as sp
IndentationError: unexpected indent
  File "C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts/run_prediagnostics.py", line 2
    import subprocess as sp
IndentationError: unexpected indent
  File "C:\Users\efriedmann\OneDrive - University of Massachusetts\Documents\confluence\confluence_Dtest\sh_scripts/run_metroman.py", line 2
    import su

In [None]:




script_jobs = {
        "expanded_setfinder": "7",
        "expanded_combine_data": "1",
        "input": "$default_jobs",
        "non_expanded_setfinder": "7",
        "non_expanded_combine_data": "1",
        "prediagnostics": "$default_jobs",
        # "unconstrained_priors": "7", 
        "sad": "$default_jobs",
        "metroman": "$default_jobs",
        "metroman_consolidation": "$default_jobs",
        "sic4dvar": "$default_jobs",
        "unconstrained_momma": "$default_jobs",
        "neobam": "$default_jobs",
        "moi": "$default_jobs",
        "unconstrained_offline": "$default_jobs",
        "validation": "$default_jobs",
        "output": "7",
    }

# Then, generate the Python script that runs the command
# Change Rebuild docker to TRUE if you made module and tag changes for a one-click run 

generate_run_scripts(
    run=run,
    modules_to_run=modules_to_run,
    script_jobs=script_jobs,
    base_dir=base_dir,
    repo_directory=repo_directory,
    rebuild_docker=False,
    docker_username=docker_username,
    push=False,
    custom_tag_name=run,
    default_jobs_json_path=os.path.join(base_dir, f"confluence_{run}", f"{run}_mnt", "input", "expanded_reaches_of_interest.json") #note that expanded_reaches_of_interest is only generated after expanded_combine_data is run
)

generate_run_all_modules_bash(
    modules_to_run=modules_to_run,
    script_jobs=script_jobs,
    base_dir=base_dir,
    script_folder_name = "sh_scripts",
    bash_script_name = "run_all_modules.sh"
)


In [None]:
# After running this notebook, there will be individual python scripts and run_all_module.sh file generated in the same directory.
# You can either run the python scripts individually or run the .sh script to run them all on the command line
