<a href="https://colab.research.google.com/github/Brenocsc/Task-Scheduling/blob/main/task-scheduling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
class Task:
  def __init__(self, id):
    self.id = id
    self.processorId = None
    self.rank = None
    self.compCost = []
    self.avgCompCost = None
    self.predEdges = []
    self.succEdges = []
    self.duration = {'start': None, 'end': None}
  
  def __lt__(self, other):
    return False

In [None]:
class Processor:
  def __init__(self, id):
    self.id = id
    self.taskList = []

In [None]:
class Edge:
  def __init__(self, predecessorTask, successorTask, data):
    self.predecessorTask = predecessorTask # id
    self.successorTask = successorTask # id
    self.data = data
    self.avgCommCost = None

In [None]:
class Environment:
  def __init__(self, numTasks, numProcessors):
    self.numTasks = numTasks
    self.numProcessors = numProcessors
    self.numEdges = 0
    self.tasks = []
    self.edges = []
    self.processors = []
    self.processorTransferRates = [[0, 1, 1], [1, 0, 1], [1, 1, 0]] # Barramento heterogêneo?
    self.commStartupCosts = [1, 1, 1] # Precisa??
    self.createProcessors()

  def createProcessors(self):
    for i in range(self.numProcessors):
      self.processors.append(Processor(i))

  def resetInstances(self):
    for task in self.tasks:
      self.processorId = None
      self.rank = None
      self.duration = {'start': None, 'end': None}
    for processor in self.processors:
      processor.taskList = []

  # def generateProcessorsSettings(self): # Gerar o processorTransferRates e o commStartupCosts random

In [None]:
import operator
import heapq
import numpy as np
from random import randrange

