In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import numpy as np
from copy import deepcopy
from typing import List, Tuple

def read_job_shop_data(filename: str) -> Tuple[int, int, List[List[int]], List[List[int]]]:
    task_use = []
    task_duration = []
    num_jobs = 0
    num_machines = 0

    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith("!"):
                continue
            elif "NBJOBS:" in line:
                num_jobs = int(line.split(":")[1])
            elif "NBRES:" in line:
                num_machines = int(line.split(":")[1])
            elif "taskUse:" in line:
                for _ in range(num_jobs):
                    line = next(file).strip()
                    task_use.append(list(map(int, [c for c in line.split() if c != ']'])))
            elif "taskDuration:" in line:
                for _ in range(num_jobs):
                    line = next(file).strip()
                    task_duration.append(list(map(int, [c for c in line.split() if c != ']'])))
    return num_jobs, num_machines, task_use, task_duration

In [3]:
file_name = "/content/drive/MyDrive/Colab Notebooks/BIA/mt06.dat"
num_jobs, num_machines, task_use, task_duration = read_job_shop_data(file_name)

In [4]:
num_jobs, num_machines, task_use, task_duration

(6,
 6,
 [[2, 0, 1, 3, 5, 4],
  [1, 2, 4, 5, 0, 3],
  [2, 3, 5, 0, 1, 4],
  [1, 0, 2, 3, 4, 5],
  [2, 1, 4, 5, 0, 3],
  [1, 3, 5, 0, 4, 2]],
 [[1, 3, 6, 7, 3, 6],
  [8, 5, 10, 10, 10, 4],
  [5, 4, 8, 9, 1, 7],
  [5, 5, 5, 3, 8, 9],
  [9, 3, 5, 4, 3, 1],
  [3, 3, 9, 10, 4, 1]])

In [5]:
class ScheduleHandler:
    def __init__(self, num_machines):
        self.schedule = [[] for _ in range(num_machines)]

    def totalSlack(self):
        summ = 0
        for row in self.schedule:
            for cell in row:
                if cell is None:
                    summ += 1
        return summ

    def totalTime(self):
        summ = 0
        for row in self.schedule:
            for cell in row:
                if cell is not None:
                    summ += 1
        return summ
    def addJob(self, num_job, num_task):
        last_job = 0
        for row in self.schedule:
            for index, cell in enumerate(row):
                if cell == num_job and index > last_job:
                    last_job = index

        if len(self.schedule[task_use[num_job][num_task]]) <= last_job:
            for slot in range(len(self.schedule[task_use[num_job][num_task]]), last_job + 1):
                self.schedule[task_use[num_job][num_task]].append(None)
        else:
            for slot in range(task_duration[num_job][num_task]):
                self.schedule[task_use[num_job][num_task]].append(num_job)

    def printSchedule(self):
        for row in self.schedule:
            print(row)

In [6]:
algo = np.zeros((len(task_use))) # Pass a tuple for shape
algo

array([0., 0., 0., 0., 0., 0.])

In [7]:
import numpy as np
import copy
from typing import List, Tuple

class ACOBinaryKnapSack:

    def __init__(self,num_machines: int, task_use: List[list[int]], task_duration: List[list[int]], n_ants: int, alpha: float, beta: float, rho: float):
        self.task_use = np.array(task_use)
        self.task_duration = np.array(task_duration)
        self.num_machines = num_machines

        self.n_ants = n_ants
        self.alpha = alpha
        self.beta = beta
        self.rho = rho

        self.pheromone = None
        self.best_solution = None
        self.best_fitness = None

        self.pheromone_history = []
        self.trails_history = []
        self.best_fitness_history = []

    def _initialize(self):
        self.pheromone = np.ones((self.task_use.shape[0],self.task_use.shape[1]))
        self.best_solution = None
        self.best_fitness = float('+inf')

        self.pheromone_history = []
        self.trails_history = []
        self.best_fitness_history = []

    def _construct_solution(self) -> ScheduleHandler:
        solution = ScheduleHandler(self.num_machines)
        candidates_tracker = np.zeros((self.task_use.shape[0]), dtype=int) # Ensure integer type
        total_tasks = self.task_use.shape[0] * self.task_use.shape[1]

        while np.any(candidates_tracker < self.task_use.shape[1]):
            heuristic = self.calculate_probabilities(candidates_tracker,solution)
            heuristic = heuristic**self.beta

            pheromones = self.pheromone[np.arange(self.task_use.shape[1]), candidates_tracker] # Now candidates_tracker should be integers
            pheromones=pheromones**self.alpha

            total = np.sum(pheromones * heuristic)
            probabilities = (pheromones * heuristic) / total

            index = np.random.choice(candidates_tracker.size, p=probabilities) # Select from the size of candidates_tracker

            #Adds pheromone based on when it was chosen
            self.pheromone[index][candidates_tracker[index]]*=1+total_tasks/100
            solution.addJob(index,candidates_tracker[index])
            total_tasks-=1
            candidates_tracker[index] += 1

        return solution

    def _heuristic(self,time_used: int,total_slack:int) -> float:
        return time_used / (1+total_slack**2)

    def calculate_probabilities(self, candidates_tracker: np.array, solution: ScheduleHandler) -> np.array:
        candidates_probabilities = np.zeros(candidates_tracker.size, dtype=np.float64)

        for idx, candidate in enumerate(candidates_tracker):
            solution_aux = copy.deepcopy(solution)
            if candidates_tracker[idx] < self.task_use.shape[1]:
                solution_aux.addJob(idx, int(candidates_tracker[idx]))
                candidates_probabilities[idx] = self._heuristic(solution_aux.totalTime(), solution_aux.totalSlack())
            else:
                candidates_probabilities[idx] = 0

        return candidates_probabilities

    def optimize(self, max_evaluations: int = 1000):
        self._initialize()

        n_evaluations = 0
        iter_fitness = 1e-10
        while n_evaluations < max_evaluations:
            trails = []
            for _ in range(self.n_ants):
                solution = self._construct_solution()
                fitness = self._evaluate(solution)
                n_evaluations += 1
                trails.append((solution, fitness))

                if fitness < self.best_fitness:
                    self.best_solution = solution
                    self.best_fitness = fitness

            self._update_pheromone(trails, iter_fitness)
            iter_fitness = self.best_fitness

            self.trails_history.append(deepcopy(trails))
            self.best_fitness_history.append(self.best_fitness)

        return self.best_solution

    def _evaluate(self, solution: ScheduleHandler) -> float:
        return solution.totalSlack()/solution.totalTime()


    def _update_pheromone(self, trails: List[Tuple[ScheduleHandler, float]], best_fitness:float):
        self.pheromone_history.append(self.pheromone.copy())

        evaporation = 1 - self.rho
        self.pheromone *= evaporation
        for solution, fitness in trails:
            delta_fitness = 1.0/(1.0 + (best_fitness - fitness) / best_fitness)
            self.pheromone += delta_fitness

In [8]:
aco = ACOBinaryKnapSack(num_machines, task_use, task_duration, 10, 1, 5, 0.8)
best_solution = aco.optimize()

  probabilities = (pheromones * heuristic) / total


ValueError: probabilities contain NaN