diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index 38cfeacf549..7ee5df94f4c 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1811,6 +1811,19 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source WithBackoff(System.Func> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { } public static Akka.Streams.Dsl.Source WithBackoff(System.Func> sourceFactory, Akka.Streams.RestartSettings settings) { } } + public class RestartWithBackoffFlow + { + public RestartWithBackoffFlow() { } + public class Delay : Akka.Streams.Attributes.IAttribute, System.IEquatable + { + public readonly System.TimeSpan Duration; + public Delay(System.TimeSpan duration) { } + public bool Equals(Akka.Streams.Dsl.RestartWithBackoffFlow.Delay other) { } + public override bool Equals(object obj) { } + public override int GetHashCode() { } + public override string ToString() { } + } + } public class static Retry { [Akka.Annotations.ApiMayChangeAttribute()] diff --git a/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs index 3bf123c84f8..e5cab69987a 100644 --- a/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Akka.Event; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Streams.TestKit.Tests; @@ -132,11 +133,11 @@ public void A_restart_with_backoff_source_should_backoff_before_restart() { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => - { - created.IncrementAndGet(); - return Source.From(new List { "a", "b" }); - }, _restartSettings) - .RunWith(this.SinkProbe(), Materializer); + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }); + }, _restartSettings) + .RunWith(this.SinkProbe(), Materializer); probe.RequestNext("a"); probe.RequestNext("b"); @@ -161,11 +162,11 @@ public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_ { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => - { - created.IncrementAndGet(); - return Source.From(new List { "a", "b" }); - }, _restartSettings) - .RunWith(this.SinkProbe(), Materializer); + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }); + }, _restartSettings) + .RunWith(this.SinkProbe(), Materializer); probe.RequestNext("a"); probe.RequestNext("b"); @@ -316,11 +317,11 @@ public void A_restart_with_backoff_source_should_not_restart_the_source_when_max { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => - { - created.IncrementAndGet(); - return Source.Single("a"); - }, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff)) - .RunWith(this.SinkProbe(), Materializer); + { + created.IncrementAndGet(); + return Source.Single("a"); + }, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff)) + .RunWith(this.SinkProbe(), Materializer); probe.RequestNext("a"); probe.RequestNext("a"); @@ -339,11 +340,11 @@ public void A_restart_with_backoff_source_should_reset_maxRestarts_when_source_r { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => - { - created.IncrementAndGet(); - return Source.Single("a"); - }, _restartSettings.WithMaxRestarts(2, _minBackoff)) - .RunWith(this.SinkProbe(), Materializer); + { + created.IncrementAndGet(); + return Source.Single("a"); + }, _restartSettings.WithMaxRestarts(2, _minBackoff)) + .RunWith(this.SinkProbe(), Materializer); probe.RequestNext("a"); // There should be minBackoff delay @@ -371,11 +372,11 @@ public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_ins { var created = new AtomicCounter(0); var probe = RestartSource.WithBackoff(() => - { - created.IncrementAndGet(); - return Source.From(new List { "a", "b" }).TakeWhile(c => c != "b"); - }, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1))) - .RunWith(this.SinkProbe(), Materializer); + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }).TakeWhile(c => c != "b"); + }, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1))) + .RunWith(this.SinkProbe(), Materializer); probe.RequestNext("a"); probe.RequestNext("a"); @@ -765,6 +766,70 @@ public void A_restart_with_backoff_flow_should_run_normally() }, Materializer); } + [Fact] + public void Simplified_restart_flow_restarts_stages_test() + { + var created = new AtomicCounter(0); + var restarts = 4; + this.AssertAllStagesStopped(() => + { + var flow = RestartFlowFactory(() => + { + created.IncrementAndGet(); + return Flow.Create() + .Select(i => + { + if (i == 6) + { + throw new ArgumentException($"BOOM"); + } + + return i; + }); + }, true, + //defaults to unlimited restarts + RestartSettings.Create(TimeSpan.FromMilliseconds(10), TimeSpan.FromSeconds(30), 0)); + + var (source, sink) = this.SourceProbe().Select(x => + { + Log.Debug($"Processing: {x}"); + return x; + }) + .Via(flow) + .ToMaterialized(this.SinkProbe(), Keep.Both) + .Run(Materializer); + + source.SendNext(1); + source.SendNext(2); + source.SendNext(3); + source.SendNext(4); + source.SendNext(5); + for (int i = 0; i < restarts; i++) + { + source.SendNext(6); + } + + source.SendNext(7); + source.SendNext(8); + source.SendNext(9); + source.SendNext(10); + + sink.RequestNext(1); + sink.RequestNext(2); + sink.RequestNext(3); + sink.RequestNext(4); + sink.RequestNext(5); + //6 is never received since RestartFlow's do not retry + sink.RequestNext(7); + sink.RequestNext(8); + sink.RequestNext(9); + sink.RequestNext(10); + + source.SendComplete(); + }, Materializer); + created.Current.Should().Be(restarts + 1); + } + [Fact] public void A_restart_with_backoff_flow_should_restart_on_cancellation() { @@ -1010,8 +1075,8 @@ public void A_restart_with_backoff_flow_should_restart_on_failure_when_using_onl flowInProbe.RequestNext("c"); flowOutProbe.SendNext("d"); sink.RequestNext("d"); - + sink.Request(1); created.Current.Should().Be(2); } } -} +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Restart.cs b/src/core/Akka.Streams/Dsl/RestartFlow.cs similarity index 51% rename from src/core/Akka.Streams/Dsl/Restart.cs rename to src/core/Akka.Streams/Dsl/RestartFlow.cs index d15aeeea535..e459cce657b 100644 --- a/src/core/Akka.Streams/Dsl/Restart.cs +++ b/src/core/Akka.Streams/Dsl/RestartFlow.cs @@ -7,306 +7,12 @@ using System; using Akka.Pattern; +using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; namespace Akka.Streams.Dsl { - /// - /// A RestartSource wraps a that gets restarted when it completes or fails. - /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for - /// example, for streams that depend on a remote server that may crash or become partitioned. The - /// RestartSource ensures that the graph can continue running while the restarts. - /// - public static class RestartSource - { - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// This will never emit a complete or failure, since the completion or failure of the wrapped - /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Source WithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); - return WithBackoff(sourceFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// This will never emit a complete or failure, since the completion or failure of the wrapped - /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Source WithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); - return WithBackoff(sourceFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// - /// This will never emit a complete or failure, since the completion or failure of the wrapped - /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// defining restart configuration - public static Source WithBackoff(Func> sourceFactory, RestartSettings settings) - => Source.FromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures: false)); - - /// - /// Wrap the given with a that will restart it when it fails using an exponential backoff. - /// This will never emit a failure, since the failure of the wrapped is always handled by - /// restarting. The wrapped can be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Source OnFailuresWithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); - return OnFailuresWithBackoff(sourceFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails using an exponential backoff. - /// This will never emit a failure, since the failure of the wrapped is always handled by - /// restarting. The wrapped can be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Source OnFailuresWithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); - return OnFailuresWithBackoff(sourceFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails using an exponential backoff. - /// - /// This will never emit a failure, since the failure of the wrapped is always handled by - /// restarting. The wrapped can be cancelled by cancelling this . - /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. - /// This can be triggered simply by the downstream cancelling, or externally by introducing a right - /// after this in the graph. - /// - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// defining restart configuration - public static Source OnFailuresWithBackoff(Func> sourceFactory, RestartSettings settings) - => Source.FromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures: true)); - } - - internal sealed class RestartWithBackoffSource : GraphStage> - { - public Func> SourceFactory { get; } - public RestartSettings Settings { get; } - public bool OnlyOnFailures { get; } - - public RestartWithBackoffSource(Func> sourceFactory, RestartSettings settings, bool onlyOnFailures) - { - SourceFactory = sourceFactory; - Settings = settings; - OnlyOnFailures = onlyOnFailures; - Shape = new SourceShape(Out); - } - - public Outlet Out { get; } = new Outlet("RestartWithBackoffSource.out"); - - public override SourceShape Shape { get; } - - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Source"); - - private sealed class Logic : RestartWithBackoffLogic, T, T> - { - private readonly RestartWithBackoffSource _stage; - - public Logic(RestartWithBackoffSource stage, string name) - : base(name, stage.Shape, null, stage.Out, stage.Settings, stage.OnlyOnFailures) - { - _stage = stage; - Backoff(); - } - - protected override void StartGraph() - { - var sinkIn = CreateSubInlet(_stage.Out); - _stage.SourceFactory().RunWith(sinkIn.Sink, SubFusingMaterializer); - if (IsAvailable(_stage.Out)) - sinkIn.Pull(); - } - - protected override void Backoff() => SetHandler(_stage.Out, () => - { - // do nothing - }); - } - } - - /// - /// A RestartSink wraps a that gets restarted when it completes or fails. - /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for - /// example, for streams that depend on a remote server that may crash or become partitioned. The - /// RestartSink ensures that the graph can continue running while the restarts. - /// - public static class RestartSink - { - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// - /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. - /// The wrapped can however be completed by feeding a completion or error into this . When that - /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered - /// simply by the upstream completing, or externally by introducing a right before this in the - /// graph. - /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - /// messages. When the wrapped does cancel, this will backpressure, however any elements already - /// sent may have been lost. - /// - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Sink WithBackoff(Func> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); - return WithBackoff(sinkFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// - /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. - /// The wrapped can however be completed by feeding a completion or error into this . When that - /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered - /// simply by the upstream completing, or externally by introducing a right before this in the - /// graph. - /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - /// messages. When the wrapped does cancel, this will backpressure, however any elements already - /// sent may have been lost. - /// - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// Minimum (initial) duration until the child actor will started again, if it is terminated - /// The exponential back-off is capped to this duration - /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. - /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. - [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] - public static Sink WithBackoff(Func> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) - { - var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); - return WithBackoff(sinkFactory, settings); - } - - /// - /// Wrap the given with a that will restart it when it fails or complete using an exponential - /// backoff. - /// - /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. - /// The wrapped can however be completed by feeding a completion or error into this . When that - /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered - /// simply by the upstream completing, or externally by introducing a right before this in the - /// graph. - /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of - /// messages. When the wrapped does cancel, this will backpressure, however any elements already - /// sent may have been lost. - /// - /// This uses the same exponential backoff algorithm as . - /// - /// A factory for producing the to wrap. - /// defining restart configuration - public static Sink WithBackoff(Func> sinkFactory, RestartSettings settings) - => Sink.FromGraph(new RestartWithBackoffSink(sinkFactory, settings)); - } - - internal sealed class RestartWithBackoffSink : GraphStage> - { - public Func> SinkFactory { get; } - public RestartSettings Settings { get; } - - public RestartWithBackoffSink(Func> sinkFactory, RestartSettings settings) - { - SinkFactory = sinkFactory; - Settings = settings; - Shape = new SinkShape(In); - } - - public Inlet In { get; } = new Inlet("RestartWithBackoffSink.in"); - - public override SinkShape Shape { get; } - - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Sink"); - - private sealed class Logic : RestartWithBackoffLogic, T, T> - { - private readonly RestartWithBackoffSink _stage; - - public Logic(RestartWithBackoffSink stage, string name) - : base(name, stage.Shape, stage.In, null, stage.Settings, onlyOnFailures: false) - { - _stage = stage; - Backoff(); - } - - protected override void StartGraph() - { - var sourceOut = CreateSubOutlet(_stage.In); - Source.FromGraph(sourceOut.Source).RunWith(_stage.SinkFactory(), SubFusingMaterializer); - } - - protected override void Backoff() => SetHandler(_stage.In, () => - { - // do nothing - }); - } - } - + /// /// A RestartFlow wraps a that gets restarted when it completes or fails. /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for @@ -463,7 +169,7 @@ internal sealed class RestartWithBackoffFlow : GraphStage> FlowFactory { get; } public RestartSettings Settings { get; } public bool OnlyOnFailures { get; } - + public RestartWithBackoffFlow( Func> flowFactory, RestartSettings settings, @@ -481,16 +187,20 @@ internal sealed class RestartWithBackoffFlow : GraphStage Shape { get; } - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, "Flow"); + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes, "Flow"); private sealed class Logic : RestartWithBackoffLogic, TIn, TOut> { private readonly RestartWithBackoffFlow _stage; + private readonly Attributes _inheritedAttributes; private Tuple, SubSinkInlet> _activeOutIn; - - public Logic(RestartWithBackoffFlow stage, string name) + private TimeSpan _delay; + + public Logic(RestartWithBackoffFlow stage, Attributes inheritedAttributes, string name) : base(name, stage.Shape, stage.In, stage.Out, stage.Settings, stage.OnlyOnFailures) { + _inheritedAttributes = inheritedAttributes; + _delay = _inheritedAttributes.GetAttribute(new RestartWithBackoffFlow.Delay(TimeSpan.FromMilliseconds(50))).Duration; _stage = stage; Backoff(); } @@ -499,13 +209,21 @@ protected override void StartGraph() { var sourceOut = CreateSubOutlet(_stage.In); var sinkIn = CreateSubInlet(_stage.Out); - Source.FromGraph(sourceOut.Source).Via(_stage.FlowFactory()).RunWith(sinkIn.Sink, SubFusingMaterializer); + + var graph = Source.FromGraph(sourceOut.Source) + //temp fix becaues the proper fix would be to have a concept of cause of cancellation. See https://github.com/akka/akka/pull/23909 + //TODO register issue to track this + .Via(DelayCancellation(_delay)) + .Via(_stage.FlowFactory()) + .To(sinkIn.Sink); + SubFusingMaterializer.Materialize(graph, _inheritedAttributes); + if (IsAvailable(_stage.Out)) sinkIn.Pull(); _activeOutIn = Tuple.Create(sourceOut, sinkIn); } - + protected override void Backoff() { SetHandler(_stage.In, () => @@ -516,7 +234,7 @@ protected override void Backoff() { // do nothing }); - + // We need to ensure that the other end of the sub flow is also completed, so that we don't // receive any callbacks from it. if (_activeOutIn != null) @@ -531,6 +249,8 @@ protected override void Backoff() _activeOutIn = null; } } + + private Flow DelayCancellation(TimeSpan duration) => Flow.FromGraph(new DelayCancellationStage(duration, null)); } } @@ -542,7 +262,7 @@ internal abstract class RestartWithBackoffLogic : TimerGraphS private readonly string _name; private readonly RestartSettings _settings; private readonly bool _onlyOnFailures; - + protected Inlet In { get; } protected Outlet Out { get; } @@ -587,10 +307,12 @@ protected SubSinkInlet CreateSubInlet(Outlet outlet) Complete(Out); else { - Log.Debug("Restarting graph due to finished upstream"); ScheduleRestartTimer(); } }, + /* + * upstream in this context is the wrapped stage + */ onUpstreamFailure: ex => { if (_finishing || MaxRestartsReached()) @@ -633,7 +355,6 @@ protected SubSourceOutlet CreateSubOutlet(Inlet inlet) Cancel(In); else { - Log.Debug("Graph in finished"); ScheduleRestartTimer(); } } @@ -697,7 +418,95 @@ protected internal override void OnTimer(object timerKey) /// public override void PreStart() => StartGraph(); } + + public class RestartWithBackoffFlow { + /// + /// Temporary attribute that can override the time a [[RestartWithBackoffFlow]] waits + /// for a failure before cancelling. + /// See https://github.com/akka/akka/issues/24529 + /// Should be removed if/when cancellation can include a cause. + /// + public class Delay : Attributes.IAttribute, IEquatable + { + /// + /// Delay duration + /// + public readonly TimeSpan Duration; + public Delay(TimeSpan duration) + { + Duration = duration; + } + + /// + public bool Equals(Delay other) => !ReferenceEquals(other, null) && Equals(Duration, other.Duration); + + /// + public override bool Equals(object obj) => obj is Delay && Equals((Delay)obj); + + /// + public override int GetHashCode() => Duration.GetHashCode(); + + /// + public override string ToString() => $"Duration({Duration})"; + } + } + + /// + /// Returns a flow that is almost identical but delays propagation of cancellation from downstream to upstream. + /// Once the down stream is finished calls to onPush are ignored + /// + /// + internal class DelayCancellationStage : SimpleLinearGraphStage + { + private readonly TimeSpan _delay; + + public DelayCancellationStage(TimeSpan delay, string name = null) : base(name) + { + _delay = delay; + } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes); + + private sealed class Logic : TimerGraphStageLogic + { + private readonly DelayCancellationStage _stage; + + public Logic(DelayCancellationStage stage, Attributes inheritedAttributes) : base(stage.Shape) + { + _stage = stage; + + SetHandler(stage.Inlet, onPush: () => Push(stage.Outlet, Grab(stage.Inlet))); + + SetHandler(stage.Outlet, onPull: () => Pull(stage.Inlet), onDownstreamFinish: OnDownStreamFinished ); + } + + /// + /// We should really. port the Cause parameter functionality for the OnDownStreamFinished delegate + /// + private void OnDownStreamFinished() + { + //_cause = new Option(/*cause*/); + ScheduleOnce("CompleteState", _stage._delay); + SetHandler(_stage.Inlet, onPush:DoNothing); + } + + protected internal override void OnTimer(object timerKey) + { + Log.Debug($"Stage was cancelled after delay of {_stage._delay}"); + CompleteStage(); + + // this code will replace the CompleteStage() call once we port the Exception Cause parameter for the OnDownStreamFinished delegate + /*if(_cause != null) + FailStage(_cause.Value); //<-- is this the same as cancelStage ? + else + { + throw new IllegalStateException("Timer hitting without first getting a cancel cannot happen"); + }*/ + } + } + } + internal sealed class Deadline { public Deadline(TimeSpan time) => Time = time; diff --git a/src/core/Akka.Streams/Dsl/RestartSink.cs b/src/core/Akka.Streams/Dsl/RestartSink.cs new file mode 100644 index 00000000000..8d7b34dbb9d --- /dev/null +++ b/src/core/Akka.Streams/Dsl/RestartSink.cs @@ -0,0 +1,140 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Pattern; +using Akka.Streams.Stage; + +namespace Akka.Streams.Dsl +{ + /// + /// A RestartSink wraps a that gets restarted when it completes or fails. + /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for + /// example, for streams that depend on a remote server that may crash or become partitioned. The + /// RestartSink ensures that the graph can continue running while the restarts. + /// + public static class RestartSink + { + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// + /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. + /// The wrapped can however be completed by feeding a completion or error into this . When that + /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered + /// simply by the upstream completing, or externally by introducing a right before this in the + /// graph. + /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + /// messages. When the wrapped does cancel, this will backpressure, however any elements already + /// sent may have been lost. + /// + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Sink WithBackoff(Func> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); + return WithBackoff(sinkFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// + /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. + /// The wrapped can however be completed by feeding a completion or error into this . When that + /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered + /// simply by the upstream completing, or externally by introducing a right before this in the + /// graph. + /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + /// messages. When the wrapped does cancel, this will backpressure, however any elements already + /// sent may have been lost. + /// + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Sink WithBackoff(Func> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); + return WithBackoff(sinkFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// + /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. + /// The wrapped can however be completed by feeding a completion or error into this . When that + /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered + /// simply by the upstream completing, or externally by introducing a right before this in the + /// graph. + /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + /// messages. When the wrapped does cancel, this will backpressure, however any elements already + /// sent may have been lost. + /// + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// defining restart configuration + public static Sink WithBackoff(Func> sinkFactory, RestartSettings settings) + => Sink.FromGraph(new RestartWithBackoffSink(sinkFactory, settings)); + } + + internal sealed class RestartWithBackoffSink : GraphStage> + { + public Func> SinkFactory { get; } + public RestartSettings Settings { get; } + + public RestartWithBackoffSink(Func> sinkFactory, RestartSettings settings) + { + SinkFactory = sinkFactory; + Settings = settings; + Shape = new SinkShape(In); + } + + public Inlet In { get; } = new Inlet("RestartWithBackoffSink.in"); + + public override SinkShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes,"Sink"); + + private sealed class Logic : RestartWithBackoffLogic, T, T> + { + private readonly RestartWithBackoffSink _stage; + private readonly Attributes _inheritedAttributes; + + public Logic(RestartWithBackoffSink stage, Attributes inheritedAttributes, string name) + : base(name, stage.Shape, stage.In, null, stage.Settings, onlyOnFailures: false) + { + _stage = stage; + _inheritedAttributes = inheritedAttributes; + Backoff(); + } + + protected override void StartGraph() + { + var sourceOut = CreateSubOutlet(_stage.In); + SubFusingMaterializer.Materialize(Source.FromGraph(sourceOut.Source).To(_stage.SinkFactory()), _inheritedAttributes); + } + + protected override void Backoff() => SetHandler(_stage.In, () => + { + // do nothing + }); + } + } + +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/RestartSource.cs b/src/core/Akka.Streams/Dsl/RestartSource.cs new file mode 100644 index 00000000000..bca1bd32283 --- /dev/null +++ b/src/core/Akka.Streams/Dsl/RestartSource.cs @@ -0,0 +1,188 @@ +// //----------------------------------------------------------------------- +// // +// // Copyright (C) 2009-2021 Lightbend Inc. +// // Copyright (C) 2013-2021 .NET Foundation +// // +// //----------------------------------------------------------------------- + +using System; +using Akka.Pattern; +using Akka.Streams.Stage; + +namespace Akka.Streams.Dsl +{ + /// + /// A RestartSource wraps a that gets restarted when it completes or fails. + /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for + /// example, for streams that depend on a remote server that may crash or become partitioned. The + /// RestartSource ensures that the graph can continue running while the restarts. + /// + public static class RestartSource + { + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// This will never emit a complete or failure, since the completion or failure of the wrapped + /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Source WithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); + return WithBackoff(sourceFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// This will never emit a complete or failure, since the completion or failure of the wrapped + /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Source WithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); + return WithBackoff(sourceFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// + /// This will never emit a complete or failure, since the completion or failure of the wrapped + /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// defining restart configuration + public static Source WithBackoff(Func> sourceFactory, RestartSettings settings) + => Source.FromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures: false)); + + /// + /// Wrap the given with a that will restart it when it fails using an exponential backoff. + /// This will never emit a failure, since the failure of the wrapped is always handled by + /// restarting. The wrapped can be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Source OnFailuresWithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor); + return OnFailuresWithBackoff(sourceFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails using an exponential backoff. + /// This will never emit a failure, since the failure of the wrapped is always handled by + /// restarting. The wrapped can be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + /// The amount of restarts is capped to this amount within a time frame of minBackoff. Passing `0` will cause no restarts and a negative number will not cap the amount of restarts. + [Obsolete("Use the overloaded method which accepts Akka.Stream.RestartSettings instead.")] + public static Source OnFailuresWithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor, int maxRestarts) + { + var settings = RestartSettings.Create(minBackoff, maxBackoff, randomFactor).WithMaxRestarts(maxRestarts, minBackoff); + return OnFailuresWithBackoff(sourceFactory, settings); + } + + /// + /// Wrap the given with a that will restart it when it fails using an exponential backoff. + /// + /// This will never emit a failure, since the failure of the wrapped is always handled by + /// restarting. The wrapped can be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// defining restart configuration + public static Source OnFailuresWithBackoff(Func> sourceFactory, RestartSettings settings) + => Source.FromGraph(new RestartWithBackoffSource(sourceFactory, settings, onlyOnFailures: true)); + } + + internal sealed class RestartWithBackoffSource : GraphStage> + { + public Func> SourceFactory { get; } + public RestartSettings Settings { get; } + public bool OnlyOnFailures { get; } + + public RestartWithBackoffSource(Func> sourceFactory, RestartSettings settings, bool onlyOnFailures) + { + SourceFactory = sourceFactory; + Settings = settings; + OnlyOnFailures = onlyOnFailures; + Shape = new SourceShape(Out); + } + + public Outlet Out { get; } = new Outlet("RestartWithBackoffSource.out"); + + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes, "Source"); + + private sealed class Logic : RestartWithBackoffLogic, T, T> + { + private readonly RestartWithBackoffSource _stage; + private readonly Attributes _inheritedAttributes; + + public Logic(RestartWithBackoffSource stage, Attributes inheritedAttributes, string name) + : base(name, stage.Shape, null, stage.Out, stage.Settings, stage.OnlyOnFailures) + { + _stage = stage; + _inheritedAttributes = inheritedAttributes; + Backoff(); + } + + protected override void StartGraph() + { + var sinkIn = CreateSubInlet(_stage.Out); + SubFusingMaterializer.Materialize(_stage.SourceFactory().To(sinkIn.Sink), _inheritedAttributes); + if (IsAvailable(_stage.Out)) + sinkIn.Pull(); + } + + protected override void Backoff() => SetHandler(_stage.Out, () => + { + // do nothing + }); + } + } + +} \ No newline at end of file