Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop the janky scala routing and use Ceen built in routing instead #1152

Merged
merged 2 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using Akka.Configuration;
using Akka.Event;
using Akka.Management.Dsl;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
using System.Threading.Tasks;
using Akka.Cluster;
using Akka.Configuration;
using Akka.Http.Dsl.Model;
using Akka.Http.Dsl.Server;
using Akka.Http.Extensions;
using Akka.Http.Dsl;
using Akka.Management.Cluster.Bootstrap.ContactPoint;
using Akka.Management.Dsl;
using Ceen;
using FluentAssertions;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;
using HttpRequest = Akka.Http.Dsl.Model.HttpRequest;
using static Akka.Management.Cluster.Bootstrap.ContactPoint.HttpBootstrapJsonProtocol;

namespace Akka.Management.Cluster.Bootstrap.Tests.ContactPoint
Expand Down Expand Up @@ -57,46 +56,76 @@ public async Task EmptyListIfNotPartOfCluster()
context.FakeRequest.Method = "GET";
context.FakeRequest.Path = ClusterBootstrapRequests.BootstrapSeedNodes("").ToString();

var requestContext = new RequestContext(await HttpRequest.CreateAsync(context.Request), Sys);
var response = (RouteResult.Complete) await _httpBootstrap.Routes.Concat()(requestContext);
response.Response.Entity.DataBytes.ToString().Should().Contain("\"Nodes\":[]");
var requestContext = new AkkaHttpContext(Sys, context);
var handled = false;
foreach (var (path, handler) in _httpBootstrap.Routes)
{
if (path == context.Request.Path)
{
if (await handler.HandleAsync(requestContext))
{
handled = true;
var response = (FakeResponse)context.Response;
response.Response.Should().Contain("\"Nodes\":[]");
}
}
}

handled.Should().BeTrue("At least one handler has to handle the request");
}

[Fact(
Skip = "Extremely racy in CI/CD",
DisplayName = "Http Bootstrap routes should include seed nodes when part of a cluster")]
public async Task IncludeSeedsWhenPartOfCluster()
{
var tcs = new TaskCompletionSource<Done>();
var cluster = Akka.Cluster.Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult(Done.Instance);
});
cluster.Join(cluster.SelfAddress);

var p = CreateTestProbe();
cluster.Subscribe(
p.Ref,
ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
typeof(ClusterEvent.MemberUp));

var up = p.ExpectMsg<ClusterEvent.MemberUp>();
up.Member.Should().Be(cluster.SelfMember);

var context = new DefaultHttpContext();
context.FakeRequest.Method = "GET";
context.FakeRequest.Path = ClusterBootstrapRequests.BootstrapSeedNodes("");

var requestContext = new RequestContext(await HttpRequest.CreateAsync(context.Request), Sys);
var response = (RouteResult.Complete) await _httpBootstrap.Routes.Concat()(requestContext);

var responseString = response.Response.Entity.DataBytes.ToString();
var nodes = JsonConvert.DeserializeObject<SeedNodes>(responseString);
nodes.Should().NotBeNull();

var seedNodes = nodes!.Nodes.Select(n => n.Node).ToList();
seedNodes.Contains(cluster.SelfAddress).Should()
.BeTrue(
"Seed nodes should contain self address but it does not. Self address: [{0}], seed nodes: [{1}], response string: [{2}]",
cluster.SelfAddress,
string.Join(", ", seedNodes),
responseString);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

await Task.WhenAny(Task.Delay(Timeout.Infinite, cts.Token), tcs.Task);
if (cts.IsCancellationRequested)
throw new TimeoutException("Cluster failed to form");
cts.Dispose();

var context = new DefaultHttpContext
{
FakeRequest =
{
Method = "GET",
Path = ClusterBootstrapRequests.BootstrapSeedNodes("")
}
};

var requestContext = new AkkaHttpContext(Sys, context);
var handled = false;
foreach (var (path, handler) in _httpBootstrap.Routes)
{
if (path == context.Request.Path)
{
if (await handler.HandleAsync(requestContext))
{
handled = true;
var response = (FakeResponse)context.Response;
var nodes = JsonConvert.DeserializeObject<SeedNodes>(response.Response);
var seedNodes = nodes.Nodes.Select(n => n.Node).ToList();
seedNodes.Contains(cluster.SelfAddress).Should()
.BeTrue(
"Seed nodes should contain self address but it does not. Self address: [{0}], seed nodes: [{1}], response string: [{2}]",
cluster.SelfAddress,
string.Join(", ", seedNodes),
response.Response);
}
}
}

handled.Should().BeTrue("At least one handler has to handle the request");
}
}

Expand All @@ -110,12 +139,11 @@ public Task LogMessageAsync(LogLevel level, string message, Exception ex)
// ReSharper disable UnassignedGetOnlyAutoProperty
public FakeRequest FakeRequest { get; } = new FakeRequest();
public IHttpRequest Request => FakeRequest;
public IHttpResponse? Response { get; }
public IStorageCreator? Storage { get; }
public IDictionary<string, string>? Session { get; set; }
public IDictionary<string, string>? LogData { get; }
public ILoadedModuleInfo? LoadedModules { get; }
// ReSharper restore UnassignedGetOnlyAutoProperty
public IHttpResponse Response { get; } = new FakeResponse();
public IStorageCreator Storage { get; }
public IDictionary<string, string> Session { get; set; }
public IDictionary<string, string> LogData { get; }
public ILoadedModuleInfo LoadedModules { get; }
}

internal class FakeRequest : IHttpRequest
Expand Down Expand Up @@ -178,7 +206,15 @@ public void ThrowIfTimeout()

internal class FakeResponse : IHttpResponse
{
public IResponseCookie AddCookie(string name, string value, string? path = null, string? domain = null, DateTime? expires = null,
public string Response { get; private set; }

public IResponseCookie AddCookie(string name, string value, string path = null, string domain = null, DateTime? expires = null,
long maxage = -1, bool secure = false, bool httponly = false, string samesite = null)
{
throw new NotImplementedException();
}

public IResponseCookie AddCookie(string name, string value, string path = null, string domain = null, DateTime? expires = null,
long maxage = -1, bool secure = false, bool httponly = false)
{
throw new NotImplementedException();
Expand All @@ -199,29 +235,34 @@ public Task FlushHeadersAsync()
throw new NotImplementedException();
}

public Task WriteAllAsync(Stream data, string? contenttype = null)
public async Task WriteAllAsync(Stream data, string? contenttype = null)
{
throw new NotImplementedException();
using var reader = new StreamReader(data);
Response = await reader.ReadToEndAsync();
}

public Task WriteAllAsync(byte[] data, string? contenttype = null)
{
throw new NotImplementedException();
Response = Encoding.UTF8.GetString(data);
return Task.CompletedTask;
}

public Task WriteAllAsync(string data, string? contenttype = null)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public Task WriteAllAsync(string data, Encoding encoding, string? contenttype = null)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public Task WriteAllJsonAsync(string data)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public void Redirect(string newurl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Hosting;
using Akka.Management.Dsl;
using Akka.Remote.Hosting;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand Down Expand Up @@ -45,7 +46,13 @@ public class HostingSpecs
});
builder.WithRemoting(hostname: "localhost", port: 12552);
builder.WithClustering();
builder.WithAkkaManagement("localhost", 18558, "localhost", 18558);
builder.WithAkkaManagement(config =>
{
config.Http.HostName = "localhost";
config.Http.Port = 18558;
config.Http.BindHostName = "localhost";
config.Http.BindPort = 18558;
});
builder.WithConfigDiscovery(
new Dictionary<string, List<string>>
{
Expand Down Expand Up @@ -85,6 +92,8 @@ public HostingSpecs(ITestOutputHelper output)
});

tcs.Task.Wait(30.Seconds()).Should().BeTrue();

await host.StopAsync();
}

public static IEnumerable<object[]> StartupFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Management.Dsl;
using Xunit;

namespace Akka.Management.Cluster.Bootstrap.Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Discovery;
using Akka.Event;
using Akka.Management.Cluster.Bootstrap.Internal;
using Akka.Management.Dsl;
using Akka.Util;
using FluentAssertions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Configuration;
using Akka.Event;
using Akka.Management.Cluster.Bootstrap.Util;
using Akka.Management.Dsl;
using Akka.TestKit.Xunit2.Internals;
using FluentAssertions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
using Akka.Http.Dsl;
using Akka.Management.Cluster.Bootstrap.ContactPoint;
using Akka.Management.Cluster.Bootstrap.Internal;
using Akka.Management.Dsl;
using Akka.Util;
using Route = System.ValueTuple<string, Akka.Http.Dsl.HttpModuleBase>;

