Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
[REEF-881] Create a ProgressProvider on the C# side
Browse files Browse the repository at this point in the history
This addressed the issue by
  * Creating the ProgressProvider class in C# and piping it to Java.

JIRA:
  [REEF-881](https://issues.apache.org/jira/browse/REEF-881)

Pull Request:
  This closes #607
  • Loading branch information
afchung authored and dongjoon-hyun committed Nov 3, 2015
1 parent 7d1ccf1 commit 5b65c73
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 18 deletions.
39 changes: 31 additions & 8 deletions lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ void populateJavaBridgeHandlerManager(JNIEnv * env, jobject jbridgeHandlerManage
env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartCompletedHandlerMid, bridgeHandlerManager->DriverRestartCompletedHandler);
jmethodID jsetDriverRestartFailedEvaluatorHandlerMid = env->GetMethodID(cls, "setDriverRestartFailedEvaluatorHandler", "(J)V");
env->CallVoidMethod(jbridgeHandlerManager, jsetDriverRestartFailedEvaluatorHandlerMid, bridgeHandlerManager->DriverRestartFailedEvaluatorHandler);
jmethodID jsetProgressProviderMid = env->GetMethodID(cls, "setProgressProvider", "(J)V");
env->CallVoidMethod(jbridgeHandlerManager, jsetProgressProviderMid, bridgeHandlerManager->ProgressProvider);
}

// Loading Clr Assembly. Note that we do not use ManagerLogger in this method since the
Expand Down Expand Up @@ -528,10 +530,10 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr
}

