# Confluence Module Docker Runner

## Run Confluence on a LOCAL Machine

## Requirements
* docker installed somewhere where you have sudo priveledges to the point where "docker --version" completes successfully
* a python environment

## Optional
* a dockerhub account (free, good for initial transition to HPC, sharing, versioning)


## Run Confluence 
### Alter 1. and 2. for your local setup
1. Git fork all of the repos you want to run, make sure you have sudo priveledges on a machine where "docker --version" works (locally)
2. Prep an empty_mnt directory to store confluence run (requires gdown package in environment) and clone modules of interest
3. Run the "build_and_push_images" function of this notebook locally to build images
4. Run the "generate_run_scripts" section of this notebook to create python submission scripts for each module
5. Run the generate_all_modules_bash Confluence Driver Script Generator section of this notebook to create a .sh submission script that runs each of the modules one by one (the one click run, Linux/Mac friendly)
6. Run single module or entirety of confluence based on 5. or 6.
7. Need to run again, or on different reaches? Run all steps except gdown in part 2 and part 3 to create a new mnt, edit the reaches_of_interest file in mnt/input/ 


## Run Confluence Parallelized
* See bottom section; replace generate_slurm_scripts with generate_slurm_scripts_parallel

### FUNCTIONS

In [1]:
# FUNCTIONS IGNORE

def clone_repos(github_name, repo_dir, repo_names, name_map, branch='main'):
    """Clone repositories with specified branch.
    
    Parameters
    ----------
    github_name : str
        GitHub username or organization name
    repo_dir : str
        Directory to clone repos into
    repo_names : list
        List of repository names to clone
    branch : str or dict, optional
        Branch name to clone. Can be:
        - A string: same branch for all repos (default: 'main')
        - A dict: mapping repo name to specific branch
    """
    os.makedirs(repo_dir, exist_ok=True)
    
    for name in repo_names:
        path = os.path.join(repo_dir, name)
        repo_name = name_map.get(name, name)
        url = f'https://github.com/{github_name}/{repo_name}.git'
        
        # Determine which branch to use
        if isinstance(branch, dict):
            branch_name = branch.get(name, 'main')
        else:
            branch_name = branch
        
        if os.path.exists(path):
            print(f'[Remove] Deleting existing {name} to overwrite...')
            try:
                shutil.rmtree(path)  # rm -rf
            except OSError as e:
                print(f"Error: {path} : {e.strerror}")
        
        print(f'[Clone] Cloning {name} from branch {branch_name}...')
        sp.run(['git', 'clone', '--branch', branch_name, url, name], cwd=repo_dir)



def build_and_push_images(repo_directory:str, modules_to_run:list, docker_username:str, push:bool = True, custom_tag_name:str = 'latest'):
    for a_repo_name in modules_to_run:
        repo_path = os.path.join(repo_directory, a_repo_name)
        docker_path = f'{docker_username}/{a_repo_name}:{custom_tag_name}'
        build_cmd = ['docker', 'build','--quiet', '-f', os.path.join(repo_path, "Dockerfile"), '-t', docker_path, repo_path]
        try:
            sp.run(build_cmd)
        except Exception as e:
            raise RuntimeError(
                f"Docker build failed...\n"
                f"Build Command: {build_cmd}\n"
                f"Error: {e}"
            )
        if push:
            try:
                push_cmd = ['docker', 'push','--quiet', docker_path]
                sp.run(push_cmd)
            except Exception as e:
                raise RuntimeError(
                    f"Docker push failed...\n"
                    f"Push Command: {push_cmd}\n"
                    f"Error: {e}"
                )