class CPOP:
  def __init__(self, env):
    self.numTasks = env.numTasks # Contando a entry e a exit task
    self.numEdges = env.numEdges
    self.numProcessors = env.numProcessors
    self.tasks = env.tasks[:]
    self.edges = env.edges[:]
    self.processors = env.processors[:]
    self.processorTransferRates = env.processorTransferRates
    self.commStartupCosts = env.commStartupCosts # Precisa??
    # cpop atributos
    self.entryTask = self.tasks[0]
    self.exitTask = self.tasks[self.numTasks - 1]
    self.criticalPath = []

  def calculateAvgCompCost(self):
    for task in self.tasks:
      task.processorId = None # Se já tiver sido usado a task em outro algoritmo
      task.isCritical = False
      task.avgCompCost = np.average(task.compCost)
      print('avg', task.avgCompCost)

  def calculateAvgTransferRates(self):
    numProcessors = self.numProcessors
    numTransfers = (numProcessors * numProcessors) - numProcessors

    totalTransferRate = 0
    for i in range(numProcessors):
      for j in range(numProcessors):
        # if (i != j): Se manter o 0 na diagonal, não precisa desse IF
        totalTransferRate += self.processorTransferRates[i][j]

    return totalTransferRate / numTransfers

  def getEST(self, task, processor):
    est = 0
    lenTaskListProcessor = len(processor.taskList)

    for predEdge in task.predEdges:
      commCost = 0
      predTask = self.tasks[predEdge.predecessorTask] # error
      if predTask.processorId != processor.id:
        commCost = predEdge.data / self.processorTransferRates[predTask.processorId][processor.id] # Não esta sendo levado em consideração o communication statup cost
      est = max(est, predTask.duration['end'] + commCost)
    freeTimes = []
    if lenTaskListProcessor == 0:       # no task has yet been assigned to processor
      freeTimes.append([0, float('inf')])
    else:
      for i in range(lenTaskListProcessor):
        if i == 0:
          if processor.taskList[i].duration['start'] != 0:    # if processor is not busy from time 0
            freeTimes.append([0, processor.taskList[i].duration['start']])
        else:
          freeTimes.append([processor.taskList[i-1].duration['end'], processor.taskList[i].duration['start']])
      freeTimes.append([processor.taskList[-1].duration['end'], float('inf')])
    for slot in freeTimes:     # free_times is already sorted based on avaialbe start times
      if est < slot[0] and slot[0] + task.compCost[processor.id] <= slot[1]:
        return slot[0]
      if est >= slot[0] and est + task.compCost[processor.id] <= slot[1]:
        return est

  def getEFT(self, task, processor, est):
    return task.compCost[processor.id] + est #self.getEST(task, processor)

  def calculateAvgCommCost(self):
    avgProcTransfer = self.calculateAvgTransferRates()
    avgCommStartup = np.average(self.commStartupCosts)
    print('avgtransfer', avgProcTransfer, avgCommStartup)

    for edge in self.edges:
      # edge.avgCommCost = avgCommStartup + edge.data / avgProcTransfer
      edge.avgCommCost = edge.data / avgProcTransfer # Não esta sendo levado em consideração o communication statup cost
      print('avg', edge.avgCommCost)

  def recursiveRankUpward(self, task):
    successorsMaxList = []
    for edge in task.succEdges:
      succRankUpward = self.recursiveRankUpward(self.tasks[edge.successorTask])
      print('upward', task.id, edge.avgCommCost, succRankUpward)
      successorsMaxList.append(edge.avgCommCost + succRankUpward)

    currentRankUpward = (task.avgCompCost if len(successorsMaxList) == 0
    else task.avgCompCost + np.amax(successorsMaxList))

    task.rankU = currentRankUpward
    return currentRankUpward

  def recursiveRankDownward(self, task):
    predMaxList = []
    for edge in task.predEdges:
      predRankDownward = self.recursiveRankDownward(self.tasks[edge.predecessorTask])
      print('downward', task.id, task.avgCompCost, edge.avgCommCost, predRankDownward)
      predMaxList.append(task.avgCompCost + edge.avgCommCost + predRankDownward)

    currentRankDownward = 0 if len(predMaxList) == 0 else np.amax(predMaxList)

    task.rankD = currentRankDownward
    return currentRankDownward

  def calculateRankPriority(self):
    # assumindo que o primeiro vertice é a unica task de entrada
    self.recursiveRankUpward(self.entryTask)
    self.recursiveRankDownward(self.exitTask)
    for task in self.tasks:
      task.rank = task.rankU + task.rankD

    self.rankTasks = self.tasks[:]
    self.rankTasks.sort(key = lambda x: x.rank, reverse=True)
    
  def createCriticalPath(self):
    self.criticalPath.append(self.entryTask)
    task = self.entryTask
    task.isCritical = True
    while task.id != self.exitTask.id:
      highestPriority = -1

      for edge in task.succEdges:
        succTask = self.tasks[edge.successorTask]
        if succTask.rank > highestPriority:
          criticalTask = succTask
          highestPriority = succTask.rank

      self.criticalPath.append(criticalTask)

      task = criticalTask
      task.isCritical = True

  def chooseProcessorCP(self):
    lowestCost = float("inf")
    for proc in self.processors:
      compCostTotal = 0

      for task in self.criticalPath:
        compCostTotal += task.compCost[proc.id]
      print('Processor ', proc.id, "-> Cost total: ", compCostTotal)
      if compCostTotal < lowestCost:
        lowestCost = compCostTotal
        self.processorCP = proc

  def scheduleTasks(self):
    priorityQueue = []
    heapq.heappush(priorityQueue, (-1 * self.entryTask.rank, self.entryTask))

    while len(priorityQueue) != 0:
      task = heapq.heappop(priorityQueue)[1]
      bestProcessor = None

      if task.isCritical:
        bestProcessor = self.processorCP
        est = self.getEST(task, bestProcessor)
        eft = self.getEFT(task, bestProcessor, est)
        print("Task: ", task.id, ", ProcCP: ", bestProcessor.id, " -> EST: ", est, 'EFT', eft)
        task.duration['start'] = est
        task.duration['end'] = eft
      else:
        eftBest = float("inf")
        for processor in self.processors:
          est = self.getEST(task, processor)
          eft = self.getEFT(task, processor, est)
          print("Task: ", task.id, ", Proc: ", processor.id, " -> EST: ", est, 'EFT', eft)
          if (eft < eftBest):
            estBest = est
            eftBest = eft
            bestProcessor = processor
        task.duration['start'] = estBest
        task.duration['end'] = eftBest
      
      task.processorId = bestProcessor.id
      bestProcessor.taskList.append(task)
      bestProcessor.taskList.sort(key = lambda x: x.duration['start']) # precisa ordenar todo loop??

      # Tem jeito mais facil de verificar se está pronta a task??
      for succEdge in task.succEdges:
        isReady = True
        succTask = self.tasks[succEdge.successorTask]
        for predEdge in succTask.predEdges:
          if self.tasks[predEdge.predecessorTask].processorId == None:
            isReady = False

        if isReady:
          heapq.heappush(priorityQueue, (-1 * succTask.rank, succTask))

      # print(task.id, processor.id, self.getEFT(task, processor))

