Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-1428] Validate Task Stop failure => FailedTask Event #1038

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -166,6 +166,7 @@ under the License.
<Compile Include="Runtime\Evaluator\Task\TaskStartImpl.cs" />
<Compile Include="Runtime\Evaluator\Task\TaskState.cs" />
<Compile Include="Runtime\Evaluator\Task\TaskStatus.cs" />
<Compile Include="Runtime\Evaluator\Task\TaskStopHandlerException.cs" />
<Compile Include="Runtime\Evaluator\Task\TaskStopImpl.cs" />
<Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" />
<Compile Include="runtime\MachineStatus.cs" />
Expand Down
Expand Up @@ -350,7 +350,6 @@ private void StartTask(StartTaskProto startTaskProto)
/// <param name="e"></param>
private void HandleTaskException(TaskClientCodeException e)
{
LOGGER.Log(Level.Error, "TaskClientCodeException", e);
byte[] error;
try
{
Expand Down
Expand Up @@ -17,6 +17,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Common.Tasks.Events;
using Org.Apache.REEF.Tang.Annotations;
Expand All @@ -29,24 +30,18 @@ internal sealed class TaskLifeCycle
{
private readonly IReadOnlyCollection<IObserver<ITaskStop>> _taskStopHandlers;
private readonly IReadOnlyCollection<IObserver<ITaskStart>> _taskStartHandlers;
private readonly Optional<ITaskStart> _taskStart;
private readonly Optional<ITaskStop> _taskStop;
private readonly ITaskStart _taskStart;
private readonly ITaskStop _taskStop;

private int _startHasBeenInvoked = 0;
private int _stopHasBeenInvoked = 0;

[Inject]
private TaskLifeCycle(
[Parameter(typeof(TaskConfigurationOptions.StartHandlers))] ISet<IObserver<ITaskStart>> taskStartHandlers,
[Parameter(typeof(TaskConfigurationOptions.StopHandlers))] ISet<IObserver<ITaskStop>> taskStopHandlers,
ITaskStart taskStart,
ITaskStop taskStop)
: this(taskStartHandlers, taskStopHandlers, Optional<ITaskStart>.Of(taskStart), Optional<ITaskStop>.Of(taskStop))
{
}

private TaskLifeCycle(
IEnumerable<IObserver<ITaskStart>> taskStartHandlers,
IEnumerable<IObserver<ITaskStop>> taskStopHandlers,
Optional<ITaskStart> taskStart,
Optional<ITaskStop> taskStop)
{
_taskStartHandlers = new ReadOnlySet<IObserver<ITaskStart>>(taskStartHandlers);
_taskStopHandlers = new ReadOnlySet<IObserver<ITaskStop>>(taskStopHandlers);
Expand All @@ -56,27 +51,30 @@ internal sealed class TaskLifeCycle

public void Start()
{
if (!_taskStart.IsPresent())
if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0)
{
return;
}

foreach (var startHandler in _taskStartHandlers)
{
startHandler.OnNext(_taskStart.Value);
foreach (var startHandler in _taskStartHandlers)
{
startHandler.OnNext(_taskStart);
}
}
}

public void Stop()
{
if (!_taskStop.IsPresent())
try
{
return;
if (Interlocked.Exchange(ref _stopHasBeenInvoked, 1) == 0)
{
foreach (var stopHandler in _taskStopHandlers)
{
stopHandler.OnNext(_taskStop);
}
}
}

foreach (var stopHandler in _taskStopHandlers)
catch (Exception e)
{
stopHandler.OnNext(_taskStop.Value);
throw new TaskStopHandlerException("Encountered Exception on TaskStopHandler.", e);
Copy link
Contributor

@jwang98052 jwang98052 Jun 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catch exception in Stop but not start? Any specific exception you would expect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start is addressed in another JIRA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we serialize TaskStopHandlerException to evalautor or eventually replaced it with SystemException in ContextRuntime? This is actually the exception from client's handler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the client's Exception. We serialize the client's Exception using InnerException, as seen in TaskRuntime.cs.

}
}
}
Expand Down
Expand Up @@ -110,6 +110,10 @@ public Thread StartTaskOnNewThread()
"Task running result:\r\n" + System.Text.Encoding.Default.GetString(result));
}
}
catch (TaskStopHandlerException e)
{
_currentStatus.SetException(e.InnerException);
}
catch (Exception e)
{
_currentStatus.SetException(e);
Expand Down Expand Up @@ -156,11 +160,11 @@ public bool HasEnded()

public void Close(byte[] message)
{
Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId));
Logger.Log(Level.Info, "Trying to close Task {0}", TaskId);

if (_currentStatus.IsNotRunning())
{
Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State));
Logger.Log(Level.Warning, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State);
return;
}
try
Expand All @@ -171,8 +175,7 @@ public void Close(byte[] message)
catch (Exception e)
{
Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger);
_currentStatus.SetException(TaskClientCodeException.Create(
TaskId, ContextId, "Error during Close().", e));
_currentStatus.SetException(e);
}
}