#FUNCTIONS
def generate_local_run_scripts(
    run: str,
    modules_to_run: list,
    target_modules: list,
    script_jobs: dict,
    base_dir: str,
    repo_directory: str,
    rebuild_docker: bool,
    docker_username: str,
    push: bool,
    custom_tag_name: str
    ):
    """
    Generate Python scripts to run Docker containers locally for each module.
    Handles dynamic JSON file detection similar to SLURM version.
    """
    
    def to_docker_path(path):
        p = str(path).replace('\\', '/')
        if len(p) >= 2 and p[1] == ':':
            p = '/' + p[0].lower() + p[2:]
        return p
    
    # Directory structure
    mnt_dir_native = os.path.join(base_dir, f'confluence_{run}', f'{run}_mnt')
    mnt_dir = to_docker_path(mnt_dir_native)
    input_dir = os.path.join(mnt_dir_native, 'input')
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    logs_dir = os.path.join(mnt_dir_native, 'logs')

    os.makedirs(sh_dir, exist_ok=True)
    os.makedirs(os.path.join(mnt_dir_native, 'logs'), exist_ok=True)  # use native path

    # JSON file paths (similar to HPC version)
    json_files = {
        'reaches_of_interest': os.path.join(input_dir, 'reaches_of_interest.json'),
        'expanded': os.path.join(input_dir, 'expanded_reaches_of_interest.json'),
        'reaches': os.path.join(input_dir, 'reaches.json'),
        'basin': os.path.join(input_dir, 'basin.json'),
        'metrosets': os.path.join(input_dir, 'metrosets.json'),
    }
    
    # Build Docker images if requested
    if rebuild_docker:
        print("Building Docker images...")
        build_and_push_images(
            repo_directory=repo_directory,
            modules_to_run=target_modules,
            docker_username=docker_username,
            push=push,
            custom_tag_name=custom_tag_name
        )
    
    # Command dictionary
    command_dict = {
        'expanded_setfinder': f'docker run -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -r reaches_of_interest.json -c continent.json -e -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'expanded_combine_data': f'docker run -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -e -s 17',
        'input': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/input:{custom_tag_name} -v 17 -r /mnt/data/expanded_reaches_of_interest.json -c SWOT_L2_HR_RiverSP_D -i {{index}}',
        'non_expanded_setfinder': f'docker run -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -c continent.json -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'non_expanded_combine_data': f'docker run -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -s 17',
        'prediagnostics': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/diagnostics/prediagnostics:/mnt/data/output {docker_username}/prediagnostics:{custom_tag_name} -r reaches.json -i {{index}}',
        # 'unconstrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r unconstrained -p usgs riggs -g -s local -i {{index}}',
        # 'constrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r constrained -p usgs riggs -g -s local -i {{index}}',
        'metroman': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/output {docker_username}/metroman:{custom_tag_name} -r metrosets.json -s local -v -i {{index}}',
        'metroman_consolidation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/flpe {docker_username}/metroman_consolidation:{custom_tag_name} -i {{index}}',
        'unconstrained_momma': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -i {{index}}',
        'constrained_momma': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -c -i {{index}}',
        'sad': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sad:/mnt/data/output {docker_username}/sad:{custom_tag_name} --reachfile reaches.json --index {{index}}',
        'hivdi': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/hivdi:/mnt/data/flpe/hivdi {docker_username}/hivdi:{custom_tag_name} /mnt/data/input/reaches.json --input-dir /mnt/data/input -i ${{index}}',
        'sic4dvar': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sic4dvar:/mnt/data/output -v {mnt_dir}/logs:/mnt/data/logs {docker_username}/sic4dvar:{custom_tag_name} -r reaches.json --index {{index}}',
        'moi': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/output {docker_username}/moi:{custom_tag_name} -j basin.json -v -b unconstrained -i {{index}}',
        'consensus': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe {docker_username}/consensus:{custom_tag_name} --mntdir /mnt/data -r /mnt/data/input/reaches.json -i {{index}}',
        'unconstrained_offline': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/output {docker_username}/offline:{custom_tag_name} unconstrained timeseries integrator reaches.json {{index}}',
        'validation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/output {docker_username}/validation:{custom_tag_name} reaches.json unconstrained {{index}}',
        'output': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/diagnostics:/mnt/data/diagnostics -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/validation -v {mnt_dir}/output:/mnt/data/output {docker_username}/output:{custom_tag_name} -s local -j /app/metadata/metadata.json -m input momma metroman sic4dvar consensus swot -v 17 -i {{index}}'
    }
    
    output_paths = []
    
    for module in modules_to_run:
        if module not in command_dict:
            print(f"Warning: No command defined for module '{module}', skipping")
            continue
        
        job_count = script_jobs.get(module, "1")
        
        # Generate Python script with dynamic job count detection and logging support
        script_content = f'''#!/usr/bin/env python3
import subprocess as sp
import sys
import os
import json

# Module: {module}

# Check for --log flag
use_logging = '--log' in sys.argv
logs_dir = r'{logs_dir}'

# JSON file paths
json_files = {{
    'reaches_of_interest': r'{json_files['reaches_of_interest']}',
    'expanded': r'{json_files['expanded']}',
    'reaches': r'{json_files['reaches']}',
    'basin': r'{json_files['basin']}',
    'metrosets': r'{json_files['metrosets']}',
}}

def get_json_length(filepath):
    """Get length of JSON array file"""
    if not os.path.exists(filepath):
        return None
    try:
        with open(filepath, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                return len(data)
    except Exception as e:
        print(f"Error reading {{filepath}}: {{e}}")
    return None

# Determine job count for this module
job_count = "{job_count}"

if job_count == "$default_jobs":
    # Dynamic job count based on module-specific logic
    num_jobs = None
    
    # Module-specific JSON file selection (matching HPC logic)
    if "{module}" == "input":
        # Use expanded_reaches_of_interest.json if it exists
        num_jobs = get_json_length(json_files['expanded'])
        if num_jobs is None:
            print("Error: expanded_reaches_of_interest.json not found for input module")
            print("Make sure expanded_combine_data has been run first")
            sys.exit(1)
    
    elif "{module}" == "metroman":
        # Use metrosets.json if it exists, otherwise reaches.json, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['metrosets'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    elif "{module}" == "moi":
        # Use basin.json if it exists, otherwise reaches.json, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['basin'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    else:
        # For most modules: use reaches.json if exists, otherwise reaches_of_interest.json
        num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])
    
    if num_jobs is None:
        print("Error: Could not determine job count for module '{module}'")
        sys.exit(1)
    
    print(f"Determined {{num_jobs}} job(s) dynamically for module '{module}'")
else:
    num_jobs = int(job_count)

# Docker command template
command_template = r"""{command_dict[module]}"""

print(f"\\nStarting module: {module}")
print(f"Running {{num_jobs}} job(s)")
if use_logging:
    print(f"Logs will be written to: {{logs_dir}}")
print()

for index in range(num_jobs):
    print(f"--- Running job {{index + 1}}/{{num_jobs}} for module '{module}' ---")
    
    # Replace {{index}} with actual index
    run_command = command_template.replace('{{index}}', str(index))
    
    if use_logging:
        # Write output to log file
        log_file = os.path.join(logs_dir, f"{module}_{{index}}.log")
        print(f"Logging to: {{log_file}}")
        
        try:
            with open(log_file, 'w') as f:
                result = sp.run(run_command, shell=True, stdout=f, stderr=sp.STDOUT)
            
            if result.returncode == 0:
                print(f"Job {{index}} completed successfully\\n")
            else:
                print(f"Job {{index}} failed with exit code {{result.returncode}}")
                print(f"Check log: {{log_file}}\\n")
        except Exception as e:
            print(f"Error running job {{index}}: {{e}}\\n")
    else:
        # Direct output to terminal
        print(f"Command: {{run_command}}")
        
        try:
            result = sp.run(run_command, shell=True, check=True)
            print(f"Job {{index}} completed successfully\\n")
        except sp.CalledProcessError as e:
            print(f"Job {{index}} failed with exit code {{e.returncode}}\\n")

print(f"All jobs completed for module '{module}'")
if use_logging:
    print(f"Logs saved in: {{logs_dir}}")
'''

        output_script_path = os.path.join(sh_dir, f"run_{module}.py")
        with open(output_script_path, 'w') as f:
            f.write(script_content)
        
        os.chmod(output_script_path, 0o755)
        output_paths.append(output_script_path)
        print(f"Created: {output_script_path}")
    
    return output_paths

