Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix RestartFlow issue; #5165 #5181

Merged
merged 10 commits into from Aug 16, 2021
13 changes: 13 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Expand Up @@ -1811,6 +1811,19 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor, int maxRestarts) { }
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, Akka.Streams.RestartSettings settings) { }
}
public class RestartWithBackoffFlow
{
public RestartWithBackoffFlow() { }
public class Delay : Akka.Streams.Attributes.IAttribute, System.IEquatable<Akka.Streams.Dsl.RestartWithBackoffFlow.Delay>
{
public readonly System.TimeSpan Duration;
public Delay(System.TimeSpan duration) { }
public bool Equals(Akka.Streams.Dsl.RestartWithBackoffFlow.Delay other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
public override string ToString() { }
}
}
public class static Retry
{
[Akka.Annotations.ApiMayChangeAttribute()]
Expand Down
119 changes: 92 additions & 27 deletions src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs
Expand Up @@ -11,6 +11,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Event;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.TestKit.Tests;
Expand Down Expand Up @@ -132,11 +133,11 @@ public void A_restart_with_backoff_source_should_backoff_before_restart()
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);

probe.RequestNext("a");
probe.RequestNext("b");
Expand All @@ -161,11 +162,11 @@ public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" });
}, _restartSettings)
.RunWith(this.SinkProbe<string>(), Materializer);

probe.RequestNext("a");
probe.RequestNext("b");
Expand Down Expand Up @@ -316,11 +317,11 @@ public void A_restart_with_backoff_source_should_not_restart_the_source_when_max
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.Single("a");
}, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.Single("a");
}, _shortRestartSettings.WithMaxRestarts(1, _shortMinBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);

probe.RequestNext("a");
probe.RequestNext("a");
Expand All @@ -339,11 +340,11 @@ public void A_restart_with_backoff_source_should_reset_maxRestarts_when_source_r
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.Single("a");
}, _restartSettings.WithMaxRestarts(2, _minBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.Single("a");
}, _restartSettings.WithMaxRestarts(2, _minBackoff))
.RunWith(this.SinkProbe<string>(), Materializer);

probe.RequestNext("a");
// There should be minBackoff delay
Expand Down Expand Up @@ -371,11 +372,11 @@ public void A_restart_with_backoff_source_should_allow_using_withMaxRestarts_ins
{
var created = new AtomicCounter(0);
var probe = RestartSource.WithBackoff(() =>
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" }).TakeWhile(c => c != "b");
}, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1)))
.RunWith(this.SinkProbe<string>(), Materializer);
{
created.IncrementAndGet();
return Source.From(new List<string> { "a", "b" }).TakeWhile(c => c != "b");
}, _shortRestartSettings.WithMaxRestarts(2, TimeSpan.FromSeconds(1)))
.RunWith(this.SinkProbe<string>(), Materializer);

probe.RequestNext("a");
probe.RequestNext("a");
Expand Down Expand Up @@ -765,6 +766,70 @@ public void A_restart_with_backoff_flow_should_run_normally()
}, Materializer);
}

[Fact]
public void Simplified_restart_flow_restarts_stages_test()
{
var created = new AtomicCounter(0);
var restarts = 4;
this.AssertAllStagesStopped(() =>
{
var flow = RestartFlowFactory<int, int, NotUsed>(() =>
{
created.IncrementAndGet();
return Flow.Create<int>()
.Select(i =>
{
if (i == 6)
{
throw new ArgumentException($"BOOM");
}

return i;
});
}, true,
//defaults to unlimited restarts
RestartSettings.Create(TimeSpan.FromMilliseconds(10), TimeSpan.FromSeconds(30), 0));

var (source, sink) = this.SourceProbe<int>().Select(x =>
{
Log.Debug($"Processing: {x}");
return x;
})
.Via(flow)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);

source.SendNext(1);
source.SendNext(2);
source.SendNext(3);
source.SendNext(4);
source.SendNext(5);
for (int i = 0; i < restarts; i++)
{
source.SendNext(6);
}

source.SendNext(7);
source.SendNext(8);
source.SendNext(9);
source.SendNext(10);

sink.RequestNext(1);
sink.RequestNext(2);
sink.RequestNext(3);
sink.RequestNext(4);
sink.RequestNext(5);
//6 is never received since RestartFlow's do not retry
sink.RequestNext(7);
sink.RequestNext(8);
sink.RequestNext(9);
sink.RequestNext(10);

source.SendComplete();
}, Materializer);
created.Current.Should().Be(restarts + 1);
}

[Fact]
public void A_restart_with_backoff_flow_should_restart_on_cancellation()
{
Expand Down Expand Up @@ -1010,8 +1075,8 @@ public void A_restart_with_backoff_flow_should_restart_on_failure_when_using_onl
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");

sink.Request(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

created.Current.Should().Be(2);
}
}
}
}