# Minimal notebook to run in parallel

In [31]:
from pytao import evaluate_tao
from distgen import Generator
from pmd_beamphysics import ParticleGroup, particle_paths

from h5py import File
import numpy as np
import sys
import os
import tempfile 
import json


In [32]:
ROOT = os.path.abspath(os.getcwd())
APATH=os.path.join(ROOT, 'archive')
INIT = os.path.join(ROOT, 'template/tao.init')
assert os.path.exists(INIT)
assert os.path.exists(APATH)

N_CHECKPOINT = 1

OUTFILE = os.path.join('summary.json')

In [33]:
def get_bunch(afile, ix=3):
    with File(afile, 'r') as h5:
        ppaths = particle_paths(h5)
        P = ParticleGroup(h5[ppaths[ix]])
    return P

In [34]:
def calc1(afile):
    
    tdir = tempfile.TemporaryDirectory()
    pfile= os.path.join(tdir.name, 'BC2BEG.beam0')
      
    P0 = get_bunch(afile, ix=3) # BC@BEG
    P0.write_bmad(pfile, t_ref=P0['mean_t'], p0c=5e9)
    
    # Tao
    res = evaluate_tao(settings={
                    'bmad_com:csr_and_space_charge_on': True,
                    'csr_param:write_csr_wake':True,
                    'csr_param:ds_track_step': 0.001,
                    'csr_param:n_bin': 200,
                    'beam:beam_saved_at': 'BEG_BX24,BX24',
                    'beam_init:position_file': pfile},
             run_commands=[
                 'set ele * space_charge_method = off',
                 'set ele BX24:DM23B CSR_METHOD  = 1_dim',
                 'set global track_type=beam'],
             expressions=['beam::norm_emit.x[ENDBC2]', 'beam::norm_emit.y[ENDBC2]', 'beam::sigma.z[ENDBC2]'],
             beam_archive_path='archive',
             archive_csr_wake=True,                       
             input_file=INIT, ploton=False)
    
    res['original_archive'] = afile
    
    return res

In [35]:
AFILES = ['bmad_beam_e29b428eeafe83372a5dbf0f437a0de0.h5', 'bmad_beam_3ef270fdbea59d7237bf619276db0bbe.h5']

In [36]:
# debug:
if False:
    RES = calc1(AFILES[0])
    json.dump(RES, open(OUTFILE, 'w'))

In [37]:
from mpi4py.futures import MPIPoolExecutor
from mpi4py import MPI
comm = MPI.COMM_WORLD
mpi_rank = comm.Get_rank()
mpi_size = comm.Get_size()

if __name__ == "__main__":
    print('_________________________________')
    print('Parallel execution with', mpi_size, 'workers')

    with MPIPoolExecutor() as executor:        
        inputs = AFILES
        
        NMAX = len(inputs)
        print(NMAX, 'inputs to process')
        
        futures = executor.map(calc1, inputs)
        
        results = []
        ii = 0
        for res in futures:
            results.append(res)
            ii += 1
            
            # 
            if ii % N_CHECKPOINT == 0:
                print(f'{ii} finished out of {NMAX}')
                sys.stdout.flush()
                
            
            
        # Write output
        json.dump(results, open(OUTFILE, 'w'))  
        print('Done. Results written to:', OUTFILE)

here


AttributeError: Can't get attribute 'calc1' on <module '__worker__'>

In [23]:
## convert to notebook with: 
# jupyter nbconvert --to script run.ipynb

In [24]:
## run with:
# mpirun -n 2 python -m mpi4py.futures run.py