Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize task update based on their status #479

Merged
merged 17 commits into from
Dec 23, 2021

Conversation

mcornaton
Copy link
Contributor

No description provided.

@mcornaton mcornaton requested a review from a team December 16, 2021 10:28
@mcornaton
Copy link
Contributor Author

If we ever need it, this could be easily updated to add deadlines ordering with thenComparing in createQueue method:

final Comparator<Task> comparator = Comparator.comparing(task -> task.getCurrentStatus().ordinal(), Comparator.reverseOrder());


PriorityBlockingQueue<Task> createQueue() {
// Tasks whose status are the more advanced should be computed before others
final Comparator<Task> comparator = Comparator.comparing(task -> task.getCurrentStatus().ordinal(), Comparator.reverseOrder());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why sorting them by status instead of contribution deadline?

Copy link
Contributor Author

@mcornaton mcornaton Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we work with bag of tasks, every task has the same contributionDeadline... But some of them may be more advanced in their statuses, so they have priority to be updated.
However, I agree with you on the fact that sorting tasks by contribution deadline may be great in addition to status ordering and that's something that should not be too hard to implement. See #479 (comment).

@jeremyjams
Copy link
Member

Why not something like:

    @Scheduled(fixedDelay = 1000)
    public void consumeAndNotify() {
        if (consumer == null){
            log.warn("Waiting for consumer before consuming [queueSize:{}]", queue.size());
            return;
        }

        while (!Thread.currentThread().isInterrupted()){
            waitForTaskUpdateRequest();
        }
    }

    private void waitForTaskUpdateRequest() {
        log.info("Waiting requests from publisher [queueSize:{}]", queue.size());
        try {
            if (taskUpdateExecutor.getActiveCount() < TASK_UPDATE_THREADS_POOL_SIZE){
                Task task = queue.take();
                CompletableFuture.runAsync(() -> {
                    synchronized (locks.computeIfAbsent(task.getChainTaskId(), key -> new Object())){ // require one update on a same task at a time
                        consumer.onTaskUpdateRequest(task.getChainTaskId()); // synchronously update task
                    }
                }, taskUpdateExecutor);
            }
        } catch (InterruptedException e) {
            log.error("The unexpected happened", e);
            Thread.currentThread().interrupt();
        }
    }

?

@mcornaton
Copy link
Contributor Author

@jeremyjams: If you already have TASK_UPDATE_THREADS_POOL_SIZE updating threads, you'll trigger a lot of calls to waitForTaskUpdateRequest and load the Scheduler... Which is what I'm trying to avoid there.

@jbern0rd
Copy link
Contributor

Implementation to discuss:

  • publishRequest changes to accept a chainTaskId as a parameter. This id is written in a Queue/List, ...
  • We use a method to poll on the list (wait on empty list, work on requests)
  • We add in taskRepository a method List<Task> findByChainTaskId(List<String ids, Sort sort)
  • The new method is used to directly fetch the tasks with the correct order
  • The requests update are handled by following the order of tasks returned by the new method
  • chainTaskIds from handled tasks are removed from the list

This may allow us to remove some bottlenecks and scheduling problems

*
* @return A {@link Stream} as described above.
*/
public Stream<TaskUpdate> streamAsTaskUpdate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamOfTaskUpdate()
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like of. The idea here is to tell that the elements in the queue are streamed as TaskUpdates. I don't think streamOfTaskUpdate carries the same meaning: it tells this method returns a stream of TaskUpdates but does not say anything about where these objects come from.

Copy link
Member

@jeremyjams jeremyjams left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well-done :)

Copy link
Contributor

@jbern0rd jbern0rd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this 👍

@mcornaton mcornaton merged commit 1dcdb93 into develop Dec 23, 2021
@mcornaton mcornaton deleted the feature/prioritize-task-update branch December 23, 2021 14:45
@mcornaton mcornaton mentioned this pull request Dec 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants