In [2]:
import json
import math

file = open('/home/amnich/Documents/magisterka/CloudFunctionOptimizer/time_tests/0.15/levels_schedule.json',)
cfile = open('/home/amnich/Documents/magisterka/CloudFunctionOptimizer/time_tests/config.json',)
data = json.load(file)
config = json.load(cfile)

In [3]:
print(config)

{'budgetParameter': 0.7, 'deadlineParameter': 0.7, 'functionTypes': ['lambda-6144', 'lambda-5120', 'lambda-4096', 'lambda-3072', 'lambda-2560', 'lambda-2048', 'lambda-1536', 'lambda-1024'], 'count': 1, 'provider': 'AWS', 'algorithm': 'sdbcs', 'workflow': 'montage', 'dag': './workflow-040.json', 'prices': {'AWS': {'lambda-128': 2.08e-07, 'lambda-256': 4.17e-07, 'lambda-512': 8.33e-07, 'lambda-1024': 1.667e-06, 'lambda-1536': 2.5e-06, 'lambda-2048': 3.333e-06, 'lambda-2560': 4.167e-06, 'lambda-3072': 5e-06, 'lambda-4096': 6.667e-06, 'lambda-5120': 8.334e-06, 'lambda-6144': 1e-05, 'fargate-512025': 1.1244e-05, 'fargate-1050': 4.4977e-05, 'fargate-21': 0.000179911, 'fargate-42': 0.000719644}, 'GCF': {'256': 4.63e-07, '512': 9.25e-07, '1024': 1.65e-06, '2048': 2.9e-06}, 'overheads': {'AWS': 0.043, 'GCF': 0.15, 'IBM': 0.13}}}


In [9]:
class Constraints:
    
    def __init__(self, data, config):
        self.data = data
        self.config = config
        self._decorate_with_level()
        self.level_to_proc = self._get_level_to_proc()
        self.id_to_process = self._get_id_to_process()
        self.id_to_executions = self._get_id_to_executions()
        self.MAX_DEADLINE = self._get_deadline(self._get_max_time)
        self.MIN_DEADLINE = self._get_deadline(self._get_min_time)
        self.MAX_BUDGET = self._get_budget(True)
        self.MIN_BUDGET = self._get_budget(False)
    
    def _decorate_with_level(self):
    
        def have_common(list1, list2):
            return sum([element in list1 for element in list2]) > 0
    
        for proc in self.data["processes"]:
            ancestors = [p.get("level", 0) for p in self.data["processes"] if have_common(p["outs"], proc["ins"])]
            ancestors.append(0)
            proc["level"] = max(ancestors) + 1

    def _get_level_to_proc(self):
        level_to_proc = {}
        for proc in self.data["processes"]:
            lvl = proc["level"]
            tasks = level_to_proc.get(lvl, [])
            tasks.append(proc["config"]["id"])
            level_to_proc[lvl] = tasks
        return level_to_proc
    
    def _get_id_to_process(self):
        id_to_process = {}
        for proc in self.data["processes"]:
            id_to_process[proc["config"]["id"]] = proc
        return id_to_process
    
    def _get_id_to_executions(self):
        id_to_executions = {}
        for proc in self.data["processes"]:
            executions = []
            for function in proc["startTime"]:
                time = proc["finishTime"][function] - proc["startTime"][function]
                price = self.config['prices']['AWS'][function]
                cost = price * math.ceil(time / 100)
                executions.append({
                    "cost": cost, 
                    "time": time, 
                    "function": function,
                    "proc_id": proc["config"]["id"]
                })
            executions = sorted(executions, key=lambda x: x["time"]) #posortowane rosnąco po czasie
            id_to_executions[proc["config"]["id"]] = executions
        return id_to_executions

    def _get_min_time(self,proc_id):
        executions = self.id_to_executions[proc_id]
        return sorted(executions, key=lambda x: x['time'])[0]['time']

    def _get_max_time(self,proc_id):
        executions = self.id_to_executions[proc_id]
        return sorted(executions, key=lambda x: x['time'], reverse=True)[0]['time']


    def _get_deadline(self, fun):
        result = 0
        for level in self.level_to_proc:
            options = [fun(proc_id) for proc_id in self.level_to_proc[level]]
            result += sorted(options, reverse=True)[0]
        return result

    def _get_budget(self, get_max):
        result = {}
        for level in self.level_to_proc:
            for proc_id in self.level_to_proc[level]:
                for execution in self.id_to_executions[proc_id]:
                    r = result.get(execution['function'],0)
                    r += execution['cost']
                    result[execution['function']] = r
        return sorted(result.values(), reverse=get_max)[0]
    
    

    
    def get_user_deadline(self, deadline_factor):
        return self.MIN_DEADLINE + (self.MAX_DEADLINE - self.MIN_DEADLINE) * deadline_factor

    def get_user_budget(self, budget_factor):
        return self.MIN_BUDGET + (self.MAX_BUDGET - self.MIN_BUDGET) * budget_factor
    
    def get_level_to_decision(self):
        level_to_decision = {}
        for proc in self.data['processes']:
            level = proc['level']
            function = proc['config']['deploymentType']
            proc_id = proc['config']['id']
            decision = level_to_decision.get(level, {'time': 0, 'cost': 0, 'functions' : {}})
            decision['functions'][proc_id] = function
            time = proc["finishTime"][function] - proc["startTime"][function]
            price = self.config['prices']['AWS'][function]
            cost = price * math.ceil(time / 100)
            decision['time'] = max(decision['time'],time)
            decision['cost'] += cost
            level_to_decision[level] = decision
        return level_to_decision
    
    def get_planned_budget(self):
        result = 0
        level_to_decision = self.get_level_to_decision()
        for lvl in level_to_decision:
            result += level_to_decision[lvl]['cost']
        return result
    
    def get_planned_deadline(self):
        result = 0
        level_to_decision = self.get_level_to_decision()
        for lvl in level_to_decision:
            result += level_to_decision[lvl]['time']
        return result
    