In [None]:
import operator
import copy
import numpy as np
from random import randrange

class HEFT:
  def __init__(self, env):
    self.numTasks = env.numTasks # Contando a entry e a exit task
    self.numEdges = env.numEdges
    self.numProcessors = env.numProcessors
    self.tasks = env.tasks[:]
    self.edges = env.edges[:]
    self.processors = env.processors[:]
    self.processorTransferRates = env.processorTransferRates # Barramento heterogêneo?
    self.commStartupCosts = env.commStartupCosts # Precisa??
    # self.compCosts = [[2, 3], [1, 1], [1, 2]] # Precisa??
    # self.rankU = [0] * self.numTasks
    # self.taskLinks = [[-1, 0, 0], [-1, -1, -1], [-1, -1, -1]] # Precida??

  def calculateAvgCompCost(self):
    for task in self.tasks:
      task.avgCompCost = np.average(task.compCost)
      print('avg', task.avgCompCost)

  def calculateAvgTransferRates(self):
    numProcessors = self.numProcessors
    numTransfers = (numProcessors * numProcessors) - numProcessors

    totalTransferRate = 0
    for i in range(numProcessors):
      for j in range(numProcessors):
        # if (i != j): Se manter o 0 na diagonal, não precisa desse IF
        totalTransferRate += self.processorTransferRates[i][j]

    return totalTransferRate / numTransfers

  def getEST(self, task, processor):
    est = 0
    lenTaskListProcessor = len(processor.taskList)

    for predEdge in task.predEdges:
      commCost = 0
      predTask = self.tasks[predEdge.predecessorTask] # error
      if predTask.processorId != processor.id:
        commCost = predEdge.data / self.processorTransferRates[predTask.processorId][processor.id] # Não esta sendo levado em consideração o communication statup cost
      est = max(est, predTask.duration['end'] + commCost)
    freeTimes = []
    if lenTaskListProcessor == 0:       # no task has yet been assigned to processor
      freeTimes.append([0, float('inf')])
    else:
      for i in range(lenTaskListProcessor):
        if i == 0:
          if processor.taskList[i].duration['start'] != 0:    # if processor is not busy from time 0
            freeTimes.append([0, processor.taskList[i].duration['start']])
        else:
          freeTimes.append([processor.taskList[i-1].duration['end'], processor.taskList[i].duration['start']])
      freeTimes.append([processor.taskList[-1].duration['end'], float('inf')])
    for slot in freeTimes:     # free_times is already sorted based on avaialbe start times
      if est < slot[0] and slot[0] + task.compCost[processor.id] <= slot[1]:
        return slot[0]
      if est >= slot[0] and est + task.compCost[processor.id] <= slot[1]:
        return est

  def getEFT(self, task, processor, est):
    return task.compCost[processor.id] + est #self.getEST(task, processor)

  def calculateAvgCommCost(self):
    avgProcTransfer = self.calculateAvgTransferRates()
    avgCommStartup = np.average(self.commStartupCosts)
    print('avgtransfer', avgProcTransfer, avgCommStartup)

    for edge in self.edges:
      # edge.avgCommCost = avgCommStartup + edge.data / avgProcTransfer
      edge.avgCommCost = edge.data / avgProcTransfer # Não esta sendo levado em consideração o communication statup cost
      print('avg', edge.avgCommCost)

  def recursiveRankUpward(self, task):
    successorsMaxList = []
    for edge in task.succEdges:
      succRankUpward = self.recursiveRankUpward(self.tasks[edge.successorTask])
      print(task.id, edge.avgCommCost, succRankUpward)
      successorsMaxList.append(edge.avgCommCost + succRankUpward)

    currentRankUpward = (task.avgCompCost if len(successorsMaxList) == 0
    else task.avgCompCost + np.amax(successorsMaxList))

    task.rank = currentRankUpward
    return currentRankUpward

  def calculateRankUpward(self):
    # assumindo que o primeiro vertice é a unica task de entrada
    self.recursiveRankUpward(self.tasks[0])
    self.rankUTasks = self.tasks[:]
    self.rankUTasks.sort(key = lambda x: x.rank, reverse=True)

  def scheduleTasks(self):
    firstTask = self.rankUTasks[0]
    processorId, compCost = min(enumerate(firstTask.compCost), key=operator.itemgetter(1))
    print("Task: ", firstTask.id, ", Proc: ", processorId, " -> EST: ", 0, 'EFT', compCost)
    firstTask.duration['start'] = 0
    firstTask.duration['end'] = compCost
    firstTask.processorId = processorId
    self.processors[processorId].taskList.append(firstTask)
  
    for task in self.rankUTasks[1:]:
      estBest = 0
      eftBest = float("inf")
      # task.duration['end'] = float("inf")
      for processor in self.processors:
        est = self.getEST(task, processor)
        eft = self.getEFT(task, processor, est)
        print("Task: ", task.id, ", Proc: ", processor.id, " -> EST: ", est, 'EFT', eft)
        if (eft < eftBest):
          estBest = est
          eftBest = eft
          bestProcessorId = processor.id # TODO parar de usar o ID
      task.duration['start'] = estBest
      task.duration['end'] = eftBest
      task.processorId = bestProcessorId
      self.processors[bestProcessorId].taskList.append(task)
      self.processors[bestProcessorId].taskList.sort(key = lambda x: x.duration['start'])  # precisa ordenar todo loop??

      # print(task.id, processor.id, self.getEFT(task, processor))

