Skip to content

Commit

Permalink
improve comment
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Nov 18, 2021
1 parent 5844955 commit e6177e6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ public interface ComputerDriver extends Closeable {
void cancelJob(String jobId, Map<String, String> params);

/**
* Watch the job state, it will trace the execution of job and notify
* Wait the job to finish, it will trace the execution of job and notify
* the observer at every superstep. Throws ComputerException if the
* job is waiting by another thread.
* @param params reserved for other parameters in addition to jobId used
* to wait job.
* @return future for watch the job
*/
CompletableFuture<Void> watchJob(String jobId, Map<String, String> params,
JobObserver observer);
CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer);

/**
* Get the current job state. Throws ComputerException if can't found the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -286,9 +287,9 @@ public void cancelJob(String jobId, Map<String, String> params) {
}

@Override
public CompletableFuture<Void> watchJob(String jobId,
Map<String, String> params,
JobObserver observer) {
public CompletableFuture<Void> waitJobAsync(String jobId,
Map<String, String> params,
JobObserver observer) {
JobState jobState = this.jobState(jobId, params);
if (jobState == null) {
LOG.warn("Unable to fetch state of job '{}', it may have been " +
Expand Down Expand Up @@ -419,10 +420,13 @@ public String log(String jobId, int containerId, long offset, long length,

@Override
public void close() {
for (String jobId : this.waits.keySet()) {
this.waits.get(jobId).getKey().cancel(true);
Iterator<Pair<CompletableFuture<Void>, JobObserver>> iterator =
this.waits.values().iterator();
while (iterator.hasNext()) {
CompletableFuture<Void> future = iterator.next().getLeft();
future.cancel(true);
iterator.remove();
}
this.waits.clear();

if (this.watch != null) {
this.watch.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -211,17 +210,16 @@ public void testCancelJob() {
}

@Test
public void testWatchJobAndCancel() throws ExecutionException,
InterruptedException {
public void testWatchJobAndCancel() {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10");
String jobId = this.driver.submitJob("PageRank3", params);

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId,
params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
params,
jobObserver);

Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1))
.onJobStateChanged(Mockito.any(DefaultJobState.class));
Expand All @@ -231,14 +229,15 @@ public void testWatchJobAndCancel() throws ExecutionException,
MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
watchActive.setFalse();
this.driver.watchJob(jobId, params, jobObserver);
this.driver.waitJobAsync(jobId, params, jobObserver);

this.driver.cancelJob(jobId, params);
CompletableFuture<Void> watchJob = this.driver.watchJob(jobId, params,
jobObserver);
if (watchJob != null) {
watchJob.get();
}
UnitTestBase.sleep(1000L);

CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
params,
jobObserver);
Assert.assertNull(future2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ public void testJobSucceed() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.INITIALIZING);
Expand Down Expand Up @@ -141,8 +141,8 @@ public void testJobInternalSucceed() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.INITIALIZING);
Expand Down Expand Up @@ -174,8 +174,8 @@ public void testJobFailed() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.INITIALIZING);
Expand Down Expand Up @@ -205,8 +205,8 @@ public void testUnSchedulable() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.FAILED);
Expand Down Expand Up @@ -236,8 +236,8 @@ public void testJobCancelled() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.INITIALIZING);
Expand Down Expand Up @@ -271,8 +271,8 @@ public void testTwiceCreate() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.RUNNING);
Expand Down Expand Up @@ -303,8 +303,8 @@ public void testPullImageError() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.FAILED);
Expand All @@ -327,8 +327,8 @@ public void testGetResourceListWithLabels() {

JobObserver jobObserver = Mockito.mock(JobObserver.class);

CompletableFuture<Void> future = this.driver.watchJob(jobId, params,
jobObserver);
CompletableFuture<Void> future = this.driver.waitJobAsync(jobId, params,
jobObserver);

DefaultJobState jobState = new DefaultJobState();
jobState.jobStatus(JobStatus.INITIALIZING);
Expand Down

0 comments on commit e6177e6

Please sign in to comment.