In [3]:
import json
import boto3
import random
import os
from concurrent.futures import ThreadPoolExecutor
import threading
RUNNING = True
threadLimiter = threading.BoundedSemaphore(6)
resultLock = threading.Lock()

s3 = boto3.resource('s3')

RAM_LIST = list(range(128, 3072 ,64))

In [4]:
def get_task_type_list(workflow_file_path):
    with open(workflow_file_path) as result_json:
        data = json.load(result_json)
    task_type_input_files_map = {}
    for process in data['processes']:
        if process['name'] not in task_type_input_files_map:
            task_type_input_files_map[process['name']] = []
        for in_file in process['ins']:
            if data['signals'][in_file]['name'] not in task_type_input_files_map[process['name']]:
                task_type_input_files_map[process['name']].append(data['signals'][in_file]['name'])

    return task_type_input_files_map

In [5]:
def copy_file_from_to_bucket(copy_source, copy_destination):
    bucket = s3.Bucket('cegielskir')
    bucket.copy(copy_source, copy_destination)

In [6]:
def add_execution_result_to_json(task_id, result_dict, executionResultLock):
    executionResultLock.acquire()
    with open('./execution_data.json') as result_json:
        data = json.load(result_json)
    if 'montage_0.35' not in data:
        data['montage_0.35'] = []
    new_recort = {}
    
    new_recort['task_id'] = task_id
    for result_key in result_dict.keys():
        new_recort[result_key] = result_dict[result_key]
    
    data['montage_0.35'].append(new_recort)
    with open('./execution_data.json', 'w+') as result_json:
        json.dump(data, result_json, indent=4)
    executionResultLock.release()

In [7]:
def execute_workflow_task(config, ins, outs, ram, s3_path):
    options = {
        "bucket": "cegielskir",
        "prefix": s3_path
    }
    os.environ['FUNCTION_TYPE'] = str(ram)
    ins_arg = json.dumps(ins)
    outs_arg = json.dumps(outs)
    config_arg = json.dumps(config)
    options_arg = json.dumps(options)
    result = !node awsLambdaCommand.js {"'" + ins_arg + "'"} {"'" + outs_arg + "'"} {"'" + config_arg + "'"} {"'" + options_arg + "'"}
    return ' '.join(result)

In [8]:
def run_single_task(filename, foldername):
    with open(foldername + filename) as single_workflow:
        data = json.load(single_workflow)
    task_name = data['processes'][0]['name']
    process = data['processes'][0]
    config = process['config']
    ins = [ data['signals'][i] for i in process['ins'] ]
    outs = [ data['signals'][i] for i in process['outs'] ]
    run_and_save_results(config, ins, outs, ram, filename.split('_')[0], filename)


In [9]:
def run_and_save_results(config, ins, outs, ram, task_core,task_name):
    print("Starting " + str(task_name) + " with ram " + str(ram) + "   core - " + str(task_core))
    result = execute_workflow_task(config, ins, outs, ram, 'data-collection/montage-0_25/' + task_core)
    save_result(result, './montage-0.25-raw-results.json', str(ram), task_name)
    threadLimiter.release()


In [10]:
def save_result(result, filename,ram, task_id):
    resultLock.acquire()
    with open(filename) as result_json:
        data = json.load(result_json)
    if ram not in data:
        data[ram] = []
    result_dict = {}
    result_dict['task_id'] = task_id
    result_dict['result'] = str(result)
    data[ram].append(result_dict)
    
    with open(filename, 'w') as result_json:
        data = json.dump(data, result_json, indent=4)
    resultLock.release()

In [26]:
def copy_executable_files(exec_folder):
    for exe_file in s3.Bucket('cegielskir').objects.filter(Prefix=exec_folder):
        copy_source = {
            'Bucket': 'cegielskir',
            'Key': exe_file
        }
        
        print(exe_file.key)

In [27]:
path = 'montageV2_6-compiles/'
copy_executable_files(path)

montageV2_6-compiles/mAdd
montageV2_6-compiles/mBackground
montageV2_6-compiles/mBgModel
montageV2_6-compiles/mConcatFit
montageV2_6-compiles/mDiff
montageV2_6-compiles/mDiffFit
montageV2_6-compiles/mFitplane
montageV2_6-compiles/mImgtbl
montageV2_6-compiles/mProject
montageV2_6-compiles/mViewer


In [11]:
####################### TO BE CHANGED ###########################


source_path = 'dc-start-montage-0_15/'
folder_with_workflow_parts = 'montage-0_15/'
workflow_file = './dc-workflow-0.15.json'

###################### END TO BE CHANGED ####################### 

task_in_file_map = get_task_type_list(workflow_file)


for task_type in task_in_file_map:
    for exe_file in s3.Bucket('cegielskir').objects.filter(Prefix=exec_folder):
        copy_exec_source = {
            'Bucket': 'cegielskir',
            'Key': exe_file
        }
        
        copy_file_from_to_bucket(copy_exec_source, 'data-collection/' + folder_with_workflow_parts +task_type + '/')
    
#     for in_file in task_in_file_map[task_type]:
#         copy_source = {
#             'Bucket': 'cegielskir',
#             'Key': source_path + in_file
#         }
        
#         copy_file_from_to_bucket(copy_source, 'data-collection/' + folder_with_workflow_parts +task_type + '/' + in_file)
    

In [30]:
for task_task_in_file_map.keys()

dict_keys(['mProject', 'mDiffFit', 'mConcatFit', 'mBgModel', 'mBackground', 'mImgtbl', 'mAdd', 'mViewer'])

In [None]:
files = os.listdir('./' + folder_with_workflow_parts)
#files.reverse()
it = 1
while True:
    print("================================== NEW LOOP ====================================")
    if RUNNING:
        for file in files:
            for ram in RAM_LIST:
                task_core = file.split('_')[0]
                with open('./' + folder_with_workflow_parts + file) as single_workflow:
                    data = json.load(single_workflow)
                task_name = data['processes'][0]['name']
                process = data['processes'][0]
                config = process['config']
                ins = [ data['signals'][i] for i in process['ins'] ]
                outs = [ data['signals'][i] for i in process['outs'] ]
                threadLimiter.acquire()
                print("Iteration: " + str(it))
                threading.Thread(target=run_and_save_results, args=(config, ins, outs, ram, task_core,file)).start()
                it += 1