diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index f57dab2f87..06bf5a348d 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -57,6 +57,43 @@ static void MarshalErrorToJava ( env->SetObjectField(jerrorInfo, fieldID, jexceptionString); } +void populateJavaBridgeHandlerManager(JNIEnv * env, jobject jbridgeHandlerManager, BridgeHandlerManager^ bridgeHandlerManager) { + jclass cls = env->GetObjectClass(jbridgeHandlerManager); + jmethodID jsetAllocatedEvaluatorHandlerMid = env->GetMethodID(cls, "setAllocatedEvaluatorHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetAllocatedEvaluatorHandlerMid, bridgeHandlerManager->AllocatedEvaluatorHandler); + jmethodID jsetActiveContextHandlerMid = env->GetMethodID(cls, "setActiveContextHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetActiveContextHandlerMid, bridgeHandlerManager->ActiveContextHandler); + jmethodID jsetTaskMessageHandlerMid = env->GetMethodID(cls, "setTaskMessageHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetTaskMessageHandlerMid, bridgeHandlerManager->TaskMessageHandler); + jmethodID jsetFailedTaskHandlerMid = env->GetMethodID(cls, "setFailedTaskHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetFailedTaskHandlerMid, bridgeHandlerManager->FailedTaskHandler); + jmethodID jsetFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setFailedEvaluatorHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetFailedEvaluatorHandlerMid, bridgeHandlerManager->FailedEvaluatorHandler); + jmethodID jsetHttpServerEventHandlerMid = env->GetMethodID(cls, "setHttpServerEventHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetHttpServerEventHandlerMid, bridgeHandlerManager->HttpServerHandler); + jmethodID jsetCompletedTaskHandlerMid = env->GetMethodID(cls, "setCompletedTaskHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetCompletedTaskHandlerMid, bridgeHandlerManager->CompletedTaskHandler); + jmethodID jsetRunningTaskHandlerMid = env->GetMethodID(cls, "setRunningTaskHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetRunningTaskHandlerMid, bridgeHandlerManager->RunningTaskHandler); + jmethodID jsetSuspendedTaskHandlerMid = env->GetMethodID(cls, "setSuspendedTaskHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetSuspendedTaskHandlerMid, bridgeHandlerManager->SuspendedTaskHandler); + jmethodID jsetCompletedEvaluatorHandlerMid = env->GetMethodID(cls, "setCompletedEvaluatorHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetCompletedEvaluatorHandlerMid, bridgeHandlerManager->CompletedEvaluatorHandler); + jmethodID jsetClosedContextHandlerMid = env->GetMethodID(cls, "setClosedContextHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetClosedContextHandlerMid, bridgeHandlerManager->ClosedContextHandler); + jmethodID jsetFailedContextHandlerMid = env->GetMethodID(cls, "setFailedContextHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetFailedContextHandlerMid, bridgeHandlerManager->FailedContextHandler); + jmethodID jsetContextMessageHandlerMid = env->GetMethodID(cls, "setContextMessageHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetContextMessageHandlerMid, bridgeHandlerManager->ContextMessageHandler); + jmethodID jsetDriverRestartActiveContextHandlerMid = env->GetMethodID(cls, "setDriverRestartActiveContextHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartActiveContextHandlerMid, bridgeHandlerManager->DriverRestartActiveContextHandler); + jmethodID jsetDriverRestartRunningTaskHandlerMid = env->GetMethodID(cls, "setDriverRestartRunningTaskHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartRunningTaskHandlerMid, bridgeHandlerManager->DriverRestartRunningTaskHandler); + jmethodID jsetDriverRestartCompletedHandlerMid = env->GetMethodID(cls, "setDriverRestartCompletedHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartCompletedHandlerMid, bridgeHandlerManager->DriverRestartCompletedHandler); + jmethodID jsetDriverRestartFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setDriverRestartFailedEvaluatorHandler", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartFailedEvaluatorHandlerMid, bridgeHandlerManager->DriverRestartFailedEvaluatorHandler); +} // Loading Clr Assembly. Note that we do not use ManagerLogger in this method since the // logger assembly needs to be loaded by this method before it can be used. @@ -97,10 +134,10 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_loadClrAsse /* * Class: org_apache_reef_javabridge_NativeInterop * Method: callClrSystemOnStartHandler - * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J + * Signature: (Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V */ -JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler -(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jevaluatorRequestorBridge) { +JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler +(JNIEnv * env, jclass jclassx, jstring dateTimeString, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge) { try { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler"); DateTime dt = DateTime::Now; @@ -108,13 +145,12 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callC String^ strPort = ManagedStringFromJavaString(env, httpServerPort); EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); - array^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge); - return JavaLongArrayFromManagedLongArray(env, handlers); + BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart(dt, strPort, evaluatorRequestorBridge); + populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager); } catch (System::Exception^ ex) { // we cannot get error back to java here since we don't have an object to call back (although we ideally should...) ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler", ex); - return NULL; } } @@ -432,25 +468,24 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemCo } /* -* Class: org_apache_reef_javabridge_NativeInterop -* Method: callClrSystemOnRestartHandler -* Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J -*/ -JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler -(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) { + * Class: org_apache_reef_javabridge_NativeInterop + * Method: callClrSystemOnRestartHandler + * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V + */ +JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler +(JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jbridgeHandlerManager, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) { try { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler"); String^ strPort = ManagedStringFromJavaString(env, httpServerPort); EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); DriverRestartedClr2Java^ driverRestartedBridge = gcnew DriverRestartedClr2Java(env, jdriverRestartedBridge); - array^ handlers = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge); - return JavaLongArrayFromManagedLongArray(env, handlers); + BridgeHandlerManager^ handlerManager = ClrSystemHandlerWrapper::Call_ClrSystemRestartHandler_OnRestart(strPort, evaluatorRequestorBridge, driverRestartedBridge); + populateJavaBridgeHandlerManager(env, jbridgeHandlerManager, handlerManager); } catch (System::Exception^ ex) { // we cannot get error back to java here since we don't have an object to call back (although we ideally should...) ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler", ex); - return NULL; } } @@ -533,7 +568,7 @@ static JNINativeMethod methods[] = { { "clrBufferedLog", "(ILjava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrBufferedLog }, - { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J", + { "callClrSystemOnStartHandler", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler }, { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V", @@ -575,7 +610,7 @@ static JNINativeMethod methods[] = { { "clrSystemContextMessageHandlerOnNext", "(JLorg/apache/reef/javabridge/ContextMessageBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext }, - { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J", + { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/BridgeHandlerManager;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler }, { "clrSystemDriverRestartActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;)V", diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs new file mode 100644 index 0000000000..bf0b04c2d1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Driver.Bridge +{ + /// + /// A class that holds all .NET handles for Java InterOp calls. + /// + public sealed class BridgeHandlerManager + { + public ulong AllocatedEvaluatorHandler { get; internal set; } + + public ulong TaskMessageHandler { get; internal set; } + + public ulong ActiveContextHandler { get; internal set; } + + public ulong FailedTaskHandler { get; internal set; } + + public ulong RunningTaskHandler { get; internal set; } + + public ulong CompletedTaskHandler { get; internal set; } + + public ulong SuspendedTaskHandler { get; internal set; } + + public ulong FailedEvaluatorHandler { get; internal set; } + + public ulong CompletedEvaluatorHandler { get; internal set; } + + public ulong ClosedContextHandler { get; internal set; } + + public ulong FailedContextHandler { get; internal set; } + + public ulong ContextMessageHandler { get; internal set; } + + public ulong DriverRestartActiveContextHandler { get; internal set; } + + public ulong DriverRestartRunningTaskHandler { get; internal set; } + + public ulong DriverRestartCompletedHandler { get; internal set; } + + public ulong DriverRestartFailedEvaluatorHandler { get; internal set; } + + public ulong HttpServerHandler { get; internal set; } + } +} diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs index 5f7b61b17a..381aebae34 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -230,20 +230,7 @@ public static void Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handl } } - //Deprecate, remove after both Java and C# code gets checked in - public static ulong[] Call_ClrSystemStartHandler_OnStart( - DateTime startTime, - IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) - { - IEvaluatorRequestor evaluatorRequestor = new EvaluatorRequestor(evaluatorRequestorClr2Java); - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemStartHandler_OnStart")) - { - LOGGER.Log(Level.Info, "*** Start time is " + startTime); - return GetHandlers(null, evaluatorRequestor); - } - } - - public static ulong[] Call_ClrSystemStartHandler_OnStart( + public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart( DateTime startTime, string httpServerPort, IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java) @@ -260,7 +247,7 @@ public static void Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handl } } - public static ulong[] Call_ClrSystemRestartHandler_OnRestart( + public static BridgeHandlerManager Call_ClrSystemRestartHandler_OnRestart( string httpServerPort, IEvaluatorRequestorClr2Java evaluatorRequestorClr2Java, IDriverRestartedClr2Java driverRestartedClr2Java) @@ -277,7 +264,7 @@ public static void Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handl } } - private static ulong[] GetHandlers(string httpServerPortNumber, IEvaluatorRequestor evaluatorRequestor) + private static BridgeHandlerManager GetHandlers(string httpServerPortNumber, IEvaluatorRequestor evaluatorRequestor) { var injector = BridgeConfigurationProvider.GetBridgeInjector(evaluatorRequestor); diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs index 6e02bff733..d770d3ae53 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs @@ -203,9 +203,9 @@ public class DriverBridge _driverRestartFailedEvaluatorSubscriber = new ClrSystemHandler(); } - public ulong[] Subscribe() + public BridgeHandlerManager Subscribe() { - ulong[] handlers = Enumerable.Repeat(Constants.NullHandler, Constants.HandlersNumber).ToArray(); + var bridgeHandlerManager = new BridgeHandlerManager(); // subscribe to Allocated Evaluator foreach (var handler in _allocatedEvaluatorHandlers) @@ -213,7 +213,7 @@ public ulong[] Subscribe() _allocatedEvaluatorSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IAllocatedEvaluator handler: " + handler); } - handlers[Constants.Handlers[Constants.AllocatedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber); + bridgeHandlerManager.AllocatedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_allocatedEvaluatorSubscriber); // subscribe to TaskMessage foreach (var handler in _taskMessageHandlers) @@ -221,7 +221,7 @@ public ulong[] Subscribe() _taskMessageSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to ITaskMessage handler: " + handler); } - handlers[Constants.Handlers[Constants.TaskMessageHandler]] = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber); + bridgeHandlerManager.TaskMessageHandler = ClrHandlerHelper.CreateHandler(_taskMessageSubscriber); // subscribe to Active Context foreach (var handler in _activeContextHandlers) @@ -229,7 +229,7 @@ public ulong[] Subscribe() _activeContextSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IActiveContext handler: " + handler); } - handlers[Constants.Handlers[Constants.ActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_activeContextSubscriber); + bridgeHandlerManager.ActiveContextHandler = ClrHandlerHelper.CreateHandler(_activeContextSubscriber); // subscribe to Failed Task foreach (var handler in _failedTaskHandlers) @@ -237,7 +237,7 @@ public ulong[] Subscribe() _failedTaskSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IFailedTask handler: " + handler); } - handlers[Constants.Handlers[Constants.FailedTaskHandler]] = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber); + bridgeHandlerManager.FailedTaskHandler = ClrHandlerHelper.CreateHandler(_failedTaskSubscriber); // subscribe to Running Task foreach (var handler in _runningTaskHandlers) @@ -245,7 +245,7 @@ public ulong[] Subscribe() _runningTaskSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IRunningTask handler: " + handler); } - handlers[Constants.Handlers[Constants.RunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber); + bridgeHandlerManager.RunningTaskHandler = ClrHandlerHelper.CreateHandler(_runningTaskSubscriber); // subscribe to Completed Task foreach (var handler in _completedTaskHandlers) @@ -253,7 +253,7 @@ public ulong[] Subscribe() _completedTaskSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to ICompletedTask handler: " + handler); } - handlers[Constants.Handlers[Constants.CompletedTaskHandler]] = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber); + bridgeHandlerManager.CompletedTaskHandler = ClrHandlerHelper.CreateHandler(_completedTaskSubscriber); // subscribe to Suspended Task foreach (var handler in _suspendedTaskHandlers) @@ -261,7 +261,7 @@ public ulong[] Subscribe() _suspendedTaskSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to ISuspendedTask handler: " + handler); } - handlers[Constants.Handlers[Constants.SuspendedTaskHandler]] = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber); + bridgeHandlerManager.SuspendedTaskHandler = ClrHandlerHelper.CreateHandler(_suspendedTaskSubscriber); // subscribe to Failed Evaluator foreach (var handler in _failedEvaluatorHandlers) @@ -269,7 +269,7 @@ public ulong[] Subscribe() _failedEvaluatorSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IFailedEvaluator handler: " + handler); } - handlers[Constants.Handlers[Constants.FailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber); + bridgeHandlerManager.FailedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_failedEvaluatorSubscriber); // subscribe to Completed Evaluator foreach (var handler in _completedEvaluatorHandlers) @@ -277,7 +277,7 @@ public ulong[] Subscribe() _completedEvaluatorSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to ICompletedEvaluator handler: " + handler); } - handlers[Constants.Handlers[Constants.CompletedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber); + bridgeHandlerManager.CompletedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_completedEvaluatorSubscriber); // subscribe to Closed Context foreach (var handler in _closedContextHandlers) @@ -285,7 +285,7 @@ public ulong[] Subscribe() _closedContextSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IClosedContext handler: " + handler); } - handlers[Constants.Handlers[Constants.ClosedContextHandler]] = ClrHandlerHelper.CreateHandler(_closedContextSubscriber); + bridgeHandlerManager.ClosedContextHandler = ClrHandlerHelper.CreateHandler(_closedContextSubscriber); // subscribe to Failed Context foreach (var handler in _failedContextHandlers) @@ -293,7 +293,7 @@ public ulong[] Subscribe() _failedContextSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IFailedContext handler: " + handler); } - handlers[Constants.Handlers[Constants.FailedContextHandler]] = ClrHandlerHelper.CreateHandler(_failedContextSubscriber); + bridgeHandlerManager.FailedContextHandler = ClrHandlerHelper.CreateHandler(_failedContextSubscriber); // subscribe to Context Message foreach (var handler in _contextMessageHandlers) @@ -301,7 +301,7 @@ public ulong[] Subscribe() _contextMessageSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to IContextMesage handler: " + handler); } - handlers[Constants.Handlers[Constants.ContextMessageHandler]] = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber); + bridgeHandlerManager.ContextMessageHandler = ClrHandlerHelper.CreateHandler(_contextMessageSubscriber); // subscribe to Active Context received during driver restart foreach (var handler in _driverRestartActiveContextHandlers) @@ -309,7 +309,7 @@ public ulong[] Subscribe() _driverRestartActiveContextSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to handler for IActiveContext received during driver restart: " + handler); } - handlers[Constants.Handlers[Constants.DriverRestartActiveContextHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber); + bridgeHandlerManager.DriverRestartActiveContextHandler = ClrHandlerHelper.CreateHandler(_driverRestartActiveContextSubscriber); // subscribe to Running Task received during driver restart foreach (var handler in _driverRestartRunningTaskHandlers) @@ -317,7 +317,7 @@ public ulong[] Subscribe() _driverRestartRunningTaskSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to handler for IRunningTask received during driver restart: " + handler); } - handlers[Constants.Handlers[Constants.DriverRestartRunningTaskHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber); + bridgeHandlerManager.DriverRestartRunningTaskHandler = ClrHandlerHelper.CreateHandler(_driverRestartRunningTaskSubscriber); // subscribe to Restart Completed received during driver restart foreach (var handler in _driverRestartCompletedHandlers) @@ -325,7 +325,7 @@ public ulong[] Subscribe() _driverRestartCompletedSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to handler for IRestartCompleted received during driver restart: " + handler); } - handlers[Constants.Handlers[Constants.DriverRestartCompletedHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber); + bridgeHandlerManager.DriverRestartCompletedHandler = ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber); // subscribe to Failed Evaluator received during driver restart foreach (var handler in _driverRestartFailedEvaluatorHandlers) @@ -333,14 +333,14 @@ public ulong[] Subscribe() _driverRestartFailedEvaluatorSubscriber.Subscribe(handler); _logger.Log(Level.Verbose, "subscribed to handler for IFailedEvaluator received during driver restart: " + handler); } - handlers[Constants.Handlers[Constants.DriverRestartFailedEvaluatorHandler]] = ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber); + bridgeHandlerManager.DriverRestartFailedEvaluatorHandler = ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber); // subscribe to Http message _httpServerEventSubscriber.Subscribe(_httpServerHandler); _logger.Log(Level.Verbose, "subscribed to IHttpMessage handler :" + _httpServerHandler); - handlers[Constants.Handlers[Constants.HttpServerHandler]] = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); + bridgeHandlerManager.HttpServerHandler = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); - return handlers; + return bridgeHandlerManager; } /// diff --git a/lang/cs/Org.Apache.REEF.Driver/Constants.cs b/lang/cs/Org.Apache.REEF.Driver/Constants.cs index ef8b87dab5..aea8164ceb 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Constants.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Constants.cs @@ -18,7 +18,6 @@ */ using System; -using System.Collections.Generic; namespace Org.Apache.REEF.Driver { @@ -51,101 +50,6 @@ public class Constants /// public const int DefaultMemoryGranularity = 1024; - /// - /// The number of handlers total. Tightly coupled with Java. - /// - public const int HandlersNumber = 18; - - /// - /// The name for EvaluatorRequestorHandler. Tightly coupled with Java. - /// - public const string EvaluatorRequestorHandler = "EvaluatorRequestor"; - - /// - /// The name for AllocatedEvaluatorHandler. Tightly coupled with Java. - /// - public const string AllocatedEvaluatorHandler = "AllocatedEvaluator"; - - /// - /// The name for CompletedEvaluatorHandler. Tightly coupled with Java. - /// - public const string CompletedEvaluatorHandler = "CompletedEvaluator"; - - /// - /// The name for ActiveContextHandler. Tightly coupled with Java. - /// - public const string ActiveContextHandler = "ActiveContext"; - - /// - /// The name for ClosedContextHandler. Tightly coupled with Java. - /// - public const string ClosedContextHandler = "ClosedContext"; - - /// - /// The name for FailedContextHandler. Tightly coupled with Java. - /// - public const string FailedContextHandler = "FailedContext"; - - /// - /// The name for ContextMessageHandler. Tightly coupled with Java. - /// - public const string ContextMessageHandler = "ContextMessage"; - - /// - /// The name for TaskMessageHandler. Tightly coupled with Java. - /// - public const string TaskMessageHandler = "TaskMessage"; - - /// - /// The name for FailedTaskHandler. Tightly coupled with Java. - /// - public const string FailedTaskHandler = "FailedTask"; - - /// - /// The name for RunningTaskHandler. Tightly coupled with Java. - /// - public const string RunningTaskHandler = "RunningTask"; - - /// - /// The name for FailedEvaluatorHandler. Tightly coupled with Java. - /// - public const string FailedEvaluatorHandler = "FailedEvaluator"; - - /// - /// The name for CompletedTaskHandler. Tightly coupled with Java. - /// - public const string CompletedTaskHandler = "CompletedTask"; - - /// - /// The name for SuspendedTaskHandler. Tightly coupled with Java. - /// - public const string SuspendedTaskHandler = "SuspendedTask"; - - /// - /// The name for HttpServerHandler. Tightly coupled with Java. - /// - public const string HttpServerHandler = "HttpServerHandler"; - - /// - /// The name for DriverRestartActiveContextHandler. Tightly coupled with Java. - /// - public const string DriverRestartActiveContextHandler = "DriverRestartActiveContext"; - - /// - /// The name for DriverRestartRunningTaskHandler. Tightly coupled with Java. - /// - public const string DriverRestartRunningTaskHandler = "DriverRestartRunningTask"; - - /// - /// The name for DriverRestartCompletedHandler. Tightly coupled with Java. - /// - public const string DriverRestartCompletedHandler = "DriverRestartCompleted"; - - /// - /// The name for DriverRestartFailedEvaluatorHandler. Tightly coupled with Java - /// - public const string DriverRestartFailedEvaluatorHandler = "DriverRestartFailedEvaluator"; - [Obsolete(message:"Use REEFFileNames instead.")] public const string DriverBridgeConfiguration = Common.Constants.ClrBridgeRuntimeConfiguration; @@ -184,37 +88,5 @@ public class Constants /// Configuration for Java verbose logging. /// public const string JavaVerboseLoggingConfig = "-Djava.util.logging.config.class=org.apache.reef.util.logging.Config"; - - /// - /// A dictionary of handler constants to handler descriptors. - /// - public static Dictionary Handlers - { - get - { - return - new Dictionary() - { - { EvaluatorRequestorHandler, 0 }, - { AllocatedEvaluatorHandler, 1 }, - { ActiveContextHandler, 2 }, - { TaskMessageHandler, 3 }, - { FailedTaskHandler, 4 }, - { FailedEvaluatorHandler, 5 }, - { HttpServerHandler, 6 }, - { CompletedTaskHandler, 7 }, - { RunningTaskHandler, 8 }, - { SuspendedTaskHandler, 9 }, - { CompletedEvaluatorHandler, 10 }, - { ClosedContextHandler, 11 }, - { FailedContextHandler, 12 }, - { ContextMessageHandler, 13 }, - { DriverRestartActiveContextHandler, 14 }, - { DriverRestartRunningTaskHandler, 15 }, - { DriverRestartCompletedHandler, 16 }, - { DriverRestartFailedEvaluatorHandler, 17 } - }; - } - } } } diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index 9fb3613169..1316cb620a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -39,6 +39,7 @@ under the License. + diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java new file mode 100644 index 0000000000..ce04454e28 --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.javabridge; + +import org.apache.reef.annotations.audience.Private; + +/** + * A class that holds all handles to the .NET side. + * USED BY UNMANAGED CODE! PLEASE DO NOT CHANGE ANY FUNCTION SIGNATURES + * UNLESS YOU KNOW WHAT YOU ARE DOING! + */ +@Private +public final class BridgeHandlerManager { + private long allocatedEvaluatorHandler = 0; + private long activeContextHandler = 0; + private long taskMessageHandler = 0; + private long failedTaskHandler = 0; + private long failedEvaluatorHandler = 0; + private long httpServerEventHandler = 0; + private long completedTaskHandler = 0; + private long runningTaskHandler = 0; + private long suspendedTaskHandler = 0; + private long completedEvaluatorHandler = 0; + private long closedContextHandler = 0; + private long failedContextHandler = 0; + private long contextMessageHandler = 0; + private long driverRestartActiveContextHandler = 0; + private long driverRestartRunningTaskHandler = 0; + private long driverRestartCompletedHandler = 0; + private long driverRestartFailedEvaluatorHandler = 0; + + public BridgeHandlerManager() { + } + + public long getAllocatedEvaluatorHandler() { + return allocatedEvaluatorHandler; + } + + public void setAllocatedEvaluatorHandler(final long allocatedEvaluatorHandler) { + this.allocatedEvaluatorHandler = allocatedEvaluatorHandler; + } + + public long getActiveContextHandler() { + return activeContextHandler; + } + + public void setActiveContextHandler(final long activeContextHandler) { + this.activeContextHandler = activeContextHandler; + } + + public long getTaskMessageHandler() { + return taskMessageHandler; + } + + public void setTaskMessageHandler(final long taskMessageHandler) { + this.taskMessageHandler = taskMessageHandler; + } + + public long getFailedTaskHandler() { + return failedTaskHandler; + } + + public void setFailedTaskHandler(final long failedTaskHandler) { + this.failedTaskHandler = failedTaskHandler; + } + + public long getFailedEvaluatorHandler() { + return failedEvaluatorHandler; + } + + public void setFailedEvaluatorHandler(final long failedEvaluatorHandler) { + this.failedEvaluatorHandler = failedEvaluatorHandler; + } + + public long getHttpServerEventHandler() { + return httpServerEventHandler; + } + + public void setHttpServerEventHandler(final long httpServerEventHandler) { + this.httpServerEventHandler = httpServerEventHandler; + } + + public long getCompletedTaskHandler() { + return completedTaskHandler; + } + + public void setCompletedTaskHandler(final long completedTaskHandler) { + this.completedTaskHandler = completedTaskHandler; + } + + public long getRunningTaskHandler() { + return runningTaskHandler; + } + + public void setRunningTaskHandler(final long runningTaskHandler) { + this.runningTaskHandler = runningTaskHandler; + } + + public long getSuspendedTaskHandler() { + return suspendedTaskHandler; + } + + public void setSuspendedTaskHandler(final long suspendedTaskHandler) { + this.suspendedTaskHandler = suspendedTaskHandler; + } + + public long getCompletedEvaluatorHandler() { + return completedEvaluatorHandler; + } + + public void setCompletedEvaluatorHandler(final long completedEvaluatorHandler) { + this.completedEvaluatorHandler = completedEvaluatorHandler; + } + + public long getClosedContextHandler() { + return closedContextHandler; + } + + public void setClosedContextHandler(final long closedContextHandler) { + this.closedContextHandler = closedContextHandler; + } + + public long getFailedContextHandler() { + return failedContextHandler; + } + + public void setFailedContextHandler(final long failedContextHandler) { + this.failedContextHandler = failedContextHandler; + } + + public long getContextMessageHandler() { + return contextMessageHandler; + } + + public void setContextMessageHandler(final long contextMessageHandler) { + this.contextMessageHandler = contextMessageHandler; + } + + public long getDriverRestartActiveContextHandler() { + return driverRestartActiveContextHandler; + } + + public void setDriverRestartActiveContextHandler(final long driverRestartActiveContextHandler) { + this.driverRestartActiveContextHandler = driverRestartActiveContextHandler; + } + + public long getDriverRestartRunningTaskHandler() { + return driverRestartRunningTaskHandler; + } + + public void setDriverRestartRunningTaskHandler(final long driverRestartRunningTaskHandler) { + this.driverRestartRunningTaskHandler = driverRestartRunningTaskHandler; + } + + public long getDriverRestartCompletedHandler() { + return driverRestartCompletedHandler; + } + + public void setDriverRestartCompletedHandler(final long driverRestartCompletedHandler) { + this.driverRestartCompletedHandler = driverRestartCompletedHandler; + } + + public long getDriverRestartFailedEvaluatorHandler() { + return driverRestartFailedEvaluatorHandler; + } + + public void setDriverRestartFailedEvaluatorHandler(final long driverRestartFailedEvaluatorHandler) { + this.driverRestartFailedEvaluatorHandler = driverRestartFailedEvaluatorHandler; + } +} diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 59a8d85b2e..28a7faf871 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -20,63 +20,21 @@ import org.apache.reef.javabridge.generic.DriverRestartCompletedBridge; -import java.util.HashMap; - /** * Java interfaces of CLR/Java bridge. * Implementations of the methods can be found at lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp. */ public final class NativeInterop { - public static final String CLASS_HIERARCHY_FILENAME = "clrClassHierarchy.bin"; public static final String GLOBAL_LIBRARIES_FILENAME = "userSuppliedGlobalLibraries.txt"; - public static final String ALLOCATED_EVALUATOR_KEY = "AllocatedEvaluator"; - public static final String ACTIVE_CONTEXT_KEY = "ActiveContext"; - public static final String TASK_MESSAGE_KEY = "TaskMessage"; - public static final String FAILED_TASK_KEY = "FailedTask"; - public static final String FAILED_EVALUATOR_KEY = "FailedEvaluator"; - public static final String HTTP_SERVER_KEY = "HttpServerKey"; - public static final String COMPLETED_TASK_KEY = "CompletedTask"; - public static final String RUNNING_TASK_KEY = "RunningTask"; - public static final String SUSPENDED_TASK_KEY = "SuspendedTask"; - public static final String COMPLETED_EVALUATOR_KEY = "CompletedEvaluator"; - public static final String CLOSED_CONTEXT_KEY = "ClosedContext"; - public static final String FAILED_CONTEXT_KEY = "FailedContext"; - public static final String CONTEXT_MESSAGE_KEY = "ContextMessage"; - public static final String DRIVER_RESTART_ACTIVE_CONTEXT_KEY = "DriverRestartActiveContext"; - public static final String DRIVER_RESTART_RUNNING_TASK_KEY = "DriverRestartRunningTask"; - public static final String DRIVER_RESTART_COMPLETED_KEY = "DriverRestartCompleted"; - public static final String DRIVER_RESTART_FAILED_EVALUATOR_KEY = "DriverRestartFailedEvaluator"; - public static final HashMap HANDLERS = new HashMap() { - { - put(ALLOCATED_EVALUATOR_KEY, 1); - put(ACTIVE_CONTEXT_KEY, 2); - put(TASK_MESSAGE_KEY, 3); - put(FAILED_TASK_KEY, 4); - put(FAILED_EVALUATOR_KEY, 5); - put(HTTP_SERVER_KEY, 6); - put(COMPLETED_TASK_KEY, 7); - put(RUNNING_TASK_KEY, 8); - put(SUSPENDED_TASK_KEY, 9); - put(COMPLETED_EVALUATOR_KEY, 10); - put(CLOSED_CONTEXT_KEY, 11); - put(FAILED_CONTEXT_KEY, 12); - put(CONTEXT_MESSAGE_KEY, 13); - put(DRIVER_RESTART_ACTIVE_CONTEXT_KEY, 14); - put(DRIVER_RESTART_RUNNING_TASK_KEY, 15); - put(DRIVER_RESTART_COMPLETED_KEY, 16); - put(DRIVER_RESTART_FAILED_EVALUATOR_KEY, 17); - } - }; - - public static final int N_HANDLERS = 18; public static native void loadClrAssembly(final String filePath); public static native void clrBufferedLog(final int level, final String message); - public static native long[] callClrSystemOnStartHandler( + public static native void callClrSystemOnStartHandler( final String dateTime, final String httpServerPortNumber, + final BridgeHandlerManager bridgeHandlerManager, final EvaluatorRequestorBridge javaEvaluatorRequestorBridge); public static native void clrSystemAllocatedEvaluatorHandlerOnNext( @@ -153,8 +111,9 @@ public static native void clrSystemContextMessageHandlerOnNext( final ContextMessageBridge contextMessageBridge ); - public static native long[] callClrSystemOnRestartHandler( + public static native void callClrSystemOnRestartHandler( final String httpServerPortNumber, + final BridgeHandlerManager bridgeHandlerManager, final EvaluatorRequestorBridge javaEvaluatorRequestorBridge, final DriverRestartedBridge driverRestartedBridge ); diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java index 34968f9fb6..a151fd15ec 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/ClrHandlersInitializer.java @@ -21,6 +21,7 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.javabridge.BridgeHandlerManager; import org.apache.reef.javabridge.EvaluatorRequestorBridge; /** @@ -34,5 +35,5 @@ interface ClrHandlersInitializer { /** * Returns the set of CLR handles. */ - long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge); + BridgeHandlerManager getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge); } diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java index d97a1cd490..1da5347fdb 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.restart.DriverRestarted; +import org.apache.reef.javabridge.BridgeHandlerManager; import org.apache.reef.javabridge.DriverRestartedBridge; import org.apache.reef.javabridge.EvaluatorRequestorBridge; import org.apache.reef.javabridge.NativeInterop; @@ -41,9 +42,12 @@ final class DriverRestartClrHandlersInitializer implements ClrHandlersInitialize } @Override - public long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge) { - return NativeInterop.callClrSystemOnRestartHandler( - portNumber, - evaluatorRequestorBridge, new DriverRestartedBridge(driverRestarted)); + public BridgeHandlerManager getClrHandlers(final String portNumber, + final EvaluatorRequestorBridge evaluatorRequestorBridge) { + final BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager(); + NativeInterop.callClrSystemOnRestartHandler(portNumber, bridgeHandlerManager, evaluatorRequestorBridge, + new DriverRestartedBridge(driverRestarted)); + + return bridgeHandlerManager; } } diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java index 117bfbd324..765869c014 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverStartClrHandlersInitializer.java @@ -21,6 +21,7 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.javabridge.BridgeHandlerManager; import org.apache.reef.javabridge.EvaluatorRequestorBridge; import org.apache.reef.javabridge.NativeInterop; import org.apache.reef.wake.time.event.StartTime; @@ -40,7 +41,12 @@ final class DriverStartClrHandlersInitializer implements ClrHandlersInitializer } @Override - public long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge) { - return NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, evaluatorRequestorBridge); + public BridgeHandlerManager getClrHandlers(final String portNumber, + final EvaluatorRequestorBridge evaluatorRequestorBridge) { + BridgeHandlerManager bridgeHandlerManager = new BridgeHandlerManager(); + NativeInterop.callClrSystemOnStartHandler(startTime.toString(), portNumber, bridgeHandlerManager, + evaluatorRequestorBridge); + + return bridgeHandlerManager; } } diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 36f576c9da..422cb620c2 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -120,24 +120,7 @@ public final class JobDriver { */ private final LoggingScopeFactory loggingScopeFactory; - private long allocatedEvaluatorHandler = 0; - private long activeContextHandler = 0; - private long taskMessageHandler = 0; - private long failedTaskHandler = 0; - private long failedEvaluatorHandler = 0; - private long httpServerEventHandler = 0; - private long completedTaskHandler = 0; - private long runningTaskHandler = 0; - private long suspendedTaskHandler = 0; - private long completedEvaluatorHandler = 0; - private long closedContextHandler = 0; - private long failedContextHandler = 0; - private long contextMessageHandler = 0; - private long driverRestartActiveContextHandler = 0; - private long driverRestartRunningTaskHandler = 0; - private long driverRestartCompletedHandler = 0; - private long driverRestartFailedEvaluatorHandler = 0; - private boolean clrBridgeSetup = false; + private BridgeHandlerManager handlerManager = null; private boolean isRestarted = false; // We are holding on to following on bridge side. // Need to add references here so that GC does not collect them. @@ -211,42 +194,13 @@ private void setupBridge(final ClrHandlersInitializer initializer) { this.evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory); - final long[] handlers = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); - if (handlers != null) { - if (handlers.length != NativeInterop.N_HANDLERS) { - throw new RuntimeException( - String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers", - String.valueOf(handlers.length), - String.valueOf(NativeInterop.N_HANDLERS))); - } - this.allocatedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ALLOCATED_EVALUATOR_KEY)]; - this.activeContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.ACTIVE_CONTEXT_KEY)]; - this.taskMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.TASK_MESSAGE_KEY)]; - this.failedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_TASK_KEY)]; - this.failedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_EVALUATOR_KEY)]; - this.httpServerEventHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.HTTP_SERVER_KEY)]; - this.completedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_TASK_KEY)]; - this.runningTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.RUNNING_TASK_KEY)]; - this.suspendedTaskHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.SUSPENDED_TASK_KEY)]; - this.completedEvaluatorHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.COMPLETED_EVALUATOR_KEY)]; - this.closedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CLOSED_CONTEXT_KEY)]; - this.failedContextHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.FAILED_CONTEXT_KEY)]; - this.contextMessageHandler = handlers[NativeInterop.HANDLERS.get(NativeInterop.CONTEXT_MESSAGE_KEY)]; - this.driverRestartActiveContextHandler = - handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_ACTIVE_CONTEXT_KEY)]; - this.driverRestartRunningTaskHandler = - handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)]; - this.driverRestartCompletedHandler = - handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)]; - this.driverRestartFailedEvaluatorHandler = - handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)]; - } + JobDriver.this.handlerManager = initializer.getClrHandlers(portNumber, evaluatorRequestorBridge); try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::clrSystemHttpServerHandlerOnNext")) { final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC"); - NativeInterop.clrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, - this.interopLogger); + NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), + httpServerEventBridge, this.interopLogger); final String specList = httpServerEventBridge.getUriSpecification(); LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList); if (specList != null) { @@ -258,7 +212,6 @@ private void setupBridge(final ClrHandlersInitializer initializer) { } } } - this.clrBridgeSetup = true; } LOG.log(Level.INFO, "CLR Bridge setup."); } @@ -277,14 +230,14 @@ private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProce eval.setProcess(process); LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", new Object[]{eval.getId(), JobDriver.this.contexts.size()}); - if (JobDriver.this.allocatedEvaluatorHandler == 0) { + if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) { throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); } final AllocatedEvaluatorBridge allocatedEvaluatorBridge = this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); - NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext(JobDriver.this.allocatedEvaluatorHandler, - allocatedEvaluatorBridge, this.interopLogger); + NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( + JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger); } } @@ -302,9 +255,11 @@ private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isR JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); if (isRestartFailed) { - evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler, eval, isRestartFailed); + evaluatorFailedHandlerWaitForCLRBridgeSetup( + JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); } else { - evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler, eval, isRestartFailed); + evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), + eval, isRestartFailed); } } } @@ -314,7 +269,7 @@ private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, final FailedEvaluator eval, final boolean isRestartFailed) { if (handle == 0) { - if (JobDriver.this.clrBridgeSetup) { + if (JobDriver.this.handlerManager != null) { final String message = "No CLR FailedEvaluator handler was set, exiting now"; LOG.log(Level.WARNING, message); JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); @@ -322,7 +277,7 @@ private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle, clock.scheduleAlarm(0, new EventHandler() { @Override public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { + if (JobDriver.this.handlerManager != null) { handleFailedEvaluatorInCLR(eval, isRestartFailed); } else { LOG.log(Level.INFO, "Waiting for CLR bridge to be set up"); @@ -344,9 +299,12 @@ private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolea JobDriver.this.isRestarted, loggingScopeFactory); if (isRestartFailed) { NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext( - JobDriver.this.driverRestartFailedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger); + JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), + failedEvaluatorBridge, JobDriver.this.interopLogger); } else { - NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, + NativeInterop.clrSystemFailedEvaluatorHandlerOnNext( + JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), + failedEvaluatorBridge, JobDriver.this.interopLogger); } @@ -365,12 +323,12 @@ private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final boolea private void submit(final ActiveContext context) { try { LOG.log(Level.INFO, "Send task to context: {0}", new Object[]{context}); - if (JobDriver.this.activeContextHandler == 0) { + if (JobDriver.this.handlerManager.getActiveContextHandler() == 0) { throw new RuntimeException("Active Context Handler not initialized by CLR."); } final ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context); - NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, - JobDriver.this.interopLogger); + NativeInterop.clrSystemActiveContextHandlerOnNext(JobDriver.this.handlerManager.getActiveContextHandler(), + activeContextBridge, JobDriver.this.interopLogger); } catch (final Exception ex) { LOG.log(Level.SEVERE, "Fail to submit task to active context"); context.close(); @@ -427,13 +385,13 @@ public void onNext(final CompletedTask task) { } LOG.log(Level.INFO, "Return results to the client:\n{0}", result); JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result)); - if (JobDriver.this.completedTaskHandler == 0) { + if (JobDriver.this.handlerManager.getCompletedTaskHandler() == 0) { LOG.log(Level.INFO, "No CLR handler bound to handle completed task."); } else { LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler."); final CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory); - NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, - JobDriver.this.interopLogger); + NativeInterop.clrSystemCompletedTaskHandlerOnNext(JobDriver.this.handlerManager.getCompletedTaskHandler(), + completedTaskBridge, JobDriver.this.interopLogger); } } } @@ -488,12 +446,11 @@ public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpS final String requestString = httpSerializer.toString(avroHttpRequest); final byte[] requestBytes = requestString.getBytes(Charset.forName(AvroHttpSerializer.JSON_CHARSET)); - //final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest); try { final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes); - NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, - JobDriver.this.interopLogger); + NativeInterop.clrSystemHttpServerHandlerOnNext(JobDriver.this.handlerManager.getHttpServerEventHandler(), + httpServerEventBridge, JobDriver.this.interopLogger); final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8"); response.getWriter().println(responseBody); LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody); @@ -512,14 +469,14 @@ public final class FailedTaskHandler implements EventHandler { @Override public void onNext(final FailedTask task) throws RuntimeException { LOG.log(Level.SEVERE, "FailedTask received, will be handle in CLR handler, if set."); - if (JobDriver.this.failedTaskHandler == 0) { + if (JobDriver.this.handlerManager.getFailedTaskHandler() == 0) { LOG.log(Level.SEVERE, "Failed Task Handler not initialized by CLR, fail for real."); throw new RuntimeException("Failed Task Handler not initialized by CLR."); } try { final FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory); - NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, - JobDriver.this.interopLogger); + NativeInterop.clrSystemFailedTaskHandlerOnNext(JobDriver.this.handlerManager.getFailedTaskHandler(), + failedTaskBridge, JobDriver.this.interopLogger); } catch (final Exception ex) { LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler"); throw new RuntimeException(ex); @@ -534,14 +491,14 @@ public final class RunningTaskHandler implements EventHandler { @Override public void onNext(final RunningTask task) { try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) { - if (JobDriver.this.runningTaskHandler == 0) { + if (JobDriver.this.handlerManager.getRunningTaskHandler() == 0) { LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler."); } else { LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId()); try { final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory); - NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, - JobDriver.this.interopLogger); + NativeInterop.clrSystemRunningTaskHandlerOnNext(JobDriver.this.handlerManager.getRunningTaskHandler(), + runningTaskBridge, JobDriver.this.interopLogger); } catch (final Exception ex) { LOG.log(Level.WARNING, "Fail to invoke CLR running task handler"); throw new RuntimeException(ex); @@ -561,11 +518,11 @@ public void onNext(final RunningTask task) { clock.scheduleAlarm(0, new EventHandler() { @Override public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { - if (JobDriver.this.driverRestartRunningTaskHandler != 0) { + if (JobDriver.this.handlerManager != null) { + if (JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler() != 0) { LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR."); NativeInterop.clrSystemDriverRestartRunningTaskHandlerOnNext( - JobDriver.this.driverRestartRunningTaskHandler, + JobDriver.this.handlerManager.getDriverRestartRunningTaskHandler(), new RunningTaskBridge(task, activeContextBridgeFactory)); } else { LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, " + @@ -594,11 +551,11 @@ public void onNext(final ActiveContext context) { clock.scheduleAlarm(0, new EventHandler() { @Override public void onNext(final Alarm time) { - if (JobDriver.this.clrBridgeSetup) { - if (JobDriver.this.driverRestartActiveContextHandler != 0) { + if (JobDriver.this.handlerManager != null) { + if (JobDriver.this.handlerManager.getDriverRestartActiveContextHandler() != 0) { LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR."); NativeInterop.clrSystemDriverRestartActiveContextHandlerOnNext( - JobDriver.this.driverRestartActiveContextHandler, + JobDriver.this.handlerManager.getDriverRestartActiveContextHandler(), activeContextBridgeFactory.getActiveContextBridge(context)); } else { LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, " + @@ -660,11 +617,12 @@ public void onNext(final DriverRestartCompleted driverRestartCompleted) { driverRestartCompleted.getCompletedTime()); try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( driverRestartCompleted.getCompletedTime().getTimeStamp())) { - if (JobDriver.this.driverRestartCompletedHandler != 0) { + if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext( - JobDriver.this.driverRestartCompletedHandler, new DriverRestartCompletedBridge(driverRestartCompleted)); + JobDriver.this.handlerManager.getDriverRestartCompletedHandler(), + new DriverRestartCompletedBridge(driverRestartCompleted)); } else { LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler."); } @@ -696,11 +654,11 @@ public void onNext(final TaskMessage taskMessage) { final String msg = new String(taskMessage.get(), StandardCharsets.UTF_8); LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg); //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) { - if (JobDriver.this.taskMessageHandler != 0) { + if (JobDriver.this.handlerManager.getTaskMessageHandler() != 0) { final TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage); // if CLR implements the task message handler, handle the bytes in CLR handler - NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), - taskMessageBridge, JobDriver.this.interopLogger); + NativeInterop.clrSystemTaskMessageHandlerOnNext(JobDriver.this.handlerManager.getTaskMessageHandler(), + taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger); } //} } @@ -715,11 +673,12 @@ public void onNext(final SuspendedTask task) { final String message = "Received notification that task [" + task.getId() + "] has been suspended."; LOG.log(Level.INFO, message); try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) { - if (JobDriver.this.suspendedTaskHandler != 0) { + if (JobDriver.this.handlerManager.getSuspendedTaskHandler() != 0) { final SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory); // if CLR implements the suspended task handler, handle it in CLR LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge."); - NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge); + NativeInterop.clrSystemSuspendedTaskHandlerOnNext(JobDriver.this.handlerManager.getSuspendedTaskHandler(), + suspendedTaskBridge); } JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message)); } @@ -734,11 +693,12 @@ public final class CompletedEvaluatorHandler implements EventHandler { public void onNext(final ClosedContext context) { LOG.log(Level.INFO, "Completed Context: {0}", context.getId()); try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) { - if (JobDriver.this.closedContextHandler != 0) { + if (JobDriver.this.handlerManager.getClosedContextHandler() != 0) { final ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory); // if CLR implements the closed context handler, handle it in CLR LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge."); - NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge); + NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), + closedContextBridge); } synchronized (JobDriver.this) { JobDriver.this.contexts.remove(context.getId()); @@ -778,11 +739,12 @@ public final class FailedContextHandler implements EventHandler { public void onNext(final FailedContext context) { LOG.log(Level.SEVERE, "FailedContext", context); try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) { - if (JobDriver.this.failedContextHandler != 0) { + if (JobDriver.this.handlerManager.getFailedContextHandler() != 0) { final FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory); // if CLR implements the failed context handler, handle it in CLR LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge."); - NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge); + NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), + failedContextBridge); } synchronized (JobDriver.this) { JobDriver.this.contexts.remove(context.getId()); @@ -804,11 +766,11 @@ public void onNext(final ContextMessage message) { LOG.log(Level.SEVERE, "Received ContextMessage:", message.get()); try (final LoggingScope ls = loggingScopeFactory.contextMessageReceived(new String(message.get(), StandardCharsets.UTF_8))) { - if (JobDriver.this.contextMessageHandler != 0) { + if (JobDriver.this.handlerManager.getContextMessageHandler() != 0) { final ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message); // if CLR implements the context message handler, handle it in CLR LOG.log(Level.INFO, "Handling the event of context message in CLR bridge."); - NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, + NativeInterop.clrSystemContextMessageHandlerOnNext(JobDriver.this.handlerManager.getContextMessageHandler(), contextMessageBridge); } }