## Artificial Intelligence (BSc) SA 2024-2025

- Prof. Luca Gambardella
- Fatima Ezzeddine ([`fatima.ezzeddine@usi.ch`](mailto:fatima.ezzeddine@usi.ch))<br>

---
# Lab 1: Introduction to TSP and JSP problem formulation and to Mealpy

Lab Objectives:

Traveling Salesman Problem:
- read TSP data
- define euclidean distance function
- define a TSPProblemInstance python class 
- plot raw data
- generate naive solution 
- check if the solution is valid
- evaluate solution


Job Shop Scehduling Problem:
- read JSP data
- define objective function
- use mealpy algorithm to get a solution
- plot gant chart for solution


NOTE: I've marked all the code that you will have to fill with a `# TODO` comment


This cell below is simply importing some useful stuff for later

In [None]:
# example of how to install a library in python
!pip install numpy
!pip install mealpy
!pip install matplotlib

In [None]:
!python3 --version

In [None]:
# The glob module finds all the pathnames matching a specified pattern
import glob

# Numpy library used for working with arrays, has functions for working in domain of linear algebra, and matrices
import numpy as np

# Matplotlib is a library for creating visualizations
from matplotlib import pyplot as plt

### Read TSP data
In this Cup you will have to deal with predefined set of problems. These problems are located in the `problems` folder.

First lets get list them out

In [None]:
# read all the files that has extention .tsp to the problems array object
problems = glob.glob('./problems/*.tsp')

# loop over the problems and print the problem name 
for prob in problems:
    print(prob[11:])

In [None]:
# Checking if all of the 10 problems are in the folder would be a waste of time so we can write a line of code just to check if they are all there
print(np.all([n[11:] in ['fl1577.tsp','pr439.tsp','ch130.tsp','rat783.tsp','d198.tsp', 'kroA100.tsp','u1060.tsp','lin318.tsp','eil76.tsp','pcb442.tsp'] for n in problems]))

#### TSP File format
All the problems are stored in a `.tsp` (this file is actually a renamed `.txt` file, so you could open them with your favorite text editor)

As we will see in a bit all the problems files are composed of different sections:
* `NAME`: the shortned name of the problem
* `COMMENT`: a comment area that can contain the full name of the problem
* `TYPE`: this defines the type of problem at hand, in our case is always TSP
* `DIMENSION`: this states the problem dimension
* `EDGE_WEIGHT_TYPE`: this section states the types of weights applied to edges, in our case it is always EUC_2D or the weights are giveng using the euclidean distance in 2 dimension
* `BEST_KNOWN`: this states the best known result obtained, note that as the Prof said, it is unlikely to get a better performance than this
* `NODE_COORD_SECTION`: finally we have the section that states the triplets that defines the problems points. These triplets are (point_number, x,y).

Now that we know all of that, lets print the content of a single problem

In [None]:
example_problem = "./problems/eil76.tsp"
# open problem file as read and print it's content lines
with open(example_problem,"r") as exprob:
    print(exprob.read().splitlines())

### Euclidean Distance
Since all of our problems are using the euclidean distance between points for the edges weights.
We will now define a function that computes the euclidean distance. This distance will also be used to build the distance matrix

In [None]:
#define function to compute euclidean distance between 2 points
def distance_euc(point_i, point_j):
    rounding = 0
    x_i, y_i = point_i[0], point_i[1]
    x_j, y_j = point_j[0], point_j[1]
    # use the numpy sqrt method
    distance = np.sqrt((x_i - x_j) ** 2 + (y_i - y_j) ** 2)
    return round(distance, rounding)

Let's test it

In [None]:
point_1 = (2, 2)
point_2 = (5, 5)
distance_euc(point_1, point_2)
# Expected output is 4.0 with rounding to 0

In [None]:
text = 'NAME: eil76'
text.split(' ')

### Reading and storing the data
We will now define a Class called `ProblemInstance`

in the Constructor of the class (`__init__()`method of a class in Python) you will have to implement the code for:
* reading the raw data
* store the metadata
* read all the point and store them
* code the method that creates the distance matrix between points
* \[optional\] check if the problem loaded has an optimal and in that case store the optimal solution
* \[optional\] code the plotting method


