In [None]:
import numpy as np
import pandas as pd
import random
from typing import List
from itertools import permutations

In [None]:
class Task:

    def __init__(
        self,
        id: int,
        type: int,
        length: int,
        deadline: int,
        oh: int,
        penalty: int,
    ) -> None:
        self.id = id
        self.type = type
        self.length = length
        self.deadline = deadline
        self.oh = oh
        self.penalty = penalty


def split_reminder(x: np.ndarray, chunk_size: int, axis=0):
    indices = np.arange(chunk_size, x.shape[axis], chunk_size)
    return np.array_split(x, indices, axis)

def move_elements(arr, consecutive_idcs: np.ndarray, new_place_id: int):
    assert np.all(consecutive_idcs[1:] - consecutive_idcs[:-1] == 1)
    move_left = new_place_id < consecutive_idcs[0]
    
    tomove = np.array(arr[consecutive_idcs])
    if move_left:
        tobemoved = np.array(arr[new_place_id:consecutive_idcs[0]])
        arr[new_place_id:new_place_id+len(consecutive_idcs)] = tomove
        arr[new_place_id+len(consecutive_idcs):consecutive_idcs[-1] + 1] = tobemoved
    else:
        tobemoved = np.array(arr[consecutive_idcs[-1] + 1:new_place_id])
        arr[new_place_id-len(consecutive_idcs):new_place_id] = tomove
        arr[consecutive_idcs[0]:new_place_id-len(consecutive_idcs)] = tobemoved

    return arr

def move_and_permute_elements(arr, seq_src, seq_trg):
    if seq_src[0] < seq_trg[0]:
        arr = move_elements(arr, seq_src, seq_trg[0])
        id_src = seq_trg[0] - len(seq_src)
        id_trg = seq_trg[0]
        swap_arr = np.copy(arr)
        swap_arr[id_src], swap_arr[id_trg] = arr[id_trg], arr[id_src]
    else:
        arr = move_elements(arr, seq_src, seq_trg[-1] + 1)
        id_src = seq_trg[0] + len(seq_trg)
        id_trg = seq_trg[0]
        swap_arr = np.copy(arr)
        swap_arr[id_src], swap_arr[id_trg] = arr[id_trg], arr[id_src]
    return [arr, swap_arr]

def move_consecutive_types(arr, type):
    all_arr = []
    type_indices = np.where(arr[:, 0] == type)[0]
    consecutive_sequences = np.split(type_indices, np.where(np.diff(type_indices) != 1)[0] + 1)
    if len(consecutive_sequences) > 1:
        for sequence_prev, sequence_next in zip(consecutive_sequences[:-1], consecutive_sequences[1:]):
            all_arr.extend(move_and_permute_elements(np.copy(arr), sequence_prev, sequence_next))
            all_arr.extend(move_and_permute_elements(np.copy(arr), sequence_next, sequence_prev))

    return all_arr

