Skip to content
Permalink
Browse files
async watch the k8s job (#148)
* async watch k8s job
* let cancelJob return bool
* update version to 0.1.1
  • Loading branch information
coderzc committed Nov 23, 2021
1 parent d404e5a commit af18dfca9f063e2cd33dee67ab3cbe71d879e7b1
Showing 14 changed files with 69 additions and 82 deletions.
@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -24,7 +24,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,9 +5,10 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>0.1.1</version>

<artifactId>computer-driver</artifactId>

@@ -23,6 +23,7 @@
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public interface ComputerDriver extends Closeable {

@@ -47,18 +48,21 @@ public interface ComputerDriver extends Closeable {
* the job.
* @param params reserved for other parameters in addition to jobId used
* to cancel job.
* @return Whether the job was successfully cancelled
*/
void cancelJob(String jobId, Map<String, String> params);
boolean cancelJob(String jobId, Map<String, String> params);

/**
* Wait the job to finish, it will trace the execution of job and notify
* the observer at every superstep. Throws ComputerException if the
* job is waiting by another thread.
* @param params reserved for other parameters in addition to jobId used
* to wait job.
* @return future for watch the job
*/
void waitJob(String jobId, Map<String, String> params,
JobObserver observer);
CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer);

/**
* Get the current job state. Throws ComputerException if can't found the
@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,7 +73,6 @@
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
@@ -280,20 +280,19 @@ private void checkComputerConf(Map<String, String> computerConf,
}

@Override
public void cancelJob(String jobId, Map<String, String> params) {
Boolean delete = this.operation.withName(KubeUtil.crName(jobId))
.delete();
E.checkState(delete, "Failed to cancel Job, jobId: ", jobId);
public boolean cancelJob(String jobId, Map<String, String> params) {
return this.operation.withName(KubeUtil.crName(jobId)).delete();
}

@Override
public void waitJob(String jobId, Map<String, String> params,
JobObserver observer) {
public CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer) {
JobState jobState = this.jobState(jobId, params);
if (jobState == null) {
LOG.warn("Unable to fetch state of job '{}', it may have been " +
"deleted", jobId);
return;
return null;
} else {
observer.onJobStateChanged(jobState);
}
@@ -309,14 +308,7 @@ public void waitJob(String jobId, Map<String, String> params,
}
}

try {
if (future != null) {
future.get();
}
} catch (Throwable e) {
this.cancelWait(jobId);
throw KubernetesClientException.launderThrowable(e);
}
return future;
}

private Watch initWatch() {
@@ -426,6 +418,14 @@ public String log(String jobId, int containerId, long offset, long length,

@Override
public void close() {
Iterator<Pair<CompletableFuture<Void>, JobObserver>> iterator =
this.waits.values().iterator();
while (iterator.hasNext()) {
CompletableFuture<Void> future = iterator.next().getLeft();
future.cancel(true);
iterator.remove();
}

if (this.watch != null) {
this.watch.close();
this.watchActive.setFalse();
@@ -24,7 +24,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

@@ -210,16 +210,16 @@ public void testCancelJob() {
}

@Test
public void testWaitJobAndCancel() {
public void testWatchJobAndCancel() {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10");
String jobId = this.driver.submitJob("PageRank3", params);

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
this.driver.waitJob(jobId, params, jobObserver);
});
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
params,
jobObserver);

Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1))
.onJobStateChanged(Mockito.any(DefaultJobState.class));
@@ -229,10 +229,15 @@ public void testWaitJobAndCancel() {
MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
watchActive.setFalse();
this.driver.waitJob(jobId, params, jobObserver);
this.driver.waitJobAsync(jobId, params, jobObserver);

this.driver.cancelJob(jobId, params);
this.driver.waitJob(jobId, params, jobObserver);
UnitTestBase.sleep(1000L);

CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
params,
jobObserver);
Assert.assertNull(future2);
}

@Test

0 comments on commit af18dfc

Please sign in to comment.