<div style="text-align:center">
    <h1>
        Federated Learning-WorkFlow Scheduling: Advantage Actor-Critic (A2C)
    </h1>
</div>

<br><br>
The actor critic algorithm consists of two networks (the actor and the critic) working together to solve a particular problem. At a high level, the Advantage Function calculates the agent’s TD Error or Prediction Error. The actor network chooses an action at each time step and the critic network evaluates the quality or the Q-value of a given input state.

## Importing the necessary software libraries:

In [48]:
import os
import math
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from gym import spaces
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.optim import AdamW
import multiprocessing as mp
from torch.nn import Module
import warnings

In [49]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**WorkFlow Parser**


In [50]:
class Task():
    def __init__(self,taskName,taskId, taskLength):

        self.taskName=taskName
        self.taskId=taskId
        self.taskLength=taskLength
        self.allocated = False
        self.childList = []
        self.parentList = []
        self.fileList = []
        self.impact = 0.0
        self.taskFinishTime = -1.0
        self.type = None
        self.priority = 0
        self.depth = 0

    def setType(self, type):
        self.type = type

    def getType(self):
        return self.type

    def setPriority(self, priority):
        self.priority = priority

    def setDepth(self, depth):
        self.depth = depth

    def getPriority(self):
        return self.priority

    def getDepth(self):
        return self.depth

    def getChildList(self):
        return self.childList

    def setChildList(self, list):
        self.childList = list

    def setParentList(self, list):
        self.parentList = list

    def addChildList(self, list):
        self.childList.extend(list)

    def addParentList(self, list):
        self.parentList.extend(list)

    def getParentList(self):
        return self.parentList

    def addChild(self, task):
        self.childList.append(task)

    def addParent(self, task):
        self.parentList.append(task)

    def getFileList(self):
        return self.fileList

    def addFile(self, file):
        self.fileList.append(file)

    def setFileList(self, list):
        self.fileList = list

    def setImpact(self, impact):
        self.impact = impact

    def getImpact(self):
        return self.impact

    def setTaskFinishTime(self, time):
        self.taskFinishTime = time

    def getTaskFinishTime(self):
        return self.taskFinishTime

#depends on CloudLet
    def getProcessingCost(self):
        cost = self.getCostPerSec() * self.getActualCPUTime()
        file_size = sum(file.getSize() / Consts.MILLION for file in self.getFileList())
        cost += self.costPerBw * file_size
        return cost

In [51]:
from enum import Enum
from typing import List

class FileType(Enum):
    NONE = 0
    INPUT = 1
    OUTPUT = 2

    def __new__(self, value):
        obj = object.__new__(self)
        obj._value_ = value
        return obj

class FileItem:
    def __init__(self, name, size):
        self.name = name
        self.size = size
        self.type = None

    def setName(self, name):
        self.name = name

    def setSize(self, size):
        self.size = size

    def setType(self, file_type):
        self.type = file_type

    def getName(self):
        return self.name

    def getSize(self):
        return self.size

    def getType(self):
        return self.type

    def is_real_input_file(self, file_list):
        if self.type == FileType.INPUT:
            for another in file_list:
                if another.get_name() == self.name and another.get_type() == FileType.OUTPUT:
                    return False
            return True
        return False

In [52]:
class ReplicaCatalog:
    class FileSystem:
        SHARED = 'SHARED'
        LOCAL = 'LOCAL'

    fileName2File = {}
    fileSystem = None
    dataReplicaCatalog = {}


    def init(self, fs):
        self.fileSystem = fs
        self.dataReplicaCatalog = {}
        self.fileName2File = {}


    def getFileSystem(self):
        return self.fileSystem


    def getFile(self, fileName):
        return self.fileName2File.get(fileName)


    def setFile(self, fileName, file):
        self.fileName2File[fileName] = file


    def containsFile(self, fileName):
        return fileName in self.fileName2File


    def getStorageList(self, file):
        return self.dataReplicaCatalog.get(file, [])


    def addFileToStorage(self, file, storage):
        if file not in self.dataReplicaCatalog:
            self.dataReplicaCatalog[file] = []
        if storage not in self.dataReplicaCatalog[file]:
            self.dataReplicaCatalog[file].append(storage)

In [53]:
import os
from enum import Enum
from xml.etree import ElementTree as ET