In [None]:
class DAG:
  def __init__(self):
    self.colSize = 11

  def writeFormatCol(self, data):
    colContent = str(data).rjust(self.colSize)
    self.wf.write(colContent)

  def readFormatCol(self, line, indexCol):
    dataCol = line[ self.colSize*indexCol : self.colSize*(indexCol+1) ]
    print(dataCol, indexCol)
    return int(dataCol.strip())

  def isEntryOrExitTask(self, taskId, numTask):
    entryTaskId = 0 
    exitTaskId = numTask + 1
    return taskId == entryTaskId or taskId == exitTaskId
    

  # Basic STG Columns 
  # Task Id | Computation Cost | Num Predecessors | [ Predecessors Id list ]

  # Communication cost and heterogeneous processors (CCHP)
  # CCHP STG Columns 
  # Task Id | [ Computation Cost list ](length = numProc) | Num Predecessors | [ (Predecessors Id | Communication data) list ]
  def createCCHP(self, readDirfile, writeDirfile, numProcessors):
    self.rf = open(readDirfile, 'r')
    self.wf = open(writeDirfile, 'w')
    colSize = 11
  
    numTasks = int(self.rf.readline().strip())
    self.writeFormatCol(numTasks)
    self.writeFormatCol(numProcessors)
    self.wf.write("\n")

    for _ in range(numTasks+2): # +2 por causa da entry task e a exit task
      line = self.rf.readline()

      taskId = self.readFormatCol(line, 0)
      print(taskId)
      compCost = self.readFormatCol(line, 1)
      print(compCost)
      numPred = self.readFormatCol(line, 2)
      print(numPred,'\n')

      self.writeFormatCol(taskId)
      for _ in range(numProcessors):
        compCostProcessor = (randrange(compCost, compCost + 3) 
        if not self.isEntryOrExitTask(taskId, numTasks)
        else 0)
        
        self.writeFormatCol(compCostProcessor)
      
      self.writeFormatCol(numPred)
      for i in range(numPred):
        predTaskId = self.readFormatCol(line, i+3)
        
        commData = (randrange(1, 6) # random range 1 to 5
        if not (self.isEntryOrExitTask(taskId, numTasks) or self.isEntryOrExitTask(predTaskId, numTasks))
        else 0)

        self.writeFormatCol(predTaskId)
        self.writeFormatCol(commData)
      
      self.wf.write("\n")
      
    self.rf.close()
    self.wf.close()
  
  def readCCHP(self, readDirfile):
    self.rf = open(readDirfile, 'r')

    line = self.rf.readline()
    numTasks = self.readFormatCol(line, 0)
    numProcessors = self.readFormatCol(line, 1)

    env = Environment(numTasks + 2, numProcessors) # Contando a entry e a exit task

    for _ in range(numTasks+2): # +2 por causa da entry task e a exit task
      colIndex = 0
      line = self.rf.readline()
      taskId = self.readFormatCol(line, colIndex)
      colIndex += 1

      task = Task(taskId)
  
      for _ in range(numProcessors):
        cost = self.readFormatCol(line, colIndex)
        colIndex += 1
        task.compCost.append(cost)

      env.tasks.append(task)

      numPred = self.readFormatCol(line, colIndex)
      colIndex += 1
      
      for _ in range(numPred):
        predTaskId = self.readFormatCol(line, colIndex)
        colIndex += 1
        commData = self.readFormatCol(line, colIndex)
        colIndex += 1

        edge = Edge(predTaskId, taskId, commData)

        env.tasks[predTaskId].succEdges.append(edge)
        task.predEdges.append(edge)
        env.edges.append(edge)
        env.numEdges += 1

    for task in env.tasks:
      print("Task ", task.id, "-> CompCost: ", task.compCost)
      for edge in task.predEdges:
        print("Edge ", edge.predecessorTask, "->", edge.successorTask, " Data:", edge.data)
    # for edge in env.edges:
    #   print("Edge ", edge.predecessorTask, "->", edge.successorTask, " Data:", edge.data)
    print(env.numTasks, env.numEdges)

    return env


