From e6177e6c3955f8cd682ae6ce7a896c9e02b9b7c9 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 18 Nov 2021 11:22:00 +0800 Subject: [PATCH] improve comment --- .../computer/driver/ComputerDriver.java | 7 ++-- .../computer/k8s/driver/KubernetesDriver.java | 16 ++++++---- .../computer/k8s/KubernetesDriverTest.java | 23 +++++++------ .../hugegraph/computer/k8s/MiniKubeTest.java | 32 +++++++++---------- 4 files changed, 41 insertions(+), 37 deletions(-) diff --git a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java b/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java index 2ff4c8435b..bb95366ba1 100644 --- a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java +++ b/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java @@ -52,15 +52,16 @@ public interface ComputerDriver extends Closeable { void cancelJob(String jobId, Map params); /** - * Watch the job state, it will trace the execution of job and notify + * Wait the job to finish, it will trace the execution of job and notify * the observer at every superstep. Throws ComputerException if the * job is waiting by another thread. * @param params reserved for other parameters in addition to jobId used * to wait job. * @return future for watch the job */ - CompletableFuture watchJob(String jobId, Map params, - JobObserver observer); + CompletableFuture waitJobAsync(String jobId, + Map params, + JobObserver observer); /** * Get the current job state. Throws ComputerException if can't found the diff --git a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java index 058e3aed63..e3293ea176 100644 --- a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java +++ b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -286,9 +287,9 @@ public void cancelJob(String jobId, Map params) { } @Override - public CompletableFuture watchJob(String jobId, - Map params, - JobObserver observer) { + public CompletableFuture waitJobAsync(String jobId, + Map params, + JobObserver observer) { JobState jobState = this.jobState(jobId, params); if (jobState == null) { LOG.warn("Unable to fetch state of job '{}', it may have been " + @@ -419,10 +420,13 @@ public String log(String jobId, int containerId, long offset, long length, @Override public void close() { - for (String jobId : this.waits.keySet()) { - this.waits.get(jobId).getKey().cancel(true); + Iterator, JobObserver>> iterator = + this.waits.values().iterator(); + while (iterator.hasNext()) { + CompletableFuture future = iterator.next().getLeft(); + future.cancel(true); + iterator.remove(); } - this.waits.clear(); if (this.watch != null) { this.watch.close(); diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java index bf326fee8f..4d1545739d 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.mutable.MutableBoolean; @@ -211,17 +210,16 @@ public void testCancelJob() { } @Test - public void testWatchJobAndCancel() throws ExecutionException, - InterruptedException { + public void testWatchJobAndCancel() { Map params = new HashMap<>(); params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10"); String jobId = this.driver.submitJob("PageRank3", params); JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, - params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, + params, + jobObserver); Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1)) .onJobStateChanged(Mockito.any(DefaultJobState.class)); @@ -231,14 +229,15 @@ public void testWatchJobAndCancel() throws ExecutionException, MutableBoolean watchActive = Whitebox.getInternalState(this.driver, "watchActive"); watchActive.setFalse(); - this.driver.watchJob(jobId, params, jobObserver); + this.driver.waitJobAsync(jobId, params, jobObserver); this.driver.cancelJob(jobId, params); - CompletableFuture watchJob = this.driver.watchJob(jobId, params, - jobObserver); - if (watchJob != null) { - watchJob.get(); - } + UnitTestBase.sleep(1000L); + + CompletableFuture future2 = this.driver.waitJobAsync(jobId, + params, + jobObserver); + Assert.assertNull(future2); } @Test diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java index 895da51ebd..9361e8332c 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java @@ -110,8 +110,8 @@ public void testJobSucceed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -141,8 +141,8 @@ public void testJobInternalSucceed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -174,8 +174,8 @@ public void testJobFailed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -205,8 +205,8 @@ public void testUnSchedulable() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.FAILED); @@ -236,8 +236,8 @@ public void testJobCancelled() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -271,8 +271,8 @@ public void testTwiceCreate() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.RUNNING); @@ -303,8 +303,8 @@ public void testPullImageError() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.FAILED); @@ -327,8 +327,8 @@ public void testGetResourceListWithLabels() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.watchJob(jobId, params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING);