# An exploration into the optimal backoff policy for parallel processed tasks with contention

The key details of the problem:

* There are $N$ processors that operate in parallel that are capable of processing any task.
* There is an unbounded queue from which each of the processors get tasks.
* A processor can only take one task off the queue at a time.
* A processor tries to start a task, but if that task interferes with the processing of another task, the processor returns the task to the queue. Once returned to the queue, the processor tries to get another task to perform.
* A task can be delayed such that it cannot get to the head of the queue before a predefined elapsed period has passed from being placed on the queue.
* Tasks have different execution times. Most tasks are very quick and a few are very slow.
* Most tasks cause little to no contention, whereas others cause lots of contention.

## Probabilistic modelling of tasks

### Simulation element

In [1]:
class SimulationElement:
    def tick(self):
        raise NotImplementedError()

### Task

In [2]:
class Task:
    def __init__(self, execution_cost, earliest_time_to_run, contention_fn):
        assert type(execution_cost) == int and execution_cost > 0
        assert type(earliest_time_to_run) == int and earliest_time_to_run >= 0
        
        self.failure_times = []
        self.excution_cost = execution_cost
        self.earliest_time_to_run = earliest_time_to_run
        self.contention_fn = contention_fn
        
        self.completion_time = None

### Queue

In [3]:
class Queue(SimulationElement):
    def __init__(self):
        self.elements = []
        self.current_time = 0
    
    def tick(self):
        self.current_time += 1
    
    def get(self):
        """Get a task from the queue (returns None if none are available)."""
        
        for idx,e in enumerate(self.elements):
            if e.earliest_time_to_run <= self.current_time:
                return self.elements.pop(idx)
        
        return None                
        
    def is_empty(self):
        """Is the queue empty?"""
        return len(self.elements) == 0
    
    def put(self, task):
        assert type(task) == Task
        self.elements.append(task)

In [4]:
# Queue tests
q = Queue()
assert q.current_time == 0
q.tick()
assert q.current_time == 1
assert q.is_empty()

# Create a queue and put a single task onto the queue
q = Queue()
t1 = Task(1, 0, None)
q.put(t1)
assert len(q.elements) == 1
assert q.get() == t1
assert q.get() is None

# Create a queue, but place a task that can't start immediately
q = Queue()
t1 = Task(1, 1, None)
q.put(t1)
assert q.get() is None
q.tick()
assert q.get() == t1

# Create a queue and place tasks that have to be performed in reverse order
q = Queue()
t1 = Task(1, 1, None)
q.put(t1)
t2 = Task(1, 0, None)
q.put(t2)
assert q.get() == t2
q.tick()
assert q.get() == t1

### Contention functions

In [5]:
def no_contention_function(num_active_processors):
    return False

In [13]:
def always_contention_function(num_active_processors):
    return num_active_processors > 1

### Backoff policy functions

In [6]:
def quickest_retry_policy(task, current_time):
    assert type(task) == Task
    assert type(current_time) == int
    
    task.earliest_time_to_run = 0

In [7]:
t1 = Task(1, 100, None)
quickest_retry_policy(t1, 0)
assert t1.earliest_time_to_run == 0

### Processor