class WorkflowParser:
    def __init__(self, userId, daxPath):
        self.userId = userId
        self.mName2Task = {}
        self.daxPath = daxPath
        self.daxPaths = []
        self.jobIdStartsFrom = 1
        self.taskList = []
        self.transferCosts = {}

    def get_task_list(self):
        return self.taskList
    def get_transfer_cost(self):
        return self.transferCosts
    def set_task_list(self, task_list):
        self.taskList = task_list

    def parse(self):
        if self.daxPath:
            self.parse_xml_file(self.daxPath)
        elif self.daxPaths:
            for path in self.daxPaths:
                self.parse_xml_file(path)

    def set_depth(self, task, depth):
        if depth > task.getDepth():
            task.setDepth(depth)
        for cTask in task.getChildList():
            self.set_depth(cTask, task.getDepth() + 1)

    def parse_xml_file(self, path):
        r=ReplicaCatalog()
        try:
            tree = ET.parse(path)
            root = tree.getroot()
            for node in root:

              if node.tag == "{http://pegasus.isi.edu/schema/DAX}job":
                length = 0
                nodeName = node.get("id")
                nodeType = node.get("name")



                runtime = 0.1
                nodeTime = node.get("runtime")
                if nodeTime is not None:
                    runtime = 1000*float(nodeTime)
                    if runtime < 100:
                        runtime = 100
                    length = int(runtime)
                else:
                    print(f"Cannot find runtime for {nodeName}, set it to be 0")


                mFileList = []
                for file in node.findall("{http://pegasus.isi.edu/schema/DAX}uses"):
                    fileName = file.get("name")
                    if fileName is None:
                        fileName = file.get("file")
                    if fileName is None:
                        print("Error in parsing xml")

                    inout = file.get("link")
                    size = 0.0
                    fileSize = file.get("size")
                    if fileSize is not None:
                        size = float(fileSize)
                    else:
                        print(f"File Size not found for {fileName}")

                    if size == 0:
                        size = 1
                    type=FileType.NONE
                    if inout == "input":
                        type = FileType.INPUT
                    elif inout == "output":
                        type = FileType.OUTPUT
                    else:
                        print("Parsing Error")
                    tFile = None


                    if size < 0:
                       size = -size
                       print("Size is negative, assuming it is a parser error")


                    if type == FileType.OUTPUT:
                       tFile = FileItem(fileName, size)
                    elif r.containsFile(fileName):
                       tFile = r.getFile(fileName)
                    else:
                       tFile = FileItem(fileName, size)
                       r.setFile(fileName, tFile)

                    tFile.setType(type)
                    mFileList.append(tFile)



                task = Task(nodeName,self.jobIdStartsFrom, length)
                self.jobIdStartsFrom += 1
                task.setType(nodeType);
                # task.setUserId(self.userId);
                task.setFileList(mFileList)
                self.get_task_list().append(task)
                self.mName2Task[nodeName] = task
                # for file in mFileList:
                #    task.addRequired

              elif node.tag == "{http://pegasus.isi.edu/schema/DAX}child":
                child_name = node.get("ref")
                if child_name in self.mName2Task:
                    child_task = self.mName2Task[child_name]

                    for parent in node.findall("{http://pegasus.isi.edu/schema/DAX}parent"):
                        parent_name = parent.get("ref")
                        if parent_name in self.mName2Task:
                            parent_task = self.mName2Task[parent_name]
                            child_task.addParent(parent_task)
                            parent_task.addChild(child_task)


            roots = []

            for task in self.mName2Task.values():
                task.setDepth(0)
                if not task.parentList:
                    roots.append(task)

            for root_task in roots:
                self.set_depth(root_task, 1)

            self.mName2Task.clear()

        except ET.ParseError:
            print("3")
            print("XML Parse Error; Please make sure your DAX file is valid")

        except IOError:
            print("4")
            print("IO Error; Please make sure the DAX path is correctly set in your config file")

        except Exception as e:
            print("5")
            print("Parsing Exception:", str(e))

    def CalculateTransferCosts(self):
      for task1 in self.taskList:
        taskTransferCosts = {}

        for task2 in self.taskList:
            taskTransferCosts[task2] = 0.0
        self.transferCosts[task1] = taskTransferCosts
      for parent in self.taskList:
        for child in parent.getChildList():
            self.transferCosts[parent][child] = self.calculateTransferCost(parent, child)

    def calculateTransferCost(self, parent, child):
      parentFiles = parent.getFileList()
      childFiles = child.getFileList()
      acc = 0.0
      # print("Len: ",len(parentFiles))
      for parentFile in parentFiles:
          # print(parentFile.getType())
          if parentFile.getType() != FileType.OUTPUT:
              continue
          for childFile in childFiles:

              if childFile.getType() == FileType.INPUT and childFile.getName() == parentFile.getName():
                  acc += childFile.getSize()
                  break
      return acc


In [54]:
daxPath = "/content/drive/MyDrive/BTP/CyberShake_100.xml"
a=WorkflowParser(1, daxPath)
a.parse()
a.CalculateTransferCosts()
# x = a.get_transfer_cost()
# for task in a.get_task_list():
#   for t in a.get_task_list():
#     if x[task][t] != 0.0:
#       print("Parent: ", task.taskId)
#       print("Child: ", t.taskId)
#       print("transferCost : ", x[task][t])

## Creating the environment

In [55]:
#ENVIRONMENT

class Datacenter:
  def __init__(self, name, location, memory, cost_per_hour, cost_per_memory, bandwidth): #cost in Rs, memory in GBs, #Cost_per_memory strorage, BW,
    self.name = name
    self.location = location
    self.memory = memory
    self.cost_per_hour = cost_per_hour
    self.cost_per_memory = cost_per_memory
    self.bandwidth = bandwidth

  def add_host(self, host):
    self.hosts.append(host)

  def get_hosts(self):
    return self.hosts

  def show_hosts(self):
    print(self.hosts)