namespace Akka.Management.Cluster.Bootstrap
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public ClusterMember(Address node, long nodeUid, MemberStatus status, ImmutableH

public sealed class SeedNodes
{
public SeedNodes(Address selfNode, ImmutableHashSet<ClusterMember> nodes)
public SeedNodes(Address selfNode, ImmutableList<ClusterMember> nodes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this could allow duplicates?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it can't - comes from the Cluster object itself which already de-dupes them.

{
SelfNode = selfNode;
Nodes = nodes;
}

public Address SelfNode { get; }
public ImmutableHashSet<ClusterMember> Nodes { get; }
public ImmutableList<ClusterMember> Nodes { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Cluster;
using Akka.Http;
using Akka.Http.Dsl;
using Akka.Http.Dsl.Model;
using Akka.Http.Dsl.Server;
using Akka.IO;
using Akka.Http.Extensions;
using Newtonsoft.Json;
using static Akka.Management.Cluster.Bootstrap.ContactPoint.HttpBootstrapJsonProtocol;
using HttpResponse = Akka.Http.Dsl.Model.HttpResponse;
using Route = System.ValueTuple<string, Akka.Http.Dsl.HttpModuleBase>;

namespace Akka.Management.Cluster.Bootstrap.ContactPoint
{
public class HttpClusterBootstrapRoutes
public class HttpClusterBootstrapRoutes: HttpModuleBase
{
private readonly ClusterBootstrapSettings _settings;

Expand All @@ -29,48 +28,33 @@ public HttpClusterBootstrapRoutes(ClusterBootstrapSettings settings)
_settings = settings;
}

public Route[] Routes
{
get
{
return new Route[]{ async context =>
{
if (context.Request is { Method: "GET", Path: "/bootstrap/seed-nodes" })
{
return await GetSeedNodes()(context);
}

return null;
}};
}
}
public Route[] Routes => new Route[] { ("/bootstrap/seed-nodes", this) };

private Route GetSeedNodes()
public override async Task<bool> HandleAsync(IAkkaHttpContext context)
{
return context =>
{
var cluster = Akka.Cluster.Cluster.Get(context.ActorSystem);
if (context.HttpContext.Request.Method.ToLowerInvariant() != "get")
return false;

var cluster = Akka.Cluster.Cluster.Get(context.ActorSystem);

ClusterMember MemberToClusterMember(Member m) =>
new (m.UniqueAddress.Address, m.UniqueAddress.Uid, m.Status, m.Roles);
ClusterMember MemberToClusterMember(Member m) =>
new (m.UniqueAddress.Address, m.UniqueAddress.Uid, m.Status, m.Roles);

var state = cluster.State;
var state = cluster.State;

// TODO shuffle the members so in a big deployment nodes start joining different ones and not all the same?
var members = state.Members
.Where(m => !state.Unreachable.Contains(m))
.Where(m => m.Status is MemberStatus.Up or MemberStatus.WeaklyUp or MemberStatus.Joining)
.Take(_settings.ContactPoint.MaxSeedNodesToExpose)
.Select(MemberToClusterMember).ToImmutableHashSet();
var members = state.Members
.Where(m => !state.Unreachable.Contains(m))
.Where(m => m.Status is MemberStatus.Up or MemberStatus.WeaklyUp or MemberStatus.Joining)
.Take(_settings.ContactPoint.MaxSeedNodesToExpose)
.Select(MemberToClusterMember).ToList().Shuffle();

var json = JsonConvert.SerializeObject(
new SeedNodes(cluster.SelfMember.UniqueAddress.Address, members));
var json = JsonConvert.SerializeObject(
new SeedNodes(cluster.SelfMember.UniqueAddress.Address, members.ToImmutableList()));

return Task.FromResult((RouteResult.IRouteResult?) new RouteResult.Complete(HttpResponse.Create(
entity: new ResponseEntity(ContentTypes.ApplicationJson, ByteString.FromString(json)))));
};
await context.HttpContext.Response.WriteAllJsonAsync(json);

return true;
}

}

public static class ClusterBootstrapRequests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Actor;
using Akka.Discovery;
using Akka.Event;
using Akka.Management.Dsl;
using Akka.Util;
using static Akka.Discovery.ServiceDiscovery;

Expand Down
1 change: 1 addition & 0 deletions src/cluster.bootstrap/examples/StressTest/StressTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Akka.Discovery;
using Akka.Management;
using Akka.Management.Cluster.Bootstrap;
using Akka.Management.Dsl;
using Akka.Util.Internal;

namespace StressTest
Expand Down