def generate_run_all_modules_script(
    run: str,
    modules_to_run: list,
    script_jobs: dict,
    base_dir: str,
    script_name: str = "run_all_modules.sh"
):
    """
    Generate a bash script that runs all module scripts in series.
    
    Parameters
    ----------
    run : str
        Run name
    modules_to_run : list
        List of modules
    script_jobs : dict
        Module -> job count mapping
    base_dir : str
        Base directory
    script_name : str
        Name of the generated script
    """
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    script_path = os.path.join(sh_dir, script_name)
    
    # Filter modules with non-zero job counts
    filtered_modules = [m for m in modules_to_run if script_jobs.get(m, "0") != "0"]
    
    # Generate modules array
    modules_array = "modules_to_run=(\n"
    for module in filtered_modules:
        modules_array += f'    "{module}"\n'
    modules_array += ")\n"
    
    script_content = f"""#!/bin/bash
# {script_name}
# Runs all module scripts in series for run: {run}

SCRIPT_DIR="{sh_dir}"

{modules_array}

echo "Starting Confluence Run: {run}"

for module in "${{modules_to_run[@]}}"; do
    script_path="${{SCRIPT_DIR}}/run_${{module}}.py"
    
    if [[ -f "$script_path" ]]; then
        echo "Running module: $module"
        echo "Script: $script_path"
        python3 "$script_path" "$@"
        
        if [[ $? -ne 0 ]]; then
            echo "Error occurred while running $module. Exiting."
        fi
        
        echo "Finished module: $module"
        echo ""
    else
        echo "Script not found for module: $module. Skipping."
    fi
done

echo "All modules finished!"
"""
    
    with open(script_path, "w") as f:
        f.write(script_content)
    
    os.chmod(script_path, 0o755)
    print(f"Created: {script_path}")
    return script_path


### 1. Prepare Run

