# JSSP comparação entre euristicas Classicas: codigo fonte

## importação de libs

In [1]:
# %pip install mealpy
# %pip install numpy
%pip install -r ../requirements.txt


Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import importlib.util
from classes.jssp import jssp
import numpy as np
from classes.jssp import jssp
from mealpy import SA
from mealpy.utils.space import FloatVar
import matplotlib.pyplot as plt
import time
import csv

## funções adicionais

In [3]:
def import_tests_cases(nome_dict : str) -> dict:

    caminho_absoluto = os.path.abspath("../tests/test1.py")

    spec = importlib.util.spec_from_file_location("modulo_temp", caminho_absoluto)
    
    if spec is None or spec.loader is None:
        raise ImportError(f"Não foi possível carregar o módulo do arquivo: {caminho_absoluto}")
    
    modulo = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(modulo)

    return getattr(modulo, nome_dict)

def decode_solution(solution_vec, operations):
    import numpy as np
    order = np.argsort(solution_vec)

    machine_available = {}
    job_operation_count = {}  # Conta quantas operações de cada job já foram executadas
    job_last_end_time = {}    # Último tempo de fim de operação para cada job

    print("Ordem com máquina usada:")

    for i, idx in enumerate(order):
        op = operations[idx]
        job = op["job"]
        
        # Inicializa contadores se necessário
        if job not in job_operation_count:
            job_operation_count[job] = 0
            job_last_end_time[job] = 0

        # Escolher a máquina que fica disponível primeiro
        machine = min(op["machines"], key=lambda m: machine_available.get(m, 0))

        # Considera tanto a disponibilidade da máquina quanto a precedência do job
        machine_ready_time = machine_available.get(machine, 0)
        job_ready_time = job_last_end_time[job]
        start_time = max(machine_ready_time, job_ready_time)
        end_time = start_time + op["duration"]

        # Atualiza os tempos
        machine_available[machine] = end_time
        job_last_end_time[job] = end_time
        job_operation_count[job] += 1

        print(f"{i+1}: {op['job']} Op{job_operation_count[job]}, Máquina {machine}, Duração {op['duration']}, Início {start_time}, Fim {end_time}, Equipamento {op.get('equipments', 'N/A')}")

def make_fitness_function(instance: jssp):
   
    operations = instance.get_flattened_operations()

    def fitness(solution):
        PENALTY_FACTOR = 1000

        priority_order = np.argsort(solution)

        machine_available = {}    # ex: {1: 5} → máquina 1 estará livre no tempo 5
        job_operation_count = {}  # ex: {"job_1": 2} → job_1 já executou 2 operações
        job_last_end_time = {}    # ex: {"job_1": 4} → última operação do job_1 terminou no tempo 4
        job_next_expected_op = {} # ex: {"job_1": 3} → próxima operação esperada do job_1 é a Op 3
        end_times = []            # armazenará o tempo de término de cada operação
        equipment_available = {}     # ex: {"equip_1": 8} → equipamento 1 estará livre no tempo 8
        
        precedence_violations = 0
        

        for idx in priority_order:
            op = operations[idx]  # Recupera a operação pelo índice
            job = op["job"]               # Nome do job (ex: "job_1")
            machines = op["machines"]     # Lista de máquinas disponíveis
            duration = op["duration"]     # Duração da operação
            equipments = op.get("equipments", [])  # Lista de equipamentos necessários
            operation_id = op.get("operation_id", 1)  # ID da operação dentro do job

            if job not in job_operation_count:
                job_operation_count[job] = 0
                job_last_end_time[job] = 0
                job_next_expected_op[job] = 1

            # VERIFICAÇÃO DE PRECEDÊNCIA: A operação deve ser executada na ordem correta
            expected_op_id = job_next_expected_op[job]
            
            if operation_id != expected_op_id:
                precedence_violations += 1
                
            
            # Atualizar a próxima operação esperada
            if operation_id == expected_op_id:
                job_next_expected_op[job] = operation_id + 1

            # Seleciona a máquina disponível mais cedo
            machine = min(machines, key=lambda m: machine_available.get(m, 0))

            machine_ready_time = machine_available.get(machine, 0)
            job_ready_time = job_last_end_time[job]
            
            # Verifica disponibilidade dos equipamentos
            equipment_ready_times = [equipment_available.get(eq, 0) for eq in equipments]
            latest_equipment_ready_time = max(equipment_ready_times) if equipment_ready_times else 0

            # O tempo de início é o máximo entre todos os tempos de disponibilidade
            start_time = max(machine_ready_time, job_ready_time, latest_equipment_ready_time)
            end_time = start_time + duration

            # Atualiza as disponibilidades
            machine_available[machine] = end_time
            for eq in equipments:
                equipment_available[eq] = end_time
            job_last_end_time[job] = end_time
            job_operation_count[job] += 1

            end_times.append(end_time)  # Salva o tempo final da operação

        makespan = max(end_times) if end_times else 0
        
        penalty = precedence_violations * PENALTY_FACTOR
        
        final_fitness = makespan + penalty
        
       

        return final_fitness,  # returns as a touple (becase mealpy needs :P)

    return fitness

