diff --git a/computer-algorithm/pom.xml b/computer-algorithm/pom.xml index 9d65f3e95..4705d7038 100644 --- a/computer-algorithm/pom.xml +++ b/computer-algorithm/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-api/pom.xml b/computer-api/pom.xml index d96b56da9..ae1c6ceec 100644 --- a/computer-api/pom.xml +++ b/computer-api/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-core/pom.xml b/computer-core/pom.xml index 9901c0b47..7ef9c713e 100644 --- a/computer-core/pom.xml +++ b/computer-core/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-dist/pom.xml b/computer-dist/pom.xml index 39e9e2a07..e38e65cd6 100644 --- a/computer-dist/pom.xml +++ b/computer-dist/pom.xml @@ -24,7 +24,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-driver/pom.xml b/computer-driver/pom.xml index c1ca56278..4d2d1af0d 100644 --- a/computer-driver/pom.xml +++ b/computer-driver/pom.xml @@ -5,9 +5,10 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 + 0.1.1 computer-driver diff --git a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java b/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java index 9c387e738..824ecd6ae 100644 --- a/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java +++ b/computer-driver/src/main/java/com/baidu/hugegraph/computer/driver/ComputerDriver.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; public interface ComputerDriver extends Closeable { @@ -47,8 +48,9 @@ public interface ComputerDriver extends Closeable { * the job. * @param params reserved for other parameters in addition to jobId used * to cancel job. + * @return Whether the job was successfully cancelled */ - void cancelJob(String jobId, Map params); + boolean cancelJob(String jobId, Map params); /** * Wait the job to finish, it will trace the execution of job and notify @@ -56,9 +58,11 @@ public interface ComputerDriver extends Closeable { * job is waiting by another thread. * @param params reserved for other parameters in addition to jobId used * to wait job. + * @return future for watch the job */ - void waitJob(String jobId, Map params, - JobObserver observer); + CompletableFuture waitJobAsync(String jobId, + Map params, + JobObserver observer); /** * Get the current job state. Throws ComputerException if can't found the diff --git a/computer-k8s-operator/pom.xml b/computer-k8s-operator/pom.xml index a7f6b3c42..86fbe05a0 100644 --- a/computer-k8s-operator/pom.xml +++ b/computer-k8s-operator/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-k8s/pom.xml b/computer-k8s/pom.xml index 45f63a4a4..6522d10e5 100644 --- a/computer-k8s/pom.xml +++ b/computer-k8s/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java index 590eae918..e645fd5f5 100644 --- a/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java +++ b/computer-k8s/src/main/java/com/baidu/hugegraph/computer/k8s/driver/KubernetesDriver.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,7 +73,6 @@ import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; @@ -280,20 +280,19 @@ private void checkComputerConf(Map computerConf, } @Override - public void cancelJob(String jobId, Map params) { - Boolean delete = this.operation.withName(KubeUtil.crName(jobId)) - .delete(); - E.checkState(delete, "Failed to cancel Job, jobId: ", jobId); + public boolean cancelJob(String jobId, Map params) { + return this.operation.withName(KubeUtil.crName(jobId)).delete(); } @Override - public void waitJob(String jobId, Map params, - JobObserver observer) { + public CompletableFuture waitJobAsync(String jobId, + Map params, + JobObserver observer) { JobState jobState = this.jobState(jobId, params); if (jobState == null) { LOG.warn("Unable to fetch state of job '{}', it may have been " + "deleted", jobId); - return; + return null; } else { observer.onJobStateChanged(jobState); } @@ -309,14 +308,7 @@ public void waitJob(String jobId, Map params, } } - try { - if (future != null) { - future.get(); - } - } catch (Throwable e) { - this.cancelWait(jobId); - throw KubernetesClientException.launderThrowable(e); - } + return future; } private Watch initWatch() { @@ -426,6 +418,14 @@ public String log(String jobId, int containerId, long offset, long length, @Override public void close() { + Iterator, JobObserver>> iterator = + this.waits.values().iterator(); + while (iterator.hasNext()) { + CompletableFuture future = iterator.next().getLeft(); + future.cancel(true); + iterator.remove(); + } + if (this.watch != null) { this.watch.close(); this.watchActive.setFalse(); diff --git a/computer-test/pom.xml b/computer-test/pom.xml index 2e85488cb..3d68151ec 100644 --- a/computer-test/pom.xml +++ b/computer-test/pom.xml @@ -24,7 +24,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java index c770d93bb..4d1545739 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/KubernetesDriverTest.java @@ -210,16 +210,16 @@ public void testCancelJob() { } @Test - public void testWaitJobAndCancel() { + public void testWatchJobAndCancel() { Map params = new HashMap<>(); params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10"); String jobId = this.driver.submitJob("PageRank3", params); JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }); + CompletableFuture future = this.driver.waitJobAsync(jobId, + params, + jobObserver); Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1)) .onJobStateChanged(Mockito.any(DefaultJobState.class)); @@ -229,10 +229,15 @@ public void testWaitJobAndCancel() { MutableBoolean watchActive = Whitebox.getInternalState(this.driver, "watchActive"); watchActive.setFalse(); - this.driver.waitJob(jobId, params, jobObserver); + this.driver.waitJobAsync(jobId, params, jobObserver); this.driver.cancelJob(jobId, params); - this.driver.waitJob(jobId, params, jobObserver); + UnitTestBase.sleep(1000L); + + CompletableFuture future2 = this.driver.waitJobAsync(jobId, + params, + jobObserver); + Assert.assertNull(future2); } @Test diff --git a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java index eb04f9775..9361e8332 100644 --- a/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java +++ b/computer-test/src/main/java/com/baidu/hugegraph/computer/k8s/MiniKubeTest.java @@ -28,16 +28,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; @@ -56,7 +53,6 @@ import com.baidu.hugegraph.computer.suite.unit.UnitTestBase; import com.baidu.hugegraph.testutil.Assert; import com.baidu.hugegraph.testutil.Whitebox; -import com.baidu.hugegraph.util.ExecutorUtil; import com.baidu.hugegraph.util.Log; import com.google.common.collect.Lists; @@ -68,17 +64,6 @@ public class MiniKubeTest extends AbstractK8sTest { private static final Logger LOG = Log.logger(MiniKubeTest.class); private static final String ALGORITHM_NAME = "PageRank"; - private static ExecutorService POOL; - - @BeforeClass - public static void init() { - POOL = ExecutorUtil.newFixedThreadPool(1, "minikube-test-pool"); - } - - @AfterClass - public static void clear() { - POOL.shutdown(); - } @Before public void setup() throws IOException { @@ -125,9 +110,8 @@ public void testJobSucceed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -139,7 +123,7 @@ public void testJobSucceed() { Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1)) .onJobStateChanged(Mockito.eq(jobState2)); - future.getNow(null); + future.cancel(true); } @Test @@ -157,9 +141,8 @@ public void testJobInternalSucceed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -171,7 +154,7 @@ public void testJobInternalSucceed() { Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1)) .onJobStateChanged(Mockito.eq(jobState2)); - future.getNow(null); + future.cancel(true); } @Test @@ -191,9 +174,8 @@ public void testJobFailed() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -210,7 +192,7 @@ public void testJobFailed() { String diagnostics = this.driver.diagnostics(jobId, params); Assert.assertContains("No such file or directory", diagnostics); - future.getNow(null); + future.cancel(true); } @Test @@ -223,9 +205,8 @@ public void testUnSchedulable() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.FAILED); @@ -235,7 +216,7 @@ public void testUnSchedulable() { String diagnostics = this.driver.diagnostics(jobId, params); Assert.assertContains("Unschedulable", diagnostics); - future.getNow(null); + future.cancel(true); } @Test @@ -255,9 +236,8 @@ public void testJobCancelled() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -271,7 +251,7 @@ public void testJobCancelled() { Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1)) .onJobStateChanged(Mockito.eq(jobState2)); - future.getNow(null); + future.cancel(true); } @Test @@ -291,9 +271,8 @@ public void testTwiceCreate() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.RUNNING); @@ -312,7 +291,7 @@ public void testTwiceCreate() { UnitTestBase.sleep(1000L); - future.getNow(null); + future.cancel(true); } @Test @@ -324,9 +303,8 @@ public void testPullImageError() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.FAILED); @@ -336,7 +314,7 @@ public void testPullImageError() { String diagnostics = this.driver.diagnostics(jobId, params); Assert.assertContains("ImagePullBackOff", diagnostics); - future.getNow(null); + future.cancel(true); } @Test @@ -349,9 +327,8 @@ public void testGetResourceListWithLabels() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = CompletableFuture.runAsync(() -> { - this.driver.waitJob(jobId, params, jobObserver); - }, POOL); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, + jobObserver); DefaultJobState jobState = new DefaultJobState(); jobState.jobStatus(JobStatus.INITIALIZING); @@ -371,6 +348,6 @@ public void testGetResourceListWithLabels() { new HashMap()); Assert.assertNotEquals(0, pods.size()); - future.getNow(null); + future.cancel(true); } } diff --git a/computer-yarn/pom.xml b/computer-yarn/pom.xml index 874bc2563..974d57b02 100644 --- a/computer-yarn/pom.xml +++ b/computer-yarn/pom.xml @@ -5,7 +5,7 @@ hugegraph-computer com.baidu.hugegraph - 0.1.0 + 0.1.1 4.0.0 diff --git a/pom.xml b/pom.xml index 16afe8e02..7681dbc72 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.baidu.hugegraph hugegraph-computer - 0.1.0 + 0.1.1 pom 3.3.9