In [1]:
import os
import subprocess
import re
from os.path import join
from io import StringIO
from textwrap import dedent
from collections import defaultdict

import pandas as pd

import qnet
from qnet.algebra import *

import QDYN

from src.notebook_plots_v1 import plot_bs_decay, display_hamiltonian, display_eq, show_summary_dicke
from src.single_sided_network_v1 import network_slh
from src.dicke_half_model_v2 import write_dicke_half_model, err_dicke_half

from doit.tools import register_doit_as_IPython_magic
import clusterjob

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
qnet.init_printing()

In [4]:
register_doit_as_IPython_magic()

In [5]:
clusterjob.JobScript.read_defaults('./config/mlhpc_cluster.ini')
clusterjob.JobScript.cache_folder = './data/doit_clusterjobs/cache/'

$
\newcommand{ket}[1]{\vert #1 \rangle}
\newcommand{bra}[1]{\langle #1 \vert}
\newcommand{Op}[1]{\hat{#1}}
$

# Using the doit package for generating run data

## action wrappers

In [6]:
def update_config(rf, lambda_a, iter_stop):
    config = QDYN.config.read_config_file(join(rf, 'config'))
    config['oct']['iter_stop'] = iter_stop
    for pulse_config in config['pulse']:
        pulse_config['oct_lambda_a'] = lambda_a
    QDYN.config.write_config(config, join(rf, 'config'))

In [7]:
def submit_optimization(rf, n_trajs, task):
    """Asynchronously run optimization"""
    body = dedent(r'''
    {module_load}

    cd {rf}
    OMP_NUM_THREADS=1 mpirun -n {n_trajs} qdyn_optimize --n-trajs={n_trajs} \
         --J_T=J_T_sm .
    ''')
    taskname = "oct_%s" % task.name.replace(":", '_')
    jobscript = clusterjob.JobScript(
        body=body, filename=join(rf, 'oct.slr'),
        jobname=taskname, nodes=1, ppn=int(n_trajs), threads=1,
        stdout=join(rf, 'oct.log'))
    jobscript.rf = rf
    jobscript.n_trajs = str(int(n_trajs))
    run = jobscript.submit(cache_id=taskname)
    run.dump(join(rf, 'oct.job.dump'))

In [8]:
def submit_propagation(rf, n_trajs):
    """Run propagation"""
    body = dedent(r'''
    {module_load}

    cd {rf}
    OMP_NUM_THREADS=1 mpirun -n {n_trajs} qdyn_prop_traj --n-trajs={n_trajs} \
        --use-oct-pulses --write-final-state=state_final.dat .
    ''')
    taskname = "prop_" + os.path.split(rf)[-1]
    jobscript = clusterjob.JobScript(
        body=body, filename=join(rf, 'prop.slr'),
        jobname=taskname, nodes=1, ppn=int(n_trajs), threads=1,
        stdout=join(rf, 'prop.log'))
    jobscript.rf = rf
    jobscript.n_trajs = str(int(n_trajs))
    run = jobscript.submit(cache_id=taskname, force=True)
    run.dump(join(rf, 'prop.job.dump'))


In [9]:
def wait_for_clusterjob(dumpfile):
    """Wait until the clusterjob.AsyncResult cached in the given dumpfile ends"""
    try:
        run = clusterjob.AsyncResult.load(dumpfile)
        run.wait()
        os.unlink(dumpfile)
        return run.successful()
    except OSError:
        pass

## custom uptodate routines

In [10]:
from src.qdyn_model_v1 import pulses_uptodate

## task definitions

In [11]:
def runfolder(row):
    return './data/doit_clusterjobs/rf%d' % row['T']

In [12]:
def task_create_runfolder():
    """Create all necessary runfolders for the runs defined in params_df"""
    jobs = {}
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        if rf in jobs:
            continue
        jobs[rf] = {
            'name': str(rf),
            'actions': [
                (write_dicke_half_model, [slh, ], dict(
                    rf=rf, T=row['T'], theta=0, nt=500,
                    kappa=1.0, E0_cycles=2, mcwf=True, non_herm=True,
                    lambda_a=row['lambda_a'],
                    iter_stop=int(row['iter_stop'])))],
            'targets': [join(rf, 'config')],
            'uptodate': [True, ] # up to date if target exists
        }
    for job in jobs.values():
        yield job

In [13]:
def task_update_runfolder():
    """For every row in params_df, update the config file in the appropriate
    runfolder with the value in that row"""
    rf_jobs = defaultdict(list)
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        # we only update the config after any earlier optimization has finished
        task_dep = ['wait_for_optimization:%s' % ind2 for ind2 in rf_jobs[rf]]
        rf_jobs[rf].append(ind)
        yield {
            'name': str(ind),
            'actions': [
                (update_config, [], dict(
                    rf=rf, lambda_a=row['lambda_a'],
                    iter_stop=int(row['iter_stop'])))],
            'file_dep': [join(rf, 'config')],
            'uptodate': [False, ],  # always run task
            'task_dep': task_dep}


In [14]:
def task_submit_optimization():
    """Run optimization for every runfolder from params_df"""
    rf_jobs = defaultdict(list)
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        task_dep = ['wait_for_optimization:%s' % ind2 for ind2 in rf_jobs[rf]]
        task_dep.append('update_runfolder:%s' % ind)
        yield {
            'name': str(ind),
            'actions': [
                (submit_optimization, [rf, ], dict(n_trajs=row['n_trajs']))],
                # 'task' keyword arg is added automatically
            'task_dep': task_dep,
            'uptodate': [(pulses_uptodate, [], {'rf': rf}), ],
        }

In [15]:
def task_wait_for_optimization():
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        yield {
            'name': str(ind),
            'task_dep': ['submit_optimization:%d' % ind],
            'actions': [
                (wait_for_clusterjob, [join(rf, 'oct.job.dump')], {}),]}

In [16]:
def task_submit_propagation():
    """Run optimization for every runfolder from params_df"""
    jobs = {}
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        jobs[rf] = {
            'name': str(rf),
            'actions': [
                (submit_propagation, [rf, ], dict(n_trajs=row['n_trajs']))],
            'file_dep': [join(rf, 'pulse1.oct.dat'),],}
    for job in jobs.values():
        yield job

In [17]:
def task_wait_for_propagation():
    """Run optimization for every runfolder from params_df"""
    jobs = {}
    for ind, row in params_df.iterrows():
        rf = runfolder(row)
        jobs[rf] = {
            'name': str(rf),
            'task_dep': ['submit_propagation:%s' % rf],
            'actions': [
                (wait_for_clusterjob, [join(rf, 'prop.job.dump')], {}),]}
    for job in jobs.values():
        yield job

##  Bringing it all together

In [18]:
params_data_str = r'''
#    T  lambda_a  n_trajs   iter_stop
    10     0.001       10          10
    10    0.0005       10          15
    10    0.0001       10          30
    20     0.001       10          20
    50     0.001       10          30
    70     0.001       10          30
'''
params_df = pd.read_fwf(
        StringIO(params_data_str), comment='#', header=1,
        names=['T', 'lambda_a', 'n_trajs', 'iter_stop'])

In [19]:
params_df

Unnamed: 0,T,lambda_a,n_trajs,iter_stop
0,10,0.001,10,10
1,10,0.0005,10,15
2,10,0.0001,10,30
3,20,0.001,10,20
4,50,0.001,10,30
5,70,0.001,10,30


In [20]:
slh = network_slh(n_cavity=2, n_nodes=4, topology='driven_bs_fb')

In [21]:
import logging
root = logging.getLogger()
for handler in root.handlers[:]:
    root.removeHandler(handler)
logging.basicConfig(level=logging.DEBUG, filename='./data/doit_clusterjobs/debug_clusterjob.log')

In [22]:
%doit -n 4 wait_for_optimization

-- create_runfolder:./data/doit_clusterjobs/rf10
-- create_runfolder:./data/doit_clusterjobs/rf20
-- create_runfolder:./data/doit_clusterjobs/rf50
-- create_runfolder:./data/doit_clusterjobs/rf70
.  update_runfolder:0
.  update_runfolder:3
.  update_runfolder:4
-- submit_optimization:0
.  update_runfolder:5
-- submit_optimization:3
-- submit_optimization:5
-- submit_optimization:4
.  wait_for_optimization:0
.  wait_for_optimization:3
.  wait_for_optimization:5
.  wait_for_optimization:4
.  update_runfolder:1
-- submit_optimization:1
.  wait_for_optimization:1
.  update_runfolder:2
-- submit_optimization:2
.  wait_for_optimization:2


In [23]:
%doit -n 4 wait_for_propagation

-- submit_propagation:./data/doit_clusterjobs/rf10
-- submit_propagation:./data/doit_clusterjobs/rf20
-- submit_propagation:./data/doit_clusterjobs/rf50
-- submit_propagation:./data/doit_clusterjobs/rf70
.  wait_for_propagation:./data/doit_clusterjobs/rf10
.  wait_for_propagation:./data/doit_clusterjobs/rf20
.  wait_for_propagation:./data/doit_clusterjobs/rf50
.  wait_for_propagation:./data/doit_clusterjobs/rf70