/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: clrSystemDriverRestartCompletedHandlerOnNext
* Signature: (JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V
*/
* Class: org_apache_reef_javabridge_NativeInterop
* Method: clrSystemDriverRestartCompletedHandlerOnNext
* Signature: (JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V
*/
JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext
(JNIEnv * env, jclass cls , jlong handler, jobject jdriverRestartCompleted) {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext");
Expand All @@ -546,10 +548,10 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr
}

/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: clrSystemDriverRestartFailedEvaluatorHandlerOnNext
* Signature: (JLorg/apache/reef/javabridge/FailedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V
*/
* Class: org_apache_reef_javabridge_NativeInterop
* Method: clrSystemDriverRestartFailedEvaluatorHandlerOnNext
* Signature: (JLorg/apache/reef/javabridge/FailedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V
*/
JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext
(JNIEnv * env, jclass cls, jlong handler, jobject jfailedEvaluator, jobject jlogger) {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext");
Expand All @@ -563,6 +565,24 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr
}
}

/*
* Class: org_apache_reef_javabridge_NativeInterop
* Method: clrSystemProgressProviderGetProgress
* Signature: (J)F
*/
JNIEXPORT jfloat JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress
(JNIEnv * env, jclass cls, jlong handler) {
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress");
try {
return (jfloat)ClrSystemHandlerWrapper::Call_ProgressProvider_GetProgress(handler);
}
catch (System::Exception^ ex) {
String^ errorMessage = "Exception in Call_ProgressProvider_GetProgress";
ManagedLog::LOGGER->LogError(errorMessage, ex);
return 0;
}
}

static JNINativeMethod methods[] = {
{ "loadClrAssembly", "(Ljava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_loadClrAssembly },

Expand Down Expand Up @@ -621,6 +641,9 @@ static JNINativeMethod methods[] = {

{ "clrSystemDriverRestartCompletedHandlerOnNext", "(JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext },

{ "clrSystemProgressProviderGetProgress", "(J)F",
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemProgressProviderGetProgress },
};

JNIEXPORT void JNICALL
Expand Down
2 changes: 2 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/Bridge/BridgeHandlerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,7 @@ public sealed class BridgeHandlerManager
public ulong DriverRestartFailedEvaluatorHandler { get; internal set; }

public ulong HttpServerHandler { get; internal set; }

public ulong ProgressProvider { get; internal set; }
}
}
10 changes: 10 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ public static void Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handl
}
}

public static float Call_ProgressProvider_GetProgress(ulong handle)
{
using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ProgressProvider_GetProgress"))
{
GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
IProgressProvider obj = (IProgressProvider)gc.Target;
return obj.Progress;
}
}

public static BridgeHandlerManager Call_ClrSystemStartHandler_OnStart(
DateTime startTime,
string httpServerPort,
Expand Down
15 changes: 12 additions & 3 deletions lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using Org.Apache.REEF.Common.Context;
using Org.Apache.REEF.Driver.Context;
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Time.Event;
using Org.Apache.REEF.Common.Evaluator.Parameters;
using Org.Apache.REEF.Driver.Bridge.Clr2java;
using Org.Apache.REEF.Driver.Bridge.Events;
using Org.Apache.REEF.Driver.Defaults;

namespace Org.Apache.REEF.Driver.Bridge
{
Expand Down Expand Up @@ -118,6 +117,8 @@ public class DriverBridge

private readonly ISet<IConfigurationProvider> _configurationProviders;

private readonly IProgressProvider _progressProvider;

[Inject]
public DriverBridge(
[Parameter(Value = typeof(DriverBridgeConfigurationOptions.DriverStartedHandlers))] ISet<IObserver<IDriverStarted>> driverStartHandlers,
Expand All @@ -142,7 +143,8 @@ public class DriverBridge
[Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] ISet<TraceListener> traceListeners,
[Parameter(Value = typeof(EvaluatorConfigurationProviders))] ISet<IConfigurationProvider> configurationProviders,
[Parameter(Value = typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel,
HttpServerHandler httpServerHandler)
HttpServerHandler httpServerHandler,
IProgressProvider progressProvider)
{
foreach (TraceListener listener in traceListeners)
{
Expand Down Expand Up @@ -182,6 +184,7 @@ public class DriverBridge
_driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers;
_httpServerHandler = httpServerHandler;
_configurationProviders = configurationProviders;
_progressProvider = progressProvider;

_allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>();
_completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>();
Expand Down Expand Up @@ -340,6 +343,12 @@ public BridgeHandlerManager Subscribe()
_logger.Log(Level.Verbose, "subscribed to IHttpMessage handler :" + _httpServerHandler);
bridgeHandlerManager.HttpServerHandler = ClrHandlerHelper.CreateHandler(_httpServerEventSubscriber);

// bind progress provider to provide application progress
// Bind null handler if user does not specify their own implementation of IProgressProvider. This is
// used to get around the overhead of Interop calls since the Java side checks for null handler here.
bridgeHandlerManager.ProgressProvider = _progressProvider is DefaultProgressProvider ?
ClrHandlerHelper.CreateNullHandler() : ClrHandlerHelper.CreateHandler(_progressProvider);

return bridgeHandlerManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
using Org.Apache.REEF.Driver.Evaluator;
using Org.Apache.REEF.Driver.Task;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Wake.Time.Event;

[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1402:FileMayOnlyContainASingleClass", Justification = "allow name parameter class to be embedded")]

Expand Down
45 changes: 45 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/Defaults/DefaultProgressProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.Driver.Defaults
{
/// <summary>
/// The default implementation of <see cref="IProgressProvider"/>.
/// </summary>
internal sealed class DefaultProgressProvider : IProgressProvider
{
[Inject]
private DefaultProgressProvider()
{
}

/// <summary>
/// Always returns a progress of 0 to the Resource Manager.
/// </summary>
public float Progress
{
get
{
return 0f;
}
}
}
}
6 changes: 6 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public sealed class DriverConfiguration : ConfigurationModuleBuilder
/// </summary>
public static readonly OptionalParameter<int> DriverRestartEvaluatorRecoverySeconds = new OptionalParameter<int>();

/// <summary>
/// The progress provider that will be injected at the Driver.
/// </summary>
public static readonly OptionalImpl<IProgressProvider> ProgressProvider = new OptionalImpl<IProgressProvider>();

public static ConfigurationModule ConfigurationModule
{
get
Expand Down Expand Up @@ -233,6 +238,7 @@ public static ConfigurationModule ConfigurationModule
MaxApplicationSubmissions)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
DriverRestartEvaluatorRecoverySeconds)
.BindImplementation(GenericType<IProgressProvider>.Class, ProgressProvider)
.Build();
}
}
Expand Down
38 changes: 38 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/IProgressProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

using Org.Apache.REEF.Driver.Defaults;
using Org.Apache.REEF.Tang.Annotations;

namespace Org.Apache.REEF.Driver
{
/// <summary>
/// Provides the progress of the job to the Resource Manager
/// </summary>
[DefaultImplementation(typeof(DefaultProgressProvider))]
public interface IProgressProvider
{
/// <summary>
/// Provides the progress of the job to the Resource Manager.
/// Value should be a float between 0 and 1. If value greater than 1,
/// 1 will be reported. If value less than 0, 0 will be reported.
/// </summary>
float Progress { get; }
}
}
2 changes: 2 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ under the License.
<Compile Include="Context\IClosedContext.cs" />
<Compile Include="Context\IContext.cs" />
<Compile Include="Context\IFailedContext.cs" />
<Compile Include="Defaults\DefaultProgressProvider.cs" />
<Compile Include="Defaults\DefaultClientCloseHandler.cs" />
<Compile Include="Defaults\DefaultClientCloseWithMessageHandler.cs" />
<Compile Include="Defaults\DefaultClientMessageHandler.cs" />
Expand Down Expand Up @@ -144,6 +145,7 @@ under the License.
<Compile Include="IDriverRestarted.cs" />
<Compile Include="IDriverStarted.cs" />
<Compile Include="IDriverRestartCompleted.cs" />
<Compile Include="IProgressProvider.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Task\ICompletedTask.cs" />
<Compile Include="Task\IFailedTask.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public final class Constants {
.set(DriverConfiguration.ON_DRIVER_STARTED, JobDriver.StartHandler.class)
.set(DriverConfiguration.ON_TASK_SUSPENDED, JobDriver.SuspendedTaskHandler.class)
.set(DriverConfiguration.ON_EVALUATOR_COMPLETED, JobDriver.CompletedEvaluatorHandler.class)
.set(DriverConfiguration.PROGRESS_PROVIDER, JobDriver.ProgressProvider.class)
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final class BridgeHandlerManager {
private long driverRestartRunningTaskHandler = 0;
private long driverRestartCompletedHandler = 0;
private long driverRestartFailedEvaluatorHandler = 0;
private long progressProvider = 0;

public BridgeHandlerManager() {
}
Expand Down Expand Up @@ -183,4 +184,12 @@ public long getDriverRestartFailedEvaluatorHandler() {
public void setDriverRestartFailedEvaluatorHandler(final long driverRestartFailedEvaluatorHandler) {
this.driverRestartFailedEvaluatorHandler = driverRestartFailedEvaluatorHandler;
}

public long getProgressProvider() {
return progressProvider;
}

public void setProgressProvider(final long progressProvider) {
this.progressProvider = progressProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public static native void clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
final InteropLogger interopLogger
);

public static native float clrSystemProgressProviderGetProgress(final long handle);

/**
* Empty private constructor to prohibit instantiation of utility class.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,4 +776,18 @@ public void onNext(final ContextMessage message) {
}
}
}

/**
* Gets the progress of the application from .NET side.
*/
public final class ProgressProvider implements org.apache.reef.driver.ProgressProvider {
@Override
public float getProgress() {
if (JobDriver.this.handlerManager != null && JobDriver.this.handlerManager.getProgressProvider() != 0) {
return NativeInterop.clrSystemProgressProviderGetProgress(JobDriver.this.handlerManager.getProgressProvider());
}

return 0f;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.remote.Encoder;
Expand Down Expand Up @@ -82,7 +83,7 @@ final class YarnContainerManager
private final String jobSubmissionDirectory;
private final REEFFileNames reefFileNames;
private final RackNameFormatter rackNameFormatter;
private final ProgressProvider progressProvider;
private final InjectionFuture<ProgressProvider> progressProvider;

@Inject
YarnContainerManager(
Expand All @@ -97,7 +98,7 @@ final class YarnContainerManager
@Parameter(JobSubmissionDirectory.class) final String jobSubmissionDirectory,
final TrackingURLProvider trackingURLProvider,
final RackNameFormatter rackNameFormatter,
final ProgressProvider progressProvider) throws IOException {
final InjectionFuture<ProgressProvider> progressProvider) throws IOException {
this.reefEventHandlers = reefEventHandlers;
this.driverStatusManager = driverStatusManager;

Expand Down Expand Up @@ -162,7 +163,14 @@ public void onNodesUpdated(final List<NodeReport> nodeReports) {

@Override
public float getProgress() {
return progressProvider.getProgress();
try {
return Math.max(Math.min(1, progressProvider.get().getProgress()), 0);
} catch (final Exception e) {
// An Exception must be caught and logged here because YARN swallows the Exception and fails the job.
LOG.log(Level.WARNING, "An exception occurred in ProgressProvider.getProgress(), with message : " +
e.getMessage() + ". Returning 0 as progress.");
return 0f;
}
}

@Override
Expand Down

0 comments on commit 5b65c73

Please sign in to comment.