In [None]:
class ProblemInstance:

    def __init__(self, name_tsp):

        # boolean if the problem have an sxisting optimal solution
        self.exist_opt = False

        # the optimal tour that was found before
        self.optimal_tour = None

        # distance matrix
        # Example distance matrix representing distances between 4 points
        #    A    B    C    D
        # A  0.0  2.5  3.0  4.2
        # B  2.5  0.0  1.8  3.6
        # C  3.0  1.8  0.0  2.9
        # D  4.2  3.6  2.9  0.0
        self.dist_matrix = None
        
        # read raw data  
        with open(name_tsp) as f_o:
            data= f_o.read()
            self.lines = data.splitlines()

        # store metadata set information 

        # here we expect the name of the problem
        self.name = self.lines[0].split(' ')[1]
        

        # here we expect the number of points in the considered instance
        self.nPoints =  int(self.lines[3].split(' ')[1])
        
        # here the length of the best solution
        self.best_sol = float(self.lines[5].split(' ')[1])
        
        
        # read all data points and store them 
        self.points = np.zeros((self.nPoints, 3)) # this is the structure where we will store the pts data 
        for i in range(self.nPoints):
            line_i = self.lines[7 + i].split(' ')
            self.points[i, 0] = int(line_i[0])
            self.points[i, 1] = float(line_i[1])
            self.points[i, 2] = float(line_i[2])

        # create distance matrix bx calling create_dist_matrix
        self.create_dist_matrix()
        
        # TODO [optional]
        # if the problem is one with a optimal solution, that solution is loaded
        self.optimal_tour = np.zeros(self.nPoints, dtype=int)
        if name_tsp in ["./problems/eil76.tsp", "./problems/kroA100.tsp"]:
            # change the boolean to True since an optimal solution exist
            self.exist_opt = True
            # open the optimal solution file and read it
            file_object = open(name_tsp.replace(".tsp", ".opt.tour"))
            data = file_object.read()
            file_object.close()
            lines = data.splitlines()

            # read all data points and store them

            # initialize an array with 0s using np.zeros
            self.optimal_tour = np.zeros(self.nPoints, dtype=int)
            # read the points from the file and fill the array
            for i in range(self.nPoints):
                line_i = lines[5 + i].split(' ')
                self.optimal_tour[i] = int(line_i[0]) - 1

    # for display purposes
    def print_info(self):
        print("\n#############################\n")
        print('name: ' + self.name)
        print('nPoints: ' + str(self.nPoints))
        print('best_sol: ' + str(self.best_sol))
        print('exist optimal: ' + str(self.exist_opt))

    # for display purposes
    def plot_data(self,show_numbers=False):
        # define a figure with it's size
        plt.figure(figsize=(8, 8))

        # give it a title as the problem instance name
        plt.title(self.name)

        # scatter takes an x and y axis values that are in this case the points that we have
        plt.scatter(self.points[:, 1], self.points[:, 2])

        # if we want to add to the plot next to each point it's number or id
        if show_numbers:
            for i, txt in enumerate(np.arange(self.nPoints)): 
                plt.annotate(txt, (self.points[i, 1], self.points[i, 2]))

        # display the plot/figure        
        plt.show()

    def create_dist_matrix(self): # TODO
        # initialize the matrix with dimension n x n
        self.dist_matrix = np.zeros((self.nPoints, self.nPoints))

        # loop over all the pints and compute the euclidean distance between every pair of points with index i and j
        for i in range(self.nPoints):
            for j in range(i, self.nPoints):
                self.dist_matrix[i, j] = distance_euc(self.points[i][1:3], self.points[j][1:3])

        # transpose of the matrix and fill missing other 0s
        # this was made to reduce the complexity  
        self.dist_matrix += self.dist_matrix.T

------------------------
Now we can test our Class with an example problem

In [None]:
example_problem = "./problems/eil76.tsp"
p_inst = ProblemInstance(example_problem)

p_inst.print_info()
p_inst.plot_data()
#Expected output
"""
#############################

name: eil76
nPoints: 76
best_sol: 538.0
exist optimal: True
"""

In [None]:
p_inst.nPoints

