Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/src/main/java/org/apache/livy/JobHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.livy;

import java.util.List;
import java.util.concurrent.Future;

/**
* A handle to a submitted job. Allows for monitoring and controlling of the running remote job.
*/
public interface JobHandle<T> extends Future<T> {

/**
* @return Return the current job id
*/
String getJobId();

/**
* Return the current state of the job.
*
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
*/
public interface LivyClient {

/**
* @return The current session id
*/
int getSessionId();

/**
* Submits a job for asynchronous execution.
*
Expand Down
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/livy/TestClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ private Client(Properties config) {
this.config = config;
}

@Override
public int getSessionId() {
throw new UnsupportedOperationException();
}

@Override
public <T> JobHandle<T> submit(Job<T> job) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public boolean cancel(boolean b) {
throw new UnsupportedOperationException();
}

@Override
public String getJobId() {
throw new UnsupportedOperationException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public Thread newThread(Runnable r) {
this.serializer = new Serializer();
}

@Override
public int getSessionId() {
return sessionId;
}

@Override
public <T> JobHandle<T> submit(Job<T> job) {
return sendJob("submit-job", job);
Expand Down Expand Up @@ -183,9 +188,4 @@ private RuntimeException propagate(Exception cause) {
}
}

// For testing.
int getSessionId() {
return sessionId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class JobHandleImpl<T> extends AbstractJobHandle<T> {
this.jobId = -1;
}

@Override
public String getJobId() {
return String.valueOf(jobId);
}

@Override
public T get() throws ExecutionException, InterruptedException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
}

withClient("should run and monitor asynchronous jobs") {
testJob(false)
testJob()
}

withClient("should propagate errors from jobs") {
val errorMessage = "This job throws an error."
val (jobId, handle) = runJob(false, { id => Seq(
val (jobId, handle) = runJob({ id => Seq(
new JobStatus(id, JobHandle.State.FAILED, null, errorMessage))
})

Expand All @@ -113,7 +113,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
}

withClient("should run and monitor synchronous jobs") {
testJob(false)
testJob()
}

withClient("should add files and jars") {
Expand All @@ -133,7 +133,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
}

withClient("should cancel jobs") {
val (jobId, handle) = runJob(false, { id => Seq(
val (jobId, handle) = runJob({ id => Seq(
new JobStatus(id, JobHandle.State.STARTED, null, null),
new JobStatus(id, JobHandle.State.CANCELLED, null, null))
})
Expand All @@ -147,7 +147,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
}

withClient("should notify listeners of job completion") {
val (jobId, handle) = runJob(false, { id => Seq(
val (jobId, handle) = runJob({ id => Seq(
new JobStatus(id, JobHandle.State.STARTED, null, null),
new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null))
})
Expand All @@ -163,7 +163,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
// JobHandleImpl does exponential backoff checking the result of a job. Given an initial
// wait of 100ms, 4 iterations should result in a wait of 800ms, so the handle should at that
// point timeout a wait of 100ms.
val (jobId, handle) = runJob(false, { id => Seq(
val (jobId, handle) = runJob({ id => Seq(
new JobStatus(id, JobHandle.State.STARTED, null, null),
new JobStatus(id, JobHandle.State.STARTED, null, null),
new JobStatus(id, JobHandle.State.STARTED, null, null),
Expand All @@ -178,7 +178,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
}

withClient("should handle null responses") {
testJob(false, response = Some(null))
testJob(response = Some(null))
}

withClient("should connect to existing sessions") {
Expand Down Expand Up @@ -215,7 +215,7 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
assert(expectedStr === new String(b))
}

private def runJob(sync: Boolean, genStatusFn: Long => Seq[JobStatus]): (Long, JFuture[Int]) = {
private def runJob(genStatusFn: Long => Seq[JobStatus]): (Long, JobHandle[Int]) = {
val jobId = java.lang.Long.valueOf(ID_GENERATOR.incrementAndGet())
when(session.submitJob(any(classOf[Array[Byte]]), anyString())).thenReturn(jobId)

Expand All @@ -225,16 +225,17 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
when(session.jobStatus(meq(jobId))).thenReturn(first, remaining: _*)

val job = new Echo(42)
val handle = if (sync) client.run(job) else client.submit(job)
val handle = client.submit(job)
(jobId, handle)
}

private def testJob(sync: Boolean, response: Option[Any] = None): Unit = {
val (jobId, handle) = runJob(sync, { id => Seq(
private def testJob(response: Option[Any] = None): Unit = {
val (jobId, handle) = runJob({ id => Seq(
new JobStatus(id, JobHandle.State.STARTED, null, null),
new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(response.getOrElse(id)), null))
})
assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === response.getOrElse(jobId))
assertResult(handle.getJobId)(jobId.toString)
verify(session, times(2)).jobStatus(meq(jobId))
}

Expand Down
3 changes: 3 additions & 0 deletions python-api/src/main/python/livy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def __init__(self, url, load_defaults=True, conf_dict=None):
self._stopped = False
self.lock = threading.Lock()

def session_id(self):
return self._session_id

def submit(self, job):
"""
Submits a job for execution to the spark cluster.
Expand Down
3 changes: 3 additions & 0 deletions python-api/src/main/python/livy/job_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def __init__(self, conn, session_id, executor):
self._running_callbacks = []
self._queued_callbacks = []

def job_id(self):
return self._job_id

def _start(self, command, serialized_job):
self._executor.submit(self._send_job_task, command, serialized_job)

Expand Down
5 changes: 5 additions & 0 deletions python-api/src/test/python/livy-tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def mock_and_validate_create_new_session(defaults):
load_defaults=defaults)
assert client_test._config.get(client_test._CONFIG_SECTION,
'spark.app.name') == app_name
assert client_test.session_id() == session_id
if defaults:
assert client_test._config.has_option(client_test._CONFIG_SECTION,
'spark.config')
Expand Down Expand Up @@ -153,6 +154,7 @@ def handle_job_running_callback(f):
submit_job_future.add_running_callback(handle_job_running_callback)
lock.wait(15)
assert invoked_running_callback
assert submit_job_future.job_id() == job_id


@responses.activate
Expand All @@ -168,6 +170,7 @@ def handle_job_queued_callback(f):
submit_job_future.add_queued_callback(handle_job_queued_callback)
lock.wait(15)
assert invoked_queued_callback
assert submit_job_future.job_id() == job_id


@responses.activate
Expand All @@ -177,6 +180,7 @@ def test_submit_job_verify_succeeded_state():
result='Z0FKVkZGc3hNREFzSURJd01Dd2dNekF3TENBME1EQmRjUUF1')
result = submit_job_future.result(15)
assert result == '[100, 200, 300, 400]'
assert submit_job_future.job_id() == job_id


@responses.activate
Expand All @@ -185,6 +189,7 @@ def test_submit_job_verify_failed_state():
error='Error job')
exception = submit_job_future.exception(15)
assert isinstance(exception, Exception)
assert submit_job_future.job_id() == job_id


@responses.activate
Expand Down
5 changes: 5 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/JobHandleImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class JobHandleImpl<T> extends AbstractJobHandle<T> {
this.promise = promise;
}

@Override
public String getJobId() {
return jobId;
}

/** Requests a running job to be cancelled. */
@Override
public boolean cancel(boolean mayInterrupt) {
Expand Down
5 changes: 5 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ public Future<URI> getServerUri() {
return serverUriPromise;
}

@Override
public int getSessionId() {
return -1;
}

@Override
public <T> JobHandle<T> submit(Job<T> job) {
return protocol.submit(job);
Expand Down
10 changes: 7 additions & 3 deletions rsc/src/test/java/org/apache/livy/rsc/TestJobHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
@RunWith(MockitoJUnitRunner.class)
public class TestJobHandle {

private static final String jobId = "jobId";

@Mock private RSCClient client;
@Mock private Promise<Object> promise;
@Mock private JobHandle.Listener<Object> listener;
@Mock private JobHandle.Listener<Object> listener2;

@Test
public void testStateChanges() throws Exception {
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, jobId);
handle.addListener(listener);

assertTrue(handle.changeState(JobHandle.State.QUEUED));
Expand All @@ -57,26 +59,28 @@ public void testStateChanges() throws Exception {

@Test
public void testFailedJob() throws Exception {
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, jobId);
handle.addListener(listener);

Throwable cause = new Exception();
when(promise.cause()).thenReturn(cause);

assertTrue(handle.changeState(JobHandle.State.FAILED));
assertEquals(handle.getJobId(), jobId);
verify(promise).cause();
verify(listener).onJobFailed(handle, cause);
}

@Test
public void testSucceededJob() throws Exception {
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, "job");
JobHandleImpl<Object> handle = new JobHandleImpl<Object>(client, promise, jobId);
handle.addListener(listener);

Object result = new Exception();
when(promise.getNow()).thenReturn(result);

assertTrue(handle.changeState(JobHandle.State.SUCCEEDED));
assertEquals(handle.getJobId(), jobId);
verify(promise).getNow();
verify(listener).onJobSucceeded(handle, result);
}
Expand Down
4 changes: 4 additions & 0 deletions rsc/src/test/java/org/apache/livy/rsc/TestSparkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void call(LivyClient client) throws Exception {
JobHandle<String> handle = client.submit(new Echo<>("hello"));
handle.addListener(listener);
assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
assertNotNull(handle.getJobId());

// Try an invalid state transition on the handle. This ensures that the actual state
// change we're interested in actually happened, since internally the handle serializes
Expand All @@ -111,6 +112,7 @@ public void testSimpleSparkJob() throws Exception {
public void call(LivyClient client) throws Exception {
JobHandle<Long> handle = client.submit(new SmallCount(5));
assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
assertNotNull(handle.getJobId());
}
});
}
Expand Down Expand Up @@ -185,6 +187,7 @@ public void call(LivyClient client) throws Exception {
assertTrue(ee.getCause().getMessage().contains(
Failure.JobFailureException.class.getName()));
}
assertNotNull(handle.getJobId());

// Try an invalid state transition on the handle. This ensures that the actual state
// change we're interested in actually happened, since internally the handle serializes
Expand Down Expand Up @@ -215,6 +218,7 @@ public void testRemoteClient() throws Exception {
public void call(LivyClient client) throws Exception {
JobHandle<Long> handle = client.submit(new SmallCount(5));
assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
assertNotNull(handle.getJobId());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class LivyScalaClient(livyJavaClient: LivyClient) {
}
})

/**
* Returns the current id of the session.
*/
def sessionId(): Int = {
livyJavaClient.getSessionId
}

/**
* Submits a job for asynchronous execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ class ScalaJobHandle[T] private[livy] (jobHandle: JobHandle[T]) extends Future[T
/**
* Return the current state of the job.
*/
def state: State = jobHandle.getState()
def state: State = jobHandle.getState

/**
* Return the current id of the job.
*/
def jobId: String = jobHandle.getJobId

/**
* When the job is completed, either through an exception, or a value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class ScalaClientTest extends FunSuite
}
}

test("return current session id") {
configureClient(true)
assertResult(-1)(client.sessionId())
}

test("test Job Submission") {
configureClient(true)
val jobHandle = client.submit(ScalaClientTestUtils.helloJob)
Expand Down
Loading