In [10]:
# CHANGE ALL DIRECTORES AND NAMES TO MATCH YOUR LOCAL SETUP

import os
import shutil
import subprocess as sp
from pathlib import Path
import json
import glob
import numpy as np
import pandas as pd
import tarfile
import gdown
import sys

#Make initial folders
BASE_DIR = Path('/Users/elisafriedmann/Documents/confluence') #Windows #Path(r'C:\yourPath\confluence') #Mac: #Path('/yourPath/confluence') #directory storing confluence runs
REPO_DIR = BASE_DIR / 'modules'  #directory storing repos

# Build directories
for d in [BASE_DIR, REPO_DIR]:
    d.mkdir(parents=True, exist_ok=True)
os.chdir(BASE_DIR)

RUN_NAME = 'runTest' #Specific run name i.e. 'test'
run_dir = BASE_DIR / f'confluence_{RUN_NAME}' # new directory for run
src_dir = BASE_DIR / 'confluence_empty'

#------------------------------------------------

# SETUP, GitHub, DOCKER (DOCKER MUST BE OPEN)
github_name = 'efried130' # GitHub username or organization name where repos are located
build = False # Only select True if want to store images on dockerhub (one way to move to HPC)
docker_username = 'efrie130'
custom_tag_name = 'vD' # version control, will default to 'latest'

In [11]:
# Choose modules of interest to run

#Name of confluence offline module
#expanded and non_expanded modules each work from single 'setfinder' and 'combine_data' module
INCLUDED_MODULES = [
    'expanded_setfinder',
    'expanded_combine_data',
    'input',
    'non_expanded_setfinder',
    'non_expanded_combine_data',
    'prediagnostics',
    # 'priors',
    'metroman',
    'metroman_consolidation',
    'unconstrained_momma',
    # 'hivdi',
    # 'sad',
    'sic4dvar',
    'consensus',
    # 'moi',
    # 'unconstrained_offline',
    # 'validation',
    'output'
]

# Git modules to pull
TARGET_MODULES = [
    'setfinder',
    'combine_data',
    'input',
    'prediagnostics',
    # 'priors',
    'metroman',
    'metroman_consolidation',
    'momma',
    # 'hivdi',
    # 'sad',
    'sic4dvar',
    'consensus',
    # 'moi',
    # 'offline',
    # 'validation',
    'output'
]

# Pull working branches for certain Git repos
branch_map = {
    'setfinder': 'main',
    'combine_data': 'main',
    'input': 'input_D_products',
    'prediagnostics': 'main',
    # 'priors': 'main',
    'metroman': 'main',
    'metroman_consolidation': 'main',
    'momma': 'main',
    # 'hivdi': 'main',
    # 'sad': 'main',
    'sic4dvar': 'main',
    'consensus': 'main',
    # 'moi': 'main',
    # 'offline': 'main',
    # 'validation': 'main',
    'output': 'add-sword-version'
}


### 2. Download Confluence Directory and Modules

In [12]:
###############################
## INITIAL OR NEW MNT DOWNLOAD:

## Only run once (large files)
# Install empty /mnt directory with input data and eventual output data
# Can also run in command line
###############################

# ! pip install gdown
! gdown 10gJwg0wsl51K_mcoXGq1uQVW34oQwrJc


Downloading...
From (original): https://drive.google.com/uc?id=10gJwg0wsl51K_mcoXGq1uQVW34oQwrJc
From (redirected): https://drive.google.com/uc?id=10gJwg0wsl51K_mcoXGq1uQVW34oQwrJc&confirm=t&uuid=e55abaad-18f4-4511-a4ec-fab11f134d33
To: /Users/elisafriedmann/Documents/confluence/confluence_empty.tar.gz
100%|██████████████████████████████████████| 2.47G/2.47G [01:34<00:00, 26.2MB/s]


In [13]:
####################
## SUBSEQUENT RUNS:
# Copy the empty mount you just downloaded 
# Use this to preserve the empty mount as the base structure
# Do this every time you need to run a new set of reaches
####################

## Extract from tar.gz
tar_path = src_dir.with_suffix('.tar.gz')
with tarfile.open(tar_path, 'r:gz') as tar:
    tar.extractall(path=src_dir.parent)

## Rename to your run
src_dir.rename(run_dir)  # Rename to your run directory
p = run_dir / "empty_mnt" # rename internal mnt to run name
p.rename(p.with_name(f"{RUN_NAME}_mnt"))


  tar.extractall(path=src_dir.parent)


PosixPath('/Users/elisafriedmann/Documents/confluence/confluence_runTest/runTest_mnt')