#Type List: MIPS,ID
PE_Type_list = {
    '1':
    {
        'MIPS': 100
    },
    '2':
    {
        'MIPS': 50
    },
    '3':
    {
        'MIPS': 10
    }
}

class Host:
  def __init__(self, name, Datacenter, cpu_cores, memory, ram, pe_type, bandwidth):#RAM, BW,Storage,PE ()Type List
    self.name = name
    self.Datacenter = Datacenter
    self.cpu_cores = cpu_cores
    self.memory = memory
    self.ram = ram
    self.pe_type = pe_type
    self.bandwidth = bandwidth
    self.vms = []

  def add_vm(self, vm):
    self.vms.append(vm)

  def get_mips(self, pe_type):
    return PE_Type_List[pe_type][0]

  def get_vms(self):
    return self.vms

VM_type = { #Task_Memory,
    'Type_1': {
          'Task_Memory' : 256,
          'mips' : 1000,
          'cost' : 0.10,
          'maxFreq' : 1.0,
          'minFreq' : 0.50,
          'minVoltage' : 5.0,
          'maxVoltage' : 7.0,
          'lambda' : 0.000150
    },
    'Type_2': {
          'Task_Memory' : 512,
          'mips' : 1500,
          'cost' : 0.20,
          'maxFreq' : 1.5,
          'minFreq' : 0.75,
          'minVoltage' : 7.0,
          'maxVoltage' : 9.0,
          'lambda' : 0.000100
        },
    'Type_3':
        {
          'Task_Memory' : 1024,
          'mips' : 2000,
          'cost' : 0.32,
          'maxFreq' : 2.0,
          'minFreq' : 1.00,
          'minVoltage' : 9.0,
          'maxVoltage' : 11.0,
          'lambda' : 0.000080
        },
    'Type_4':
        {
          'Task_Memory' : 500,
          'mips' : 2500,
          'cost' : 0.46,
          'maxFreq' : 2.5,
          'minFreq' : 1.25,
          'minVoltage' : 11.0,
          'maxVoltage' : 13.0,
          'lambda' : 0.000045
        },
    'Type_5':
        {
          'Task_Memory' : 560,
          'mips' : 3000,
          'cost' : 0.58,
          'maxFreq' : 3.0,
          'minFreq' : 1.50,
          'minVoltage' : 13,
          'maxVoltage' : 15,
          'lambda' : 0.000040
        },
    'Type_6':
        {
          'Task_Memory' : 250,
          'mips' : 3500,
          'cost' : 0.73,
          'maxFreq' : 3.5,
          'minFreq' : 1.75,
          'minVoltage' : 15,
          'maxVoltage' : 17,
          'lambda' : 0.000025
        },
    'Type_7':
        {
          'Task_Memory' : 150,
          'mips' : 4000,
          'cost' : 0.9,
          'maxFreq' : 4.0,
          'minFreq' : 2.00,
          'minVoltage' : 17,
          'maxVoltage' : 19,
          'lambda' : 0.000010
        },
    'Type_8':
        {
          'Task_Memory' : 750,
          'mips' : 4500,
          'cost' : 1.05,
          'maxFreq' : 4.5,
          'minFreq' : 2.25,
          'minVoltage' : 19,
          'maxVoltage' : 21,
          'lambda' : 0.000005
        },
    'Type_9':
        {
          'Task_Memory' : 2048,
          'mips' : 500,
          'cost' : 1.40,
          'maxFreq' : 5.5,
          'minFreq' : 2.75,
          'minVoltage' : 23,
          'maxVoltage' : 25,
          'lambda' : 0.000001
        }

}

AWS_VM_type = {
    'Type_1':
    {
        'Task_Memory' : 60,#GiB
          'cost' : 1.591,
          'PE' : 36,
          'Network+Performance' : "10 Gigabit",
          'Storage' : "EBS"
    },
    'Type_2':
    {
        'Task_Memory' : 30,#GiB
          'cost' : 0.786,
          'PE' : 16,
          'Network+Performance' : "High",
          'Storage' : "EBS"
    },
    'Type_3':
    {
        'Task_Memory' : 15,#GiB
          'cost' : 0.398,
          'PE' : 8,
          'Network+Performance' : "High",
          'Storage' : "EBS"
    },
    'Type_4':
    {
        'Task_Memory' : 7.5,#GiB
          'cost' : 0.198,
          'PE' : 4,
          'Network+Performance' : "High",
          'Storage' : "EBS"
    },
    'Type_5':
    {
        'Task_Memory' : 3.75,#GiB
          'cost' : 0.10,
          'PE' : 2,
          'Network+Performance' : "Moderate",
          'Storage' : "EBS"
    }
}

class VM:
  def __init__(self, name, Host, Type):
    self.name = name
    self.Host = Host
    self.Type = Type




def check_memory(datacenter, host_list):
    datacenter_memory=datacenter.memory
    host_memory=0
    for _ in host_list :
      host_memory += _.memory
    if host_memory > datacenter_memory :
      return True
    return False