In [None]:
p_inst.plot_data(show_numbers=True)

-------------
### Random solver 
Now we will code the random solver and test it with a class called `SolverTSP` that takes the solvers and the problem instance and act as a framework to compute the solution and gives us some additional information.
We will also need to code the `evaluate_solution` method of the the `SolverTSP` class

In [None]:

# Generate a random permutation of numbers from 0 to instance_.nPoints - 1.
#  This is done using np.arange(instance_.nPoints), which creates an array of integers from 0 to instance_.nPoints - 1.

# np.random.choice is then used to randomly shuffle this array. 
# It essentially selects elements from the array in a random order without replacement, ensuring that each element is selected exactly once.

#The result is an array containing a random permutation of integers from 0 to instance_.nPoints - 1.

def random_method(instance_): # TODO
    return np.random.choice(np.arange(instance_.nPoints), size=instance_.nPoints,
                            replace=False)

print(random_method(p_inst))

In [None]:
available_methods = {"random": random_method}

In [None]:
# library to get time
from time import time as t

class SolverTSP:
    def __init__(self, algorithm_name, problem_instance):
        # duration taken to find the solution
        self.duration = np.inf

        # lenght of the tour found by the solver
        self.found_length = np.inf

        # name of the algorithm
        self.algorithm_name = algorithm_name

        # initialization
        self.name_method = "initialized with " + algorithm_name

        # boolean to check if solved problem
        self.solved = False

        # TSP problem intance to be solved
        self.problem_instance = problem_instance

        # the found solution
        self.solution = None

    # function that takes computes the solution by applying the function given as input
    def compute_solution(self, verbose=True, return_value=True):
        self.solved = False
        if verbose:
            print(f"###  solving with {self.algorithm_name}  ####")
        
        # starting time 
        start_time = t()

        # available methods that was defined earlier have a pointer to the function (algorithm name) and take as input the problem instance
        self.solution = available_methods[self.algorithm_name](self.problem_instance)

        # check if solution is valid implemented
        assert self.check_if_solution_is_valid(self.solution), "Error the solution is not valid"
        #end time of the solution
        end_time = t()
        self.duration = np.around(end_time - start_time, 3)
        if verbose:
            print(f"###  solved  ####")
        self.solved = True
        # compute the length of the solution tour
        self.evaluate_solution()

        # compute the gap
        self._gap()

        if return_value:
            return self.solution

    # simple plot of the soluton found
    def plot_solution(self):
        assert self.solved, "You can't plot the solution, you need to compute it first!"
        plt.figure(figsize=(8, 8))
        self._gap()
        plt.title(f"{self.problem_instance.name} solved with {self.name_method} solver, gap {self.gap}")
        ordered_points = self.problem_instance.points[self.solution]
        plt.plot(ordered_points[:, 1], ordered_points[:, 2], 'b-')
        plt.show()

    # check if the solution contains all the point and visited exactly one
    def check_if_solution_is_valid(self, solution):
        rights_values = np.sum([self.check_validation(i, solution) for i in np.arange(self.problem_instance.nPoints)])
        if  rights_values == self.problem_instance.nPoints:
            return True
        else:
            return False 
        
    # check if a point/node inside a solution 
    def check_validation(self, node , solution):
         if np.sum(solution == node) == 1:
            return 1
         else:
            return 0

    # compute the tour lengh by the help of distance matrix already computed
    def evaluate_solution(self, return_value=False):
        total_length = 0
        from_node_id = self.solution[0] # starting_node

        # loop over all the nodes and add successive distances
        for node_id in self.solution[1:]:
            edge_distance = self.problem_instance.dist_matrix[from_node_id, node_id]
            total_length += edge_distance
            from_node_id = node_id

        # have a complete tour and add the distance to go back to starting node    
        self.found_length = total_length + self.problem_instance.dist_matrix[self.solution[0], from_node_id]       
        if return_value:
            return total_length

    # compute (solution - best solution)/best solution in %
    def _gap(self):
        self.evaluate_solution(return_value=False)
        self.gap = np.round(
            ((self.found_length - self.problem_instance.best_sol) / self.problem_instance.best_sol) * 100, 2)


----------------------------
Now we will test our code

