-
Notifications
You must be signed in to change notification settings - Fork 504
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async job schedule #1030
Async job schedule #1030
Conversation
@@ -161,11 +161,11 @@ public void delete(@Context GraphManager manager, | |||
|
|||
TaskScheduler scheduler = graph(manager, graph).taskScheduler(); | |||
HugeTask<?> task = scheduler.task(IdGenerator.of(id)); | |||
if (!task.completed() && scheduler.cancel(task)) { | |||
if (!task.completed() && | |||
(scheduler.cancel(task) || task.cancelling())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move task.cancelling() before scheduler.cancel(task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scheduler.cancel(task)
will change task status to CANCELLING
ServerInfoManager manager = scheduler.serverManager(); | ||
HugeServerInfo serverInfo = manager.serverInfo(); | ||
serverInfo.load(serverInfo.load() - this.load); | ||
manager.save(serverInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensure don't throw any exception out of done(), add a method ServerInfoManager.decreaseLoad(int) and just log in that method if error
} | ||
|
||
public void addScheduler(HugeGraphParams graph) { | ||
E.checkArgumentNotNull(graph, "The graph can't be null"); | ||
ExecutorService task = this.taskExecutor; | ||
ExecutorService db = this.dbExecutor; | ||
this.schedulers.put(graph, new StandardTaskScheduler(graph, task, db)); | ||
TaskScheduler taskScheduler = new StandardTaskScheduler(graph, task, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to scheduler to not wrap line
hugegraph-core/src/main/java/com/baidu/hugegraph/cluster/ServerInfoManager.java
Outdated
Show resolved
Hide resolved
this.graph.schemaTransaction().addVertexLabel(label); | ||
|
||
// Create index | ||
this.createIndexLabel(label, P.ROLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems no need to query by role
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
query by role in initServerInfo
of com/baidu/hugegraph/cluster/ServerInfoManager.java
|
||
public class HugeServerInfo { | ||
|
||
public static final int MAX_LOAD = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
report MAX_LOAD by itself and add maxLoad field
1, TASK_SCHEDULER); | ||
// Start after 10s waiting for HugeGraphServer startup | ||
this.taskScheduler.scheduleWithFixedDelay(this::periodicJob, | ||
10L, 3, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define 3 as const var SCHEDULE_PERIOD
public class HugeServerInfo { | ||
|
||
public static final int MAX_LOAD = 1000; | ||
public static final long EXPIRED_INTERVAL = 5000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SCHEDULE_PERIOD * 3
f277c9f
to
e96baf2
Compare
"server.role", | ||
"The role of nodes in the cluster, available type are " + | ||
"[master, worker]", | ||
disallowEmpty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use allowValues
@@ -47,16 +47,15 @@ | |||
private EventListener storeEventListener; | |||
private EventListener cacheEventListener; | |||
|
|||
private final Map<HugeType, Boolean> cachedTypes; | |||
private static final Map<HugeGraphParams, CachedTypes> CACHED_TYPES = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<String, CachedTypes>
// Update server heartbeat | ||
server.heartbeat(); | ||
|
||
// Master schedule tasks not found suitable server when created |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve "not found suitable server when created"
29f2258
to
e68cc2a
Compare
@@ -267,6 +267,8 @@ public void truncateBackend() { | |||
|
|||
this.storeProvider.truncate(); | |||
this.storeProvider.initSystemInfo(this); | |||
this.serverStarted(this.serverManager.serverId().asString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not change the parameter type
@@ -47,16 +47,15 @@ | |||
private EventListener storeEventListener; | |||
private EventListener cacheEventListener; | |||
|
|||
private final Map<HugeType, Boolean> cachedTypes; | |||
private static final Map<String, CachedTypes> CACHED_TYPES = | |||
new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer align with CACHED_TYPES
} | ||
return results; | ||
} | ||
} | ||
|
||
private static class CachedTypes | ||
extends ConcurrentHashMap<HugeType, Boolean> {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align with class
e, e.getMessage()); | ||
} finally { | ||
((StandardTaskScheduler) task.scheduler()).remove(id); | ||
// ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't ignore exception here
if (task.status.code() > TaskStatus.QUEUED.code()) { | ||
task.get(); | ||
} | ||
task = this.scheduler().task(this.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should sleep between tow times of task-read
@@ -322,12 +325,13 @@ private void unlistenChanges() { | |||
HugeServerInfo server; | |||
do { | |||
Iterator<HugeServerInfo> servers = this.serverManager() | |||
.serverInfos(100L, page); | |||
.serverInfos(10L, page); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add const var
@@ -388,9 +392,10 @@ protected ServerInfoManager serverManager() { | |||
return this.graph.serverManager(); | |||
} | |||
|
|||
protected void remove(Id id) { | |||
public void remove(Id id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep protected
task = scheduler.task(id); | ||
Assert.assertEquals(TaskStatus.FAILED, task.status()); | ||
Assert.assertContains("Not allowed to remove vertex label 'book' " + | ||
"because the edge label 'write' still link with" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap line after "link "
@Override | ||
public void done() { | ||
StandardTaskScheduler scheduler = Whitebox.getInternalState( | ||
this.task(), "scheduler"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not call this.task().scheduler()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HugeTask doesn't have scheduler()
Already change to this.graph().taskScheduler()
E.checkArgumentNotNull(task, "Task can't be null"); | ||
if (!this.serverManager().master()) { | ||
return; | ||
} | ||
if (!task.completed()) { | ||
// The task scheduled to workers, waiting for worker cancel | ||
task = this.task(task.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why read again
} | ||
|
||
private <V> Future<?> submitTask(HugeTask<V> task) { | ||
public <V> Future<?> submitTask(HugeTask<V> task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep private
public static final ConfigOption<String> SERVER_ROLE = | ||
new ConfigOption<>( | ||
"server.role", | ||
"The role of nodes in the cluster, available type are " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
types
protected Object[] asArray() { | ||
E.checkState(this.id != null, "Server id can't be null"); | ||
|
||
List<Object> list = new ArrayList<>(8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialCapacity should be 10, actually DEFAULT_CAPACITY is 10
|
||
public static final int SCHEDULE_PERIOD = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comment: Unit second
for (String graphName : graphNames) { | ||
config.setProperty(CoreOptions.STORE.name(), graphName); | ||
if (backend.equals("rocksdb")) { | ||
String dataPath = data + "/" + graphName; | ||
config.setProperty(RocksDBOptions.DATA_PATH.name(), dataPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer set DATA_PATH before openGraphs() in each test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
address it
.enableLabelIndex(true) | ||
.build(); | ||
this.params().schemaTransaction().addVertexLabel(label); | ||
|
||
// Create index | ||
this.createIndexLabel(label, P.STATUS); | ||
this.createIndexLabel(label, P.SERVER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unneeded index
|
||
private synchronized <V> HugeServerInfo pickWorker(HugeTask<V> task) { | ||
HugeServerInfo master = null; | ||
HugeServerInfo minServer = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minServer -- the meaning is incomprehensible
hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java
Show resolved
Hide resolved
@@ -162,6 +170,7 @@ private void unlistenChanges() { | |||
public <V> void restoreTasks() { | |||
boolean supportsPaging = this.graph().backendStoreFeatures() | |||
.supportsQueryByPage(); | |||
Id server = this.serverManager().serverId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to selfServer
@@ -114,7 +114,7 @@ public static synchronized CoreOptions instance() { | |||
"Timeout in seconds for waiting for the task to complete," + | |||
"such as when truncating or clearing the backend.", | |||
rangeInt(0L, Long.MAX_VALUE), | |||
10L | |||
30L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too much time that client may time out, 20s is ok
HugeServerInfo serverInfo = new HugeServerInfo(server, nodeRole); | ||
serverInfo.maxLoad(this.calcMaxLoad()); | ||
this.serverId = serverInfo.id(); | ||
this.serverRole = nodeRole; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move serverId and serverRole assignment to the begin of this method after refactor initServerInfo params
if (nodeRole.master()) { | ||
String page = PAGE_NONE; | ||
do { | ||
Iterator<HugeServerInfo> servers = this.serverInfos(10L, page); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add const var
for (String graphName : graphNames) { | ||
config.setProperty(CoreOptions.STORE.name(), graphName); | ||
if (backend.equals("rocksdb")) { | ||
String dataPath = data + "/" + graphName; | ||
config.setProperty(RocksDBOptions.DATA_PATH.name(), dataPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
address it
} | ||
throw new HugeException("Async task failed with error: %s", | ||
cause, cause.getMessage()); | ||
this.scheduler().waitUntilTaskCompleted(this.id(), 20L, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10L
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems can delete syncWait() method
} | ||
HugeServerInfo server = this.pickWorker(task); | ||
if (server == null) { | ||
LOG.debug("The master can not find suitable server to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"can't"
@@ -325,6 +316,7 @@ public void testGremlinJobWithFailure() throws TimeoutException { | |||
.job(new GremlinJob()); | |||
HugeTask<Object> task = builder.schedule(); | |||
scheduler.waitUntilTaskCompleted(task.id(), 10); | |||
task = scheduler.task(task.id()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task = scheduler.waitUntilTaskCompleted()
test errors:
|
if (this.updateTime.getTime() + 5000L < now || | ||
this.load() + task.load() > MAX_LOAD) { | ||
public <V> boolean suitableFor(HugeTask<V> task) { | ||
if (this.alive() || this.load() + task.load() > MAX_LOAD) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem contrary to common sense, it should accept new task if it is alive
if (servers.hasNext()) { | ||
existed = servers.next(); | ||
E.checkArgument(!existed.alive(), | ||
"Already existed master '%s' in current " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception information may not correct.
|
hbase
|
|
|
Codecov Report
@@ Coverage Diff @@
## master #1030 +/- ##
============================================
+ Coverage 59.92% 69.32% +9.39%
- Complexity 4571 5538 +967
============================================
Files 328 331 +3
Lines 26400 26852 +452
Branches 3773 3838 +65
============================================
+ Hits 15820 18614 +2794
+ Misses 8857 6422 -2435
- Partials 1723 1816 +93
Continue to review full report at Codecov.
|
Change-Id: I06478c6dcf64699aba336aa5c032337bd297783b
Change-Id: I39306140f3d4682b7de88bfa2b4277c622ca8560
Change-Id: I804eab3fe05807b8ca3482207268340bb1b09a71
Change-Id: I55ab99d497c13eb7803e76da3dcc7d9f8850c57b
Change-Id: Ie5dd63e0e977eda9dd0a795b34f0898252b45d2b
Change-Id: If48dac267863c6757de16eb4d6ea12f26cdc6087
Change-Id: Iccad390f86a6b5d5605f127fc49e50713cc264df
Change-Id: I016eebc92314e23b7cfd622dbca78df2f4ab63c2
Change-Id: Idf77f479c766aaa0a29b74fbd5ab92828ffebfba
Change-Id: Ia3b1c163f5f90c07a748b89c4be652ccc2ae6fd9
Change-Id: I0e4cae424a788fcb0b61e7ee8708e4b6e05b27bb
Change-Id: I7e94e3111a735f5d25de8711321a92fb3ef45e20
2. wait task completed in delete tests of api test Change-Id: I5fc7f828c704b49b6caada37f0648ed9e54258ae
Change-Id: I812a482100509d224bc3b26265121e2f007a837c
2. change cassandra ttl type from int to long Change-Id: I6780c7829cfeae5dedb60bfc50a9dbed5657f29d
} while (page != null); | ||
} | ||
|
||
protected <V> void executeTasksForWorker(Id server) { | ||
String page = PageInfo.PAGE_NONE; | ||
boolean supportsPaging = this.graph().backendStoreFeatures() | ||
.supportsQueryByPage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define a field or method supportsPaging
"server.role", | ||
"The role of nodes in the cluster, available types are " + | ||
"[master, worker]", | ||
allowValues("master", "worker"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add "computer"
@@ -591,19 +636,22 @@ private void checkPropertySize(int propertyLength, String propertyName) { | |||
} | |||
|
|||
public void syncWait() { | |||
long timeout = ((HugeConfig) this.scheduler.graph().configuration()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not supported by auth graph
} | ||
throw new HugeException("Async task failed with error: %s", | ||
cause, cause.getMessage()); | ||
this.scheduler().waitUntilTaskCompleted(this.id(), timeout, 10L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can read timeout in scheduler.waitUntilTaskCompleted()
}; | ||
|
||
try { | ||
this.schedulerExecutor.submit(closeTx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensure completed, call this.schedulerExecutor.submit(closeTx).get()
abf01c7
to
c2dab85
Compare
Change-Id: Iff72fc89737c8b05ff22ffcc86cc9c00bc555144
c2dab85
to
4d073d3
Compare
@@ -62,7 +62,7 @@ public static synchronized ServerOptions instance() { | |||
"server.role", | |||
"The role of nodes in the cluster, available types are " + | |||
"[master, worker]", | |||
allowValues("master", "worker"), | |||
allowValues("master", "worker", "computer"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also update line 64
improved by https://github.com/hugegraph/hugegraph/pull/1079: |
No description provided.