def save_results_to_csv(times, solutions, instance_data, filename="results.csv"):
    
    time_dict = {id_sol: tempo for tempo, id_sol in times}
    
    timespan = instance_data.get("timespan", "N/A")
    
    csv_data = []
    
    for solution in solutions:
        solution_id = solution.id
        execution_time = time_dict.get(solution_id, "N/A")
        fitness = solution.target.fitness
        
        solution_vector = str(solution.solution.tolist()) if hasattr(solution.solution, 'tolist') else str(solution.solution)
        
        csv_data.append({
            'id': solution_id,
            'execution_time': execution_time,
            'fitness': fitness,
            'timespan': timespan,
            'solution_vector': solution_vector
        })
    
    # Salvar no CSV
    fieldnames = ['id', 'execution_time', 'fitness', 'timespan', 'solution_vector']
    
    with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(csv_data)
    
    print(f"Resultados salvos em {filename}")
    print(f"Total de {len(csv_data)} soluções salvas")
    print(f"Timespan da instância: {timespan}")
    
    return filename

## Configurações gerais

In [4]:
# dados para os testes iniciais
data = import_tests_cases("best")
instance = jssp(data)
fitness_func = make_fitness_function(instance)
num_ops = len(instance.get_flattened_operations())

print(f"Número de operações: {num_ops}")
print("Operações carregadas:")
operations = instance.get_flattened_operations()
for i, op in enumerate(operations):
    print(f"  {i}: Job {op['job']}, Op {op['operation_id']}, Máquinas {op['machines']}, Equipamento {op['equipments']} , Duração {op['duration']}")

# Definindo o problema
problem = {
    "obj_func": fitness_func,
    "bounds": [FloatVar(lb=0.0, ub=1.0) for _ in range(num_ops)],
    "minmax": "min",
    "log_to": None,
}

[([1], [1], 1), ([1], [2], 1)]
[([2], [1], 1), ([2], [2], 1)]
Número de operações: 4
Operações carregadas:
  0: Job job_1, Op 1, Máquinas [1], Equipamento [1] , Duração 1
  1: Job job_1, Op 2, Máquinas [1], Equipamento [2] , Duração 1
  2: Job job_2, Op 1, Máquinas [2], Equipamento [1] , Duração 1
  3: Job job_2, Op 2, Máquinas [2], Equipamento [2] , Duração 1


## Simulated annealing

In [5]:
times = []
solutions = []
model = SA.OriginalSA(epoch=1000)
for _ in range(30):
    start_time = time.time()
    g_best = model.solve(problem)
    end_time = time.time()
    times.append([end_time - start_time, g_best.id])
    solutions.append(g_best)




In [6]:

csv_filename = save_results_to_csv(times, solutions, data, "simulated_annealing_results.csv")


Resultados salvos em simulated_annealing_results.csv
Total de 30 soluções salvas
Timespan da instância: 3