In [None]:
# here I'm repeating this two lines just to remind you which problem we are using
example_problem = "./problems/eil76.tsp"
p_inst = ProblemInstance(example_problem)
available_methods = {"random": random_method}
solver_name="random"
# TODO
# 1. create an instance of SolverTSP
solver = SolverTSP(solver_name, p_inst)
# 2. compute a solution
solver.compute_solution()
# 3. print the information as for the output
print(f"the total length for the solution found is {solver.found_length}",
      f"while the optimal length is {solver.problem_instance.best_sol}",
      f"the gap is {solver.gap}%",
      f"the solution is found in {solver.duration} seconds", sep="\n")
# 4. plot the solution
solver.plot_solution()
# this is the output expected and after that the solution's plot
"""
###  solving with random  ####
###  solved  ####
the total length for the solution found is 2424.0
while the optimal length is 538.0
the gap is 350.56%
the solution is found in 0.0 seconds
"""

--------------------
Finally since our example problem has an optimal solution we can plot it

In [None]:
solver = SolverTSP("optimal", p_inst)
solver.solved = True
# the solution is the optimal tour
solver.solution = np.concatenate([p_inst.optimal_tour, [p_inst.optimal_tour[0]]])
solver.plot_solution()

### JOB SHOP SCHEDULING
JSPLIB
- "name" : "instance", // the name of the instance [required]
- "jobs" : n,          // the number of jobs [required]
- "machines" : m,      // the number of machines [required]
- "optimum" : c,       // the optimum makespan or null [required]
- "bounds" :          // required when the optimum is null
- "upper" : ub,       // the upper-bounds of makespan
- "lower" : lb,       // the lower-bounds of makespan
- "path" : "*****"     // the file path to the instance [required]


In [None]:
# coding: utf-8
import os
import sys
import json
import math
import numpy as np

Target = 'abz5'

file = open('./JSPLIB-master/instances.json', "r" )
data = json.load(file)

instance = [ inst for inst in data if inst['name'] == Target ]
if(len(instance) == 0):
    raise Exception("There is no instance named %s" % Target)

instance = instance[0]
path = os.path.abspath("../%s" % instance['path'])
optimum = instance['optimum']

if( optimum is None ):
    if(instance['bounds'] is None):
        optimum = "nan"
    else:
        optimum = instance['bounds']['lower']

sys.stdout.write('--instance %s --optimum %s' % (path,optimum))

In [None]:
optimum

In [None]:
# EXAMPLE OF HOW JOBS SHOULD BE PRESENTED
job_times = [[2, 1, 3], [4, 2, 1], [3, 3, 2]] # [0, 1, 2, 3, 4, 5, 6 ,7, 8] (how represented in the solution, flatten and re-enumerated)
# job 0: [2, 1, 3]
# Machine 1: 2, index 0 in the solution
# Machine 2: 1  index 1 in the solution
# Machine 3: 3  index 2 in the solution
# etc. same for the others
# job_times[i][j] represents the processing time of job i on machine j.
# The objective is typically to minimize the total time required to complete all jobs (the makespan).

path_file = './JSPLIB-master/instances/abz5'

# read raw data
with open(path_file) as f_o:
    data = f_o.read()
    lines = data.splitlines()
    print(lines)

jobs = np.zeros((10, 10))
lines = lines[5:]
lines, len(lines)

for i in range(len(lines)):
    line = lines[i].split(' ')
    line = [int(num) for num in line]
    for j in range(0, len(line) -1 , 2):
        jobs[i][line[j]] = int(line[j+1])
jobs

In [None]:
from mealpy import PermutationVar, Problem, WOA

job_times = jobs
n_jobs = len(job_times)
n_machines = 10


data = {
    "job_times": job_times,
    "n_jobs": n_jobs,
    "n_machines": n_machines
}

