In [1]:
%matplotlib inline

In [2]:
import os
import asyncio
import matplotlib.pyplot as plt
import numpy as np
import MDAnalysis as mda

In [3]:
import arcd
import arcd.distributed as arcdd

In [4]:
n_engines = 6

In [5]:
scratch_dir = "/home/think/scratch/arcd_distributed/"
wdirs = [os.path.join(scratch_dir, f"engine_wdir{i}") for i in range(n_engines)]

In [6]:
[os.mkdir(d) for d in wdirs]

[None, None, None, None, None, None]

In [7]:
engines = [arcdd.GmxEngine(gro_file=os.path.join(scratch_dir, "gmx_infiles/conf.gro"),
                           top_file=os.path.join(scratch_dir, "gmx_infiles/topol.top"),
                           mdrun_extra_args="-nt 4",
                           )
           for _ in range(n_engines)]

In [8]:
mdps = [arcdd.MDP(os.path.join(scratch_dir, "gmx_infiles/md.mdp"))
        for _ in range(n_engines)]

In [9]:
mdps[0]

{'title': ['test'], 'cpp': ['/lib/cpp'], 'include': ['-I../top'], 'define': [], 'integrator': ['md-vv'], 'dt': [0.002], 'nsteps': [100000], 'nstxout': [10], 'nstvout': [10], 'nstlog': [10], 'nstenergy': [10], 'nstxout-compressed': [10], 'compressed-x-grps': ['Protein'], 'energygrps': ['Protein', 'SOL'], 'nstlist': [10], 'ns-type': ['grid'], 'cutoff-scheme': ['Verlet'], 'rlist': [1.1], 'coulombtype': ['PME'], 'rcoulomb': [1.1], 'rvdw': [1.1], 'tcoupl': ['Berendsen'], 'tc-grps': ['Protein', 'SOL'], 'tau-t': [0.1, 0.1], 'ref-t': [300.0, 300.0], 'Pcoupl': ['Berendsen'], 'tau-p': [1.0], 'compressibility': [4.5e-05], 'ref-p': [1.0], 'gen-vel': ['no'], 'gen-temp': [300.0], 'gen-seed': [173529], 'constraints': ['all-bonds']}

In [10]:
for mdp in mdps:
    mdp['nstvout'] = 10
    mdp["nstxout"] = 10

In [11]:
mdps[0]

{'title': ['test'], 'cpp': ['/lib/cpp'], 'include': ['-I../top'], 'define': [], 'integrator': ['md-vv'], 'dt': [0.002], 'nsteps': [100000], 'nstxout': [10], 'nstvout': [10], 'nstlog': [10], 'nstenergy': [10], 'nstxout-compressed': [10], 'compressed-x-grps': ['Protein'], 'energygrps': ['Protein', 'SOL'], 'nstlist': [10], 'ns-type': ['grid'], 'cutoff-scheme': ['Verlet'], 'rlist': [1.1], 'coulombtype': ['PME'], 'rcoulomb': [1.1], 'rvdw': [1.1], 'tcoupl': ['Berendsen'], 'tc-grps': ['Protein', 'SOL'], 'tau-t': [0.1, 0.1], 'ref-t': [300.0, 300.0], 'Pcoupl': ['Berendsen'], 'tau-p': [1.0], 'compressibility': [4.5e-05], 'ref-p': [1.0], 'gen-vel': ['no'], 'gen-temp': [300.0], 'gen-seed': [173529], 'constraints': ['all-bonds']}

In [12]:
#for engine, wdir, mdp in zip(engines, wdirs, mdps):
#    engine.prepare(starting_configuration=None, workdir=wdir, deffnm="blub", run_config=mdp)

In [13]:
import time

In [14]:
import mdtraj as mdt
import numpy as np

def psi(traj):
    traj = mdt.load(traj.trajectory_file, 
                    top=os.path.join(scratch_dir, "gmx_infiles/conf.gro"),  # mdt can not work with tprs, so we use theinitial gro for now
                    )
    psi = mdt.compute_psi(traj)
    return psi

def alpha_R(traj):
    traj = mdt.load(traj.trajectory_file, 
                    top=os.path.join(scratch_dir, "gmx_infiles/conf.gro"),  # mdt can not work with tprs, so we use theinitial gro for now
                    )
    psi = mdt.compute_dihedrals(traj, indices=[[6,8,14,16]])[:, 0]
    phi = mdt.compute_dihedrals(traj, indices=[[4,6,8,14]])[:, 0]
    state = np.full_like(psi, False, dtype=bool)
    # phi: -pi -> 0 
    # psi: > -50 but smaller 30 degree
    deg = 180/np.pi
    state[(phi <= 0) & (-50/deg <= psi) & (psi <= 30/deg)] = True
    return state

