From 1291f600112e0f41db19cd0f738f0a09c19b0e91 Mon Sep 17 00:00:00 2001 From: Andrew Chung Date: Fri, 30 Oct 2015 17:48:44 -0700 Subject: [PATCH] [REEF-881] Create a ProgressProvider on the C# side This addressed the issue by * Creating the ProgressProvider class in C# and piping it to Java. JIRA: [REEF-881](https://issues.apache.org/jira/browse/REEF-881) --- .../Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 23 +++++++++++ .../Bridge/BridgeHandlerManager.cs | 2 + .../Bridge/ClrSystemHandlerWrapper.cs | 10 +++++ .../Bridge/DriverBridge.cs | 15 +++++-- .../DriverBridgeConfigurationOptions.cs | 1 - .../Defaults/DefaultProgressProvider.cs | 39 +++++++++++++++++++ .../DriverConfiguration.cs | 6 +++ .../IProgressProvider.cs | 30 ++++++++++++++ .../Org.Apache.REEF.Driver.csproj | 2 + .../apache/reef/bridge/client/Constants.java | 1 + .../reef/javabridge/BridgeHandlerManager.java | 9 +++++ .../apache/reef/javabridge/NativeInterop.java | 2 + .../reef/javabridge/generic/JobDriver.java | 14 +++++++ .../yarn/driver/YarnContainerManager.java | 9 +++-- .../webserver/HttpServerReefEventHandler.java | 7 ++-- 15 files changed, 160 insertions(+), 10 deletions(-) create mode 100644 lang/cs/Org.Apache.REEF.Driver/Defaults/DefaultProgressProvider.cs create mode 100644 lang/cs/Org.Apache.REEF.Driver/IProgressProvider.cs diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index 06bf5a348d..7b205fbf86 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -93,6 +93,8 @@ void populateJavaBridgeHandlerManager(JNIEnv * env, jobject jbridgeHandlerManage env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartCompletedHandlerMid, bridgeHandlerManager->DriverRestartCompletedHandler); jmethodID jsetDriverRestartFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setDriverRestartFailedEvaluatorHandler", "(J)V"); env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartFailedEvaluatorHandlerMid, bridgeHandlerManager->DriverRestartFailedEvaluatorHandler); + jmethodID jsetProgressProviderMid = env->GetMethodID(cls, "setProgressProvider", "(J)V"); + env->CallVoidMethod(jbridgeHandlerManager, jsetProgressProviderMid, bridgeHandlerManager->ProgressProvider); } // Loading Clr Assembly. Note that we do not use ManagerLogger in this method since the @@ -563,6 +565,24 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr } } +/* +* Class: org_apache_reef_javabridge_NativeInterop +* Method: clrSystemProgressProviderGetProgress +* Signature: (J)F +*/ +JNIEXPORT jfloat JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress +(JNIEnv * env, jclass cls, jlong handler) { + ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress"); + try { + return (jfloat)ClrSystemHandlerWrapper::Call_ProgressProvider_GetProgress(handler); + } + catch (System::Exception^ ex) { + String^ errorMessage = "Exception in Call_ProgressProvider_GetProgress"; + ManagedLog::LOGGER->LogError(errorMessage, ex); + return 0; + } +} + static JNINativeMethod methods[] = { { "loadClrAssembly", "(Ljava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_loadClrAssembly }, @@ -621,6 +641,9 @@ static JNINativeMethod methods[] = { { "clrSystemDriverRestartCompletedHandlerOnNext", "(JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext }, + + { "clrSystemProgressProviderGetProgress", "(J)F", + (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress }, }; JNIEXPORT void JNICALL diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs index bf0b04c2d1..377ad94a07 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs @@ -57,5 +57,7 @@ public sealed class BridgeHandlerManager public ulong DriverRestartFailedEvaluatorHandler { get; internal set; } public ulong HttpServerHandler { get; internal set; } + + public ulong ProgressProvider { get; internal set; } } } diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs index 381aebae34..d67d8f48e6 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -230,6 +230,16 @@ public static void Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handl } } + public static float Call_ProgressProvider_GetProgress(ulong handle) + { + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ProgressProvider_GetProgress")) + { + GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); + IProgressProvider obj = (IProgressProvider)gc.Target; + return obj.Progress; + } + } + public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart( DateTime startTime, string httpServerPort, diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs index d770d3ae53..35db431119 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs @@ -21,7 +21,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Globalization; -using System.Linq; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; @@ -29,10 +28,10 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Time.Event; using Org.Apache.REEF.Common.Evaluator.Parameters; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Bridge.Events; +using Org.Apache.REEF.Driver.Defaults; namespace Org.Apache.REEF.Driver.Bridge { @@ -118,6 +117,8 @@ public class DriverBridge private readonly ISet _configurationProviders; + private readonly IProgressProvider _progressProvider; + [Inject] public DriverBridge( [Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverStartedHandlers))] ISet> driverStartHandlers, @@ -142,7 +143,8 @@ public DriverBridge( [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet traceListeners, [Parameter(Value = typeof(EvaluatorConfigurationProviders))] ISet configurationProviders, [Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel, - HttpServerHandler httpServerHandler) + HttpServerHandler httpServerHandler, + IProgressProvider progressProvider) { foreach (TraceListener listener in traceListeners) { @@ -182,6 +184,7 @@ public DriverBridge( _driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers; _httpServerHandler = httpServerHandler; _configurationProviders = configurationProviders; + _progressProvider = progressProvider; _allocatedEvaluatorSubscriber = new ClrSystemHandler(); _completedEvaluatorSubscriber = new ClrSystemHandler(); @@ -340,6 +343,12 @@ public BridgeHandlerManager Subscribe() _logger.Log(Level.Verbose, "subscribed to IHttpMessage handler :" + _httpServerHandler); bridgeHandlerManager.HttpServerHandler = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber); + // bind progress provider to provide application progress + // Bind null handler if user does not specify their own implementation of IProgressProvider. This is + // used to get around the overhead of Interop calls since the Java side checks for null handler here. + bridgeHandlerManager.ProgressProvider = _progressProvider is DefaultProgressProvider ? + ClrHandlerHelper.CreateNullHandler() : ClrHandlerHelper.CreateHandler(_progressProvider); + return bridgeHandlerManager; } diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs index d48a6ce5df..16940d2d2a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs @@ -27,7 +27,6 @@ using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Time.Event; [module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")] diff --git a/lang/cs/Org.Apache.REEF.Driver/Defaults/DefaultProgressProvider.cs b/lang/cs/Org.Apache.REEF.Driver/Defaults/DefaultProgressProvider.cs new file mode 100644 index 0000000000..7b1941810b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Defaults/DefaultProgressProvider.cs @@ -0,0 +1,39 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Driver.Defaults +{ + internal sealed class DefaultProgressProvider : IProgressProvider + { + [Inject] + private DefaultProgressProvider() + { + } + + public float Progress + { + get + { + return 0f; + } + } + } +} diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs index 821517e93e..a055dc1034 100644 --- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs @@ -184,6 +184,11 @@ public sealed class DriverConfiguration : ConfigurationModuleBuilder /// public static readonly OptionalParameter DriverRestartEvaluatorRecoverySeconds = new OptionalParameter(); + /// + /// The progress provider that will be injected at the Driver. + /// + public static readonly OptionalImpl ProgressProvider = new OptionalImpl(); + public static ConfigurationModule ConfigurationModule { get @@ -233,6 +238,7 @@ public static ConfigurationModule ConfigurationModule MaxApplicationSubmissions) .BindNamedParameter(GenericType.Class, DriverRestartEvaluatorRecoverySeconds) + .BindImplementation(GenericType.Class, ProgressProvider) .Build(); } } diff --git a/lang/cs/Org.Apache.REEF.Driver/IProgressProvider.cs b/lang/cs/Org.Apache.REEF.Driver/IProgressProvider.cs new file mode 100644 index 0000000000..24087c8fb5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/IProgressProvider.cs @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Driver +{ + [DefaultImplementation(typeof(DefaultProgressProvider))] + public interface IProgressProvider + { + float Progress { get; } + } +} diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index 1316cb620a..0c0a47e35d 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -103,6 +103,7 @@ under the License. + @@ -144,6 +145,7 @@ under the License. + diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java index e5b397ccce..ac41f9fbf3 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/Constants.java @@ -51,6 +51,7 @@ public final class Constants { .set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class) .set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class) .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class) + .set(DriverConfiguration.PROGRESS_PROVIDER, JobDriver.ProgressProvider.class) .build(); /** diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java index ce04454e28..1684e2649d 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/BridgeHandlerManager.java @@ -44,6 +44,7 @@ public final class BridgeHandlerManager { private long driverRestartRunningTaskHandler = 0; private long driverRestartCompletedHandler = 0; private long driverRestartFailedEvaluatorHandler = 0; + private long progressProvider = 0; public BridgeHandlerManager() { } @@ -183,4 +184,12 @@ public long getDriverRestartFailedEvaluatorHandler() { public void setDriverRestartFailedEvaluatorHandler(final long driverRestartFailedEvaluatorHandler) { this.driverRestartFailedEvaluatorHandler = driverRestartFailedEvaluatorHandler; } + + public long getProgressProvider() { + return progressProvider; + } + + public void setProgressProvider(final long progressProvider) { + this.progressProvider = progressProvider; + } } diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 28a7faf871..32b10d04e8 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -139,6 +139,8 @@ public static native void clrSystemDriverRestartFailedEvaluatorHandlerOnNext( final InteropLogger interopLogger ); + public static native float clrSystemProgressProviderGetProgress(final long handle); + /** * Empty private constructor to prohibit instantiation of utility class. */ diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 422cb620c2..03ce2022c1 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -776,4 +776,18 @@ public void onNext(final ContextMessage message) { } } } + + /** + * Gets the progress of the application from .NET side. + */ + public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider { + @Override + public float getProgress() { + if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) { + return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider()); + } + + return 0f; + } + } } diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index 51a8022bf7..5ff9313d50 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -41,6 +41,7 @@ import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; +import org.apache.reef.tang.InjectionFuture; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.util.Optional; import org.apache.reef.wake.remote.Encoder; @@ -82,7 +83,7 @@ final class YarnContainerManager private final String jobSubmissionDirectory; private final REEFFileNames reefFileNames; private final RackNameFormatter rackNameFormatter; - private final ProgressProvider progressProvider; + private final InjectionFuture progressProvider; @Inject YarnContainerManager( @@ -97,7 +98,7 @@ final class YarnContainerManager @Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory, final TrackingURLProvider trackingURLProvider, final RackNameFormatter rackNameFormatter, - final ProgressProvider progressProvider) throws IOException { + final InjectionFuture progressProvider) throws IOException { this.reefEventHandlers = reefEventHandlers; this.driverStatusManager = driverStatusManager; @@ -162,7 +163,9 @@ public void onNodesUpdated(final List nodeReports) { @Override public float getProgress() { - return progressProvider.getProgress(); + float f = progressProvider.get().getProgress(); + LOG.log(Level.SEVERE, "Progress Provider returning " + f); + return Math.max(Math.min(1, f), 0); } @Override diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java index aa7dd42c08..7f68277efd 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java @@ -22,6 +22,7 @@ import org.apache.reef.driver.evaluator.EvaluatorDescriptor; import org.apache.reef.driver.parameters.ClientCloseHandlers; import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.tang.InjectionFuture; import org.apache.reef.tang.Tang; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; @@ -62,7 +63,7 @@ public final class HttpServerReefEventHandler implements HttpHandler { private final ReefEventStateManager reefStateManager; private final Set> clientCloseHandlers; private final LoggingScopeFactory loggingScopeFactory; - private final ProgressProvider progressProvider; + private final InjectionFuture progressProvider; /** * Log level string prefix in the log lines. @@ -81,7 +82,7 @@ public HttpServerReefEventHandler( @Parameter(LogLevelName.class) final String logLevel, final LoggingScopeFactory loggingScopeFactory, final REEFFileNames reefFileNames, - final ProgressProvider progressProvider) { + final InjectionFuture progressProvider) { this.reefStateManager = reefStateManager; this.clientCloseHandlers = clientCloseHandlers; this.loggingScopeFactory = loggingScopeFactory; @@ -194,7 +195,7 @@ public void onHttpRequest( } break; case "progress": - response.getWriter().println(progressProvider.getProgress()); + response.getWriter().println(progressProvider.get().getProgress()); break; default: response.getWriter().println(String.format("Unsupported query for entity: [%s].", target));