In [None]:
dag = DAG()
# dag.createCCHP('/content/drive/MyDrive/TCC/rnc50/50/rand0000 (edited).stg', '/content/drive/MyDrive/TCC/rnc50/50/rand0000 (new).stg', 2)
env = dag.readCCHP('/content/drive/MyDrive/TCC/Grafos/topcuoglu-10.stg')

# dag.createCCHP('/content/drive/MyDrive/TCC/Grafos/rnc50/50/rand0001.stg', '/content/drive/MyDrive/TCC/Grafos/rnc50/50/rand0001-cchp.stg', 3)
# env = dag.readCCHP('/content/drive/MyDrive/TCC/Grafos/rnc50/50/rand0001-cchp.stg')

env.resetInstances()
cpop = CPOP(env)
cpop.calculateAvgCompCost()
cpop.calculateAvgCommCost()
cpop.calculateRankPriority()
for task in cpop.tasks:
  print("Task ", task.id, "-> Rank: ", task.rank)
cpop.createCriticalPath()
cpop.chooseProcessorCP()
for task in cpop.tasks:
  print("Task ", task.id, "-> critical: ", task.isCritical)
for task in cpop.criticalPath:
  print("Critical Task ", task.id)
print("Processor CP", cpop.processorCP.id)
cpop.scheduleTasks()
print('Schedule length = ', cpop.exitTask.duration['end'])

env.resetInstances()
heft = HEFT(env)
heft.calculateAvgCompCost()
heft.calculateAvgCommCost()
heft.calculateRankUpward()
for task in heft.tasks:
    print("Task ", task.id, "-> Rank: ", task.rank)
for task in heft.rankUTasks:
    print("RankUTasks ", task.id, "-> Rank: ", task.rank)
heft.scheduleTasks()
print('Schedule length = ', heft.rankUTasks[-1].duration['end'])
for p in heft.processors:  
  for t in p.taskList:
    print(p.id, t.id)


          9 0
          3 1
          0 0
          0 1
          0 2
          0 3
          0 4
          1 0
         14 1
         16 2
          9 3
          1 4
          0 5
          0 6
          2 0
         13 1
         19 2
         18 3
          1 4
          1 5
         18 6
          3 0
         11 1
         13 2
         19 3
          1 4
          1 5
         12 6
          4 0
         13 1
          8 2
         17 3
          1 4
          1 5
          9 6
          5 0
         12 1
         13 2
         10 3
          1 4
          1 5
         11 6
          6 0
         13 1
         16 2
          9 3
          1 4
          1 5
         14 6
          7 0
          7 1
         15 2
         11 3
          1 4
          3 5
         23 6
          8 0
          5 1
         11 2
         14 3
          3 4
          2 5
         19 6
          4 7
         27 8
          6 9
         15 10
          9 0
         18 1
         12 2
         20 3
     

In [None]:
import heapq
listForTree = []
# heapq._heapify_max(listForTree)
# print(heapq._heappop_max(listForTree))
# print(heapq._heappop_max(listForTree))
# heapq._heappush_max(listForTree, 16)
# print(heapq._heappop_max(listForTree))
# heapq.heappush(listForTree, (-1*1, 'a'))
# heapq.heappush(listForTree, (-1*2, 'b'))
# heapq.heappush(listForTree, (-1*3, 'c'))
# print(heapq.heappop(listForTree)[1])
# print(heapq.heappop(listForTree)[1])
# print(heapq.heappop(listForTree)[1])
# print(listForTree)
# print(heapq.heappop(listForTree)[1])
import copy
t = Task(1)
a1 = []
a1.append(t)
a2 = copy.deepcopy(a1)
a2[0].id = 2
print(a1[0].id)
print(a2[0].id)



1
2