wrapped_alphaR = arcdd.trajectory.TrajectoryFunctionWrapper(alpha_R)
wrapped_psi = arcdd.trajectory.TrajectoryFunctionWrapper(psi)

In [15]:
import arcd.distributed.logic

In [16]:
propagators = [arcd.distributed.logic.PropagatorUntilAnyState(states=[wrapped_alphaR],
                                                              engine=e, walltime_per_part=0.005
                                                             )
               for e in engines
              ]

In [17]:
#initial_tp_mdt = mdt.load(os.path.join(scratch_dir, "gmx_infiles/ala_400K_TP_low_barrier.h5"))

In [18]:
#initial_tp_mdt[10].save_trr(os.path.join(scratch_dir, "gmx_infiles/tp_frame.trr"))

In [19]:
starting_conf = arcdd.Trajectory(trajectory_file=os.path.join(scratch_dir, "gmx_infiles/tp_frame.trr"),
                                 topology_file=os.path.join(scratch_dir, "gmx_infiles/tp_frame.gro"))

In [20]:
await wrapped_alphaR(starting_conf)

array([False])

In [21]:
tasks = [asyncio.create_task(p.propagate_and_concatenate(starting_configuration=starting_conf,
                                                           workdir=wdir,
                                                           deffnm="blub",
                                                           run_config=mdp,
                                                           tra_out=os.path.join(wdir, "blub_until_state.trr"),
                                                          ))
         for p, wdir, mdp in zip(propagators, wdirs, mdps)]

done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

In [32]:
list(done)[0]

(Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub_until_state.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub.tpr),
 0)

In [36]:
tasks

[<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir0/blub.tpr), 0)>,
 <Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir1/blub.tpr), 0)>,
 <Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir2/blub.tpr), 0)>,
 <Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir3/blub.tpr), 0)>,
 <Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Traject

In [40]:
done, pending = await asyncio.wait(tasks,)# return_when=asyncio.FIRST_COMPLETED)

In [44]:
for i in done:
    print(i)
    print(tasks.index(i))

<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir2/blub.tpr), 0)>
2
<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir5/blub.tpr), 0)>
5
<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir0/blub.tpr), 0)>
0
<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajectory(tr...dir3/blub.tpr), 0)>
3
<Task finished coro=<PropagatorUntilAnyState.propagate_and_concatenate() done, defined at /home/think/Documents/sources/OPS/arcd/arcd/distributed/logic.py:667> result=(Trajecto

In [42]:
pending

set()

In [None]:
start = time.time()
await asyncio.gather(*(e.prepare(starting_configuration=None, workdir=wdir, deffnm="blub", run_config=mdp)
                               for e, wdir, mdp in zip(engines, wdirs, mdps)
                               )
                            )
end = time.time()
print(f"time elapsed: {end-start} s")

In [16]:
walltime = 0.02 # 0.01 h = 36 s
start = time.time()
trajs = await asyncio.gather(*(e.run_walltime(walltime) for e in engines))
end = time.time()
print(f"time elapsed: {end-start} s")

time elapsed: 72.21506142616272 s


In [17]:
trajs

[Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir0/blub.part0001.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir0/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir1/blub.part0001.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir1/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir2/blub.part0001.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir2/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub.part0001.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir4/blub.part0001.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir4/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir5/blub.part0001.trr, topology_file=/home/think/scratch/arcd_dist

In [21]:
start = time.time()
result1 = await wrapped_func(trajs[-1])
end = time.time()
print(f"time elapsed: {end-start} s")

time elapsed: 0.48328495025634766 s


In [22]:
start = time.time()
results = await asyncio.gather(*(wrapped_func(t) for t in trajs))
end = time.time()
print(f"time elapsed: {end-start} s")

time elapsed: 0.5739212036132812 s


In [23]:
trajs

[Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir0/blub.part0002.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir0/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir1/blub.part0002.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir1/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir2/blub.part0002.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir2/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub.part0002.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir3/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir4/blub.part0002.trr, topology_file=/home/think/scratch/arcd_distributed/engine_wdir4/blub.tpr),
 Trajectory(trajectory_file=/home/think/scratch/arcd_distributed/engine_wdir5/blub.part0002.trr, topology_file=/home/think/scratch/arcd_dist