Expand Down
Expand Up @@ -100,16 +100,6 @@ public void SetException(Exception e)
{
try
{
if (HasEnded())
{
// Note that this is possible if the job is already DONE, but a
// Task Close is triggered prior to the DONE signal propagates to the
// Driver. If the Task Close handler is not implemented, the Handler will
// mark the Task with an Exception, although for all intents and purposes
// the Task is already done and should not be affected.
return;
}

if (!_lastException.IsPresent())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not a valid scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. In this PR I've moved SetException s to only be called prior to SetResult, so this should not be a valid scenario anymore. If TaskStopHandler fails, we would not have called SetResult yet and we should directly move here. If TaskCloseHandler fails, the Task would not have been completed yet, provided that they lock on the same object, so that will not be a valid scenario either.

{
_lastException = Optional<Exception>.Of(e);
Expand All @@ -130,6 +120,8 @@ public void SetResult(byte[] result)
lock (_heartBeatManager)
{
_result = Optional<byte[]>.OfNullable(result);
_taskLifeCycle.Stop();

switch (State)
{
case TaskState.SuspendRequested:
Expand All @@ -140,7 +132,6 @@ public void SetResult(byte[] result)
State = TaskState.Done;
break;
}
_taskLifeCycle.Stop();
Heartbeat();
}
}
Expand Down
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System;

namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
{
/// <summary>
/// An Exception that indicates that the TaskStopHandlers have triggered an Exception.
/// </summary>
internal sealed class TaskStopHandlerException : Exception
{
internal TaskStopHandlerException(string message, Exception inner) : base(message, inner)
{
}
}
}
Expand Up @@ -106,8 +106,8 @@ public void OnNext(IActiveContext value)
{
case ContextOneId:
var contextConfig =
Common.Context.ContextConfiguration.ConfigurationModule.Set(
Common.Context.ContextConfiguration.Identifier, ContextTwoId)
REEF.Common.Context.ContextConfiguration.ConfigurationModule.Set(
REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId)
.Build();
var stackingContextConfig =
TangFactory.GetTang()
Expand Down Expand Up @@ -193,9 +193,9 @@ public void OnNext(IActiveContext value)
{
case ContextOneId:
var contextConfig =
Common.Context.ContextConfiguration.ConfigurationModule
.Set(Common.Context.ContextConfiguration.Identifier, ContextTwoId)
.Set(Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class)
REEF.Common.Context.ContextConfiguration.ConfigurationModule
.Set(REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId)
.Set(REEF.Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class)
.Build();

var stackingContextConfig =
Expand Down Expand Up @@ -287,8 +287,8 @@ public void OnNext(IDriverStarted value)

public void OnNext(IAllocatedEvaluator value)
{
value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
.Set(Common.Context.ContextConfiguration.Identifier, ContextOneId)
value.SubmitContext(REEF.Common.Context.ContextConfiguration.ConfigurationModule
.Set(REEF.Common.Context.ContextConfiguration.Identifier, ContextOneId)
.Build());
}

Expand Down
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System;

namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers
{
/// <summary>
/// A helper test class that implements <see cref="IObserver{T}"/>, which throws an
/// Exception after executing an optional Action provided by the caller of the constructor.
/// </summary>
internal abstract class ExceptionThrowingHandler<T> : IObserver<T>
{
private readonly Exception _exceptionToThrow;
private readonly Action<T> _action;

protected ExceptionThrowingHandler(
Exception exceptionToThrow, Action<T> action = null)
{
_exceptionToThrow = exceptionToThrow;
_action = action;
}

public void OnNext(T value)
{
if (_action != null)
{
_action(value);
}

throw _exceptionToThrow;
}

public void OnError(Exception error)
{
throw new NotImplementedException();
}

public void OnCompleted()
{
throw new NotImplementedException();
}
}
}
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers
{
/// <summary>
/// A helper test class that implements <see cref="IObserver{T}"/>, which logs
/// a message provided by the caller of the constructor.
/// </summary>
public abstract class LoggingHandler<T> : IObserver<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test classes don't particularly matter because they are not a part of our API anyway.

{
private static readonly Logger Logger = Logger.GetLogger(typeof(LoggingHandler<>));

private readonly string _messageToLog;

protected LoggingHandler(string messageToLog)
{
_messageToLog = messageToLog;
}

public void OnNext(T value)
{
Logger.Log(Level.Info, _messageToLog);
}

public void OnError(Exception error)
{
throw new NotImplementedException();
}

public void OnCompleted()
{
throw new NotImplementedException();
}
}
}
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.Tests.Functional.Common.Task
{
/// <summary>
/// A helper test class that implements <see cref="ITask"/>, which logs
/// a message provided by the caller of the constructor.
/// </summary>
public abstract class LoggingTask : ITask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See other comment.

{
private static readonly Logger Logger = Logger.GetLogger(typeof(LoggingTask));

private readonly string _messageToLog;

protected LoggingTask(string messageToLog)
{
_messageToLog = messageToLog;
}

public void Dispose()
{
}

public byte[] Call(byte[] memento)
{
Logger.Log(Level.Info, _messageToLog);
return null;
}
}
}