[REEF-2025] A new module containing the new Java bridge #1466
Conversation
@motus ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are quite a few tests failing currently on my PC and on CI servers:
Failed tests:
FailBridgeDriverTest.testDriverCompleted:139 expected:<COMPLETED> but was:<FAILED(org.apache.reef.tests.library.exceptions.DriverSideFailure: Event out of sequence: org.apache.reef.runtime.common.driver.task.TaskMessageImpl)>
FailBridgeDriverTest.testFailDriverActiveContext:95->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverAlarm:125->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverAllocatedEvaluator:90->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverCompletedEvaluator:120->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverCompletedTask:115->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverConstructor:80->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverRunningTask:100->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverStart:85->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverStop:130->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeDriverTest.testFailDriverSuspendedTask:110->failOn:69 expected:<FAILED> but was:<FORCE_CLOSED>
FailBridgeDriverTest.testFailDriverTaskMessage:105->failOn:69 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeTaskTest.testFailTask:63->failOn:54 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeTaskTest.testFailTaskCall:68->failOn:54 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeTaskTest.testFailTaskClose:93->failOn:54 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeTaskTest.testFailTaskMsg:73->failOn:54 expected:<FAILED> but was:<FORCE_CLOSED>
FailBridgeTaskTest.testFailTaskStart:83->failOn:54 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailBridgeTaskTest.testFailTaskStop:88->failOn:54 expected:<FAILED> but was:<FORCE_CLOSED>
FailBridgeTaskTest.testFailTaskSuspend:78->failOn:54 Unexpected error: java.lang.IllegalStateException: Unable to start driver client
FailDriverTest.testDriverCompleted:131 expected:<COMPLETED> but was:<FORCE_CLOSED>
FailDriverTest.testFailDriverCompletedEvaluator:112->failOn:64 Unexpected error: org.apache.reef.tests.library.exceptions.DriverSideFailure: Event out of sequence: org.apache.reef.runtime.common.driver.task.TaskMessageImpl
FailDriverTest.testFailDriverConstructor:72->failOn:64 expected:<FAILED> but was:<FORCE_CLOSED>
FailDriverTest.testFailDriverSuspendedTask:102->failOn:64 expected:<FAILED> but was:<FORCE_CLOSED>
FailTaskTest.testFailTaskMsg:71->failOn:52 expected:<FAILED> but was:<FORCE_CLOSED>
FailTaskTest.testFailTaskStop:86->failOn:52 expected:<FAILED> but was:<FORCE_CLOSED>
I will keep reviewing the code, but please feel free to push the fixes to this PR
There is a file being generated during the build: |
This Jira introduces a new Java bridge for Drivers implemented in alternative languages. It provides the following artifacts (note: client driver refers to the application driver implemented in an alternative programming language): 1. A generic framework for passing information between the Java driver and the client driver. 2. A gRPC based implementation of the bridge that passes information via protocol buffers over gRPC. 3. Protocol buffer definitions for all information that flows between the Java driver and the client driver. 4. A Java implementation of the driver client that can be used for developing unit tests and serve as a template for implementing a driver client (say in C#). 5. Test cases to fail based unit tests that cover the Java bridge and client. Pull Request: Closes apache#1466
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few nit picks, will continue tomorrow
ContextInfo context = 5; | ||
|
||
// Possible exception encountered in task execution. | ||
ExceptionInfo exception = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have ExceptionInfo
as part of the ContextInfo
- do we need them both? is there a way to avoid such duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe both are warranted due to the context of the exception being specific to a context or a task. More specifically, when I create a FailedTask, I'd rather not initialize its exception from the exception in the ContextInfo.
|
||
message FailureInfo { | ||
string message = 1; | ||
repeated string failedContexts = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be consistent in naming - i.e. stick to either camel case or underscores/snake_case for all IDs. I guess underscores are standard for gRPC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this particular field should probably be failed_contexts_ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
|
||
bool relax_locality = 7; | ||
|
||
string runtime_name = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DriverClientConfiguration.runtime
is an oneof
field, and has a predefined set of runtimes; that is, we already have a hardcoded set of runtimes at the client. Should we use a fixed enum
for runtimes here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As stated, this makes sense to me. However, it creates complication when mapping the protocol buffer enum to the appropriate string in Java and C#. Ideally, this change would also include changing the internal java and C# types for runtime name to enums as well, so that the translation can be coded more robustly i.e., a compiler error is generated if the enum name changes on either end, or if a new runtime name is introduced (compiler warning).
Proposal: How about we create a Jira that chances the runtime name in all scenarios protocol buffers, C# and Java types? I suggest this because changing the runtime name type from strings to enums in Java and C# is not be relevant to this PR, which is dealing with the current codebase and all its design flaws :)
@markusweimer who might want to weigh in on this.
int32 cores = 2; | ||
|
||
// name of the runtime | ||
string runtime_name = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a predefined set of runtimes in ClientProtocol.proto
- should we restrict our values here, too - e.g. using the enum
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above reply to this.
message FailureInfo { | ||
string message = 1; | ||
repeated string failedContexts = 2; | ||
string failedTaskId = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming conventions - maybe use failed_task_id
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, I'll go back and clean these up.
} | ||
|
||
message ClientMessageInfo { | ||
bytes payload = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have raw payload fields here and in the TaskMessageInfo
. Should we wrap it into some more abstract message - say, TaskMessage
(with a payload
field inside), and use it in both TaskMessageInfo
and ClientMessageInfo
? (or use TaskMessage
instead of ClientMessageInfo
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure TaskMessage makes since in the context of a ClientMessage. I'm not sure I understand the problem with using raw bytes payload.
message RunningTaskRequest { | ||
string task_id = 1; | ||
|
||
bytes message = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - use TaskMessage
instead of raw bytes message
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear on why using raw bytes is a problem...
try { | ||
content = new String(Files.readAllBytes(Paths.get(args[0]))); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of catch and throw here, just add IOException
to throws
clause in main()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto = | ||
driverClientConfigurationProtoBuilder.build(); | ||
final LauncherStatus status = launch(driverClientConfigurationProto); | ||
LOG.log(Level.INFO, "Status: " + status.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to call .toString()
explicitly; also, use string interpolation for logging, i.e.
LOG.log(Level.INFO, "Status: {0}", status);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indeed.
try { | ||
this.driverClientService.start(); | ||
this.driverClientService.awaitTermination(); | ||
} catch (IOException | InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more nit picks. Will continue tomorrow!
final File driverClientConfigurationFile = File.createTempFile("driverclient", ".conf"); | ||
try { | ||
// Write driver client configuration to a file | ||
final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to call Tang.Factory.getTang()
again - we already have a global TANG
object
} | ||
|
||
private static IDriverLauncher getLocalDriverServiceLauncher() throws InjectionException { | ||
final Configuration localJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and everywhere in the file: use the global TANG
object
default: | ||
throw new RuntimeException("Unknown runtime"); | ||
} | ||
} catch (final BindException | InjectionException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BindException
is actually never thrown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we can throw InjectionException
out of launch()
without wrapping it into RuntimeException
*/ | ||
public static void main(final String[] args) throws InvalidProtocolBufferException { | ||
if (args.length != 1) { | ||
LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation ({0}
)
public void onNext(final String alarmId) { | ||
LOG.log(Level.INFO, "Alarm {0} triggered", alarmId); | ||
if (this.alarmMap.containsKey(alarmId)) { | ||
final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map.remove()
returns null
if the key is not found. That is, we can do without .containsKey()
call:
final ClientAlarm clientAlarm = this.alarmMap.remove(alarmId);
if (clientAlarm != null) {
clientAlarm.run();
// ...
Optional.of(this.addFileList), | ||
this.addLibraryList.size() == 0 ? | ||
Optional.<List<File>>empty() : | ||
Optional.of(this.addLibraryList)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
.setException(ExceptionInfo.newBuilder() | ||
.setName(ex.getCause() != null ? ex.getCause().toString() : ex.toString()) | ||
.setMessage(ex.getMessage() == null ? ex.toString() : ex.getMessage()) | ||
.setData(ByteString.copyFrom(exceptionCodec.toBytes(ex))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a common pattern. Should we add a helper function for it? like,
public static ExceptionInfo exceptionInfo(final Throwable ex) // ...
.build()); | ||
try { | ||
callComplete.get(5, TimeUnit.SECONDS); | ||
} catch (ExecutionException | TimeoutException | InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
.build()) | ||
.build()); | ||
try { | ||
callComplete.get(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make gRPC timeout configurable?
try { | ||
callComplete.get(5, TimeUnit.SECONDS); | ||
} catch (ExecutionException | TimeoutException | InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a string description, e.g.
throw new RuntimeException("Cannor register DriverClient", e);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more nitpicks - will continue tomorrow
} catch (Throwable t) { | ||
return t; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
*/ | ||
public final class Builder extends EvaluatorRequest.Builder<DriverClientEvaluatorRequestor.Builder> { | ||
@Override | ||
public synchronized void submit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it have to be synchronized
at this point?
|
||
return TANG.newInjector(clockArgConfig).getInstance(JavaDriverClientLauncher.class); | ||
|
||
} catch (final BindException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BindException
is actually never thrown
.build(), | ||
TANG.newConfigurationBuilder() | ||
.bindNamedParameter(ClockConfigurationPath.class, clockConfigPath) | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent the two lines above for readability
throw fatal("Unable run clock.", t); | ||
} | ||
} | ||
} catch (InjectionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
final ActiveContextBridge context = this.activeContextBridgeMap.remove(request.getContextId()); | ||
final Optional<ActiveContext> parent = context.getParentId().isPresent() ? | ||
Optional.<ActiveContext>of(this.activeContextBridgeMap.get(context.getParentId().get())) : | ||
Optional.<ActiveContext>empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above:
final Optional<ActiveContext> parent = Optional.<ActiveContext>ofNullable(
this.activeContextBridgeMap.get(context.getParentId().get()));
Optional.<ActiveContext>empty(); | ||
final Optional<Throwable> reason = !request.getException().getData().isEmpty() ? | ||
this.exceptionCodec.fromBytes(request.getException().getData().toByteArray()) : | ||
Optional.<Throwable>empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why DefaultExceptionCodec
does not accept null and byte[0]
input, like this:
@Override
public Optional<Throwable> fromBytes(final byte[] bytes) {
try {
if (bytes != null && bytes.length > 0) {
return Optional.of((Throwable) SerializationUtils.deserialize(bytes));
}
} catch (final SerializationException | IllegalArgumentException e) {
LOG.log(Level.WARNING, "Unable to deserialize a Throwable.", e);
}
return Optional.empty();
}
That would allow us to write
final Optional<Throwable> reason =
this.exceptionCodec.fromBytes(request.getException().getData().toByteArray());
final Optional<ActiveContext> context = | ||
this.activeContextBridgeMap.containsKey(request.getContext().getContextId()) ? | ||
Optional.<ActiveContext>of(this.activeContextBridgeMap.get(request.getContext().getContextId())) : | ||
Optional.<ActiveContext>empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can simplify and reduce the number of trips to the map, e.g.
ActiveContextBridge context = null;
if (request.hasContext()) {
final String contextId = request.getContext().getContextId();
context = this.activeContextBridgeMap.get(contextId);
if (context == null) {
context = toActiveContext(request.getContext());
this.activeContextBridgeMap.put(contextId, context);
}
}
and then pass Optional.<ActiveContext>ofNullable(context)
to constructor of the FailedTask
final ContextInfo contextInfo = request.getContext(); | ||
if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) { | ||
this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one less trip to the map:
final ContextInfo contextInfo = request.getContext();
final String contextId = contextInfo.getContextId();
ActiveContextBridge context = this.activeContextBridgeMap.get(contextId);
if (context == null) {
context = toActiveContext(contextInfo);
this.activeContextBridgeMap.put(contextId, context);
}
final ContextInfo contextInfo = request.getContext(); | ||
if (!this.activeContextBridgeMap.containsKey(contextInfo.getContextId())) { | ||
this.activeContextBridgeMap.put(contextInfo.getContextId(), toActiveContext(contextInfo)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above re: number of trips to the map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with the DriverClientSource
. Will post more comments soon
request.getTaskId(), | ||
context, | ||
request.getResult() != null && !request.getResult().isEmpty() ? | ||
request.getResult().toByteArray() : null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a common task - maybe have a utility function like
public static byte[] toByteArray(final ByteString bs) {
return bs == null || bs.isEmpty() ? null : bs.toByteArray();
}
so we can write
GRPCUtils.toByteArray(request.getResult())
(and, maybe, also have versions to handle/produce Optional
data?)
} finally { | ||
responseObserver.onNext(null); | ||
responseObserver.onCompleted(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just realized that we can have a small wrapper to make sure StreamObserver
always gets closed:
public static class ObserverCleanup<T> implements AutoCloseable {
private final StreamObserver<T> observer;
private final T nextValue;
public static <V> ObserverCleanup<V> of(
final StreamObserver<V> observer) {
return of(observer, null);
}
public static <V> ObserverCleanup<V> of(
final StreamObserver<V> observer, final V nextValue) {
return new ObserverCleanup<>(observer, nextValue);
}
private ObserverCleanup(final StreamObserver<T> observer, final T nextValue) {
this.observer = observer;
this.nextValue = nextValue;
}
@Override
public void close() {
this.observer.onNext(this.nextValue);
this.observer.onCompleted();
}
}
then our try
block will look like this:
try (final ObserverCleanup _cleanup = ObserverCleanup.of(responseObserver)) {
// ...
}
final ContextInfo request, | ||
final StreamObserver<Void> responseObserver) { | ||
try { | ||
LOG.log(Level.INFO, "Driver restarted active context " + request.getContextId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation
final TaskInfo request, | ||
final StreamObserver<Void> responseObserver) { | ||
try { | ||
LOG.log(Level.INFO, "Driver restarted running task " + request.getTaskId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use {0}
request.getEvaluatorId(), | ||
request.getFailure() != null ? | ||
new EvaluatorException(request.getFailure().getMessage()) : | ||
new EvaluatorException("restart failed"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or,
new EvaluatorException(request.getFailure() == null ?
"restart failed" : request.getFailure().getMessage())
private boolean isIdle() { | ||
LOG.log(Level.INFO, "Clock idle {0}, outstanding evaluators {1}, current evaluators {2}", | ||
new Object[] { | ||
this.clock.get().isIdle(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix the indentation
this.driverServiceClient, | ||
contextInfo.getContextId(), | ||
StringUtils.isNotEmpty(contextInfo.getParentId()) ? | ||
Optional.of(contextInfo.getParentId()) : Optional.<String>empty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably have a special method for strings, say Optional.ofString()
responseObserver.onNext(null); | ||
responseObserver.onCompleted(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we do so many similar lookups in activeContextBridgeMap
, that we can factor out the whole process into a helper method, e.g.
private ActiveContextBridge addContextIfMissing(final ContextInfo contextInfo) {
final String contextId = contextInfo.getContextId();
ActiveContextBridge context = this.activeContextBridgeMap.get(contextId);
if (context == null) {
context = toActiveContext(contextInfo);
this.activeContextBridgeMap.put(contextId, context);
}
return context;
}
Then, with all other helpers mentioned earlier, our method would become just
@Override
public void completedTaskHandler(
final TaskInfo request, final StreamObserver<Void> responseObserver) {
final ActiveContextBridge context = this.addContextIfMissing(request.getContext());
LOG.log(Level.INFO, "Completed task id {0}", request.getTaskId());
try (final ObserverCleanup _cleanup = ObserverCleanup.of(responseObserver)) {
this.clientDriverDispatcher.get().dispatch(new CompletedTaskBridge(
request.getTaskId(), context, toByteArray(request.getResult())));
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more nit picks and suggestions. To be continued!
final ConfigurationSerializer configurationSerializer, | ||
final ExceptionCodec exceptionCodec, | ||
@Parameter(DriverServicePort.class) final Integer driverServicePort, | ||
@Parameter(DriverRegistrationTimeout.class) final Integer driverRegistrationTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for boxing - use int
.setTaskId(taskId) | ||
.setOperation(RunningTaskRequest.Operation.SUSPEND) | ||
.build()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All three task operations are very similar - maybe it makes sense to factor out the common part into a helper function? say,
private void taskOp(final RunningTaskRequest.Operation op, final String taskId, final byte[] message) {
final RunningTaskRequest.Builder request = RunningTaskRequest.newBuilder().setTaskId(taskId).setOperation(op);
if (message != null && message.length > 0) {
request.setMessage(ByteString.copyFrom(message));
}
this.serviceStub.runningTaskOp(request.build());
}
@Override
public void onTaskClose(final String taskId, final Optional<byte[]> message) {
this.taskOp(RunningTaskRequest.Operation.CLOSE, taskId, message.orElse(null));
}
@Override
public void onTaskMessage(final String taskId, final byte[] message) {
this.taskOp(RunningTaskRequest.Operation.SEND_MESSAGE, taskId, message);
}
@Override
public void onSuspendTask(final String taskId, final Optional<byte[]> message) {
this.taskOp(RunningTaskRequest.Operation.SUSPEND, taskId, message.orElse(null));
}
?
.run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); | ||
} catch (InjectionException e) { | ||
throw fatal("local runtime", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can throw InjectionException
from launch()
- it is being handled in every invocation context anyway. Then we don't need try/catch/fatal/LOG here at all
private static final Logger LOG = Logger.getLogger(YarnLauncher.class.getName()); | ||
|
||
@Inject | ||
private YarnLauncher(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add space before {
|
||
} catch (final Exception e) { | ||
throw fatal("Failed to initialize configurations.", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as with LocalLauncher
: throw InjectionException
out of the method and remove all error handling, including fatal()
} | ||
final LauncherStatus status = DriverLauncher.getLauncher(yarnConfiguration) | ||
.run(driverConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); | ||
LOG.log(Level.INFO, "Job complete status: " + status.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation, remove .toString()
call
LOG.log(Level.INFO, "Job complete status: " + status.toString()); | ||
if (status.getError().isPresent()) { | ||
LOG.log(Level.SEVERE, status.getError().get().getMessage()); | ||
status.getError().get().printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two lines above can be just
LOG.log(Level.SEVERE, "Job completed with error", status.getError().get());
this will print the exception and the stack trace to the log
try { | ||
return DriverLauncher.getLauncher(generateConfigurationFromJobSubmissionParameters(driverClientConfiguration)) | ||
.run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); | ||
} catch (InjectionException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let InjectionException
fly out of the launch()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more nit picks
.run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); | ||
} | ||
|
||
private static RuntimeException fatal(final String msg, final Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need that method anymore
*/ | ||
public final class LocalLauncher implements IDriverLauncher { | ||
|
||
private static final Logger LOG = Logger.getLogger(LocalLauncher.class.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we remove fatal()
method, we don't need the logger, either.
} | ||
final LauncherStatus status = DriverLauncher.getLauncher(yarnConfiguration) | ||
.run(driverConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); | ||
LOG.log(Level.INFO, "Job complete status: {0}" + status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, replace +
with ,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's another chunk of comments. stay tuned! 😄
if (wasCalledViaHTTP || retry >= maxNumberOfRetries) { | ||
// No alarm necessary anymore. | ||
LOG.log(Level.INFO, | ||
"Not scheduling additional alarms after {0} out of max {1} retries. The HTTP handles was called: ", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are three parameters and only two interpolation placeholders. I guess we should have {2}
at the end?
.start(); | ||
LOG.info("Server started, listening on " + port); | ||
break; | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
.addService(new DriverBridgeServiceImpl()) | ||
.build() | ||
.start(); | ||
LOG.info("Server started, listening on " + port); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation
} | ||
|
||
private void start() throws IOException, InterruptedException { | ||
for (final Integer port : this.tcpPortProvider) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use int
instead of a boxed value
} | ||
if (this.server == null || this.server.isTerminated()) { | ||
throw new IOException("Unable to start gRPC server"); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for explicit else
block
.setData(ByteString.copyFrom(exceptionCodec.toBytes(reason))) | ||
.build()); | ||
} else if (context.getData().isPresent()) { | ||
contextInfoBuilder.setException(ExceptionInfo.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This represents an exception thrown from an alternative language that we cannot decode. GRPCUtils.createExceptionInfo deals with java only exceptions.
.build()); | ||
} else { | ||
final Throwable reason = context.asError(); | ||
contextInfoBuilder.setException(ExceptionInfo.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same. we have to grep the code for all places where we still use ExceptionInfo.newBuilder()
instead of .createExceptionInfo()
.setContextId(task.getActiveContext().get().getId()) | ||
.setEvaluatorId(task.getActiveContext().get().getEvaluatorId()) | ||
.setParentId(task.getActiveContext().get().getParentId().isPresent() ? | ||
task.getActiveContext().get().getParentId().get() : "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
.setParentId(task.getActiveContext().get().getParentId().orElse(""))
} | ||
if (task.getReason().isPresent()) { | ||
final Throwable reason = task.getReason().get(); | ||
taskInfoBuilder.setException(ExceptionInfo.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use GRPCUtils.createExceptionInfo()
.setContextId(task.getActiveContext().getId()) | ||
.setEvaluatorId(task.getActiveContext().getEvaluatorId()) | ||
.setParentId(task.getActiveContext().getParentId().isPresent() ? | ||
task.getActiveContext().getParentId().get() : "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use .orElse("")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more nit picks before the weekend 😄
.setParentId(context.getParentId().orElse("")) | ||
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo( | ||
context.getEvaluatorDescriptor())) | ||
.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like pretty common operation. should we have a method like
private static ContextInfo toContextInfo(final ContextBase context) {
return toContextInfo(context, null);
}
private static ContextInfo toContextInfo(final ContextBase context, final ExceptionInfo error) {
final ContextInfo.Builder builder = ContextInfo.newBuilder()
.setContextId(context.getId())
.setEvaluatorId(context.getEvaluatorId())
.setParentId(context.getParentId().orElse(""))
.setEvaluatorDescriptorInfo(toEvaluatorDescriptorInfo(
context.getEvaluatorDescriptor()));
if (error != null) {
builder.setException(error);
}
return builder.build();
}
then we can write
@Override
public synchronized void activeContextHandler(final ActiveContext context) {
this.activeContextMap.put(context.getId(), context);
this.clientStub.activeContextHandler(toContextInfo(context));
}
and
@Override
public synchronized void failedContextHandler(final FailedContext context) {
final ContextInfo contextInfo;
if (context.getReason().isPresent()) {
contextInfo = toContextInfo(context,
GRPCUtils.createExceptionInfo(this.exceptionCodec, context.getReason().get()));
} else if (context.getData().isPresent()) {
contextInfo = toContextInfo(context, ExceptionInfo.newBuilder()
.setName(context.toString())
.setMessage(context.getDescription().orElse(
context.getMessage() != null ? context.getMessage() : ""))
.setData(ByteString.copyFrom(context.getData().get()))
.build());
} else {
contextInfo = toContextInfo(context,
GRPCUtils.createExceptionInfo(this.exceptionCodec, context.asError()));
}
this.activeContextMap.remove(context.getId());
this.clientStub.failedContextHandler(contextInfo);
}
and so on? (same, BTW, applies to many other gRPC messages that we have)
return false; | ||
} | ||
|
||
private EvaluatorDescriptorInfo toEvaluatorDescriptorInfo(final EvaluatorDescriptor descriptor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it static
GRPCDriverService.this.clientStub = DriverClientGrpc.newFutureStub(channel); | ||
GRPCDriverService.this.notifyAll(); | ||
} | ||
LOG.log(Level.INFO, "Driver has registered on port " + request.getPort()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation
if (request.hasException()) { | ||
final Optional<Throwable> exception = parseException(request.getException()); | ||
if (exception.isPresent()) { | ||
LOG.log(Level.INFO, "driver exception: " + exception.get().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation, remove explicit .toString()
call
} | ||
} finally { | ||
responseObserver.onNext(null); | ||
responseObserver.onCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here and in other methods below: use
try (final ObserverCleanup _cleanup = ObserverCleanup.of(responseObserver)) {
// ...
}
final StreamObserver<Void> responseObserver) { | ||
synchronized (GRPCDriverService.this) { | ||
if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) { | ||
LOG.log(Level.SEVERE, "Context does not exist with id " + request.getContextId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation (here and everywhere)
final ActiveContextRequest request, | ||
final StreamObserver<Void> responseObserver) { | ||
synchronized (GRPCDriverService.this) { | ||
if (!GRPCDriverService.this.activeContextMap.containsKey(request.getContextId())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one less trip to the map:
final String contextId = request.getContextId();
final ActiveContext context = GRPCDriverService.this.activeContextMap.get(contextId);
if (context == null) {
LOG.log(Level.SEVERE, "Context does not exist with id {0}", contextId);
responseObserver.onError(Status.INTERNAL
.withDescription("Context does not exist with id " + contextId)
.asRuntimeException());
return;
}
// ...
.asRuntimeException()); | ||
} else { | ||
final ActiveContext context = GRPCDriverService.this.activeContextMap.get(request.getContextId()); | ||
if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do
switch (request.getOperationCase()) {
case CLOSE_CONTEXT:
// ...
}
if (request.getOperationCase() == ActiveContextRequest.OperationCase.CLOSE_CONTEXT) { | ||
if (request.getCloseContext()) { | ||
try { | ||
LOG.log(Level.INFO, "closing context " + context.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use string interpolation
context.close(); | ||
} finally { | ||
responseObserver.onNext(null); | ||
responseObserver.onCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also,
try (final ObserverCleanup _cleanup = ObserverCleanup.of(responseObserver)) {
// ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final round of nit picks - finally made it to the end of the PR! 😄
// Set file dependencies | ||
final List<String> localLibraries = new ArrayList<>(); | ||
localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); | ||
if (driverConfiguration.getLocalLibrariesCount() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check for .getLocalLibrariesCount()
- the call
localLibraries.addAll(driverConfiguration.getLocalLibrariesList());
will work correctly (i.e. be a no-op) even when the list is empty
|
||
private Configuration getDriverRestartConfiguration( | ||
final ClientProtocol.DriverClientConfiguration driverConfiguration) { | ||
ConfigurationModule restartConfModule = DriverRestartConfiguration.CONF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be final
builder.setOperatingSystem(ClientProtocol.DriverClientConfiguration.OS.LINUX); | ||
builder.setAzbatchRuntime(ClientProtocol.AzureBatchRuntimeParameters.newBuilder() | ||
.build()); | ||
builder.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just
private static final ClientProtocol.DriverClientConfiguration CLIENT_CONFIG =
ClientProtocol.DriverClientConfiguration.newBuilder()
.setJobid("HelloREEF")
.setEnableHttpDriver(false)
.setOperatingSystem(ClientProtocol.DriverClientConfiguration.OS.LINUX)
.setAzbatchRuntime(ClientProtocol.AzureBatchRuntimeParameters.newBuilder().build())
.addGlobalLibraries(EnvironmentUtils.getClassLocation(HelloDriver.class))
.build();
// Set file dependencies | ||
final List<String> localLibraries = new ArrayList<>(); | ||
localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); | ||
if (driverConfiguration.getLocalLibrariesCount() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove the if
statement: there is no need to check for .getLocalLibrariesCount()
- the call localLibraries.addAll()
will work correctly even when the input list is empty.
|
||
private Configuration getDriverRestartConfiguration( | ||
final ClientProtocol.DriverClientConfiguration driverConfiguration) { | ||
ConfigurationModule restartConfModule = DriverRestartConfiguration.CONF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
.setClassPath(driverClientConfigurationProto.getOperatingSystem() == | ||
ClientProtocol.DriverClientConfiguration.OS.WINDOWS ? | ||
StringUtils.join(classpathProvider.getDriverClasspath(), ";") : | ||
StringUtils.join(classpathProvider.getDriverClasspath(), ":")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or,
.setClassPath(StringUtils.join(classpathProvider.getDriverClasspath(),
driverClientConfigurationProto.getOperatingSystem() ==
ClientProtocol.DriverClientConfiguration.OS.WINDOWS ? ";" : ":"))
ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto); | ||
final File driverClientConfigurationFile = File.createTempFile("driverclient", ".conf"); | ||
// Write driver client configuration to a file | ||
final Injector driverClientInjector = Tang.Factory.getTang().newInjector(driverClientConfiguration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's already a static TANG
object
GRPCDriverServiceConfigurationProvider.class) | ||
.build()) | ||
.getInstance(IDriverServiceConfigurationProvider.class); | ||
return driverServiceConfigurationProvider.getDriverServiceConfiguration(builder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe,
final ClientProtocol.DriverClientConfiguration driverServiceConfiguration =
ClientProtocol.DriverClientConfiguration.newBuilder(driverClientConfigurationProto)
.setDriverClientLaunchCommand(cmd)
.addLocalFiles(driverClientConfigurationFile.getAbsolutePath())
.build();
return driverServiceConfigurationProvider.getDriverServiceConfiguration(driverServiceConfiguration);
@motus Just finished with your final round of reviews. Thank you for helping improve the code quality! Please let me know if you have any further change requests. If not, then perhaps we can roll this into master, possibly after REEF-2024 if you like it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a surface skim for issues with docs, APIs and project structure. Please have a look and generalize my comments to other parts of the PR as appropriate.
|
||
package driverbridge; | ||
|
||
message LocalRuntimeParameters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation to the new message types introduced in this PR.
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory> | ||
|
||
<!-- library versions --> | ||
<maven.assembly>3.1.0</maven.assembly> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please push all of these into the parent pom? Also, please add the dependencies to <dependencyManagement>
in that parent POM and drop the versions in here.
/** | ||
* Driver Service Launcher - main class. | ||
*/ | ||
public final class DriverServiceLauncher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all new public
types: Please decide whether they truly need to be public
. If so, decide whether they create new APIs of REEF. If not, mark them @Private
, not to introduce new support surface.
throw new RuntimeException("Do not instantiate this class!"); | ||
} | ||
|
||
public static LauncherStatus submit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document all public
types & members with JavaDoc
} | ||
|
||
/** | ||
* Main method that launches the REEF job. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this launch the job or the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking they are the same thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point now. This is launching the REEF job, which is a DriverService + application DriverClient. I reuse the REEFLauncher to start the DriverService.Driver, which in turn will fire up the application DriverClient. Please let me know if you think things could be more clear.
*/ | ||
public static void main(final String[] args) throws IOException { | ||
if (args.length != 1) { | ||
LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it also fail here?
/** | ||
* Driver client configuration. | ||
*/ | ||
public final class DriverClientConfiguration extends ConfigurationModuleBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider marking this, and other legitimately public
new APIs @Unstable
to mark their status of development.
/** | ||
* A 'hello REEF' Task. | ||
*/ | ||
public final class HelloTask implements Task { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a copy of the Driver / Task? I'd like to see a re-use of existing examples to validate that existing REEF apps can be ported over to this new runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The driver adds a couple of extra event handlers, but I see your point and will target reef-examples/Driver/Task implementations.
This Jira introduces a new Java bridge for Drivers implemented in alternative languages. It provides the following artifacts (note: client driver refers to the application driver implemented in an alternative programming language): 1. A generic framework for passing information between the Java driver and the client driver. 2. A gRPC based implementation of the bridge that passes information via protocol buffers over gRPC. 3. Protocol buffer definitions for all information that flows between the Java driver and the client driver. 4. A Java implementation of the driver client that can be used for developing unit tests and serve as a template for implementing a driver client (say in C#). 5. Test cases to fail based unit tests that cover the Java bridge and client. Pull Request: Closes apache#1466
@motus @markusweimer FYI: finished merging with #1467. Let me know if you guys think this PR is ready to go. I'd like to start syncing with another branch containing the C# side of the bridge. |
@tcondie I will take another pass thorough the code now, and post my approval here. @markusweimer, I think the sheer size of the change requires a second pair of eyeballs to review this PR before I merge it. WDYT? |
@motus I did a surface scan for maintainability issues. Assuming they are addressed, I am fine with merging this. |
Thanks @markusweimer for looking at it. I am running the tests now and doing a final scan over the code, and will merge it soon afterwards. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few more minor nit picks. about 50% through the final review
ClientProtocol.DriverClientConfiguration.newBuilder(); | ||
JsonFormat.parser() | ||
.usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry()) | ||
.merge(content, driverClientConfigurationProtoBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe,
try (final Reader reader = new FileReader(args[0])) {
JsonFormat.parser()
.usingTypeRegistry(JsonFormat.TypeRegistry.getEmptyTypeRegistry())
.merge(reader, driverClientConfigurationProtoBuilder);
}
(otherwise I am not sure about encoding from bytes to String)
public static void main(final String[] args) throws IOException { | ||
if (args.length != 1) { | ||
LOG.log(Level.SEVERE, DriverServiceLauncher.class.getName() + | ||
" accepts single argument referencing a file that contains a client protocol buffer driver configuration"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to print the class name - it is included in the log message anyway. I would just write something like
LOG.log(Level.SEVERE,
"Expected a single command line argument with a file containing client protobuf driver configuration");
LOG.log(Level.SEVERE, "Job configuration error", ex); | ||
throw new RuntimeException(ex); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can actually remove some code duplication here. e.g. instead of having three functions get*DriverServiceLauncher()
, we can write something like this:
private static LauncherStatus launch(
final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto)
throws InjectionException {
final ClientProtocol.DriverClientConfiguration.RuntimeCase runtime =
driverClientConfigurationProto.getRuntimeCase();
final Class<? extends IDriverLauncher> launcherClass;
switch (runtime) {
case YARN_RUNTIME:
launcherClass = YarnLauncher.class;
break;
case LOCAL_RUNTIME:
launcherClass = LocalLauncher.class;
break;
case AZBATCH_RUNTIME:
launcherClass = AzureBatchLauncher.class;
break;
default:
throw new RuntimeException("Unknown runtime: " + runtime);
}
final Configuration jobSubmissionClientConfig = TANG.newConfigurationBuilder()
.bindImplementation(IDriverLauncher.class, launcherClass)
.bindImplementation(IDriverServiceConfigurationProvider.class,
GRPCDriverServiceConfigurationProvider.class)
.build();
final IDriverLauncher driverServiceLauncher =
TANG.newInjector(jobSubmissionClientConfig).getInstance(launcherClass);
return driverServiceLauncher.launch(driverClientConfigurationProto);
}
*/ | ||
@Private | ||
@DefaultImplementation(DriverClientClock.class) | ||
public interface IAlarmDispatchHandler extends EventHandler<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefix I
for interfaces is standard in C#, but not in Java. Our coding conventions are:
- Do not use one-letter prefixes; name interfaces as some general entities they represent, e.g.
Failure
,AlarmDispatchHandler
- Prefix abstract classes with
Abstract
, e.g.AbstractFailure
- Use single letters
T
,U
,V
for generic type parameters; in rare cases it is OK to give prefixT
to such arguments, e.g.Map<TKey, TValue>
this.clock = clock; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra blank line. run the code formatter to make sure there are no double blank lines, empty lines before closing }
, trailing spaces etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more comments regarding C#-style naming for Java interfaces
*/ | ||
@Private | ||
@DefaultImplementation(DriverClientService.class) | ||
public interface IDriverClientService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed few more C#-style namings in Java. I would call the interface DriverClientService
, and the implementation DefaultDriverClientService
or DriverClientServiceImpl
*/ | ||
@Private | ||
@DefaultImplementation(DriverServiceClient.class) | ||
public interface IDriverServiceClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above. use DriverServiceClient
for the interface, and e.g. DefaultDriverServiceClient
for the implementation
* All driver launchers implement this method. | ||
*/ | ||
@Unstable | ||
public interface IDriverLauncher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above. I would call the interface DriverLauncher
, but this name is already taken by another class at the client side. Ideally, we should've renamed the existing DriverLauncher
into something more appropriate (say, ReefClient
) and use the DriverLauncher
name here. Unfortunately, renaming the existing files is not an option for this PR, so we have to come up with a better name for our interface. I really don't know what to suggest - maybe, BridgeDriverLauncher
(reflecting the fact that its method takes a protobuf config)?
* Interface implemented by a Driver Service. | ||
*/ | ||
@Private | ||
public interface IDriverService extends DriverIdlenessSource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. I would just call it DriverService
*/ | ||
@Private | ||
@DefaultImplementation(GRPCDriverServiceConfigurationProvider.class) | ||
public interface IDriverServiceConfigurationProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above. remove the I
from the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I am done with my final round! I've left a few very minor comments. Unfortunately, now that we are on gitbox, I cannot fix these issues myself, rebase, and push everything to the master. Hence those petty comments. Once they are fixed, I'll be happy to test and merge this PR immediately.
|
||
@Override | ||
public List<String> getCommandLine() { | ||
throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an error message, e.g. "Getting command line on the client is not supported."
or something
|
||
private final ActiveContext context; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra blank line
public List<String> getOptions() { | ||
return this.optionList; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extra blank line
.bindNamedParameter(DriverServicePort.class, DRIVER_SERVICE_PORT) | ||
.bindNamedParameter(DriverRegistrationTimeout.class, DRIVER_CLIENT_REGISTRATION_TIMEOUT) | ||
.build(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line
} | ||
return builder.build(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line
*/ | ||
private final DispatchingEStage driverRestartDispatcher; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
.bindSetEntry(DriverIdleSources.class, DriverService.class) | ||
.build(); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
private GRPCDriverServiceConfigurationProvider() { | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
|
||
package org.apache.reef.tests.fail.driver; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
return TestDriverLauncher.getLauncher(runtimeConfig).run(driverServiceConfiguration, timeOut); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
This Jira introduces a new Java bridge for Drivers implemented in alternative languages. It provides the following artifacts (note: client driver refers to the application driver implemented in an alternative programming language): 1. A generic framework for passing information between the Java driver and the client driver. 2. A gRPC based implementation of the bridge that passes information via protocol buffers over gRPC. 3. Protocol buffer definitions for all information that flows between the Java driver and the client driver. 4. A Java implementation of the driver client that can be used for developing unit tests and serve as a template for implementing a driver client (say in C#). 5. Test cases to fail based unit tests that cover the Java bridge and client. Pull Request: Closes apache#1466
This Jira introduces a new Java bridge for Drivers implemented in alternative languages. It provides the following artifacts (note: client driver refers to the application driver implemented in an alternative programming language): 1. A generic framework for passing information between the Java driver and the client driver. 2. A gRPC based implementation of the bridge that passes information via protocol buffers over gRPC. 3. Protocol buffer definitions for all information that flows between the Java driver and the client driver. 4. A Java implementation of the driver client that can be used for developing unit tests and serve as a template for implementing a driver client (say in C#). 5. Test cases to fail based unit tests that cover the Java bridge and client. Pull Request: Closes apache#1466
This Jira introduces a new Java bridge for Drivers implemented in alternative languages. It provides the following artifacts (note: client driver refers to the application driver implemented in an alternative
programming language):
Pull Request:
Closes #