In [10]:
constraints = Constraints(data, config)
MAX_BUDGET = constraints.get_user_budget(config['budgetParameter'])
MAX_DEADLINE = constraints.get_user_deadline(config['deadlineParameter'])
CURRENT_BUDGET = constraints.get_planned_budget()
CURRENT_DEADLINE = constraints.get_planned_deadline()

In [21]:
print(MAX_BUDGET)
print(MAX_DEADLINE)
print(CURRENT_BUDGET)
print(CURRENT_DEADLINE)

0.057669249200000224
31824.199999999997
0.02322949600000001
22149


In [11]:
def get_level_to_proc():
    level_to_proc = {}
    for proc in data["processes"]:
        lvl = proc["level"]
        tasks = level_to_proc.get(lvl, [])
        tasks.append(proc["config"]["id"])
        level_to_proc[lvl] = tasks
    return level_to_proc

level_to_proc = get_level_to_proc()

In [12]:
print(level_to_proc)

{1: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222], 2: [17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 2

In [13]:
def get_id_to_process():
    id_to_process = {}
    for proc in data["processes"]:
        id_to_process[proc["config"]["id"]] = proc
    return id_to_process

id_to_process = get_id_to_process()

In [14]:
print(id_to_process)

{1: {'config': {'executor': {'args': ['-X', '2mass-atlas-001021s-j0560033.fits', 'p2mass-atlas-001021s-j0560033.fits', 'region-oversized.hdr'], 'executable': 'mProject'}, 'id': 1, 'deploymentType': 'lambda-1536', 'scheduledStartTime': 0, 'scheduledFinishTime': 14627}, 'firingLimit': 1, 'function': 'awsCommand', 'ins': [0, 1], 'name': 'mProject', 'outs': [2, 3], 'type': 'dataflow', 'startTime': {'lambda-6144': 333, 'lambda-5120': 10, 'lambda-4096': 6, 'lambda-3072': 68, 'lambda-2048': 306, 'lambda-1536': 43, 'lambda-1024': 141}, 'finishTime': {'lambda-6144': 13438, 'lambda-5120': 12688, 'lambda-4096': 12530, 'lambda-3072': 12742, 'lambda-2048': 13194, 'lambda-1536': 14670, 'lambda-1024': 22266}, 'level': 1}, 2: {'config': {'executor': {'args': ['-X', '2mass-atlas-980914s-j0820033.fits', 'p2mass-atlas-980914s-j0820033.fits', 'region-oversized.hdr'], 'executable': 'mProject'}, 'id': 2, 'deploymentType': 'lambda-1536', 'scheduledStartTime': 0, 'scheduledFinishTime': 14936}, 'firingLimit': 

In [15]:
def get_last_task(tasks):
    ftimes = [(tasks[i],id_to_process[tasks[i]]["config"]["scheduledFinishTime"]) for i in range(len(tasks))]
    return sorted(ftimes,key=lambda x: x[1])[-1][0]

def get_2_last_task(tasks):
    ftimes = [(tasks[i],id_to_process[tasks[i]]["config"]["scheduledFinishTime"]) for i in range(len(tasks))]
    return sorted(ftimes,key=lambda x: x[1])[-2][0]

def get_best_option_for_task(task, task2):
    proc = id_to_process[task]
    price = config['prices']['AWS'][proc["config"]["deploymentType"]]
    execution_time = proc["config"]["scheduledFinishTime"] - proc["config"]["scheduledStartTime"]
    cost = price * math.ceil(execution_time / 100)
    changes = []
    for key in proc["startTime"]:
        if(key != 'sdbcs'):
            startTime = proc["startTime"][key]
            finishTime = proc["finishTime"][key]
            new_price = config['prices']['AWS'][key]
            option = {
                "old_time": execution_time,
                "old_cost": cost,
                "new_time": finishTime - startTime,
                "time_loss": max(id_to_process[task2]["config"]["scheduledFinishTime"], proc["config"]["scheduledStartTime"] + (finishTime - startTime)) - proc["config"]["scheduledFinishTime"], 
                "new_cost": math.ceil((finishTime - startTime) / 100) * new_price,
                "deploymentType": key,
                "id": task,
                "level": proc["level"]
            }
            changes.append(option)
        
    for change in changes:
        if change["time_loss"] >= 0:
            change["loss"] = 0
        else:
            change["loss"] = (change["new_cost"] - change["old_cost"]) / (change["time_loss"] * -1)
    candidates = sorted(changes, key=lambda x: x["loss"])
    for candidate in candidates:
        if candidate["loss"] != 0:
            return candidate
    return None

In [16]:
time_updates_to_level = {}
for key in level_to_proc:
    time_updates_to_level[key] = 0;


def apply_option(option):
    global CURRENT_BUDGET
    if (CURRENT_BUDGET - option["old_cost"] + option["new_cost"]) <= MAX_BUDGET:
        CURRENT_BUDGET = CURRENT_BUDGET - option["old_cost"] + option["new_cost"]
        c = id_to_process[option["id"]]["config"]
        c["deploymentType"] = option["deploymentType"]
        old_finish = c["scheduledFinishTime"]
        c["scheduledFinishTime"] = c["scheduledStartTime"] + option["new_time"]
        time_updates_to_level[option["level"]] = time_updates_to_level[option["level"]] + option["time_loss"]
        return True
    return False

In [17]:
options = []
for level in level_to_proc:
    tasks = level_to_proc[level]
    task = get_last_task(tasks)
    task2 = get_2_last_task(tasks)
    option = get_best_option_for_task(task, task2)
    if option:
        options.append(option)
    

In [18]:
counter = 0
while(len(options) > 0):
    options = sorted(options,key=lambda x: x["loss"])
    applied = apply_option(options[0])
    if applied:
        counter += 1
        new_task = get_last_task(level_to_proc[options[0]["level"]])
        new_2_task = get_2_last_task(level_to_proc[options[0]["level"]])
        new_option = get_best_option_for_task(new_task, new_2_task)
        if new_option:
            options.append(new_option)
    options = options[1:]
print(f'Applied {counter} updates')

Applied 34 updates


In [19]:
acc = 0
for i in range(1,len(time_updates_to_level)+1):
    acc += time_updates_to_level[i]
    time_updates_to_level[i] = acc
    
    
print(time_updates_to_level)

{1: -467, 2: -467, 3: -547, 4: -949, 5: -949, 6: -1193, 7: -1193, 8: -1193}


In [20]:
for proc in data["processes"]:
    if(proc["level"] > 1):
        proc["config"]["scheduledStartTime"] += time_updates_to_level[proc["level"]-1]
        proc["config"]["scheduledFinishTime"] += time_updates_to_level[proc["level"]-1]
        

In [86]:
print(get_current_budget())
print(get_current_time())

0.04004034326999997
37207


In [87]:
output_path = '/home/amnich/Documents/magisterka/CloudFunctionOptimizer/time_tests/tmp.json'
with  open(output_path,'w+') as f:
    json.dump(data, f, indent=2)