class JobShopProblem(Problem):
    def __init__(self, bounds=None, minmax="min", data=None, **kwargs):
        self.data = data
        super().__init__(bounds, minmax, **kwargs)

        # Objective function to minimize the makespan
    def obj_func(self, x):
        x_decoded = self.decode_solution(x)
        x = x_decoded["per_var"]

        # Initialize the makespan matrix with zeroes for all jobs and machines
        makespan = np.zeros((self.data["n_jobs"], self.data["n_machines"]))
        
        # MakeSpan:  [[0. 0. 0.]
        #             [0. 0. 0.]
        #             [0. 0. 0.]]
        # makespan[i][j]: each row i represent a job idx, each column j repesent a machine idx
        
        # Arrays to track the completion time of each job and each machine
        job_completion = np.zeros(self.data["n_jobs"])
        machine_completion = np.zeros(self.data["n_machines"])

        # Iterates over the jobs in the solution order and calculates the completion time of each job on each machine.
        # Uses a makespan matrix to keep track of the completion times of jobs on each machine.
        # The final makespan is the maximum value in the makespan matrix, which represents the completion time of the last job.
        for gene in x:
            # Decode gene into job and machine indices
            job_idx = gene // self.data["n_machines"]
            machine_idx = gene % self.data["n_machines"]
            
            # Completion time for this job on this machine depends on two things:
            # 1. The time when the machine becomes available (from previous job)
            # 2. The time when the job is ready to be processed on this machine (from previous machine)
            
            start_time = max(job_completion[job_idx], machine_completion[machine_idx])
            
            # Update the makespan matrix with start time + processing time
            makespan[job_idx][machine_idx] = start_time + self.data["job_times"][job_idx][machine_idx]
            
            # Update the completion time of this job and this machine
            job_completion[job_idx] = makespan[job_idx][machine_idx]
            machine_completion[machine_idx] = makespan[job_idx][machine_idx]
        
        # The makespan (objective value) is the maximum value in the makespan matrix
        return np.max(makespan)
        

bounds = PermutationVar(valid_set=list(range(0, n_jobs*n_machines)), name="per_var")
problem = JobShopProblem(bounds=bounds, minmax="min", data=data)


model = WOA.OriginalWOA(epoch=100, pop_size=20)
model.solve(problem)

print(f"Best agent: {model.g_best}")                    # Encoded solution
print(f"Best solution: {model.g_best.solution}")        # Encoded solution
print(f"Best fitness: {model.g_best.target.fitness}")
print(f"Best real scheduling: {model.problem.decode_solution(model.g_best.solution)}")      # Decoded (Real) solution

In [None]:
def plot_gantt_chart(solution, job_times):
    n_jobs = len(job_times)
    n_machines = len(job_times[0])
    
    # Decode the solution
    decoded = problem.decode_solution(solution)["per_var"]
    
    # Initialize data structures
    job_completion = np.zeros(n_jobs)
    machine_completion = np.zeros(n_machines)
    tasks = []
    
    # Generate a color for each job
    colors = plt.cm.get_cmap('hsv', n_jobs)

    for gene in decoded:
        job_idx = gene // n_machines
        machine_idx = gene % n_machines
        start_time = max(job_completion[job_idx], machine_completion[machine_idx])
        end_time = start_time + job_times[job_idx][machine_idx]
        
        # Record the task for Gantt chart
        tasks.append((machine_idx, start_time, end_time, job_idx))
        
        # Update completion times
        job_completion[job_idx] = end_time
        machine_completion[machine_idx] = end_time
    
    # Plotting
    fig, gnt = plt.subplots(figsize=(16, 6))  # Set a larger figure size
    gnt.set_xlabel('Time')
    gnt.set_ylabel('Machines')
    gnt.set_title('Gantt Chart for Job Shop Scheduling')

    # Set the y-ticks to represent machines
    gnt.set_yticks(range(n_machines))
    gnt.set_yticklabels([f'Machine {i+1}' for i in range(n_machines)])

    # Add tasks to the Gantt chart
    for machine_idx, start_time, end_time, job_idx in tasks:
        gnt.broken_barh([(start_time, end_time - start_time)], (machine_idx - 0.4, 0.8), 
                         facecolors=colors(job_idx / n_jobs))  # Assign color based on job index
        gnt.text(start_time + (end_time - start_time) / 2, machine_idx, 
                 f'J {job_idx + 1}', ha='center', va='center', color='black')

    plt.show()

# After solving the problem, visualize the best solution
plot_gantt_chart(model.g_best.solution, job_times)

In [None]:
job_times