In [14]:
# Point to necessary directories 
SIF_DIR = run_dir / 'sif' # Store built Docker images
sh_dir = run_dir / 'sh_scripts' # Store the sh scripts to run each module
report_dir = run_dir / 'report' # Job logs
mnt_dir = run_dir / f'{RUN_NAME}_mnt' #the mnt storing all confluence run data

os.environ['RUN_NAME'] = RUN_NAME
os.environ['BASE_DIR'] = str(BASE_DIR)
os.environ['REPO_DIR'] = str(REPO_DIR)
os.environ['SIF_DIR'] = str(SIF_DIR)

print(f'RUN_NAME: {RUN_NAME}')
print(f'REPO_DIR: {REPO_DIR}')
print(f'SIF_DIR: {SIF_DIR}')

RUN_NAME: runTest
REPO_DIR: /Users/elisafriedmann/Documents/confluence/modules
SIF_DIR: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sif


In [15]:
########################
## Clone repos (modules) from GitHub 
# only need to run ONCE unless you want to pull new branches or new modules
########################

name_map = {
        'offline': 'offline-discharge-data-product-creation',
        'moi': 'MOI',
        'validation': 'Validation',
        'hivdi': 'h2ivdi'
    } # docker images must be lower case, should not change unless new names or new modules added

clone_repos(github_name=github_name, repo_dir=REPO_DIR, repo_names=TARGET_MODULES, name_map=name_map, branch=branch_map)

[Clone] Cloning setfinder from branch main...


Cloning into 'setfinder'...


[Clone] Cloning combine_data from branch main...


Cloning into 'combine_data'...


[Clone] Cloning input from branch input_D_products...


Cloning into 'input'...


[Clone] Cloning prediagnostics from branch main...


Cloning into 'prediagnostics'...


[Clone] Cloning metroman from branch main...


Cloning into 'metroman'...


[Clone] Cloning metroman_consolidation from branch main...


Cloning into 'metroman_consolidation'...


[Clone] Cloning momma from branch main...


Cloning into 'momma'...


[Clone] Cloning sic4dvar from branch main...


Cloning into 'sic4dvar'...


[Clone] Cloning consensus from branch main...


Cloning into 'consensus'...


[Clone] Cloning output from branch add-sword-version...


Cloning into 'output'...


## 3. Build/Push modules to Docker

In [None]:
##################################################################################
# Generate Docker images from cloned modules 
# Only need to run ONCE unless you changed a module and are testing through docker
##################################################################################


build_and_push_images(\
                      repo_directory = REPO_DIR, \
                      modules_to_run = TARGET_MODULES, \
                      docker_username = docker_username, \
                      push = build, \
                      custom_tag_name = custom_tag_name \
                     )
                      
# The output should look something like 
# sha256:6900c3d99325a4a7c8b282d4a7a62f2a0f3fc673f03f5ca3333c2746bf20d06a
# docker.io/travissimmons/setfinder:latest


#command line version of build_and_push_images function above - verbose, remove push if not using DockerHub, change tag name latest as needed
# ! docker build -t efrie130/validation:latest ./Validation/ && docker push efrie130/validation:latest & 

## 4. Create scripts

In [6]:
# Define hardcoded job counts
HARDCODED_JOBS = {
    "expanded_setfinder": "6",
    "expanded_combine_data": "1",
    "non_expanded_setfinder": "6",
    "non_expanded_combine_data": "1",
    "unconstrained_priors": "6",
    "constrained_priors": "6",
    "metroman_consolidation": "6",
    "output": "6",
}

# Define dynamic modules
DYNAMIC_MODULES = [
    "input",
    "prediagnostics",
    "metroman",
    "sic4dvar",
    "unconstrained_momma",
    "constrained_momma",
    "sad",
    "moi",
    "consensus",
    "unconstrained_offline",
    "validation",
]

# Build script_jobs dict
script_jobs = {}
for module in INCLUDED_MODULES:
    if module in HARDCODED_JOBS:
        script_jobs[module] = HARDCODED_JOBS[module]
    elif module in DYNAMIC_MODULES:
        script_jobs[module] = "$default_jobs"

# Generate scripts
# Can rebuild docker here in one step if testing changes
generate_local_run_scripts(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    target_modules=TARGET_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    repo_directory=REPO_DIR,
    rebuild_docker=False, # Set to True if you want to rebuild Docker images (e.g. if you made changes to the code)
    docker_username=docker_username,
    push=False,
    custom_tag_name=custom_tag_name
)

# Generate master run script
generate_run_all_modules_script(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    script_name="run_all_modules.sh"
)

print("\nAll scripts generated!")