def create_suitable_vms(task_memory, vm_memory):
    suitable_vms = []
    for t in task_memory:
        suitable_vms_for_task = []
        for v in range (len(vm_memory)):
            if t <= vm_memory[v]:
                suitable_vms_for_task.append(v)
        suitable_vms.append(suitable_vms_for_task)
    return suitable_vms


class VMAllocationEnv:
    def __init__(self, no_of_tasks, vm_list, task_list, task_lengths, datacenter, workflow):
        super().__init__()
        self.task_lengths = task_lengths
        self.task_list = task_list
        self.no_of_tasks = no_of_tasks
        self.no_of_vms = len(vm_list)
        self.observation_space = spaces.Box(low=0, high=1, shape=(no_of_vms, no_of_tasks), dtype=np.float32)
        # x,y = random.randint(0, len(action_space) - 1),random.randint(0, len(action_space[0]) - 1)
        self.state = np.zeros((no_of_vms, no_of_tasks), dtype = int)
        self.datacenter = datacenter
        self.workflow = workflow

    def action_space(self, task_id):
      x,y = task_id-1, (random.randint(0, no_of_vms-1))
      self.state[y][x]=1
      return [x,y]

    def calculate_reward(self, state, action, task_lengths,task,tcostMap):
        next_state = self.get_next_state(state, action, task, tcostMap)
        makespan = self.calculate_makespan(next_state, task_lengths, task, tcostMap)
        return 1.0/float(makespan)

    def get_next_state(self, state, action,task, tcostMap):
        vm_idx = action[1]
        task_idx = action[0]
        next_state = state
        # queue = []
        # queue.append(a.taskList)
        # for i in queue:
        #   if next_state[vm_idx][task.taskId-1]
        # print("n_s: ",next_state)
        # print("::", next_state[0])
        # print(":::", next_state[1])
        res = 0.0

        for i in range(len(next_state)):
            # print(next_state[i][task_idx])
            if (next_state[i][task_idx] == 1):
                next_state[i][task_idx] = 0
            next_state[vm_idx][task_idx] = 1
        for parent in task.getParentList():
          i = parent.taskId-1
          for j in range(len(state)):
            if state[j][i]==1 and j != vm_idx :
              x = self.workflow.get_transfer_cost()
              res = max(res,x[parent][task])

        res = res / 1000000
        res = (res * 8)/ self.datacenter.bandwidth
        res = res * 1000.0
        tcostMap[task.taskId-1] = res
        return next_state

    def calculate_makespan(self, state, task_lengths, task,tcostMap):
        makespan = 0.0
        for i in range(len(state)):
            time_in_row = 0.0
            for j in range(len(state[0])):
                if (state[i][j] == 1):
                    time_in_row += (task_lengths[j] + tcostMap[j])
            makespan = max(makespan, time_in_row)
        # print("Makespan in env : ", makespan)
        return makespan

    def are_all_assigned(self, state): #true if f==no_of_tasks
        f = 0
        # print("State in step: ", state)
        for i in range(len(state[0])):
            for j in range(len(state)):
                if (state[j][i] == 1):
                    f += 1
        if (f >= self.no_of_tasks):
                # print("All are assigned!")
                return True
        return False

    def step(self, action, task, tcostMap):
        reward = self.calculate_reward(self.state, action, self.task_lengths,task, tcostMap)
        self.state = self.get_next_state(self.state, action, task, tcostMap)
        done = self.are_all_assigned(self.state)
        truncated = False
        info = {}
        # print("Step Called!")
        # print(self.state, done)
        return self.state, reward, done, info

    def reset(self):
        self.state = np.zeros((self.no_of_vms, self.no_of_tasks), dtype = int)
#       self.state = self.state.flatten()
        # print(self.state)
        return self.state

# task_list = [
#     Task("task1", 1024, 1000,1), #name, memory, length, PE
#     Task("task2", 512, 500,1),
#     Task("task3", 256, 250,1),
#     Task("task4", 128, 100,1),
#     Task("task5", 124, 190,1),
#     Task("task6", 500, 10,1),
#     Task("task7", 256, 20,1),
#     Task("task8", 128, 130,1),
#     Task("task9", 1024, 240,1),
#     Task("task10", 512, 550,1),
#     Task("task11", 256, 210,1),
#     Task("task12", 128, 690,1),
#     Task("task13", 156, 110,1),
#     Task("task14", 128, 620,1)
# ]

# task_memory = []
# vm_memory = []
# for _ in vm_list:
#   vm_memory.append(VM_type[_.Type]['Task_Memory'])
# for _ in task_list:
#   task_memory.append(_.memory)

# print("VM Memory :", vm_memory)
# print("Task Memory :", task_memory)

# suitable_vms = create_suitable_vms(task_memory, vm_memory)
# print("Suitable VMs:" , suitable_vms)


# vm_available = [0] * len(vm_list)

# def allocate_task(task, vms, task_memory, vm_memory):
#     # suitable_vms = create_suitable_vms(task_memory, vm_memory)
#     vm = None
#     # Randomly select a VM from the suitable VMs
#     index = random.randint(0, len(suitable_vms[task]) - 1)
#     vm_index = suitable_vms[task][index]
#     if(vm_available[vm_index]==0):
#       vm = vm_index
#       vm_available[vm_index]=1
#     # If the task cannot be allocated to any of the suitable VMs, print the message "Unallocated task"
#     if vm is None:
#         print("Unallocated task :", task)
#         return None,None
#     return vm,vm_index

