# Dask - Using Maxwell via dask_jobqueue 

### Utility function

In [1]:
def print_squeue(username='cpassow'):
    import subprocess
    process = subprocess.Popen("squeue -u "+ username, stdout=subprocess.PIPE, shell=True)
    (output, err) = process.communicate()
    print('Total Jobs:', str(output).count(username))
    print('Running Jobs:', str(output).count(' R '))
    print('Pending Jobs:', str(output).count(' PD '))
    
print_squeue()

Total Jobs: 0
Running Jobs: 0
Pending Jobs: 0


## Cluster setup

In [2]:
from dask_jobqueue import SLURMCluster
from distributed import Client

In [3]:
cluster = SLURMCluster(queue = 'ps', cores =30 , processes = 1, memory = '200GB', walltime = '00:30:00',
                      local_directory = './slurm_out', log_directory = './slurm_logs')

In [4]:
cluster.scale(5)

In [5]:
print_squeue()

Total Jobs: 5
Running Jobs: 5
Pending Jobs: 0


In [6]:
client = Client(cluster)

In [7]:
client

0,1
Client  Scheduler: tcp://131.169.193.101:42698  Dashboard: http://131.169.193.101:8787/status,Cluster  Workers: 5  Cores: 150  Memory: 1000.00 GB


## Problem: timing jumps in FL1 pp-laser

During a recent beamtime we observed the pp-laser jumps in timing and decided to record this behavior via a FLASH GHz ADC.

![title](jddd_adc.png)

### Imports 

In [8]:
import numpy as np
import glob

from beamtimedaqaccess import BeamtimeDaqAccess 

### HDF files of 1 FEL block 

In [9]:
root_dir = '/asap3/flash/gpfs/bl1/2019/data/11006508/raw/hdf/online-1/'
trace_addr = '/FL1/Experiment/BL1/ADQ412 GHz ADC/CH01/TD'

daq = BeamtimeDaqAccess.create(root_dir)

### Analysis function

In [10]:
def get_rising_edges(run_number):
    trace = np.asarray(daq.allValuesOfRun(trace_addr, run_number))
    rising_edges = [np.argmax(trace[0][index] > 100) for index in range(len(trace[0])) if np.argmax(trace[0][index]) > 0 ]
    unique, counts = np.unique(rising_edges, return_counts=True)    
    edges = dict(zip(unique, counts))
    return edges

def counting(edges):
    late = np.sum((np.asarray([v for k,v in edges.items() if k >= 2100])))
    early = np.sum((np.asarray([v for k,v in edges.items() if k > 10 and k < 2100])))
    if early > 0 and late > 0:
        ratio = early / (early + late)
        ratio = float("{0:.2f}".format(ratio))
    else:
        ratio = np.nan
    return [late,early, ratio]

### Check # of DAQ runs in FEL block

In [20]:
runs = [int(f[93:98]) for f in glob.glob(root_dir + '/fl1user1/*_file1_*')]
runs.sort()
print('total # of DAQ runs:',len(runs))

total # of DAQ runs: 85


In [21]:
runs = runs[0:10]

In [22]:
import dask 

number_of_edges_per_run = []
for run in runs:
    number_of_edges = dask.delayed(get_rising_edges)(run)
    number_of_edges_per_run.append(number_of_edges)

In [23]:
%%time

dict_early_late = dask.compute(*number_of_edges_per_run)

CPU times: user 664 ms, sys: 39.5 ms, total: 703 ms
Wall time: 12 s


## Statistic for eLog

In [24]:
results = [counting(dict_early_late[index]) for index in range(len(dict_early_late))]

for i in range(len(runs)):
    if len(dict_early_late[i]) > 1:
        print('Run:',runs[i],'Time Bins',dict_early_late[i])
    else:
        print('Run:',runs[i],'no Laser')
        
print('\nRUN | late | early | ratio')
for i in range(len(results)):
    print(runs[i], results[i])

Run: 30067 Time Bins {2093: 119, 2094: 3766, 2095: 157, 2112: 192, 2113: 162}
Run: 30068 Time Bins {2093: 169, 2094: 4651, 2095: 207, 2112: 215, 2113: 214}
Run: 30069 Time Bins {2093: 86, 2094: 2840, 2095: 154, 2112: 155, 2113: 103}
Run: 30070 Time Bins {2093: 55, 2094: 2716, 2095: 177, 2112: 115, 2113: 123}
Run: 30071 Time Bins {2093: 65, 2094: 3665, 2095: 267}
Run: 30073 Time Bins {2093: 40, 2094: 1653, 2095: 135}
Run: 30074 no Laser
Run: 30075 Time Bins {2093: 48, 2094: 2935, 2095: 243}
Run: 30076 Time Bins {2093: 41, 2094: 2315, 2095: 170}
Run: 30077 Time Bins {2093: 18, 2094: 1037, 2095: 71}

RUN | late | early | ratio
30067 [354, 4042, 0.92]
30068 [429, 5027, 0.92]
30069 [258, 3080, 0.92]
30070 [238, 2948, 0.93]
30071 [0.0, 3997, nan]
30073 [0.0, 1828, nan]
30074 [0.0, 0.0, nan]
30075 [0.0, 3226, nan]
30076 [0.0, 2526, nan]
30077 [0.0, 1126, nan]


## Cluster termination

In [25]:
cluster.close()
client.close()

In [26]:
print_squeue()

Total Jobs: 0
Running Jobs: 0
Pending Jobs: 0
