Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async watch the k8s job #148

Merged
merged 4 commits into from Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion computer-algorithm/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion computer-api/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion computer-core/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion computer-dist/pom.xml
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
3 changes: 2 additions & 1 deletion computer-driver/pom.xml
Expand Up @@ -5,9 +5,10 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>0.1.1</version>

<artifactId>computer-driver</artifactId>

Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public interface ComputerDriver extends Closeable {

Expand All @@ -47,18 +48,21 @@ public interface ComputerDriver extends Closeable {
* the job.
* @param params reserved for other parameters in addition to jobId used
* to cancel job.
* @return Whether the job was successfully cancelled
*/
void cancelJob(String jobId, Map<String, String> params);
boolean cancelJob(String jobId, Map<String, String> params);

/**
* Wait the job to finish, it will trace the execution of job and notify
* the observer at every superstep. Throws ComputerException if the
* job is waiting by another thread.
* @param params reserved for other parameters in addition to jobId used
* to wait job.
* @return future for watch the job
Copy link
Member

@imbajin imbajin Nov 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 52

void cancelJob() --> boolean cancelJob is better? (so as the impl)

*/
void waitJob(String jobId, Map<String, String> params,
JobObserver observer);
CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer);

/**
* Get the current job state. Throws ComputerException if can't found the
Expand Down
2 changes: 1 addition & 1 deletion computer-k8s-operator/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion computer-k8s/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -72,7 +73,6 @@
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
Expand Down Expand Up @@ -280,20 +280,19 @@ private void checkComputerConf(Map<String, String> computerConf,
}

@Override
public void cancelJob(String jobId, Map<String, String> params) {
Boolean delete = this.operation.withName(KubeUtil.crName(jobId))
.delete();
E.checkState(delete, "Failed to cancel Job, jobId: ", jobId);
public boolean cancelJob(String jobId, Map<String, String> params) {
return this.operation.withName(KubeUtil.crName(jobId)).delete();
}

@Override
public void waitJob(String jobId, Map<String, String> params,
JobObserver observer) {
public CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer) {
JobState jobState = this.jobState(jobId, params);
if (jobState == null) {
LOG.warn("Unable to fetch state of job '{}', it may have been " +
"deleted", jobId);
return;
return null;
} else {
observer.onJobStateChanged(jobState);
}
Expand All @@ -309,14 +308,7 @@ public void waitJob(String jobId, Map<String, String> params,
}
}

try {
if (future != null) {
future.get();
}
} catch (Throwable e) {
this.cancelWait(jobId);
throw KubernetesClientException.launderThrowable(e);
}
return future;
}

private Watch initWatch() {
Expand Down Expand Up @@ -426,6 +418,14 @@ public String log(String jobId, int containerId, long offset, long length,

@Override
public void close() {
Iterator<Pair<CompletableFuture<Void>, JobObserver>> iterator =
this.waits.values().iterator();
while (iterator.hasNext()) {
CompletableFuture<Void> future = iterator.next().getLeft();
future.cancel(true);
iterator.remove();
}

if (this.watch != null) {
this.watch.close();
this.watchActive.setFalse();
Expand Down
2 changes: 1 addition & 1 deletion computer-test/pom.xml
Expand Up @@ -24,7 +24,7 @@
<parent>
<artifactId>hugegraph-computer</artifactId>
<groupId>com.baidu.hugegraph</groupId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Expand Up @@ -210,16 +210,16 @@ public void testCancelJob() {
}

@Test
public void testWaitJobAndCancel() {
public void testWatchJobAndCancel() {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10");
String jobId = this.driver.submitJob("PageRank3", params);

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
this.driver.waitJob(jobId, params, jobObserver);
});
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
params,
jobObserver);

Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1))
.onJobStateChanged(Mockito.any(DefaultJobState.class));
Expand All @@ -229,10 +229,15 @@ public void testWaitJobAndCancel() {
MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
watchActive.setFalse();
this.driver.waitJob(jobId, params, jobObserver);
this.driver.waitJobAsync(jobId, params, jobObserver);

this.driver.cancelJob(jobId, params);
this.driver.waitJob(jobId, params, jobObserver);
UnitTestBase.sleep(1000L);

CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
params,
jobObserver);
Assert.assertNull(future2);
}

@Test
Expand Down