-
Notifications
You must be signed in to change notification settings - Fork 504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix task scheduler bugs: dead lock and dup server-info #1079
Conversation
1.fix dead lock when scheduler-worker hold scheduleTasks lock then query tasks, tx() method synchronized(this) wait indefinitely. 2.delete server-info when graph close 3.throw exception if task if failed in syncWait fix #1075 Change-Id: Icb8d5073f024f0598d439d738d6c0e2caa6fd0cd
Change-Id: I69efd7d48610619379975e7d284e5dab88e30776
* standardize the status of tasks * remove server info only after storage is initialized * cancel(task) don't remove task from memory if it's single-node server * notifyNewTask only notify when the queue is empty Change-Id: I874724519e516f600ee8eb1b4760a607421a844a
84875cc
to
795ef89
Compare
Codecov Report
@@ Coverage Diff @@
## master #1079 +/- ##
============================================
+ Coverage 68.94% 69.16% +0.22%
- Complexity 5543 5670 +127
============================================
Files 331 331
Lines 26880 27260 +380
Branches 3842 3986 +144
============================================
+ Hits 18533 18855 +322
- Misses 6532 6558 +26
- Partials 1815 1847 +32 Continue to review full report at Codecov.
|
@@ -242,17 +237,19 @@ public int pendingTasks() { | |||
} | |||
|
|||
protected void notifyNewTask(HugeTask<?> task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parameter task is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does not matter, just preserved
this.schedulerExecutor.submit(this::scheduleOrExecuteJob); | ||
Queue<Runnable> queue = ((ThreadPoolExecutor) this.schedulerExecutor) | ||
.getQueue(); | ||
if (queue.size() <= 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may schedule more tasks at the same time, and these tasks will not be processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's ok to not notify again if there are more than 1 task in queue(like two, one is timer task, one is immediate task), we don't want too many immediate tasks to be inserted into queue, one notify will cause all the tasks processed.
protected synchronized <V> void scheduleTasks() { | ||
// Master schedule all queued tasks to suitable servers | ||
protected synchronized void scheduleTasks() { | ||
// Master server schedule all queued tasks to suitable worker servers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
queued => scheduling
server.load(server.load() + task.load()); | ||
this.serverManager().save(server); | ||
LOG.info("Schedule task {} to server {}", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
must save server info here. Otherwise all tasks will be scheduled to same worker
Implement increaseLoad(int load)
in class HugeServerInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use batch read write
@@ -377,15 +402,14 @@ private void unlistenChanges() { | |||
* initialized when canceled. | |||
*/ | |||
this.initTaskCallable(task); | |||
@SuppressWarnings("unchecked") | |||
HugeTask<V> memTask = (HugeTask<V>) this.tasks.get(task.id()); | |||
HugeTask<?> memTask = this.tasks.get(task.id()); | |||
if (memTask != null) { | |||
task = memTask; | |||
} | |||
Id taskServer = task.server(); | |||
if (taskServer != null && taskServer.equals(server)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to line 397 and change to:
Id taskServer = task.server();
assert taskServer != null;
if (!taskServer.equals(server)) {
continue;
}
@@ -212,8 +273,8 @@ protected Id save(HugeServerInfo server) { | |||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove line 218 due to system tx is auto commit
* this lock through scheduleTasks(), then query tasks and wait | ||
* for db-worker thread after call(), the tx may not be initialized | ||
* but can't catch this lock, then cause dead lock. | ||
* We just use this.eventListener ad a monitor here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a monitor
69ddc1f
to
cad38cf
Compare
Change-Id: I23c845ee93c05ebec1fbea77fdc36a5cdb2614c2
cad38cf
to
176c988
Compare
E.checkArgumentNotNull(task, "Task can't be null"); | ||
this.checkOnMasterNode("schedule"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to line 234
* is running but removed from memory. | ||
*/ | ||
this.remove(task); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else means task is on master, cancel it directly seems more efficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We assume cancel is not a frequent operation, so it is not optimized
Change-Id: I74a0164fd755bae885575a1770995502def6e88b
1. fix dead lock when scheduler-worker hold scheduleTasks lock then query tasks, tx() method synchronized(this) wait indefinitely. 2. delete server-info when graph close (when store is initialized) 3. throw exception if task if failed in syncWait (just for debug tinkerpop tests) 4. submit task if there is only single node 5. standardize the status of tasks (add scheduling/scheduled) 6. cancel-task don't remove task from memory if it's single-node server 7. notifyNewTask only notify when the queue is empty(size=1 means one timer task) 8. read/write server-infos in bache duting one scheduling fix #1075 Change-Id: Icb8d5073f024f0598d439d738d6c0e2caa6fd0cd
query tasks, tx() method synchronized(this) wait indefinitely.
fix #1075
Change-Id: Icb8d5073f024f0598d439d738d6c0e2caa6fd0cd