# action_space = array = [[0 for i in range(len(vm_list))] for j in range(len(task_list))]
# print(action_space)

# def allocate_tasks(tasks, vms, task_memory, vm_memory):
#     allocated_tasks = []
#     for task in range(len(tasks)):
#         vm, vm_index = allocate_task(task, vms, task_memory, vm_memory)
#         print(vm_index)
#         if vm and vm_index is not None:
#             allocated_tasks.append((tasks[task].memory, vm))
#             print(task,vm_index,len(action_space))
#             print(len(action_space[task]))
#             action_space[task][vm_index]=1
#     return allocated_tasks

# res = allocate_tasks(task_list, vm_list, task_list, task_memory, vm_memory)
# print(res)
# print(action_space)
# create action_space


'''
  for every task, find out all suitable vms, create list of suitable vms, allocate to any one of them randomly
  keep a list of vm occupied and vm time score.
  while allocating tasks randomly to suitable vms, check it's availibility, if it's unavailable, allocate the task to some other random location
  if all of the locations are unavailable, return unallocated message. Do this repeatedly until makespan is minimized.
  OR KEEP ALLOCATING VMS ONCE A BATCH.
'''

# env = VMAllocationEnv(vm_list, task_list)
# env = VMAllocationEnv(no_of_tasks, no_of_vms, a.taskList, task_lengths)

"\n  for every task, find out all suitable vms, create list of suitable vms, allocate to any one of them randomly\n  keep a list of vm occupied and vm time score.\n  while allocating tasks randomly to suitable vms, check it's availibility, if it's unavailable, allocate the task to some other random location\n  if all of the locations are unavailable, return unallocated message. Do this repeatedly until makespan is minimized.\n  OR KEEP ALLOCATING VMS ONCE A BATCH.\n"

In [56]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Create the policy $\pi(s)$

In [57]:
class PolicyNet(nn.Module):

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(no_of_tasks, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc_mu = nn.Linear(64, 2)

    def forward(self, x):
        x = torch.tensor(x).float().to(device)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        loc = self.fc_mu(x)
        loc = torch.tanh(loc*9)
        return loc

def Actor(states, task_id, actor):
    actor = actor
    actor = actor.to(device)
    actor_ = actor.to(device)
    actor_.requires_grad_(False)
    actions = actor(states).cpu().numpy()[0]
    # print("Actor:", actions)
    action = [task_id-1, random.randint(0, len(states)-1)]

    #action = [abs(round(actions[0]*9.45)), abs(round(actions[1]*60.325))]

    return action

# actor = PolicyNet()
# actor = actor.to(device)
# actor.requires_grad_(False)




## Create the value network $v(s)$

In [58]:
class ValueNet(nn.Module):

    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(no_of_tasks, 64)
        self.fc_mu = nn.Linear(64, 2)

    def forward(self, x):
        x = torch.tensor(x).float().to(device)
        x = F.relu(self.fc1(x))
        loc = self.fc_mu(x)
        loc = torch.tanh(loc*9)
        return loc

def Critic(states, task_id, critic):
    critic = critic
    critic = critic.to(device)
    critic_ = critic.to(device)
    critic_.requires_grad_(False)
    actions = critic(states).cpu().numpy()[0]
    # print("Critic:", actions)
    action = [task_id-1, random.randint(0, len(states)-1)]
    # action = [abs(round(actions[0]*9.45)), abs(round(actions[1]*60.325))]
    return action

# critic = ValueNet()
# critic = critic.to(device)
# critic.requires_grad_(False)

In [59]:
num_envs = mp.cpu_count()
num_envs

2

In [60]:
# a=WorkflowParser(1)
# a.parse()

## Implement the algorithm
</br>

In [61]:
from queue import Queue
from tqdm import trange

makespan = []

def evaluation(workflow, actor, critic, actor_optim, critic_optim, env):

  alpha=1e-4
  gamma=0.99
  reward_records = []
  States = []
  Final_State = []
  actor_loss=[]
  critic_loss=[]
  final_makespan = []
  Task_Allocation = {}
  episodes=[]
  stats = {'Actor Loss': [], 'Critic Loss': [], 'Returns': []}
  advantage = 0
  warnings.filterwarnings('ignore')
  for i in range(50):
      states = torch.from_numpy(env.reset()).float()
      # done = False
      cum_reward = 0
      makespans =[]
      ep_return = torch.zeros((num_envs, 1))
      I = 1
      tcostMap = {}
      episodes.append(i)

      q = Queue()
      for _ in workflow.taskList: #a.tasklist
        q.put(_)
      #tasks are allocated only if parent tasks are allocated
      while q.qsize()>0:
          # Taking Action Based on Advantage
          x = q.get()
          flag=False
          for j in x.parentList:
            if j.allocated==False:
              q.put(x)
              flag=True
              break
          if flag == True:
            continue
          else:
            x.allocated=True
            Action = actor(states)[0]
            if advantage<=0:
                action = Actor(states,x.taskId, actor)
            else:
                action = Critic(states,x.taskId, critic)
            # if action[0]>=10 or action[1]>=60:
            #     continue

            next_state, reward, done, _ = env.step(action, x, tcostMap)
            if done:
    #             print('done!')
                makespans.append(1/reward)
                States.append(states)
                cum_reward += reward
                break

    #         Critic loss
            temp =  critic(torch.from_numpy(next_state).float())[:][0]
            value = temp
            target = reward + gamma * temp
            critic_loss = F.mse_loss(value, target)
            critic.zero_grad()
            critic_optim.step()

    #         Advantage Calculation
            value = sum(value/10)
            target = sum(target/10)
    #         value = math.sqrt((value[0]**2+value[1]**2)/2)
    #         target = math.sqrt((target[0]**2+target[1]**2)/2)
            advantage = (target - value)

    #         Actor loss and returns
            probs = actor(states)[1]
            log_probs = torch.log(probs + 1e-6)
            entropy = - torch.sum(probs * log_probs, dim=-1, keepdim=True)
            actor_loss = - I * log_probs * advantage - 0.01 * entropy
            actor_loss = actor_loss.mean()
            actor.zero_grad()
            actor_optim.step()
            ep_return += reward
            states = next_state
            I = I * gamma

      reward_records.append(cum_reward)
      final_makespan.append(min(makespans))
      Final_State.append(States[final_makespan.index(min(makespans))])
      stats['Actor Loss'].append(actor_loss.item())
      stats['Critic Loss'].append(critic_loss.item())
      stats['Returns'].append(ep_return.mean().item())
  # makespan.append(min(final_makespans))
  # print("makespan: ", min(final_makespan))
  # print(min(final_makespan))
  return min(final_makespan)




In [62]:
#Client 1:
daxPath = "/content/drive/MyDrive/BTP/CyberShake_100.xml"
a=WorkflowParser(1, daxPath)
a.parse()
a.CalculateTransferCosts()
task_lengths = []
for _ in a.taskList:
  task_lengths.append(_.taskLength)

# print("Task_List :", task_lengths)


#vm_list: append vm type uniformly
vm_list = [
    VM("VM1", "host1", VM_type["Type_1"]),
    VM("VM2", "host1", VM_type["Type_2"]),
    VM("VM3", "host2", VM_type["Type_3"]),
    VM("VM4", "host2", VM_type["Type_4"]),
    VM("VM5", "host3", VM_type["Type_1"]),
    VM("VM6", "host3", VM_type["Type_2"]),
    VM("VM7", "host4", VM_type["Type_3"]),
    VM("VM8", "host4", VM_type["Type_4"]),
    VM("VM9", "host1", VM_type["Type_1"]),
]

host_mem = {
    "host1":[0,0],#(no_of_vms,max_vm_mem)
    "host2":[0,0],
    "host3":[0,0],
    "host4":[0,0]
}

#updating host memory based on vms's allocated
def host_memory(host_mem, vm_list):
  for _ in vm_list:
    host_mem[_.Host][1]=max(host_mem[_.Host][1] ,_.Type['Task_Memory'])
    host_mem[_.Host][0]+=1

host_memory(host_mem, vm_list)

D2 = Datacenter("D2", "Bangalore", 0, 7.5, 7.5, 500) #name, location, memory, cost_per_hour, cost_per_memory, bandwidth

host_list = [
    Host("host1", D2, 2, host_mem["host1"][1]*host_mem["host1"][0],2,1,10), #name, Datacenter, cpu_cores, memory, ram, pe_type, bandwidth
    Host("host2", D2, 4, host_mem["host2"][1]*host_mem["host2"][0],4,2,25),
    Host("host3", D2, 8, host_mem["host3"][1]*host_mem["host3"][0],4,3,100),
    Host("host4", D2, 16, host_mem["host4"][1]*host_mem["host4"][0],8,4,75)
]

#updating datacenter based on host's allocated
def datacenter_memory(host_mem, vm_list):
  mem = 0
  for _ in host_list:
    mem += _.memory
  D2.memory = mem
datacenter_memory(host_mem, vm_list)
print("D2 memory : ", D2.memory)

no_of_vms = len(vm_list)
no_of_tasks = len(task_lengths)

env1 = VMAllocationEnv(no_of_tasks, vm_list, a.taskList, task_lengths, D2, a)
actor1 = PolicyNet()
actor1 = actor1.to(device)
actor1.requires_grad_(False)

critic1 = ValueNet()
critic1 = critic1.to(device)
critic1.requires_grad_(False)

actor1_optim = AdamW(actor1.parameters(), lr=0.05)
critic1_optim = AdamW(critic1.parameters(), lr=0.05)

# print(evaluation(a, actor1, critic1, actor1_optim, critic1_optim, env1)/1000.0)

#Client2:
daxPath = "/content/drive/MyDrive/BTP/CyberShake_100.xml"
b=WorkflowParser(2, daxPath)
b.parse()
b.CalculateTransferCosts()
task_lengths = []
for _ in b.taskList:
  task_lengths.append(_.taskLength)

# print("Task_List :", task_lengths)


#vm_list: append vm type uniformly
vm_list_AWS = [
    VM("VM1", "host1", AWS_VM_type["Type_1"]),
    VM("VM2", "host1", AWS_VM_type["Type_2"]),
    VM("VM3", "host2", AWS_VM_type["Type_3"]),
    VM("VM4", "host2", AWS_VM_type["Type_4"]),
    VM("VM5", "host3", AWS_VM_type["Type_1"]),
    VM("VM6", "host3", AWS_VM_type["Type_2"]),
    VM("VM7", "host4", AWS_VM_type["Type_3"]),
    VM("VM8", "host4", AWS_VM_type["Type_4"]),
    VM("VM9", "host1", AWS_VM_type["Type_1"]),
]

host_mem_AWS = {
    "host1":[0,0],#(no_of_vms,max_vm_mem)
    "host2":[0,0],
    "host3":[0,0],
    "host4":[0,0]
}

def host_memory(host_mem, vm_list):
  for _ in vm_list_AWS:
    host_mem_AWS[_.Host][1]=max(host_mem_AWS[_.Host][1] ,_.Type['Task_Memory'])
    host_mem_AWS[_.Host][0]+=1

host_memory(host_mem_AWS, vm_list_AWS)

AWS = Datacenter("AWS", "Bangalore", 0, 7.5, 7.5, 500) #name, location, memory, cost_per_hour, cost_per_memory, bandwidth

host_list_AWS = [
    Host("host1", AWS, 2, host_mem_AWS["host1"][1]*host_mem_AWS["host1"][0],2,1,10), #name, Datacenter, cpu_cores, memory, ram, pe_type, bandwidth
    Host("host2", AWS, 4, host_mem_AWS["host2"][1]*host_mem_AWS["host2"][0],4,2,25),
    Host("host3", AWS, 8, host_mem_AWS["host3"][1]*host_mem_AWS["host3"][0],4,3,100),
    Host("host4", AWS, 16, host_mem_AWS["host4"][1]*host_mem_AWS["host4"][0],8,4,75)
]

def datacenter_memory(host_mem_AWS, vm_list_AWS):
  mem = 0
  for _ in host_list_AWS:
    mem += _.memory
  AWS.memory = mem
datacenter_memory(host_mem_AWS, vm_list_AWS)
print("AWS memory : ", AWS.memory)

no_of_vms = len(vm_list_AWS)
no_of_tasks = len(task_lengths)

env2 = VMAllocationEnv(no_of_tasks, vm_list_AWS, b.taskList, task_lengths,AWS,b)
actor2 = PolicyNet()
actor2 = actor2.to(device)
actor2.requires_grad_(False)

critic2 = ValueNet()
critic2 = critic2.to(device)
critic2.requires_grad_(False)

actor2_optim = AdamW(actor2.parameters(), lr=0.05)
critic2_optim = AdamW(critic2.parameters(), lr=0.05)

# evaluation(b, actor2, critic2, actor2_optim, critic2_optim,env2)

#Federated Model

actor_fed = PolicyNet()
actor_fed = actor_fed.to(device)
actor_fed.requires_grad_(False)


critic_fed = ValueNet()
critic_fed = critic_fed.to(device)
critic_fed.requires_grad_(False)

actor_fed_optim = AdamW(actor_fed.parameters(), lr=0.05)
critic_fed_optim = AdamW(critic_fed.parameters(), lr=0.05)



D2 memory :  6656
AWS memory :  360


In [63]:

aggregator_epochs = 20

number_of_clients = 2

from tqdm import trange
# client_actor_weights = {client: None for client in range(number_of_clients)}
# client_critic_weights = {client: None for client in range(number_of_clients)}

#list of makespan obtained in each epoch
makespan1=[]
makespan2=[]

for epoch in trange(aggregator_epochs, desc ="Aggregator Model"):

#training the client models locally
  print("\nMakepsan after epoch:",epoch)
  x=evaluation(a, actor1, critic1, actor1_optim, critic1_optim,env1)
  print("Client 1: ", x/1000) #tasklength was multiplied by 1000 when workflow was parsed
  makespan1.append(x/1000)
  y=evaluation(b, actor2, critic2, actor2_optim, critic2_optim,env2)
  print("Client 2: ", y/1000)
  makespan2.append(y/1000)

#extracting the weights of the client models
  #Actor
  actor1_state_dict = actor1.state_dict()
  actor2_state_dict = actor2.state_dict()

#taking the mean of weights of both the client models
  actor_state_dict = {}
  for key in actor1_state_dict:
      if key in actor2_state_dict:
          actor_state_dict[key] = (actor1_state_dict[key] + actor2_state_dict[key]) / 2.0
      else:
          actor_state_dict[key] = actor_state_dict[key]

  # for key in state_dict2:
  #     if key not in state_dict1:
  #         mean_state_dict[key] = state_dict2[key]


  #updating the weights of federated model with the mean of weights of client models
  actor_fed.load_state_dict(actor_state_dict)

  #Critic
  critic1_state_dict = critic1.state_dict()
  critic2_state_dict = critic2.state_dict()

  critic_state_dict = {}
  for key in critic1_state_dict:
      if key in critic2_state_dict:
          critic_state_dict[key] = (critic1_state_dict[key] + critic2_state_dict[key]) / 2.0
      else:
          critic_state_dict[key] = critic_state_dict[key]

  # for key in state_dict2:
  #     if key not in state_dict1:
  #         mean_state_dict[key] = state_dict2[key]
  critic_fed.load_state_dict(critic_state_dict)

#updating the weights of the both the client model with the weight of federated model
  actor1.load_state_dict(actor_fed.state_dict())
  actor2.load_state_dict(actor_fed.state_dict())
  critic1.load_state_dict(critic_fed.state_dict())
  critic2.load_state_dict(critic_fed.state_dict())

#Average makespan after the aggregator epochs
print("Makespan after federated learning of Client 1: ",sum(makespan1)/len(makespan1))
print("Makespan after federated learning of Client 2: ",sum(makespan2)/len(makespan2))


Aggregator Model:   0%|          | 0/20 [00:00<?, ?it/s]


Makepsan after epoch: 0
Client 1:  505.977879584


Aggregator Model:   5%|▌         | 1/20 [00:20<06:29, 20.50s/it]

Client 2:  484.150335968

Makepsan after epoch: 1
Client 1:  519.4807388160001


Aggregator Model:  10%|█         | 2/20 [00:41<06:18, 21.05s/it]

Client 2:  530.8046156799999

Makepsan after epoch: 2
Client 1:  510.09420137600006


Aggregator Model:  15%|█▌        | 3/20 [01:03<06:05, 21.50s/it]

Client 2:  509.214768608

Makepsan after epoch: 3
Client 1:  471.5329505600001


Aggregator Model:  20%|██        | 4/20 [01:24<05:36, 21.02s/it]

Client 2:  517.4957165440001

Makepsan after epoch: 4
Client 1:  543.3453560319999


Aggregator Model:  25%|██▌       | 5/20 [01:44<05:12, 20.86s/it]

Client 2:  522.779033216

Makepsan after epoch: 5
Client 1:  521.3945976320001


Aggregator Model:  30%|███       | 6/20 [02:07<05:00, 21.43s/it]

Client 2:  480.37741126400005

Makepsan after epoch: 6
Client 1:  515.83371008


Aggregator Model:  35%|███▌      | 7/20 [02:27<04:35, 21.16s/it]

Client 2:  499.4039261120001

Makepsan after epoch: 7
Client 1:  514.4919231360001


Aggregator Model:  40%|████      | 8/20 [02:48<04:10, 20.86s/it]

Client 2:  518.707851072

Makepsan after epoch: 8
Client 1:  494.06023286400006


Aggregator Model:  45%|████▌     | 9/20 [03:09<03:49, 20.89s/it]

Client 2:  474.8052908480001

Makepsan after epoch: 9
Client 1:  521.5666106560001


Aggregator Model:  50%|█████     | 10/20 [03:31<03:33, 21.33s/it]

Client 2:  490.59125177600004

Makepsan after epoch: 10
Client 1:  522.3686468480001


Aggregator Model:  55%|█████▌    | 11/20 [03:51<03:09, 21.00s/it]

Client 2:  515.196302208

Makepsan after epoch: 11
Client 1:  515.064150368


Aggregator Model:  60%|██████    | 12/20 [04:12<02:46, 20.80s/it]

Client 2:  471.41007984000015

Makepsan after epoch: 12
Client 1:  525.1444143680001


Aggregator Model:  65%|██████▌   | 13/20 [04:34<02:29, 21.29s/it]

Client 2:  483.9905018560001

Makepsan after epoch: 13
Client 1:  517.953873856


Aggregator Model:  70%|███████   | 14/20 [04:55<02:06, 21.12s/it]

Client 2:  511.33780905600014

Makepsan after epoch: 14
Client 1:  501.310520704


Aggregator Model:  75%|███████▌  | 15/20 [05:15<01:44, 20.86s/it]

Client 2:  520.48523456

Makepsan after epoch: 15
Client 1:  492.85015312


Aggregator Model:  80%|████████  | 16/20 [05:36<01:23, 20.93s/it]

Client 2:  504.36850550400004

Makepsan after epoch: 16
Client 1:  455.80155440000004


Aggregator Model:  85%|████████▌ | 17/20 [05:58<01:03, 21.30s/it]

Client 2:  520.0054987200001

Makepsan after epoch: 17
Client 1:  497.3106989440001


Aggregator Model:  90%|█████████ | 18/20 [06:18<00:41, 20.98s/it]

Client 2:  480.8191002240001

Makepsan after epoch: 18
Client 1:  496.05651545600006


Aggregator Model:  95%|█████████▌| 19/20 [06:39<00:20, 20.79s/it]

Client 2:  542.0044991039999

Makepsan after epoch: 19
Client 1:  480.364896096


Aggregator Model: 100%|██████████| 20/20 [07:01<00:00, 21.08s/it]

Client 2:  512.3107967679999
Makespan after federated learning of Client 1:  506.1001812448001
Makespan after federated learning of Client 2:  504.51292644640006



