In [1]:
import subprocess
import shlex
import os
import random
import re
import glob

from subprocess import PIPE
from pathlib import Path

In [2]:
CC_PATTERN = re.compile(r"(?<!^)(?=[A-Z])")

In [3]:
def submit_independent_spatially_explicit_parallel(
    parallelism_mode, delta_t, step_slice, dedup_cache, # sweetspot
    repeats=10, walltime=1, speciation=0.000001, sample=0.00025, partitions=4, memory=16,
    reporters=['Execution()', 'Biodiversity()'], output='./STDIN'
):
    output = Path(output).resolve(strict=False)
    output.parent.mkdir(parents=True, exist_ok=True)
    
    glob_pathname = f"{glob.escape(output)}.e*.*"
    
    successful = []
    erroneous = []
    
    for path in glob.iglob(glob_pathname):
        with open(path) as file:
            content = file.read()
            
            if content == '':
                successful.append(path)
            else:
                erroneous.append(path)
                
    for error_path in erroneous:
        Path('o'.join(error_path.rsplit('e', 1))).unlink()
        Path(error_path).unlink()
        
    repeats = repeats - len(successful)
    
    if repeats < 1:
        return None
    
    print(f"{repeats} x {output}")
    
    repeats = max(repeats, 2)
    
    walltime = "{:02d}".format(walltime)
    reporters = ','.join(reporters)
    ncpus = 32 if partitions > 8 and partitions < 32 else partitions
    
    config = fr"""
    #PBS -lselect={1}:ncpus={ncpus}:mem={memory}gb
    #PBS -J 0-{max(repeats - 1, 0)}
    #PBS -lwalltime={walltime}:00:00
    
    module load mpi
    module load anaconda3/personal

    mpiexec -n {partitions} $HOME/necsim-rust/target-mpi/release/rustcoalescence simulate '(
        speciation: {speciation},
        sample: {sample},
        seed: '$(python3 -c "import random; print(random.getrandbits(64))")',

        algorithm: Independent(
            delta_t: {delta_t},
            step_slice: {step_slice},
            dedup_cache: {dedup_cache},
            parallelism_mode: {parallelism_mode},
        ),

        scenario: SpatiallyExplicit(
            habitat: "'$HOME'/necsim-rust/maps/madingley/fg0size12/habitat.tif",
            dispersal: "'$HOME'/necsim-rust/maps/madingley/fg0size12/dispersal.tif",
        ),
        
        log: "'$TMPDIR'/event_log",

        reporters: [
            Plugin(
                library: "'$HOME'/necsim-rust/target-mpi/release/deps/libnecsim_plugins_common.so",
                reporters: [{reporters}],
            ),
        ],
    )'
    
    $HOME/necsim-rust/target-replay/release/rustcoalescence replay '(
        logs: [
            "'$(echo $TMPDIR | sed "s:\[\([0-9]\+\)\]:\[\[\]\1\[\]\]:g")'/event_log/*/*"
        ],

        reporters: [
            Plugin(
                library: "'$HOME'/necsim-rust/target-replay/release/deps/libnecsim_plugins_common.so",
                reporters: [{reporters}],
            ),
        ],
    )'
    """
    
    cwd = os.getcwd()
    os.chdir(output.parent)
    
    # Submit the simulation batch
    result = subprocess.run(shlex.split(
        f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}"
    ), check=True, input=config, stdout=PIPE, universal_newlines=True).stdout.strip()
    
    os.chdir(cwd)
    
    return result

In [4]:
def submit_independent_spatially_explicit_isolated(
    parallelism_mode, delta_t, step_slice, dedup_cache, # sweetspot
    repeats=10, walltime=1, speciation=0.000001, sample=0.00025, partitions=4, memory=16,
    reporters=['Execution()', 'Biodiversity()'], output='./STDIN'
):
    output = Path(output).resolve(strict=False)
    output.parent.mkdir(parents=True, exist_ok=True)
    
    ISOLATED_REPLAY_FILE_PATTERN = re.compile(fr"pbs\.{partitions}\.(\d+)(?:\[\]\.pbs)?\.e\d+")
    ISOLATED_PARTITIONS_FILE_PATTERN = re.compile(fr"pbs\.{partitions}\.isolated\.e(\d+)\.\d+")
    
    successful_replay = set()
    erroneous_replay = set()
    
    successful_partition = set()
    erroneous_partition = set()
    
    for path in output.parent.iterdir():
        replay_match = ISOLATED_REPLAY_FILE_PATTERN.match(path.name)
        partition_match = ISOLATED_PARTITIONS_FILE_PATTERN.match(path.name)
        
        if replay_match is not None:
            with open(path) as file:
                content = file.read()

            if content == '':
                successful_replay.add(replay_match.group(1))
            else:
                print(path, content)
                erroneous_replay.add(replay_match.group(1))
        elif partition_match is not None:
            with open(path) as file:
                content = file.read()

            if content == '':
                successful_partition.add(partition_match.group(1))
            else:
                erroneous_partition.add(partition_match.group(1))
    
    erroneous = erroneous_replay | erroneous_partition | (successful_partition - successful_replay)
    successful = successful_replay - erroneous
    
    for error in erroneous:
        for error_path in glob.iglob(f"{glob.escape(output)}.{error}.[oe]*"):
            Path(error_path).unlink()
            
        for error_path in glob.iglob(f"{glob.escape(output)}.{error}[[][]].pbs.[oe]*"):
            Path(error_path).unlink()
            
        for error_path in glob.iglob(f"{glob.escape(output)}.isolated.[oe]{error}.*"):
            Path(error_path).unlink()
        
    repeats = repeats - len(successful)
    
    if repeats < 1:
        return None
    
    print(f"{repeats} x {output}")
    
    walltime = "{:02d}".format(walltime)
    reporters = ','.join(reporters)
    
    results = []
    
    for _ in range(repeats):
        seed = random.getrandbits(64)
        
        config = fr"""
        #PBS -lselect={1}:ncpus={1}:mem={memory}gb
        #PBS -J 0-{max(partitions - 1, 0)}
        #PBS -lwalltime={walltime}:00:00

        $HOME/necsim-rust/target-base/release/rustcoalescence simulate '(
            speciation: {speciation},
            sample: {sample},
            seed: {seed},

            algorithm: Independent(
                delta_t: {delta_t},
                step_slice: {step_slice},
                dedup_cache: {dedup_cache},
                parallelism_mode: {parallelism_mode.format(partition=f"Partition(rank:'$PBS_ARRAY_INDEX',partitions:{partitions})")},
            ),

            scenario: SpatiallyExplicit(
                habitat: "'$HOME'/necsim-rust/maps/madingley/fg0size12/habitat.tif",
                dispersal: "'$HOME'/necsim-rust/maps/madingley/fg0size12/dispersal.tif",
            ),

            log: "'$EPHEMERAL'/'$(echo $PBS_JOBID | sed "s:\[[0-9]\+\]:\[\]:g")'/event_log/'$PBS_ARRAY_INDEX'",

            reporters: [
                Plugin(
                    library: "'$HOME'/necsim-rust/target-mpi/release/deps/libnecsim_plugins_common.so",
                    reporters: [{reporters}],
                ),
            ],
        )'
        """
    
        cwd = os.getcwd()
        os.chdir(output.parent)

        # Submit the simulation batch
        pbs_jobid = subprocess.run(shlex.split(
            f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}.isolated"
        ), check=True, input=config, stdout=PIPE, universal_newlines=True).stdout.strip()
        
        pbs_jobid_folder = pbs_jobid.replace("[]", "[[][]]")
        
        config = fr"""
        #PBS -lselect={1}:ncpus={1}:mem={memory}gb
        #PBS -lwalltime={walltime}:00:00
        #PBS -W depend=afterok:{pbs_jobid}

        $HOME/necsim-rust/target-replay/release/rustcoalescence replay '(
            logs: [
                "'$EPHEMERAL'/{pbs_jobid_folder}/event_log/*/*/*"
            ],

            reporters: [
                Plugin(
                    library: "'$HOME'/necsim-rust/target-replay/release/deps/libnecsim_plugins_common.so",
                    reporters: [{reporters}],
                ),
            ],
        )'
        
        rm -rf $EPHEMERAL/{pbs_jobid_folder}/event_log
        """
        
        # Submit the simulation batch
        results.append(subprocess.run(shlex.split(
            f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}.{pbs_jobid.replace('[]', '')}"
        ), check=True, input=config, stdout=PIPE, universal_newlines=True).stdout.strip())

        os.chdir(cwd)
    
    return results

