In [1]:
%run init.ipynb

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43593 instead
  f"Port {expected} is already in use.\n"


In [2]:
import json
import numpy as np
from json import JSONEncoder

class NpEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return float(obj)
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return super(NpEncoder, self).default(obj)

In [3]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.LSFCluster
Dashboard: http://192.168.66.200:43593/status,

0,1
Dashboard: http://192.168.66.200:43593/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://192.168.66.200:44799,Workers: 0
Dashboard: http://192.168.66.200:43593/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [4]:
log_dir = "/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet"

In [5]:
import dask
import dask.dataframe as dd

indexed_dir = f"{log_dir}/indexed/file_id"
indexed_ddf = dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id'])
indexed_ddf

Unnamed: 0_level_0,index,proc,rank,thread_id,cat,io_cat,tstart,tend,func_id,level,hostname,app,filename,size,bandwidth,duration,tmid,proc_id
npartitions=2286,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
-9223371986035283781,int64,int64,int32,int32,int32,int32,float32,float32,object,int32,object,object,object,int64,float32,float32,int64,int64
-8872388567018363149,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8752386220556340459,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9223369538921024184,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [6]:
%%time

indexed_ddf.index.nunique().compute()

CPU times: user 5.6 s, sys: 223 ms, total: 5.83 s
Wall time: 26.4 s


21260259

In [7]:
%%time

filenames = indexed_ddf.index.unique().compute()
filenames = list(filenames)
filenames.sort()
with open(f"{log_dir}/filenames.json", "w") as file:
    json.dump(filenames, file, cls=NpEncoder)

CPU times: user 42.1 s, sys: 1.18 s, total: 43.3 s
Wall time: 1min 4s


In [6]:
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
import math
import numpy as np
import time
from dask.distributed import as_completed
from dask.diagnostics import ProgressBar
from time import sleep

def clear_logs(dask_scheduler): # As suggested in #3898 
    dask_scheduler.log.clear()
    dask_scheduler.transition_log.clear()
    dask_scheduler.events.clear()

def read_global_json(log_dir):
    with open(f"{log_dir}/filenames.json") as file:
        filenames = json.load(file)
    return np.array(filenames)

def proc_metrics(index, filenames, ddf):

    # print("filenames", len(filenames))
    dask.distributed.get_worker().log_event("filenames", len(filenames))
    dask.distributed.get_worker().log_event("filenames_shape", np.array(filenames).shape)

    filename = f"{log_dir}/metrics/file_id/{index}.parquet"
    splice_ddf = ddf.loc[filenames].reset_index()
    target_ddf = splice_ddf.compute()
    aggregate = target_ddf.groupby(['index','io_cat']).agg({
        'duration':sum, 
        'size':sum, 
        'index':'count',
        'filename':min
    })
    aggregate.reset_index(inplace=True)
    aggregate.columns  = ['_'.join(col) for col in aggregate.columns.values]
    aggregate.to_parquet(filename)

    return filename

In [9]:
%%time

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)

delayed_result = [
    dask.delayed(lambda i, filenames: len(filenames))(i, cm1_files_a[i*n_buckets:(i+1)*n_buckets])
    for i in range(0, n_buckets)
]

result = dask.delayed(list)(delayed_result)
# result.visualize("mapped_direct.png")

CPU times: user 3.24 s, sys: 182 ms, total: 3.42 s
Wall time: 3.24 s


In [10]:
%%time

client.run_on_scheduler(clear_logs)
metrics = result.compute()

print(len(metrics))
print(metrics[0])
print(metrics[-1])

4611
4611
3549
CPU times: user 26.8 s, sys: 724 ms, total: 27.6 s
Wall time: 1min 8s


In [11]:
%%time

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)
cm1_files_a = cm1_files_a.rechunk(n_buckets)

# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel()[:10])
# ]
delayed_result = [
    dask.delayed(lambda i, filenames: len(filenames))(i, filenames)
    for i, filenames in enumerate(cm1_files_a.blocks.ravel())
]

