Skip to content

Commit

Permalink
Merge pull request #3931 from Aaronontheweb/v1.3.15-backport
Browse files Browse the repository at this point in the history
V1.3.15 backport
  • Loading branch information
Aaronontheweb committed Sep 23, 2019
2 parents 14a26ef + 93d9139 commit 537f818
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 2,057 deletions.
1,876 changes: 6 additions & 1,870 deletions RELEASE_NOTES.md

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions src/common.props
@@ -1,50 +1,62 @@
<Project>
<PropertyGroup>
<Copyright>Copyright © 2013-2018 Akka.NET Team</Copyright>
<Copyright>Copyright © 2013-2019 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.3.14</VersionPrefix>
<VersionPrefix>1.4.0</VersionPrefix>
<PackageIconUrl>http://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
<NoWarn>$(NoWarn);CS1591;xUnit1013</NoWarn>
</PropertyGroup>
<PropertyGroup>
<XunitVersion>2.3.1</XunitVersion>
<TestSdkVersion>15.7.2</TestSdkVersion>
<TestSdkVersion>15.9.0</TestSdkVersion>
<HyperionVersion>0.9.8</HyperionVersion>
<NewtonsoftJsonVersion>9.0.1</NewtonsoftJsonVersion>
<NBenchVersion>1.2.2</NBenchVersion>
<ProtobufVersion>3.9.1</ProtobufVersion>
<NetCoreTestVersion>netcoreapp2.1</NetCoreTestVersion>
<NetFrameworkTestVersion>net461</NetFrameworkTestVersion>
<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
<NetFrameworkLibVersion>net452</NetFrameworkLibVersion>
<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
<FsCheckVersion>2.9.0</FsCheckVersion>
<AkkaPackageTags>akka;actors;actor model;Akka;concurrency</AkkaPackageTags>
</PropertyGroup>
<PropertyGroup>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<PropertyGroup>
<PackageReleaseNotes>Maintenance Release for Akka.NET 1.3**
You know what? We're going to stop promising that _this_ is the last 1.3.x release, because even though we've said that twice... We now have _another_ 1.3.x release.
1.3.14 consists of non-breaking bugfixes and additions that have been contributed against the [Akka.NET v1.4.0 milestone](https://github.com/akkadotnet/akka.net/milestone/17) thus far. These include:
Akka.Cluster.Sharding: default "persistent" mode has been stabilized and errors that users have ran into during `ShardCoordinator` recovery, such as [Exception in PersistentShardCoordinator ReceiveRecover](https://github.com/akkadotnet/akka.net/issues/3414);
[Akka.Remote: no longer disassociates when serialization errors are thrown](https://github.com/akkadotnet/akka.net/pull/3782) in the remoting pipeline - the connection will now stay open;
[Akka.Cluster.Tools: mission-critical `ClusterClient` and `ClusterClientReceptionist` fixes](https://github.com/akkadotnet/akka.net/pull/3866);
[SourceLink debugging support for Akka.NET](https://github.com/akkadotnet/akka.net/pull/3848); and
[Akka.Persistence: Allow AtLeastOnceDelivery parameters to be set from deriving classes](https://github.com/akkadotnet/akka.net/pull/3810);
[Akka.Persistence.Sql: BatchingSqlJournal now preserves Sender in PersistCallback](https://github.com/akkadotnet/akka.net/pull/3779); and
[Akka: bugfix - coordinated shutdown timesout when exit-clr = on](https://github.com/akkadotnet/akka.net/issues/3815).
To [see the full set of changes for Akka.NET v1.3.14, click here](https://github.com/akkadotnet/akka.net/pull/3869).
<PackageReleaseNotes>Second pre-release candidate for Akka.NET 1.4**
Akka.NET v1.4.0 is still moving along and this release contains some new and important changes.
We've added a new package, the Akka.Persistence.TestKit - this is designed to allow users to test their `PersistentActor` implementations under real-world conditions such as database connection failures, serialization errors, and so forth. It works alongside the standard Akka.NET TestKit distributions and offers a simple, in-place API to do so.
Akka.Streams now supports [Stream Context propagation](https://github.com/akkadotnet/akka.net/pull/3741), designed to make it really easy to work with tools such as Kafka, Amazon SQS, and more - where you might want to have one piece of context (such as the partition offset in Kafka) and propagate it from the very front of an Akka.Stream all the way to the end, and then finally process it once the rest of the stream has completed processing. In the case of Kakfa, this might be updating the local consumer's partition offset only once we've been able to fully guarantee the processing of the message.
Fixed
To [follow our progress on the Akka.NET v1.4 milestone, click here](https://github.com/akkadotnet/akka.net/milestone/17).
We expect to release more beta versions in the future, and if you wish to [get access to nightly Akka.NET builds then click here](https://getakka.net/community/getting-access-to-nightly-builds.html).
| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
| 22 | 2893 | 828 | Aaron Stannard |
| 3 | 1706 | 347 | zbynek001 |
| 97 | 6527 | 3729 | Aaron Stannard |
| 13 | 11671 | 1059 | Bartosz Sypytkowski |
| 4 | 1708 | 347 | zbynek001 |
| 2 | 7 | 7 | jdsartori |
| 2 | 4 | 6 | Onur Gumus |
| 2 | 37 | 114 | Ismael Hamed |
| 1 | 65 | 47 | Ondrej Pialek |
| 1 | 3020 | 2 | Valdis Zobēla |
| 1 | 3 | 3 | Abi |
| 1 | 3 | 1 | jg11jg |
| 1 | 18 | 16 | Peter Huang |
| 1 | 1 | 2 | Maciej Wódke |
| 1 | 1 | 1 | Wessel Kranenborg |
| 1 | 1 | 1 | Kaiwei Li |
| 1 | 1 | 1 | jdsartori |</PackageReleaseNotes>
| 1 | 1 | 1 | Greatsamps |
| 1 | 1 | 1 | Arjen Smits |
| 1 | 1 | 1 | Andre |</PackageReleaseNotes>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19351-01" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19367-01" PrivateAssets="All" />
</ItemGroup>
<PropertyGroup>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
Expand Down
155 changes: 155 additions & 0 deletions src/core/Akka.Remote.Tests.MultiNode/TransportFailSpec.cs
@@ -0,0 +1,155 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.Remote.TestKit;
using Akka.Util;

namespace Akka.Remote.Tests.MultiNode
{
public class TransportFailSpecConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }

public TransportFailSpecConfig()
{
First = Role("first");
Second = Role("second");

CommonConfig = DebugConfig(true).WithFallback(ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.remote{
transport-failure-detector {
implementation-class = """+ typeof(TestFailureDetector).AssemblyQualifiedName + @"""
heartbeat-interval = 1 s
}
retry-gate-closed-for = 3 s
# Don't trigger watch Terminated
watch-failure-detector.acceptable-heartbeat-pause = 60 s
#use-passive-connections = off
}
"));
}

internal static AtomicBoolean FdAvailable = new AtomicBoolean(true);

/// <summary>
/// Failure detector implementation that will fail when <see cref="FdAvailable"/> is false.
/// </summary>
public class TestFailureDetector : FailureDetector
{
public TestFailureDetector(Config config, EventStream eventStream)
{

}

private volatile bool _active = false;

public override bool IsAvailable => _active ? FdAvailable.Value : true;

public override bool IsMonitoring => _active;

public override void HeartBeat()
{
_active = true;
}
}

public class Subject : ReceiveActor
{
public Subject()
{
ReceiveAny(_ => Sender.Tell(_));
}
}
}

public class TransportFailSpec : MultiNodeSpec
{
private readonly TransportFailSpecConfig _config;

public TransportFailSpec() : this(new TransportFailSpecConfig()) { }

private TransportFailSpec(TransportFailSpecConfig config) : base(config, typeof(TransportFailSpecConfig))
{
_config = config;
}

protected override int InitialParticipantsValueFactory => 2;

private IActorRef Identify(RoleName role, string actorName)
{
var p = CreateTestProbe();
Sys.ActorSelection(Node(role) / "user" / actorName).Tell(new Identify(actorName), p.Ref);
return p.ExpectMsg<ActorIdentity>(RemainingOrDefault).Subject;
}

[MultiNodeFact]
public void TransportFail_should_reconnect()
{
RunOn(() =>
{
EnterBarrier("actors-started");
var subject = Identify(_config.Second, "subject");
Watch(subject);
subject.Tell("hello");
ExpectMsg("hello");
}, _config.First);

RunOn(() =>
{
Sys.ActorOf(Props.Create(() => new TransportFailSpecConfig.Subject()), "subject");
EnterBarrier("actors-started");
}, _config.Second);

EnterBarrier("watch-established");

// trigger transport failure detector
TransportFailSpecConfig.FdAvailable.GetAndSet(false);

// wait for ungated (also later awaitAssert retry)
Task.Delay(RARP.For(Sys).Provider.RemoteSettings.RetryGateClosedFor).Wait();
TransportFailSpecConfig.FdAvailable.GetAndSet(true);

RunOn(() =>
{
EnterBarrier("actors-started2");
var quarantineProbe = CreateTestProbe();
Sys.EventStream.Subscribe(quarantineProbe.Ref, typeof(QuarantinedEvent));
IActorRef subject2 = null;
AwaitAssert(() =>
{
// TODO: harden
Within(TimeSpan.FromSeconds(3), () =>
{
AwaitCondition(() =>
{
subject2 = Identify(_config.Second, "subject2");
return subject2 != null;
}, RemainingOrDefault, TimeSpan.FromSeconds(1));
});
}, TimeSpan.FromSeconds(5));
Watch(subject2);
quarantineProbe.ExpectNoMsg(TimeSpan.FromSeconds(1));
subject2.Tell("hello2");
ExpectMsg("hello2");
EnterBarrier("watch-established2");
ExpectTerminated(subject2);
}, _config.First);

RunOn(() =>
{
var subject2 = Sys.ActorOf(Props.Create(() => new TransportFailSpecConfig.Subject()), "subject2");
EnterBarrier("actors-started2");
EnterBarrier("watch-established2");
subject2.Tell(PoisonPill.Instance);
}, _config.Second);

EnterBarrier("done");
}
}
}
26 changes: 13 additions & 13 deletions src/core/Akka.Remote.Tests/AckedDeliverySpec.cs
Expand Up @@ -164,7 +164,7 @@ public void SendBuffer_must_remove_messages_from_buffer_when_cumulative_ack_rece
var b8 = b7.Acknowledge(new Ack(new SeqNo(2)));
Assert.True(b8.NonAcked.SequenceEqual(new[] { msg3, msg4 }));

var b9 = b8.Acknowledge(new Ack(new SeqNo(5)));
var b9 = b8.Acknowledge(new Ack(new SeqNo(4)));
Assert.True(b9.NonAcked.Count == 0);
}

Expand Down Expand Up @@ -199,7 +199,7 @@ public void SendBuffer_must_keep_NACKed_messages_in_buffer_if_selective_nacks_ar
Assert.True(b6.NonAcked.Count == 0);
Assert.True(b6.Nacked.SequenceEqual(new[] { msg2, msg3 }));

var b7 = b6.Acknowledge(new Ack(new SeqNo(5)));
var b7 = b6.Acknowledge(new Ack(new SeqNo(4)));
Assert.True(b7.NonAcked.Count == 0);
Assert.True(b7.Nacked.Count == 0);
}
Expand All @@ -226,36 +226,36 @@ public void ReceiveBuffer_must_enqueue_message_in_buffer_if_needed_return_the_li
var msg4 = Msg(4);
var msg5 = Msg(5);

var d1 = b0.Receive(msg1).ExtractDeliverable;
var d1 = b0.Receive(msg1).ExtractDeliverable();
Assert.True(d1.Deliverables.Count == 0);
Assert.Equal(new SeqNo(1), d1.Ack.CumulativeAck);
Assert.True(d1.Ack.Nacks.SequenceEqual(new[]{ new SeqNo(0) }));
var b1 = d1.Buffer;

var d2 = b1.Receive(msg0).ExtractDeliverable;
var d2 = b1.Receive(msg0).ExtractDeliverable();
Assert.True(d2.Deliverables.SequenceEqual(new[] { msg0, msg1 }));
Assert.Equal(new SeqNo(1), d2.Ack.CumulativeAck);
var b3 = d2.Buffer;

var d3 = b3.Receive(msg4).ExtractDeliverable;
var d3 = b3.Receive(msg4).ExtractDeliverable();
Assert.True(d3.Deliverables.Count == 0);
Assert.Equal(new SeqNo(4), d3.Ack.CumulativeAck);
Assert.True(d3.Ack.Nacks.SequenceEqual(new[] { new SeqNo(2), new SeqNo(3) }));
var b4 = d3.Buffer;

var d4 = b4.Receive(msg2).ExtractDeliverable;
var d4 = b4.Receive(msg2).ExtractDeliverable();
Assert.True(d4.Deliverables.SequenceEqual(new[] { msg2 }));
Assert.Equal(new SeqNo(4), d4.Ack.CumulativeAck);
Assert.True(d4.Ack.Nacks.SequenceEqual(new[] { new SeqNo(3) }));
var b5 = d4.Buffer;

var d5 = b5.Receive(msg5).ExtractDeliverable;
var d5 = b5.Receive(msg5).ExtractDeliverable();
Assert.True(d5.Deliverables.Count == 0);
Assert.Equal(new SeqNo(5), d5.Ack.CumulativeAck);
Assert.True(d5.Ack.Nacks.SequenceEqual(new[] { new SeqNo(3) }));
var b6 = d5.Buffer;

var d6 = b6.Receive(msg3).ExtractDeliverable;
var d6 = b6.Receive(msg3).ExtractDeliverable();
Assert.True(d6.Deliverables.SequenceEqual(new[] { msg3, msg4, msg5 }));
Assert.Equal(new SeqNo(5), d6.Ack.CumulativeAck);
}
Expand All @@ -268,11 +268,11 @@ public void ReceiveBuffer_must_handle_duplicate_arrivals_correctly()
var msg1 = Msg(1);
var msg2 = Msg(2);

var buf2 = buf.Receive(msg0).Receive(msg1).Receive(msg2).ExtractDeliverable.Buffer;
var buf2 = buf.Receive(msg0).Receive(msg1).Receive(msg2).ExtractDeliverable().Buffer;

var buf3 = buf2.Receive(msg0).Receive(msg1).Receive(msg2);

var d = buf3.ExtractDeliverable;
var d = buf3.ExtractDeliverable();
Assert.True(d.Deliverables.Count == 0);
Assert.Equal(new SeqNo(2), d.Ack.CumulativeAck);
}
Expand All @@ -290,8 +290,8 @@ public void ReceiveBuffer_must_be_able_to_correctly_merge_with_another_receive_b

var buf = buf1.Receive(msg1a).Receive(msg2).MergeFrom(buf2.Receive(msg1b).Receive(msg3));

var d = buf.Receive(msg0).ExtractDeliverable;
Assert.True(d.Deliverables.SequenceEqual(new []{ msg0, msg1b, msg2, msg3 }));
var d = buf.Receive(msg0).ExtractDeliverable();
Assert.True(d.Deliverables.SequenceEqual(new []{ msg0, msg1a, msg2, msg3 }));
Assert.Equal(new SeqNo(3), d.Ack.CumulativeAck);
}

Expand Down Expand Up @@ -341,7 +341,7 @@ public void SendBuffer_and_ReceiveBuffer_must_correctly_cooperate_with_each_othe
if (sends.Contains(msg)) sndBuf = sndBuf.Buffer(msg);
if (Happened(p))
{
var del = rcvBuf.Receive(msg).ExtractDeliverable;
var del = rcvBuf.Receive(msg).ExtractDeliverable();
rcvBuf = del.Buffer;
dbLog(string.Format("{0} -- {1} --> {2}", sndBuf, msg, rcvBuf));
lastAck = del.Ack;
Expand Down
27 changes: 17 additions & 10 deletions src/core/Akka.Remote.Tests/ActorsLeakSpec.cs
Expand Up @@ -25,7 +25,7 @@ namespace Akka.Remote.Tests
public class ActorsLeakSpec : AkkaSpec
{
public static readonly Config Confg = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
akka.actor.provider = remote
akka.loglevel = DEBUG
akka.remote.dot-netty.tcp.applied-adapters = [trttl]
akka.remote.dot-netty.tcp.hostname = 127.0.0.1
Expand All @@ -44,13 +44,20 @@ private static ImmutableList<IActorRef> Recurse(IActorRef @ref)
{
var empty = new List<IActorRef>();
var list = empty;
if (@ref is ActorRefWithCell)
if (@ref is ActorRefWithCell wc)
{
var cell = @ref.AsInstanceOf<ActorRefWithCell>().Underlying;
if (cell.ChildrenContainer is EmptyChildrenContainer ||
cell.ChildrenContainer is TerminatedChildrenContainer ||
cell.ChildrenContainer is TerminatingChildrenContainer) list = empty;
else list = cell.ChildrenContainer.Children.Cast<IActorRef>().ToList();
var cell = wc.Underlying;
switch (cell.ChildrenContainer)
{
case TerminatingChildrenContainer _:
case TerminatedChildrenContainer _:
case EmptyChildrenContainer _:
list = empty;
break;
case NormalChildrenContainer n:
list = n.Children.Cast<IActorRef>().ToList();
break;
}
}

return ImmutableList<IActorRef>.Empty.Add(@ref).AddRange(list.SelectMany(Recurse));
Expand All @@ -62,7 +69,7 @@ private static ImmutableList<IActorRef> CollectLiveActors(IActorRef root)
return Recurse(root);
}

class StoppableActor : ReceiveActor
private class StoppableActor : ReceiveActor
{
public StoppableActor()
{
Expand All @@ -75,7 +82,7 @@ public StoppableActor()

private void AssertActors(ImmutableHashSet<IActorRef> expected, ImmutableHashSet<IActorRef> actual)
{
Assert.True(expected.SetEquals(actual));
expected.Should().BeEquivalentTo(actual);
}

[Fact]
Expand Down Expand Up @@ -227,7 +234,7 @@ public void Remoting_must_not_leak_actors()
AwaitAssert(() =>
{
AssertActors(initialActors, targets.SelectMany(CollectLiveActors).ToImmutableHashSet());
}, 5.Seconds());
}, 10.Seconds());
}
}
}
Expand Down

0 comments on commit 537f818

Please sign in to comment.