In [5]:
def submit_cuda_spatially_explicit_isolated(
    parallelism_mode, delta_t, block_size, grid_size, step_slice, dedup_cache, # sweetspot
    repeats=10, walltime=1, speciation=0.000001, sample=0.00025, memory=24, partitions=4,
    reporters=['Execution()', 'Biodiversity()'], output='./STDIN'
):
    output = Path(output).resolve(strict=False)
    output.parent.mkdir(parents=True, exist_ok=True)
    
    ISOLATED_REPLAY_FILE_PATTERN = re.compile(fr"pbs\.{partitions}\.(\d+)(?:\[\]\.pbs)?\.e\d+")
    ISOLATED_PARTITIONS_FILE_PATTERN = re.compile(fr"pbs\.{partitions}\.isolated\.e(\d+)\.\d+")
    
    successful_replay = set()
    erroneous_replay = set()
    
    successful_partition = set()
    erroneous_partition = set()
    
    for path in output.parent.iterdir():
        replay_match = ISOLATED_REPLAY_FILE_PATTERN.match(path.name)
        partition_match = ISOLATED_PARTITIONS_FILE_PATTERN.match(path.name)
        
        if replay_match is not None:
            with open(path) as file:
                content = file.read()

            if content == '':
                successful_replay.add(replay_match.group(1))
            else:
                print(path, content)
                erroneous_replay.add(replay_match.group(1))
        elif partition_match is not None:
            with open(path) as file:
                content = file.read()

            if content == '':
                successful_partition.add(partition_match.group(1))
            else:
                print(path, content)
                erroneous_partition.add(partition_match.group(1))
    
    erroneous = erroneous_replay | erroneous_partition | (successful_partition - successful_replay)
    successful = successful_replay - erroneous
    
    for error in erroneous:
        for error_path in glob.iglob(f"{glob.escape(output)}.{error}.[oe]*"):
            Path(error_path).unlink()
            
        for error_path in glob.iglob(f"{glob.escape(output)}.{error}[[][]].pbs.[oe]*"):
            Path(error_path).unlink()
            
        for error_path in glob.iglob(f"{glob.escape(output)}.isolated.[oe]{error}.*"):
            Path(error_path).unlink()
        
    repeats = repeats - len(successful)
    
    if repeats < 1:
        return None
    
    print(f"{repeats} x {output}")
    
    walltime = "{:02d}".format(walltime)
    reporters = ','.join(reporters)
    
    results = []
    
    for _ in range(repeats):
        seed = random.getrandbits(64)

        config = fr"""
        #PBS -lselect={1}:ncpus={1}:mem={memory}gb:ngpus={1}:gpu_type=P1000
        #PBS -J 0-{max(partitions - 1, 0)}
        #PBS -lwalltime={walltime}:00:00

        $HOME/necsim-rust/target-cuda/release/rustcoalescence simulate '(
            speciation: {speciation},
            sample: {sample},
            seed: {seed},

            algorithm: CUDA(
                device: {0},
                ptx_jit: {str(True).lower()},
                delta_t: {delta_t},
                block_size: {block_size},
                grid_size: {grid_size},
                step_slice: {step_slice},
                dedup_cache: {dedup_cache},
                parallelism_mode: {parallelism_mode.format(partition=f"Partition(rank:'$PBS_ARRAY_INDEX',partitions:{partitions})")},
            ),

            scenario: SpatiallyExplicit(
                habitat: "'$HOME'/necsim-rust/maps/madingley/fg0size12/habitat.tif",
                dispersal: "'$HOME'/necsim-rust/maps/madingley/fg0size12/dispersal.tif",
            ),
            
            log: "'$EPHEMERAL'/'$(echo $PBS_JOBID | sed "s:\[[0-9]\+\]:\[\]:g")'/event_log/'$PBS_ARRAY_INDEX'",

            reporters: [
                Plugin(
                    library: "'$HOME'/necsim-rust/target-base/release/deps/libnecsim_plugins_common.so",
                    reporters: [{reporters}],
                ),
            ],
        )'
        """

        cwd = os.getcwd()
        os.chdir(output.parent)

        # Submit the simulation batch
        pbs_jobid = subprocess.run(shlex.split(
            f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}.isolated"
        ), check=True, input=config, stdout=PIPE, universal_newlines=True).stdout.strip()
        
        pbs_jobid_folder = pbs_jobid.replace("[]", "[[][]]")
        
        config = fr"""
        #PBS -lselect={1}:ncpus={1}:mem={memory}gb
        #PBS -lwalltime={walltime}:00:00
        #PBS -W depend=afterok:{pbs_jobid}

        $HOME/necsim-rust/target-replay/release/rustcoalescence replay '(
            logs: [
                "'$EPHEMERAL'/{pbs_jobid_folder}/event_log/*/*/*"
            ],

            reporters: [
                Plugin(
                    library: "'$HOME'/necsim-rust/target-replay/release/deps/libnecsim_plugins_common.so",
                    reporters: [{reporters}],
                ),
            ],
        )'
        
        rm -rf $EPHEMERAL/{pbs_jobid_folder}/event_log
        """
        
        # Submit the simulation batch
        result = subprocess.run(shlex.split(
            f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}.{pbs_jobid.replace('[]', '')}"
        ), check=True, input=config, stdout=PIPE, stderr=PIPE, universal_newlines=True)
        
        if result.stderr != '':
            print(result.stderr)
        
        results.append(result.stdout.strip())

        os.chdir(cwd)
    
    return results