class TaskManager:

    def __init__(self) -> None:
        self.types = []
        self.lengths = []
        self.deadlines = []
        self.ohs = []
        self.penalties = []
        self.ids = []
        self.tasks: List[Task] = []
        self.task_board = None
        self.heuristics = []
        self.id_counter: int = 0
    
    def add_task(
        self,
        type: int,
        length: int,
        deadline: int,
        oh: int,
        penalty: int,
    ):
        task = Task(
            id=self.id_counter,
            type=type,
            length=length,
            deadline=deadline,
            oh=oh,
            penalty=penalty,
        )
        self.tasks.append(task)
        self.id_counter += 1
    
    def add_random_task(self):
        type = random.randint(0, 5)
        deadline = random.randint(50, 365)
        length = random.randint(1, 5)
        oh = random.randint(1, 3)
        penalty = random.randint(1, 3)

        self.add_task(type, length, deadline, oh, penalty)

    def complete(self):
        for task in self.tasks:
            self.types.append(task.type)
            self.lengths.append(task.length)
            self.deadlines.append(task.deadline)
            self.ohs.append(task.oh)
            self.penalties.append(task.penalty)
            self.ids.append(task.id)
        self.types = np.array(self.types)
        self.lengths = np.array(self.lengths)
        self.deadlines = np.array(self.deadlines)
        self.ohs = np.array(self.ohs)
        self.penalties = np.array(self.penalties)
        self.ids = np.array(self.ids)

        self.task_board: pd.DataFrame = pd.DataFrame({
            'type': self.types,
            'length': self.lengths,
            'deadline': self.deadlines,
            'oh': self.ohs,
            'penalty': self.penalties,
            'id': self.ids,
        })

        deadline_heuristic = self.task_board.sort_values(by=['deadline', 'type']).reset_index(drop=True)
        difference_heuristic = self.task_board.assign(
                difference=lambda df: df['deadline'] - df['length']
            ).sort_values(by=['difference', 'type']).reset_index(drop=True)
        
        self.heuristics = [
            deadline_heuristic,
            difference_heuristic,
        ]
    
    def save_tasks(self, filename: str):
        if self.task_board is None:
            raise Exception('No tasks are present. Maybe you missed to call the "complete()" method before?')
        self.task_board.to_csv(filename)
    
    def load_tasks(self, filename: str):
        task_board = pd.read_csv(filename, index_col=0)
        for index, row in task_board.iterrows():
            self.add_task(*row[:-1])
    
    def solve(self, chunk_size: int = 5):
        solutions = []
        best_penalty_scores = []
        best_length_scores = []
        for heuristic in self.heuristics:
            solution = None
            chunks = split_reminder(heuristic, chunk_size)
            for chunk in chunks:
                solution, best_penalty_score, best_length_score = self.solve_chunk(chunk, solution)
                if len(solution.shape) == 2:
                    solution = solution[np.newaxis, ...]

            solution, best_penalty_score, best_length_score = self.refine_task_types(solution)
            
            solutions.append(solution)
            best_penalty_scores.append(best_penalty_score)
            best_length_scores.append(best_length_score)
        
        best_sorted_solution_indices = np.lexsort(
            (
                [l.sum(axis=-1)[0] for l in best_length_scores],
                [p.sum(axis=-1)[0] for p in best_penalty_scores],
            )
        )[0]
        return solutions[best_sorted_solution_indices], best_penalty_scores[best_sorted_solution_indices], best_length_scores[best_sorted_solution_indices]
    
    def solve_chunk(self, chunk, solution):
        num_tasks = len(chunk)
        perm_indices = np.stack(list(permutations(np.arange(num_tasks))), axis=0)
        options = chunk.values[perm_indices]
        if solution is not None:
            combinations = []
            for row in solution:
                combinations.append(
                    np.concatenate((np.repeat(row[np.newaxis, ...], len(options), axis=0), options), axis=-2)
                )
            options = np.concatenate(combinations, axis=0)
        return self.evaluate(options)

    def refine_task_types(self, best_options):
        u_types = np.unique(best_options[..., 0])

        combinations = []
        for row in best_options:
            for type in u_types:
                new_combination = move_consecutive_types(row, type)
                if len(new_combination) > 0:
                    combinations.append(np.stack(new_combination, axis=0))
        if len(combinations) > 0:
            options = np.concatenate((best_options, np.concatenate(combinations, axis=0)), axis=0)
            new_best_options, best_penalty_score, best_length_score = self.evaluate(options)
            if len(new_best_options)==len(best_options) and np.all(new_best_options==best_options):
                return new_best_options, best_penalty_score, best_length_score
            return self.refine_task_types(new_best_options)
        return self.evaluate(best_options)
    
    def evaluate(self, options: np.ndarray):
        _options = np.copy(options)
        task_type = _options[..., 0]
        oh_mask = np.zeros_like(task_type, dtype=bool)
        oh_mask[:, 1:] = task_type[:, :-1] != task_type[:, 1:]
        oh_mask[:, 0] = True

        task_length = _options[:, :, 1]
        task_length[oh_mask] += _options[oh_mask][:, 3]

        M = np.cumsum(task_length, axis=-1)
        M_zeros = np.zeros_like(M)
        deadline = _options[..., 2]
        task_penalty = _options[..., 4]
        penalty = np.maximum(M_zeros, M - deadline) * task_penalty
        penalty_score = np.sum(penalty, axis=-1)
        length_score = np.sum(task_length, axis=-1)
        sort_indices = np.lexsort((length_score, penalty_score))
        sorted_penalty_score = penalty_score[sort_indices]
        sorted_length_score = length_score[sort_indices]
        min_penalty_indices = np.where(sorted_penalty_score == np.min(penalty_score))[0]
        min_length_indices = np.where(sorted_length_score[min_penalty_indices] == np.min(sorted_length_score[min_penalty_indices]))[0]

        passed_indices = sort_indices[min_length_indices]
        _, first_occurrence_indices = np.unique(task_type[passed_indices][:, -1], return_index=True)

        best_options_indices = passed_indices[first_occurrence_indices]
        best_options = options[best_options_indices]
        best_penalty_score = penalty[best_options_indices]
        best_length_score = task_length[best_options_indices]

        return best_options, best_penalty_score, best_length_score

In [None]:
tm = TaskManager()
# --- #
# for _ in range(100):
#     tm.add_random_task()
# --- OR --- #
# tm.save_tasks('benchmark_tasks.csv')
# --- #
tm.load_tasks('benchmark_tasks.csv')
tm.complete()

In [None]:
solution, penalty, length = tm.solve(chunk_size=6)
print('Number of equivalent solutions found:', len(solution))
print('Penalty per task', penalty)
print('Cumulative required time', np.cumsum(length, axis=-1))
print('Ordered task indices', solution[..., 5])
print('Total penalty:', penalty.sum(axis=1))
print('Total length:', length.sum(axis=1))