In [32]:
import copy
import math
import random
import os
import re
import numpy as np

from abc import ABC
from typing import List

from jmetal.algorithm.multiobjective.nsgaii import NSGAII
from jmetal.core.operator import Crossover, Mutation
from jmetal.core.problem import IntegerProblem
from jmetal.core.solution import IntegerSolution
from jmetal.core.solution import Solution
from jmetal.util.termination_criterion import StoppingByEvaluations
from jmetal.util.solution import print_function_values_to_file, print_variables_to_file

In [20]:
def dcg(g):
    res = copy.deepcopy(g)
    for i in range(len(g)):
        if i > 0:
            res[i] = res[i - 1] + g[i] / math.log2(i + 1)
    return res


def compute_avg_ndcg(relevance_list: List[int]) -> float:
    """ Computes average nDCG for the fitness function. """
    l = len(relevance_list)
    sorted_scores = sorted(relevance_list, reverse=True)

    dcg_vals = dcg(relevance_list)
    idcg_vals = dcg(sorted_scores)

    dwp_vals = [dcg_vals[i] / idcg_vals[i] if idcg_vals[i] != 0 else 0 for i in range(l)]
    return sum(dwp_vals) / l

In [21]:
class RelevanceProfileProblem(IntegerProblem, ABC):
    def __init__(self, list_length: int, min_relevance: int, max_relevance: int, target_fitness: float, R: List[int]):
        super().__init__()
        self.length = list_length
        self.lower_bound = [min_relevance] * self.length
        self.upper_bound = [max_relevance] * self.length
        self.number_of_variables = self.length
        self.number_of_objectives = 1
        self.number_of_constraints = len(R)
        self.target_fitness = target_fitness
        self.R = R
        
    def name(self) -> str:
        return "Relevance Profile Problem, inherits Integer Problem"
        
    def number_of_constraints(self) -> int:
        return self.number_of_constraints
    
    def number_of_objectives(self) -> int:
        return self.number_of_objectives

    def create_solution(self) -> IntegerSolution:
        values = [0] * self.length
        relevance_counts = np.zeros_like(self.R)
        for idx in range(self.length):
            g = np.random.geometric(0.7) - 1
            
            rel = max(self.lower_bound[0], min(g, self.upper_bound[0]))
            
            if relevance_counts[rel] >= self.R[rel]:
                rel = 0
            
            relevance_counts[rel] += 1
            values[idx] = rel
                
        solution = IntegerSolution(
            lower_bound=self.lower_bound,
            upper_bound=self.upper_bound,
            number_of_objectives=self.number_of_objectives,
            number_of_constraints=self.number_of_constraints
        )
        solution.variables = values
        return solution
    
    def evaluate(self, solution: IntegerSolution) -> None:
        """ Evaluate the solution based on average nDCG with respect to target fitness. """
        relevance_counts = np.unique(solution.variables, return_counts=True)[0]
        
        if np.any(relevance_counts > self.R):
            solution.objectives[0] = float('inf')
            return
        
        andcg = compute_avg_ndcg(solution.variables)
        solution.objectives[0] = abs(self.target_fitness - andcg)


In [22]:
class CustomCrossover(Crossover, ABC):
    def __init__(self, L: int):
        super().__init__(probability=1.0)  # Always apply
        self.L = L
        
    def get_name(self) -> str:
        return "Custom Crossover for ranking profiles. "
    
    def get_number_of_parents(self) -> int:
        return 2
    
    def get_number_of_children(self) -> int:
        return 2

    def execute(self, parents: List[Solution] )-> List[Solution]:
        """ Executes the crossover, by either adding or multiplying items from two parent solutions. """
        parent1, parent2 = parents[0].variables, parents[1].variables
        child1, child2 = [], []
        for a, b in zip(parent1, parent2):
            if random.random() < 0.5:
                c1 = (a + b) % (self.L + 1)
                c2 = (a * b) % (self.L + 1)
            else:
                c1 = (a * b) % (self.L + 1)
                c2 = (a + b) % (self.L + 1)
            child1.append(c1)
            child2.append(c2)

        offspring1 = copy.deepcopy(parents[0])
        offspring2 = copy.deepcopy(parents[1])
        offspring1.variables = child1
        offspring2.variables = child2
        return [offspring1, offspring2]

class CustomMutation(Mutation, ABC):
    def __init__(self, probability, L: int):
        super().__init__(probability)
        self.L = L
        
    def get_name(self) -> str:
        return "Custom Mutation for ranking profiles. "

    def execute(self, solution: Solution) -> Solution:
        """ Executes the mutation, by randomly swapping two items in a solution or adding a random quantity to an item. """
        for i in range(len(solution.variables)):
            if random.random() < self.probability:
                if random.random() < 0.5:
                    # Swap the selected index with another random index
                    j = random.randint(0, len(solution.variables) - 1)
                    solution.variables[i], solution.variables[j] = solution.variables[j], solution.variables[i]
                else:
                    # Add a random quantity mod L + 1 to the selected index
                    solution.variables[i] = (solution.variables[i] + random.randint(0, self.L)) % (self.L + 1)
        return solution


In [36]:
target_ndcgs = {}

data_dir = './data'
for filename in os.listdir(data_dir):
   with open(os.path.join(data_dir, filename), 'r') as f:
       for line in f:
           parts = re.split(r'\s+', line.strip())
           if len(parts) >= 2:
               key = parts[0]
               value = parts[2]
               if key == 'runid':
                   target_ndcgs[value] = 0
               if key == 'ndcg':
                   target_ndcgs[list(target_ndcgs.keys())[-1]] = value
                   
print(target_ndcgs)

{'agg-cocondenser': '0.3519', 'bm25_splades': '0.3848', 'cip_run_1': '0.4834', 'cip_run_2': '0.4834', 'cip_run_3': '0.4688'}


In [23]:
min_relevance = 0
max_relevance = 3
R = [float('inf'), 50, 50, 50]
assert len(R) == max_relevance + 1 - min_relevance
problem = RelevanceProfileProblem(list_length= 100, min_relevance=min_relevance, max_relevance=max_relevance, target_fitness=0.8, R=R)
L = max_relevance - min_relevance

algorithm = NSGAII(
    problem=problem,
    population_size=10,
    offspring_population_size=10,
    mutation=CustomMutation(probability=1.0, L=L),
    crossover=CustomCrossover(L=L),
    termination_criterion=StoppingByEvaluations(max_evaluations=1000)
)

algorithm.run()
result = algorithm.result()

print_function_values_to_file(result, 'FUN')
print_variables_to_file(result, 'VAR')


[2025-05-08 12:26:52,904] [jmetal.core.algorithm] [DEBUG] Creating initial set of solutions...
[2025-05-08 12:26:52,904] [jmetal.core.algorithm] [DEBUG] Evaluating solutions...
[2025-05-08 12:26:52,911] [jmetal.core.algorithm] [DEBUG] Initializing progress...
[2025-05-08 12:26:52,911] [jmetal.core.algorithm] [DEBUG] Running main loop until termination criteria is met
[2025-05-08 12:26:53,203] [jmetal.core.algorithm] [DEBUG] Finished!
[2025-05-08 12:26:53,203] [jmetal.util.solution] [INFO] Output file (function values): FUN
[2025-05-08 12:26:53,203] [jmetal.util.solution] [INFO] Output file (variables): VAR