Created: D:\confluence\confluence_runTest\sh_scripts\run_expanded_setfinder.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_expanded_combine_data.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_input.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_non_expanded_setfinder.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_non_expanded_combine_data.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_prediagnostics.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_metroman.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_metroman_consolidation.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_unconstrained_momma.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_sic4dvar.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_consensus.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_output.py
Created: D:\confluence\confluence_runTest\sh_scripts\run_all_modules.sh

All scripts genera

## 5. Run Confluence

In [8]:
# Run modules - can use this cell or terminal
# Note: scripts will usually run to completion, the logs directory will record actual errors from the run

os.chdir(BASE_DIR)

# Option 1 - Run ONE MODULE (i.e. first module)
# !python "{sh_dir}\run_expanded_setfinder.py" --log

# Option 2 - Run multiple modules
# Can be paralellized (but best results when modules run in serial)!

#Linux/mac option
# !"{sh_dir}/run_all_modules.sh" --log

# Windows option
import sys
for mod in INCLUDED_MODULES:
    result = sp.run(
        [sys.executable, os.path.join(sh_dir, f'run_{mod}.py'), '--log']
    )

### Changing Modules 
1. Create and run confluence through the minimum number of modules (i.e. input or prediag) as baseline
2. Change module script and point inputs to 1. and outputs to new directory (symlink can be helpful!)
3. Run analysis by modifying the python run scripts above or run full module by modifying docker command in command_dict both within generate_local_run_scripts function
4. Change modules included in 'output' using the command_dict (generaete_local_run_scripts function)

## Parallellizing Modules

* If you need to run many reaches, adding a parallel componenet to each .py script may be necessary
* Change MAX_WORKERS argument in generate_local_run_scripts_parallel to fit your system

In [16]:
# Define hardcoded job counts (mostly per continent)
HARDCODED_JOBS = {
    "expanded_setfinder": "6",
    "expanded_combine_data": "1",
    "non_expanded_setfinder": "6",
    "non_expanded_combine_data": "1",
    "unconstrained_priors": "6",
    "constrained_priors": "6",
    "metroman_consolidation": "6",
    "output": "6",
}

# Define dynamic modules (usually per reach/basin)
DYNAMIC_MODULES = [
    "input",
    "prediagnostics",
    "metroman",
    "sic4dvar",
    "unconstrained_momma",
    "constrained_momma",
    "sad",
    "moi",
    "consensus",
    "unconstrained_offline",
    "validation",
]

# Build script_jobs dict
script_jobs = {}
for module in INCLUDED_MODULES:
    if module in HARDCODED_JOBS:
        script_jobs[module] = HARDCODED_JOBS[module]
    elif module in DYNAMIC_MODULES:
        script_jobs[module] = "$default_jobs"
        