In [8]:
class Processor(SimulationElement):
    
    # States used in the state machine
    STATE_READY = "Ready"
    STATE_PROCESSING = "Processing"
    
    def __init__(self, backoff_policy_fn, queue, completed_task_list, num_active_processors_fn):
        assert type(completed) == list
        
        self.state = Processor.STATE_READY
        self.backoff_policy = backoff_policy_fn
        self.queue = queue
        self.completed_task_list = completed_task_list
        self.num_active_processors_fn = num_active_processors_fn
        
        self.computation_performed = 0
        self.task = None
        self.current_time = 0
    
    def tick(self):
        self.current_time += 1
        
        # Run the state machine
        if self.state == Processor.STATE_READY:
            self.state_ready()
        elif self.state == Processor.STATE_PROCESSING:
            self.state_processing()
        else:
            raise ValueError(f"Unknown state: {self.state}")
        
    def state_ready(self):
        
        # Try to get a Task from the queue
        self.task = self.queue.get()
        
        # Check to see if the Task can be performed without contention
        if self.task is not None:
            
            # The current processor is active, so the contention function has to be given
            # one less than the number of active processors
            contention = self.task.contention_fn(self.num_active_processors_fn() - 1)
            assert type(contention) == bool
            
            if contention:
                # Mutate the task due to the failure
                self.task_failure(self.task)
                self.queue.put(self.task)
                self.task = None
            else:
                self.state = Processor.STATE_PROCESSING
                self.computation_performed = 0

    def state_processing(self):
        self.computation_performed += 1
        
        if self.task.excution_cost == self.computation_performed:
            self.task.completion_time = self.current_time
            self.completed_task_list.append(self.task)
            self.task = None
            self.state = Processor.STATE_READY
            
    def task_failure(self, task):
        assert type(task) == Task
        
        task.failure_times.append(self.current_time)
        task.earliest_time_to_run = backoff_policy_fn(task, self.current_time)

In [9]:
# Processor with an empty queue
q = Queue()
completed = []
p = Processor(None, q, completed, None)
assert p.state == Processor.STATE_READY
p.tick()
assert p.state == Processor.STATE_READY

# Processor with a queue with one task and no contention
num_active_processors_fn = lambda: 0
q = Queue()
completed = []
t1 = Task(1, 0, no_contention_function)
q.put(t1)
p = Processor(quickest_retry_policy, q, completed, num_active_processors_fn)

p.tick()
assert p.current_time == 1
assert p.state == Processor.STATE_PROCESSING
assert p.task == t1
assert completed == []

p.tick()
assert p.current_time == 2
assert p.state == Processor.STATE_READY
assert completed == [t1]

p.tick()
assert p.current_time == 3
assert p.state == Processor.STATE_READY
assert completed == [t1]

### Runner

In [10]:
class Runner:
    def __init__(self, number_processors, backoff_policy_fn, tasks, max_timesteps):
        assert type(number_processors) == int and number_processors > 0
        assert type(tasks) and all([type(t) == Task for t in tasks])
        assert max_timesteps is None or (type(max_timesteps) == int and max_timesteps >= 0)
        
        self.number_processors = number_processors
        self.backoff_policy_fn = backoff_policy_fn
        self.max_timesteps = max_timesteps
        
        # Place the tasks on the queue
        self.make_and_populate_queue(tasks)
        
        self.completed_task_list = []
        self.current_time = 0
        
        # Make the processors
        self.make_processors()        
        
    def make_and_populate_queue(self, tasks):
        """Make and populate the queue."""
        
        self.queue = Queue()
        for t in tasks:
            self.queue.put(t)
            
    def make_processors(self):
        self.processors = []
        
        for idx in range(self.number_processors):
            
            # Make the processor
            proc = Processor(self.backoff_policy_fn, 
                             self.queue, 
                             self.completed_task_list, 
                             self.number_active_processors)
            
            self.processors.append(proc)
    
    def number_active_processors(self):
        count = 0
        for p in self.processors:
            if p.state == Processor.STATE_PROCESSING:
                count += 1
                
        return count
    
    def run(self):
        
        self.current_time += 1
        
        while not self.queue.is_empty() or self.number_active_processors() > 0:
            
            if self.max_timesteps is not None and self.current_time == self.max_timesteps:
                return
            
            for p in self.processors:
                p.tick()

In [11]:
tasks = [Task(1, 0, no_contention_function)]
r = Runner(1, quickest_retry_policy, tasks, None)
assert r.queue.elements == tasks
assert len(r.processors) == 1

r.run()
assert r.queue.elements == []
assert len(r.completed_task_list) == 1

In [12]:
r.completed_task_list[0].completion_time

2