diff --git a/api/src/main/java/org/apache/livy/JobHandle.java b/api/src/main/java/org/apache/livy/JobHandle.java index 75f7a59f0..ed683f34a 100644 --- a/api/src/main/java/org/apache/livy/JobHandle.java +++ b/api/src/main/java/org/apache/livy/JobHandle.java @@ -17,7 +17,6 @@ package org.apache.livy; -import java.util.List; import java.util.concurrent.Future; /** @@ -25,6 +24,11 @@ */ public interface JobHandle extends Future { + /** + * @return Return the current job id + */ + String getJobId(); + /** * Return the current state of the job. * diff --git a/api/src/main/java/org/apache/livy/LivyClient.java b/api/src/main/java/org/apache/livy/LivyClient.java index fc03a1f7a..f60dc654a 100644 --- a/api/src/main/java/org/apache/livy/LivyClient.java +++ b/api/src/main/java/org/apache/livy/LivyClient.java @@ -26,6 +26,11 @@ */ public interface LivyClient { + /** + * @return The current session id + */ + int getSessionId(); + /** * Submits a job for asynchronous execution. * diff --git a/api/src/test/java/org/apache/livy/TestClientFactory.java b/api/src/test/java/org/apache/livy/TestClientFactory.java index 89edeec5e..7c02cffff 100644 --- a/api/src/test/java/org/apache/livy/TestClientFactory.java +++ b/api/src/test/java/org/apache/livy/TestClientFactory.java @@ -46,6 +46,11 @@ private Client(Properties config) { this.config = config; } + @Override + public int getSessionId() { + throw new UnsupportedOperationException(); + } + @Override public JobHandle submit(Job job) { throw new UnsupportedOperationException(); diff --git a/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java b/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java index 6703147a6..adb68c0bd 100644 --- a/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java +++ b/client-common/src/test/java/org/apache/livy/client/common/TestAbstractJobHandle.java @@ -87,6 +87,10 @@ public boolean cancel(boolean b) { throw new UnsupportedOperationException(); } + @Override + public String getJobId() { + throw new UnsupportedOperationException(); + } } } diff --git a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java index f40148f94..c6f9f2a0d 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java +++ b/client-http/src/main/java/org/apache/livy/client/http/HttpClient.java @@ -96,6 +96,11 @@ public Thread newThread(Runnable r) { this.serializer = new Serializer(); } + @Override + public int getSessionId() { + return sessionId; + } + @Override public JobHandle submit(Job job) { return sendJob("submit-job", job); @@ -183,9 +188,4 @@ private RuntimeException propagate(Exception cause) { } } - // For testing. - int getSessionId() { - return sessionId; - } - } diff --git a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java index d39dfe994..d740fa3e8 100644 --- a/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java +++ b/client-http/src/main/java/org/apache/livy/client/http/JobHandleImpl.java @@ -82,6 +82,11 @@ class JobHandleImpl extends AbstractJobHandle { this.jobId = -1; } + @Override + public String getJobId() { + return String.valueOf(jobId); + } + @Override public T get() throws ExecutionException, InterruptedException { try { diff --git a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala index f53d9f5b4..99348c5e9 100644 --- a/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala @@ -95,12 +95,12 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should run and monitor asynchronous jobs") { - testJob(false) + testJob() } withClient("should propagate errors from jobs") { val errorMessage = "This job throws an error." - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob({ id => Seq( new JobStatus(id, JobHandle.State.FAILED, null, errorMessage)) }) @@ -113,7 +113,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should run and monitor synchronous jobs") { - testJob(false) + testJob() } withClient("should add files and jars") { @@ -133,7 +133,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should cancel jobs") { - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob({ id => Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.CANCELLED, null, null)) }) @@ -147,7 +147,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should notify listeners of job completion") { - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob({ id => Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null)) }) @@ -163,7 +163,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni // JobHandleImpl does exponential backoff checking the result of a job. Given an initial // wait of 100ms, 4 iterations should result in a wait of 800ms, so the handle should at that // point timeout a wait of 100ms. - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob({ id => Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.STARTED, null, null), @@ -178,7 +178,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should handle null responses") { - testJob(false, response = Some(null)) + testJob(response = Some(null)) } withClient("should connect to existing sessions") { @@ -215,7 +215,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni assert(expectedStr === new String(b)) } - private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): (Long, JFuture[Int]) = { + private def runJob(genStatusFn: Long => Seq[JobStatus]): (Long, JobHandle[Int]) = { val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet()) when(session.submitJob(any(classOf[Array[Byte]]), anyString())).thenReturn(jobId) @@ -225,16 +225,17 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni when(session.jobStatus(meq(jobId))).thenReturn(first, remaining: _*) val job = new Echo(42) - val handle = if (sync) client.run(job) else client.submit(job) + val handle = client.submit(job) (jobId, handle) } - private def testJob(sync: Boolean, response: Option[Any] = None): Unit = { - val (jobId, handle) = runJob(sync, { id => Seq( + private def testJob(response: Option[Any] = None): Unit = { + val (jobId, handle) = runJob({ id => Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(response.getOrElse(id)), null)) }) assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === response.getOrElse(jobId)) + assertResult(handle.getJobId)(jobId.toString) verify(session, times(2)).jobStatus(meq(jobId)) } diff --git a/python-api/src/main/python/livy/client.py b/python-api/src/main/python/livy/client.py index d9830f321..6889d49b7 100644 --- a/python-api/src/main/python/livy/client.py +++ b/python-api/src/main/python/livy/client.py @@ -90,6 +90,9 @@ def __init__(self, url, load_defaults=True, conf_dict=None): self._stopped = False self.lock = threading.Lock() + def session_id(self): + return self._session_id + def submit(self, job): """ Submits a job for execution to the spark cluster. diff --git a/python-api/src/main/python/livy/job_handle.py b/python-api/src/main/python/livy/job_handle.py index 278834c23..0c79104f5 100644 --- a/python-api/src/main/python/livy/job_handle.py +++ b/python-api/src/main/python/livy/job_handle.py @@ -53,6 +53,9 @@ def __init__(self, conn, session_id, executor): self._running_callbacks = [] self._queued_callbacks = [] + def job_id(self): + return self._job_id + def _start(self, command, serialized_job): self._executor.submit(self._send_job_task, command, serialized_job) diff --git a/python-api/src/test/python/livy-tests/client_test.py b/python-api/src/test/python/livy-tests/client_test.py index b6426ae10..c22cd14b6 100644 --- a/python-api/src/test/python/livy-tests/client_test.py +++ b/python-api/src/test/python/livy-tests/client_test.py @@ -49,6 +49,7 @@ def mock_and_validate_create_new_session(defaults): load_defaults=defaults) assert client_test._config.get(client_test._CONFIG_SECTION, 'spark.app.name') == app_name + assert client_test.session_id() == session_id if defaults: assert client_test._config.has_option(client_test._CONFIG_SECTION, 'spark.config') @@ -153,6 +154,7 @@ def handle_job_running_callback(f): submit_job_future.add_running_callback(handle_job_running_callback) lock.wait(15) assert invoked_running_callback + assert submit_job_future.job_id() == job_id @responses.activate @@ -168,6 +170,7 @@ def handle_job_queued_callback(f): submit_job_future.add_queued_callback(handle_job_queued_callback) lock.wait(15) assert invoked_queued_callback + assert submit_job_future.job_id() == job_id @responses.activate @@ -177,6 +180,7 @@ def test_submit_job_verify_succeeded_state(): result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1') result = submit_job_future.result(15) assert result == '[100, 200, 300, 400]' + assert submit_job_future.job_id() == job_id @responses.activate @@ -185,6 +189,7 @@ def test_submit_job_verify_failed_state(): error='Error job') exception = submit_job_future.exception(15) assert isinstance(exception, Exception) + assert submit_job_future.job_id() == job_id @responses.activate diff --git a/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java index 0fc4ba214..9f7a2f9f7 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java +++ b/rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java @@ -44,6 +44,11 @@ class JobHandleImpl extends AbstractJobHandle { this.promise = promise; } + @Override + public String getJobId() { + return jobId; + } + /** Requests a running job to be cancelled. */ @Override public boolean cancel(boolean mayInterrupt) { diff --git a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java index c1c953400..70b5ba9b5 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java +++ b/rsc/src/main/java/org/apache/livy/rsc/RSCClient.java @@ -199,6 +199,11 @@ public Future getServerUri() { return serverUriPromise; } + @Override + public int getSessionId() { + return -1; + } + @Override public JobHandle submit(Job job) { return protocol.submit(job); diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java index e6161ed81..38a2beb5d 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java @@ -31,6 +31,8 @@ @RunWith(MockitoJUnitRunner.class) public class TestJobHandle { + private static final String jobId = "jobId"; + @Mock private RSCClient client; @Mock private Promise promise; @Mock private JobHandle.Listener listener; @@ -38,7 +40,7 @@ public class TestJobHandle { @Test public void testStateChanges() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl(client, promise, jobId); handle.addListener(listener); assertTrue(handle.changeState(JobHandle.State.QUEUED)); @@ -57,26 +59,28 @@ public void testStateChanges() throws Exception { @Test public void testFailedJob() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl(client, promise, jobId); handle.addListener(listener); Throwable cause = new Exception(); when(promise.cause()).thenReturn(cause); assertTrue(handle.changeState(JobHandle.State.FAILED)); + assertEquals(handle.getJobId(), jobId); verify(promise).cause(); verify(listener).onJobFailed(handle, cause); } @Test public void testSucceededJob() throws Exception { - JobHandleImpl handle = new JobHandleImpl(client, promise, "job"); + JobHandleImpl handle = new JobHandleImpl(client, promise, jobId); handle.addListener(listener); Object result = new Exception(); when(promise.getNow()).thenReturn(result); assertTrue(handle.changeState(JobHandle.State.SUCCEEDED)); + assertEquals(handle.getJobId(), jobId); verify(promise).getNow(); verify(listener).onJobSucceeded(handle, result); } diff --git a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java index 9fc50eceb..914f636ec 100644 --- a/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java +++ b/rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java @@ -92,6 +92,7 @@ public void call(LivyClient client) throws Exception { JobHandle handle = client.submit(new Echo<>("hello")); handle.addListener(listener); assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS)); + assertNotNull(handle.getJobId()); // Try an invalid state transition on the handle. This ensures that the actual state // change we're interested in actually happened, since internally the handle serializes @@ -111,6 +112,7 @@ public void testSimpleSparkJob() throws Exception { public void call(LivyClient client) throws Exception { JobHandle handle = client.submit(new SmallCount(5)); assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS)); + assertNotNull(handle.getJobId()); } }); } @@ -185,6 +187,7 @@ public void call(LivyClient client) throws Exception { assertTrue(ee.getCause().getMessage().contains( Failure.JobFailureException.class.getName())); } + assertNotNull(handle.getJobId()); // Try an invalid state transition on the handle. This ensures that the actual state // change we're interested in actually happened, since internally the handle serializes @@ -215,6 +218,7 @@ public void testRemoteClient() throws Exception { public void call(LivyClient client) throws Exception { JobHandle handle = client.submit(new SmallCount(5)); assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS)); + assertNotNull(handle.getJobId()); } }); } diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala index ca7199a99..6d7767d5a 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/LivyScalaClient.scala @@ -41,6 +41,13 @@ class LivyScalaClient(livyJavaClient: LivyClient) { } }) + /** + * Returns the current id of the session. + */ + def sessionId(): Int = { + livyJavaClient.getSessionId + } + /** * Submits a job for asynchronous execution. * diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala index d1cf29de0..da07ba3ad 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/ScalaJobHandle.scala @@ -50,7 +50,12 @@ class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T /** * Return the current state of the job. */ - def state: State = jobHandle.getState() + def state: State = jobHandle.getState + + /** + * Return the current id of the job. + */ + def jobId: String = jobHandle.getJobId /** * When the job is completed, either through an exception, or a value, diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala index a716f5856..34c5860da 100644 --- a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaClientTest.scala @@ -58,6 +58,11 @@ class ScalaClientTest extends FunSuite } } + test("return current session id") { + configureClient(true) + assertResult(-1)(client.sessionId()) + } + test("test Job Submission") { configureClient(true) val jobHandle = client.submit(ScalaClientTestUtils.helloJob) diff --git a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala index 3f5bdc64e..7c9798558 100644 --- a/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala +++ b/scala-api/src/test/scala/org/apache/livy/scalaapi/ScalaJobHandleTest.scala @@ -48,6 +48,12 @@ class ScalaJobHandleTest extends FunSuite scalaJobHandle = new ScalaJobHandle(mockJobHandle) } + test("return current job id") { + val jobId = "3"; + when(mockJobHandle.getJobId).thenReturn(jobId) + assertResult(jobId)(scalaJobHandle.jobId) + } + test("get result when job is already complete") { when(mockJobHandle.get(timeoutInMilliseconds, TimeUnit.MILLISECONDS)).thenReturn("hello") val result = Await.result(scalaJobHandle, 5 seconds) @@ -187,4 +193,6 @@ private abstract class AbstractJobHandleStub[T] private[livy] extends JobHandle[ override def cancel(mayInterruptIfRunning: Boolean): Boolean = false override def isDone: Boolean = true + + override def getJobId: String = null }