result2 = dask.delayed(list)(delayed_result)
# result2.visualize("mapped_rechunk.png")

CPU times: user 4.87 s, sys: 69 ms, total: 4.94 s
Wall time: 4.9 s


In [12]:
%%time

client.run_on_scheduler(clear_logs)
metrics2 = result2.compute()

print(len(metrics2))
print(metrics2[0])
print(metrics2[-1])

4611
4611
3549
CPU times: user 34 s, sys: 663 ms, total: 34.7 s
Wall time: 1min 15s


In [13]:
%%time

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)

delayed_result = [
    dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, cm1_files_a[i*n_buckets:(i+1)*n_buckets], cm1_persisted_d)
    for i in range(0, n_buckets)
]

result3 = dask.delayed(list)(delayed_result)

CPU times: user 3.75 s, sys: 223 ms, total: 3.98 s
Wall time: 3.8 s


In [14]:
%%time

client.run_on_scheduler(clear_logs)
metrics3 = result3.compute()

print(len(metrics3))
print(metrics3[0])
print(metrics3[-1])

4611
4612
3549
CPU times: user 2min 2s, sys: 2.73 s, total: 2min 5s
Wall time: 2min 59s


In [15]:
%%time

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)
cm1_files_a = cm1_files_a.rechunk(n_buckets)

# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel()[:10])
# ]
delayed_result = [
    dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
    for i, filenames in enumerate(cm1_files_a.blocks.ravel())
]

result4 = dask.delayed(list)(delayed_result)

CPU times: user 5.35 s, sys: 79.2 ms, total: 5.43 s
Wall time: 5.38 s


In [16]:
%%time

client.run_on_scheduler(clear_logs)
metrics4 = result4.compute()

print(len(metrics4))
print(metrics4[0])
print(metrics4[-1])

4611
4612
3549
CPU times: user 2min 40s, sys: 3.5 s, total: 2min 44s
Wall time: 3min 28s


In [17]:
%%time

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)
cm1_files_a = cm1_files_a.rechunk(n_buckets)

# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel()[:10])
# ]
# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel())
# ]

result5 = cm1_files_a.map_blocks(lambda filenames, ddf: ddf.loc[filenames].index.count().compute(), cm1_persisted_d)

CPU times: user 3.25 s, sys: 186 ms, total: 3.44 s
Wall time: 7.61 s


In [18]:
%%time

client.run_on_scheduler(clear_logs)
# metrics5 = result5.compute()

# print(len(metrics5))
# print(metrics5[0])
# print(metrics5[-1])

CPU times: user 63 ms, sys: 48 µs, total: 63.1 ms
Wall time: 61.7 ms


In [8]:
%%time

def proc_metrics(ddf, filenames):
    return ddf.loc[filenames].reset_index().groupby(['index', 'io_cat']).agg({
        'duration':sum, 
        'size':sum, 
        'index':'count',
        'filename':min
    }).compute()

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)
cm1_files_a = cm1_files_a.rechunk(n_buckets)

# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel()[:10])
# ]
delayed_result = [
    dask.delayed(proc_metrics)(cm1_persisted_d, filenames)
    for i, filenames in enumerate(cm1_files_a.blocks.ravel())
]

result6 = dask.delayed(list)(delayed_result)

CPU times: user 3.97 s, sys: 155 ms, total: 4.12 s
Wall time: 4.09 s


In [None]:
%%time

client.run_on_scheduler(clear_logs)
metrics6 = result6.compute()

print(len(metrics6))
print(metrics6[0])
print(metrics6[-1])

In [None]:
%%time

def proc_metrics(ddf):
#     target_ddf = ddf.loc[filenames].compute()
    return ddf.reset_index().groupby(['index', 'io_cat']).agg({
        'duration':sum, 
        'size':sum, 
        'index':'count',
        'filename':min
    })

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))
# n_buckets = 32

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)
cm1_files_a = cm1_files_a.rechunk(n_buckets)

# delayed_result = [
#     dask.delayed(lambda i, filenames, ddf: ddf.loc[filenames].index.count().compute())(i, filenames, cm1_persisted_d)
#     for i, filenames in enumerate(cm1_files_a.blocks.ravel()[:10])
# ]

t0 = time.time()
task_ddfs_d = [
    dask.delayed(lambda ddf, filenames: ddf.loc[filenames])(cm1_persisted_d, filenames)
    for i, filenames in enumerate(cm1_files_a.blocks.ravel())
]
task_ddfs = client.compute(task_ddfs_d, sync=True)
print("Task ddf", time.time()-t0)

delayed_result = [
    dask.delayed(proc_metrics)(task_ddfs[i])
    for i, filenames in enumerate(cm1_files_a.blocks.ravel())
]

result7 = dask.delayed(list)(delayed_result)

In [None]:
%%time

client.run_on_scheduler(clear_logs)
metrics7 = result7.compute()

print(len(metrics7))
print(metrics7[0])
print(metrics7[-1])

In [None]:
%%time

def proc_metrics(ddf):
#     target_ddf = ddf.loc[filenames].compute()
    return ddf.reset_index().groupby(['index', 'io_cat']).agg({
        'duration':sum, 
        'size':sum, 
        'index':'count',
        'filename':min
    })

n_el = 21260259
n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)

tasks = {}
for i in range(0, n_buckets):
    start = i*n_buckets
    stop = (i+1)*n_buckets
    
    target_ddfs = [
        dask.delayed(lambda ddf, filename: ddf.loc[[filename]])(cm1_persisted_d, cm1_files_a[i], dask_key_name=f"target-ddf-{i}")
        for i in range(start, stop)
    ]
    print('target_ddfs', len(target_ddfs))
    delayed_result = [
        dask.delayed(proc_metrics)(target_ddfs[i], dask_key_name=f"metric-{i}")
        for i in range(start, stop)
    ]
    print('delayed_result', len(delayed_result))
    t0 = time.time()
    bucket_futures = client.compute(delayed_result, sync=False)
    print('bucket_futures', len(bucket_futures))
    for future in as_completed(bucket_futures):  
        tasks[future.key] = future.result()
        now = time.time()
        print('\rCompleted', len(tasks), now-t0)


In [None]:
%%time

client.run_on_scheduler(clear_logs)
metrics8 = result8.compute()

print(len(metrics8))
print(metrics8[0])
print(metrics8[-1])

In [7]:
%%time

n_el = 21260259
n_buckets = math.ceil(n_el*1.0/ 64*4) 
# n_buckets = math.ceil(math.sqrt(n_el))

cm1_indexed_d = dask.delayed(lambda indexed_dir: dd.read_parquet(f"{indexed_dir}/*.parquet", calculate_divisions=True, index=['file_id']))(indexed_dir)
cm1_persisted_d = dask.delayed(lambda ddf: ddf.persist())(cm1_indexed_d)
cm1_files_d = dask.delayed(lambda log_dir: read_global_json(log_dir))(log_dir)
cm1_files_a = da.from_delayed(cm1_files_d, shape=(n_el,), dtype=int)

def cal_metrics_file(ddf, index, file_lists, log_dir):
    filename = f"{log_dir}/metrics/file_id/{index}.parquet"
#     splice_ddf = ddf.loc[file_lists]
#     splice_ddf.reset_index()
#     target_ddf = splice_ddf.compute()
    aggregate = ddf.reset_index().groupby(['index','io_cat']).agg({'duration':sum, 
                                                              'size':sum, 
                                                              'bandwidth':sum, 
                                                              'index':'count', 
                                                              'proc_id':[min,max], 
                                                              'filename':min})
    aggregate.reset_index(inplace=True)
    aggregate.columns  = ['_'.join(col) for col in aggregate.columns.values]
    aggregate.to_parquet(filename)
    return filename

filter_group_index = 'file_id'

futures = []
target_ddfs = []
file_range = range(0, n_el, n_buckets)