# #Sample function
# # Change to match local needs
def generate_local_run_scripts_parallel(
    run: str,
    modules_to_run: list,
    target_modules: list,
    script_jobs: dict,
    base_dir: str,
    repo_directory: str,
    rebuild_docker: bool,
    docker_username: str,
    push: bool,
    custom_tag_name: str,
    max_workers: int = 4,
):
    """
    Generate Python scripts to run Docker containers locally for each module.
    Handles dynamic JSON file detection similar to SLURM version.
    Uses multiprocessing for parallel job execution.

    Parameters
    ----------
    max_workers : int
        Number of parallel workers for all modules (default: 4)
    """

    
    def to_docker_path(path):
        p = str(path).replace('\\', '/')
        if len(p) >= 2 and p[1] == ':':
            p = '/' + p[0].lower() + p[2:]
        return p
    
    # Directory structure
    mnt_dir_native = os.path.join(base_dir, f'confluence_{run}', f'{run}_mnt')
    mnt_dir = to_docker_path(mnt_dir_native)
    input_dir = os.path.join(mnt_dir_native, 'input')
    sh_dir = os.path.join(base_dir, f'confluence_{run}', 'sh_scripts')
    logs_dir = os.path.join(mnt_dir_native, 'logs')

    os.makedirs(sh_dir, exist_ok=True)
    os.makedirs(os.path.join(mnt_dir_native, 'logs'), exist_ok=True)  # use native path

    # JSON file paths (similar to HPC version)
    json_files = {
        'reaches_of_interest': os.path.join(input_dir, 'reaches_of_interest.json'),
        'expanded': os.path.join(input_dir, 'expanded_reaches_of_interest.json'),
        'reaches': os.path.join(input_dir, 'reaches.json'),
        'basin': os.path.join(input_dir, 'basin.json'),
        'metrosets': os.path.join(input_dir, 'metrosets.json'),
    }
    
    # Build Docker images if requested
    if rebuild_docker:
        print("Building Docker images...")
        build_and_push_images(
            repo_directory=repo_directory,
            modules_to_run=target_modules,
            docker_username=docker_username,
            push=push,
            custom_tag_name=custom_tag_name
        )
    
    # Command dictionary
    command_dict = {
        'expanded_setfinder': f'docker run -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -r reaches_of_interest.json -c continent.json -e -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'expanded_combine_data': f'docker run -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -e -s 17',
        'input': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/input:{custom_tag_name} -v 17 -r /mnt/data/expanded_reaches_of_interest.json -c SWOT_L2_HR_RiverSP_D -i {{index}}',
        'non_expanded_setfinder': f'docker run -v {mnt_dir}/input:/data {docker_username}/setfinder:{custom_tag_name} -c continent.json -s 17 -o /data -n /data -a MetroMan HiVDI SIC -i {{index}}',
        'non_expanded_combine_data': f'docker run -v {mnt_dir}/input:/data {docker_username}/combine_data:{custom_tag_name} -d /data -s 17',
        'prediagnostics': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/diagnostics/prediagnostics:/mnt/data/output {docker_username}/prediagnostics:{custom_tag_name} -r reaches.json -i {{index}}',
        # 'unconstrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r unconstrained -p usgs riggs -g -s local -i {{index}}',
        # 'constrained_priors': f'docker run -v {mnt_dir}/input:/mnt/data {docker_username}/priors:{custom_tag_name} -r constrained -p usgs riggs -g -s local -i {{index}}',
        'metroman': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/output {docker_username}/metroman:{custom_tag_name} -r metrosets.json -s local -v -i {{index}}',
        'metroman_consolidation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/metroman:/mnt/data/flpe {docker_username}/metroman_consolidation:{custom_tag_name} -i {{index}}',
        'unconstrained_momma': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -i {{index}}',
        'constrained_momma': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/momma:/mnt/data/output {docker_username}/momma:{custom_tag_name} -r reaches.json -m 3 -c -i {{index}}',
        'sad': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sad:/mnt/data/output {docker_username}/sad:{custom_tag_name} --reachfile reaches.json --index {{index}}',
        'hivdi': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/hivdi:/mnt/data/flpe/hivdi {docker_username}/hivdi:{custom_tag_name} /mnt/data/input/reaches.json --input-dir /mnt/data/input -i ${{index}}',
        'sic4dvar': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe/sic4dvar:/mnt/data/output -v {mnt_dir}/logs:/mnt/data/logs {docker_username}/sic4dvar:{custom_tag_name} -r reaches.json --index {{index}}',
        'moi': f'docker run --env AWS_BATCH_JOB_ID="foo" -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/output {docker_username}/moi:{custom_tag_name} -j basin.json -v -b unconstrained -i {{index}}',
        'consensus': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe {docker_username}/consensus:{custom_tag_name} --mntdir /mnt/data -r /mnt/data/input/reaches.json -i {{index}}',
        'unconstrained_offline': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/output {docker_username}/offline:{custom_tag_name} unconstrained timeseries integrator reaches.json {{index}}',
        'validation': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/output {docker_username}/validation:{custom_tag_name} reaches.json unconstrained {{index}}',
        'output': f'docker run -v {mnt_dir}/input:/mnt/data/input -v {mnt_dir}/flpe:/mnt/data/flpe -v {mnt_dir}/moi:/mnt/data/moi -v {mnt_dir}/diagnostics:/mnt/data/diagnostics -v {mnt_dir}/offline:/mnt/data/offline -v {mnt_dir}/validation:/mnt/data/validation -v {mnt_dir}/output:/mnt/data/output {docker_username}/output:{custom_tag_name} -s local -j /app/metadata/metadata.json -m input momma metroman sic4dvar consensus swot -v 17 -i {{index}}'
    }

    output_paths = []

    for module in modules_to_run:
        if module not in command_dict:
            print(f"Warning: No command defined for module '{module}', skipping")
            continue

        job_count = script_jobs.get(module, "1")

        script_content = f'''#!/usr/bin/env python3
import subprocess as sp
import sys
import os
import json
from multiprocessing import Pool

# Module: {module}

# Check for --log flag
use_logging = '--log' in sys.argv
logs_dir = r'{logs_dir}'

# JSON file paths
json_files = {{
    'reaches_of_interest': r'{json_files['reaches_of_interest']}',
    'expanded': r'{json_files['expanded']}',
    'reaches': r'{json_files['reaches']}',
    'basin': r'{json_files['basin']}',
    'metrosets': r'{json_files['metrosets']}',
}}

def get_json_length(filepath):
    """Get length of JSON array file"""
    if not os.path.exists(filepath):
        return None
    try:
        with open(filepath, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                return len(data)
    except Exception as e:
        print(f"Error reading {{filepath}}: {{e}}")
    return None

# Determine job count for this module
job_count = "{job_count}"

if job_count == "$default_jobs":
    num_jobs = None

    if "{module}" == "input":
        num_jobs = get_json_length(json_files['expanded'])
        if num_jobs is None:
            print("Error: expanded_reaches_of_interest.json not found for input module")
            print("Make sure expanded_combine_data has been run first")
            sys.exit(1)

    elif "{module}" == "metroman":
        num_jobs = get_json_length(json_files['metrosets'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])

    elif "{module}" == "moi":
        num_jobs = get_json_length(json_files['basin'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])

    else:
        num_jobs = get_json_length(json_files['reaches'])
        if num_jobs is None:
            num_jobs = get_json_length(json_files['reaches_of_interest'])

    if num_jobs is None:
        print("Error: Could not determine job count for module '{module}'")
        sys.exit(1)

    print(f"Determined {{num_jobs}} job(s) dynamically for module '{module}'")
else:
    num_jobs = int(job_count)

# Docker command template
command_template = r"""{command_dict[module]}"""

MAX_WORKERS = {max_workers}

def run_job(index):
    """Run a single Docker job. Designed to be called by multiprocessing.Pool."""
    run_command = command_template.replace('{{index}}', str(index))

    if use_logging:
        log_file = os.path.join(logs_dir, f"{module}_{{index}}.log")
        try:
            with open(log_file, 'w') as f:
                result = sp.run(run_command, shell=True, stdout=f, stderr=sp.STDOUT)
            return index, result.returncode, log_file
        except Exception as e:
            return index, -1, str(e)
    else:
        try:
            result = sp.run(run_command, shell=True, capture_output=True, text=True)
            if result.stdout:
                print(result.stdout, flush=True)
            if result.stderr:
                print(result.stderr, flush=True)
            return index, result.returncode, None
        except Exception as e:
            return index, -1, str(e)


def main():
    print(f"\\nStarting module: {module}")
    print(f"Running {{num_jobs}} job(s) with {{MAX_WORKERS}} parallel workers")
    if use_logging:
        print(f"Logs will be written to: {{logs_dir}}")
    print()

    indices = list(range(num_jobs))
    failed_jobs = []

    with Pool(processes=MAX_WORKERS) as pool:
        for index, returncode, info in pool.imap_unordered(run_job, indices):
            if returncode == 0:
                print(f"[OK] Job {{index}} completed successfully", flush=True)
            else:
                failed_jobs.append(index)
                if use_logging:
                    print(f"[FAIL] Job {{index}} failed (exit {{returncode}}) — log: {{info}}", flush=True)
                else:
                    print(f"[FAIL] Job {{index}} failed (exit {{returncode}}): {{info}}", flush=True)

    print(f"\\nAll jobs finished for module '{module}'")
    print(f"  Succeeded: {{num_jobs - len(failed_jobs)}}/{{num_jobs}}")
    if failed_jobs:
        print(f"  Failed jobs: {{failed_jobs}}")
    if use_logging:
        print(f"  Logs saved in: {{logs_dir}}")


# Required for multiprocessing on Windows
if __name__ == '__main__':
    main()
'''

        output_script_path = os.path.join(sh_dir, f"run_{module}.py")
        with open(output_script_path, 'w') as f:
            f.write(script_content)

        os.chmod(output_script_path, 0o755)
        output_paths.append(output_script_path)
        print(f"Created: {output_script_path}")

    return output_paths



In [None]:
generate_local_run_scripts_parallel(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    target_modules=TARGET_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    repo_directory=REPO_DIR,
    rebuild_docker=False,
    docker_username=docker_username,
    push=False,
    custom_tag_name=custom_tag_name,
    max_workers=4, # Set this based on machine
)

# Generate master run script
generate_run_all_modules_script(
    run=RUN_NAME,
    modules_to_run=INCLUDED_MODULES,
    script_jobs=script_jobs,
    base_dir=BASE_DIR,
    script_name="run_all_modules.sh"
)

print("\nAll scripts generated!")



#Linux/mac option
!"{sh_dir}/run_all_modules.sh" --log

# Windows option
# Log files will be saved in the logs directory, but will likely not log to cell

# for mod in INCLUDED_MODULES:
#     result = sp.run(
#         [sys.executable, os.path.join(sh_dir, f'run_{mod}.py'), '--log'])

Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_expanded_setfinder.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_expanded_combine_data.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_input.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_non_expanded_setfinder.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_non_expanded_combine_data.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_prediagnostics.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_metroman.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_metroman_consolidation.py
Created: /Users/elisafriedmann/Documents/confluence/confluence_runTest/sh_scripts/run_unconstrained_momma.py
Created: /Users/elisafriedmann/Docum