In [6]:
def submit_monolithic_spatially_explicit_parallel(
    algorithm, parallelism_mode,
    repeats=10, walltime=1, speciation=0.000001, sample=0.00025, partitions=4, memory=16,
    reporters=['Execution()', 'Biodiversity()'], output='./STDIN'
):
    output = Path(output).resolve(strict=False)
    output.parent.mkdir(parents=True, exist_ok=True)
    
    glob_pathname = f"{glob.escape(output)}.e*.*"
    
    successful = []
    erroneous = []
    
    for path in glob.iglob(glob_pathname):
        with open(path) as file:
            content = file.read()
            
            if content == '':
                successful.append(path)
            else:
                erroneous.append(path)
                
    for error_path in erroneous:
        Path('o'.join(error_path.rsplit('e', 1))).unlink()
        Path(error_path).unlink()
        
    repeats = repeats - len(successful)
    
    if repeats < 1:
        return None
    
    print(f"{repeats} x {output}")
    
    repeats = max(repeats, 2)
    
    walltime = "{:02d}".format(walltime)
    reporters = ','.join(reporters)
    ncpus = 32 if partitions > 8 and partitions < 32 else partitions
    
    config = fr"""
    #PBS -lselect={1}:ncpus={ncpus}:mem={memory}gb
    #PBS -J 0-{max(repeats - 1, 0)}
    #PBS -lwalltime={walltime}:00:00
    
    module load mpi
    module load anaconda3/personal

    mpiexec -n {partitions} $HOME/necsim-rust/target-mpi/release/rustcoalescence simulate '(
        speciation: {speciation},
        sample: {sample},
        seed: '$(python3 -c "import random; print(random.getrandbits(64))")',

        algorithm: {algorithm}(parallelism_mode:{parallelism_mode}),

        scenario: SpatiallyExplicit(
            habitat: "'$HOME'/necsim-rust/maps/madingley/fg0size12/habitat.tif",
            dispersal: "'$HOME'/necsim-rust/maps/madingley/fg0size12/dispersal.tif",
        ),
        
        log: "'$TMPDIR'/event_log",

        reporters: [
            Plugin(
                library: "'$HOME'/necsim-rust/target-mpi/release/deps/libnecsim_plugins_common.so",
                reporters: [{reporters}],
            ),
        ],
    )'
    
    $HOME/necsim-rust/target-replay/release/rustcoalescence replay '(
        logs: [
            "'$(echo $TMPDIR | sed "s:\[\([0-9]\+\)\]:\[\[\]\1\[\]\]:g")'/event_log/*/*"
        ],

        reporters: [
            Plugin(
                library: "'$HOME'/necsim-rust/target-replay/release/deps/libnecsim_plugins_common.so",
                reporters: [{reporters}],
            ),
        ],
    )'
    """
    
    cwd = os.getcwd()
    os.chdir(output.parent)
    
    # Submit the simulation batch
    result = subprocess.run(shlex.split(
        f"{os.environ['HOME']}/qsubbuf/target/release/qsubbuf -N {output.name}"
    ), check=True, input=config, stdout=PIPE, universal_newlines=True).stdout.strip()
    
    os.chdir(cwd)
    
    return result