for index, file_index in enumerate(file_range):
    selected_files = cm1_files_a[file_index:file_index+n_buckets]
    target_ddf = dask.delayed(lambda ddf, files, _: ddf.loc[files])(cm1_indexed_d, selected_files, cm1_persisted_d, dask_key_name=f"target_ddf_{index}_{filter_group_index}")
    target_ddfs.append(target_ddf)

t0 = time.time()
target_ddfs_ = client.compute(target_ddfs, sync=True)
print("Target ddf", len(target_ddfs_), time.time()-t0)

for index, file_index in enumerate(file_range):
    print(f"processing {index} of {len(file_range)}", end='\r')
    selected_files = [] # cm1_files_a[file_index:file_index+n_buckets]
    #seleceted_files_future = client.scatter(seleceted_files) doesnt help as list of futures is larger as list of file ids :D
    #print(len(seleceted_files), index)
#     target_ddf = dask.delayed(lambda ddf, files, _: ddf.loc[files].compute().reset_index())(cm1_indexed_d, selected_files, cm1_persisted_d, dask_key_name=f"target_ddf_{index}_{filter_group_index}")
    cal_metrics = dask.delayed(cal_metrics_file)(target_ddfs_[index], index, selected_files, log_dir, dask_key_name=f"cal_metrics_{index}_{filter_group_index}")
    futures.append(client.compute(cal_metrics, sync=False))
    #cal_metrics.append(delayed_func(cal_metrics,  , []))

Target ddf 16 42.540334701538086
processing 0 of 16

  ("('read-parquet-7fc378b9b14dcc5f1c4fa663b16a2bc8' ... 6802749], None)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


CPU times: user 1min 32s, sys: 1.58 s, total: 1min 34s
Wall time: 1min 54s


In [None]:
futures

In [8]:
%%time
import time
from dask.distributed import as_completed
start_time = time.time()
metrics = []
for future in as_completed(futures):
    end_time = time.time() - start_time
    #filename = future.result()
    metrics.append(future.result())
    print(f"Completed {len(metrics)} of {len(futures)} in {end_time/60}", end='\r')

Completed 7 of 16 in 3.8645090897878014

KeyboardInterrupt: 

In [None]:
for f in futures:
    f.cancel()

In [None]:
futures

In [16]:
finished = 0
for f in futures:
    if f.status == 'finished':
        finished = finished + 1
finished

4609

In [11]:
metrics

['/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/0.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/1.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/7.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/3.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/15.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/4.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/17.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/10.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/13.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/5.parquet',
 '/p/gpfs1/iopp/recorder_app_logs/genome_pegas

In [12]:
file_ddf_0 = dd.read_parquet("/p/gpfs1/iopp/recorder_app_logs/genome_pegasus/nodes-32/_parquet/metrics/file_id/0.parquet")

file_ddf_0

Unnamed: 0_level_0,index_,io_cat_,duration_sum,size_sum,bandwidth_sum,index_count,proc_id_min,proc_id_max,filename_min
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,int64,int64,float32,int64,float32,int64,int64,int64,object
,...,...,...,...,...,...,...,...,...


In [13]:
file_ddf_0.head()

Unnamed: 0,index_,io_cat_,duration_sum,size_sum,bandwidth_sum,index_count,proc_id_min,proc_id_max,filename_min
0,12707,3,9.4e-05,0,0.0,1,3612142438820350027,3612142438820350027,tmpz6yqykx9/chr5.HG00123
1,12818,3,9.9e-05,0,0.0,1,3612142438820350027,3612142438820350027,tmpz6yqykx9/chr5.HG00142
2,13160,3,9.1e-05,0,0.0,1,3612142438820350027,3612142438820350027,tmpz6yqykx9/chr5.HG00253
3,13377,3,0.000119,0,0.0,1,3612142438820350027,3612142438820350027,tmpz6yqykx9/chr5.HG00290
4,13712,3,9.3e-05,0,0.0,1,3612142438820350027,3612142438820350027,tmpz6yqykx9/chr5.HG00366