In [7]:
for partitions, memory, walltime in [
    (2, 16, 14), (4, 16, 12), (8, 32, 8), (12, 32, 4), (16, 32, 2), (24, 62, 2), (32, 62, 2), (48, 124, 2)
]:
    submit_monolithic_spatially_explicit_parallel(
        "Classical", f"Averaging(delta_sync:{1.0})",
        walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
        output=f"classical-averaging/pbs.{partitions}",
    )

In [8]:
for partitions, memory, walltime in [
    (2, 16, 16), (4, 16, 10), (8, 32, 8), (12, 32, 4), (16, 62, 2), (24, 62, 2), (32, 62, 2), (48, 124, 2)
]:
    submit_monolithic_spatially_explicit_parallel(
        "Gillespie", f"Averaging(delta_sync:{1.0})",
        walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
        output=f"gillespie-averaging/pbs.{partitions}",
    )

In [9]:
for partitions, memory, walltime in [
    (2, 16, 1), (4, 16, 1), (8, 32, 1), (12, 32, 1), (16, 62, 1), (24, 62, 1), (32, 62, 1), (48, 124, 2)
]:
    submit_monolithic_spatially_explicit_parallel(
        "SkippingGillespie", f"Averaging(delta_sync:{1.0})",
        walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
        output=f"skipping-gillespie-averaging/pbs.{partitions}",
    )

In [10]:
for partitions, memory, walltime in [
    (2, 16, 16), (4, 16, 16), (8, 32, 16), (12, 62, 16), (16, 62, 12), (24, 62, 12), (32, 124, 12), (48, 124, 12)
]:
    for parallelism_mode in ["Individuals", "Landscape", f"Probabilistic(communication:{0.25})"]:
        parallelism_name = parallelism_mode[:parallelism_mode.find('(')] if "(" in parallelism_mode else parallelism_mode
        
        submit_independent_spatially_explicit_parallel(
            parallelism_mode, 2.0, 10, f"Relative(factor: {1.0})", # delta_t, step_slice, dedup_cache
            walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
            output=f"independent-{CC_PATTERN.sub('-', parallelism_name).lower()}/pbs.{partitions}",
        )

In [11]:
for partitions, memory, walltime in [
    (2, 16, 12), (4, 16, 12), (8, 16, 12), (12, 16, 12), (16, 16, 12), (24, 16, 12), (32, 16, 8), (48, 16, 8)
]:
    for parallelism_mode in [
        f"IsolatedIndividuals(event_slice:{100000000}, partition: {{partition}})",
        f"IsolatedLandscape(event_slice:{100000000}, partition: {{partition}})"
    ]:
        parallelism_name = parallelism_mode[:parallelism_mode.find('(')] if "(" in parallelism_mode else parallelism_mode

        submit_independent_spatially_explicit_isolated(
            parallelism_mode, 2.0, 10, f"Relative(factor: {1.0})", # delta_t, step_slice, dedup_cache
            walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
            output=f"independent-{CC_PATTERN.sub('-', parallelism_name).lower()}/pbs.{partitions}",
        )

In [12]:
for partitions, memory, walltime in [
    (2, 16, 1), (4, 16, 1), (8, 16, 1), (12, 16, 1), (16, 16, 1), (24, 16, 1), (32, 16, 1), (48, 16, 1)
]:
    for parallelism_mode in [
        f"IsolatedIndividuals(event_slice:{2000000000}, partition: {{partition}})",
        f"IsolatedLandscape(event_slice:{2000000000}, partition: {{partition}})"
    ]:
        parallelism_name = parallelism_mode[:parallelism_mode.find('(')] if "(" in parallelism_mode else parallelism_mode

        submit_cuda_spatially_explicit_isolated(
            parallelism_mode, 3.0, 64, 64, 150, f"Relative(factor: {0.1})", # delta_t, block_size, grid_size, step_slice, dedup_cache,
            walltime=walltime, memory=memory, repeats=10, speciation=0.000001, sample=0.025, partitions=partitions,
            output=f"cuda-{CC_PATTERN.sub('-', parallelism_name).lower()}/pbs.{partitions}",
        )