Skip to content

Commit

Permalink
Drop the janky scala routing and use Ceen built in routing instead (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Dec 29, 2022
1 parent 3a99116 commit 5b8b6c4
Show file tree
Hide file tree
Showing 61 changed files with 518 additions and 2,168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using Akka.Configuration;
using Akka.Event;
using Akka.Management.Dsl;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
using System.Threading.Tasks;
using Akka.Cluster;
using Akka.Configuration;
using Akka.Http.Dsl.Model;
using Akka.Http.Dsl.Server;
using Akka.Http.Extensions;
using Akka.Http.Dsl;
using Akka.Management.Cluster.Bootstrap.ContactPoint;
using Akka.Management.Dsl;
using Ceen;
using FluentAssertions;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;
using HttpRequest = Akka.Http.Dsl.Model.HttpRequest;
using static Akka.Management.Cluster.Bootstrap.ContactPoint.HttpBootstrapJsonProtocol;

namespace Akka.Management.Cluster.Bootstrap.Tests.ContactPoint
Expand Down Expand Up @@ -57,46 +56,76 @@ public async Task EmptyListIfNotPartOfCluster()
context.FakeRequest.Method = "GET";
context.FakeRequest.Path = ClusterBootstrapRequests.BootstrapSeedNodes("").ToString();

var requestContext = new RequestContext(await HttpRequest.CreateAsync(context.Request), Sys);
var response = (RouteResult.Complete) await _httpBootstrap.Routes.Concat()(requestContext);
response.Response.Entity.DataBytes.ToString().Should().Contain("\"Nodes\":[]");
var requestContext = new AkkaHttpContext(Sys, context);
var handled = false;
foreach (var (path, handler) in _httpBootstrap.Routes)
{
if (path == context.Request.Path)
{
if (await handler.HandleAsync(requestContext))
{
handled = true;
var response = (FakeResponse)context.Response;
response.Response.Should().Contain("\"Nodes\":[]");
}
}
}

handled.Should().BeTrue("At least one handler has to handle the request");
}

[Fact(
Skip = "Extremely racy in CI/CD",
DisplayName = "Http Bootstrap routes should include seed nodes when part of a cluster")]
public async Task IncludeSeedsWhenPartOfCluster()
{
var tcs = new TaskCompletionSource<Done>();
var cluster = Akka.Cluster.Cluster.Get(Sys);
cluster.RegisterOnMemberUp(() =>
{
tcs.SetResult(Done.Instance);
});
cluster.Join(cluster.SelfAddress);

var p = CreateTestProbe();
cluster.Subscribe(
p.Ref,
ClusterEvent.SubscriptionInitialStateMode.InitialStateAsEvents,
typeof(ClusterEvent.MemberUp));

var up = p.ExpectMsg<ClusterEvent.MemberUp>();
up.Member.Should().Be(cluster.SelfMember);

var context = new DefaultHttpContext();
context.FakeRequest.Method = "GET";
context.FakeRequest.Path = ClusterBootstrapRequests.BootstrapSeedNodes("");

var requestContext = new RequestContext(await HttpRequest.CreateAsync(context.Request), Sys);
var response = (RouteResult.Complete) await _httpBootstrap.Routes.Concat()(requestContext);

var responseString = response.Response.Entity.DataBytes.ToString();
var nodes = JsonConvert.DeserializeObject<SeedNodes>(responseString);
nodes.Should().NotBeNull();

var seedNodes = nodes!.Nodes.Select(n => n.Node).ToList();
seedNodes.Contains(cluster.SelfAddress).Should()
.BeTrue(
"Seed nodes should contain self address but it does not. Self address: [{0}], seed nodes: [{1}], response string: [{2}]",
cluster.SelfAddress,
string.Join(", ", seedNodes),
responseString);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

await Task.WhenAny(Task.Delay(Timeout.Infinite, cts.Token), tcs.Task);
if (cts.IsCancellationRequested)
throw new TimeoutException("Cluster failed to form");
cts.Dispose();

var context = new DefaultHttpContext
{
FakeRequest =
{
Method = "GET",
Path = ClusterBootstrapRequests.BootstrapSeedNodes("")
}
};

var requestContext = new AkkaHttpContext(Sys, context);
var handled = false;
foreach (var (path, handler) in _httpBootstrap.Routes)
{
if (path == context.Request.Path)
{
if (await handler.HandleAsync(requestContext))
{
handled = true;
var response = (FakeResponse)context.Response;
var nodes = JsonConvert.DeserializeObject<SeedNodes>(response.Response);
var seedNodes = nodes.Nodes.Select(n => n.Node).ToList();
seedNodes.Contains(cluster.SelfAddress).Should()
.BeTrue(
"Seed nodes should contain self address but it does not. Self address: [{0}], seed nodes: [{1}], response string: [{2}]",
cluster.SelfAddress,
string.Join(", ", seedNodes),
response.Response);
}
}
}

handled.Should().BeTrue("At least one handler has to handle the request");
}
}

Expand All @@ -110,12 +139,11 @@ public Task LogMessageAsync(LogLevel level, string message, Exception ex)
// ReSharper disable UnassignedGetOnlyAutoProperty
public FakeRequest FakeRequest { get; } = new FakeRequest();
public IHttpRequest Request => FakeRequest;
public IHttpResponse? Response { get; }
public IStorageCreator? Storage { get; }
public IDictionary<string, string>? Session { get; set; }
public IDictionary<string, string>? LogData { get; }
public ILoadedModuleInfo? LoadedModules { get; }
// ReSharper restore UnassignedGetOnlyAutoProperty
public IHttpResponse Response { get; } = new FakeResponse();
public IStorageCreator Storage { get; }
public IDictionary<string, string> Session { get; set; }
public IDictionary<string, string> LogData { get; }
public ILoadedModuleInfo LoadedModules { get; }
}

internal class FakeRequest : IHttpRequest
Expand Down Expand Up @@ -178,7 +206,15 @@ public void ThrowIfTimeout()

internal class FakeResponse : IHttpResponse
{
public IResponseCookie AddCookie(string name, string value, string? path = null, string? domain = null, DateTime? expires = null,
public string Response { get; private set; }

public IResponseCookie AddCookie(string name, string value, string path = null, string domain = null, DateTime? expires = null,
long maxage = -1, bool secure = false, bool httponly = false, string samesite = null)
{
throw new NotImplementedException();
}

public IResponseCookie AddCookie(string name, string value, string path = null, string domain = null, DateTime? expires = null,
long maxage = -1, bool secure = false, bool httponly = false)
{
throw new NotImplementedException();
Expand All @@ -199,29 +235,34 @@ public Task FlushHeadersAsync()
throw new NotImplementedException();
}

public Task WriteAllAsync(Stream data, string? contenttype = null)
public async Task WriteAllAsync(Stream data, string? contenttype = null)
{
throw new NotImplementedException();
using var reader = new StreamReader(data);
Response = await reader.ReadToEndAsync();
}

public Task WriteAllAsync(byte[] data, string? contenttype = null)
{
throw new NotImplementedException();
Response = Encoding.UTF8.GetString(data);
return Task.CompletedTask;
}

public Task WriteAllAsync(string data, string? contenttype = null)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public Task WriteAllAsync(string data, Encoding encoding, string? contenttype = null)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public Task WriteAllJsonAsync(string data)
{
throw new NotImplementedException();
Response = data;
return Task.CompletedTask;
}

public void Redirect(string newurl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Actor;
using Akka.Cluster.Hosting;
using Akka.Hosting;
using Akka.Management.Dsl;
using Akka.Remote.Hosting;
using FluentAssertions;
using FluentAssertions.Extensions;
Expand Down Expand Up @@ -45,7 +46,13 @@ public class HostingSpecs
});
builder.WithRemoting(hostname: "localhost", port: 12552);
builder.WithClustering();
builder.WithAkkaManagement("localhost", 18558, "localhost", 18558);
builder.WithAkkaManagement(config =>
{
config.Http.HostName = "localhost";
config.Http.Port = 18558;
config.Http.BindHostName = "localhost";
config.Http.BindPort = 18558;
});
builder.WithConfigDiscovery(
new Dictionary<string, List<string>>
{
Expand Down Expand Up @@ -85,6 +92,8 @@ public HostingSpecs(ITestOutputHelper output)
});

tcs.Task.Wait(30.Seconds()).Should().BeTrue();

await host.StopAsync();
}

public static IEnumerable<object[]> StartupFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System.Threading.Tasks;
using Akka.Actor;
using Akka.Management.Dsl;
using Xunit;

namespace Akka.Management.Cluster.Bootstrap.Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Discovery;
using Akka.Event;
using Akka.Management.Cluster.Bootstrap.Internal;
using Akka.Management.Dsl;
using Akka.Util;
using FluentAssertions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Akka.Configuration;
using Akka.Event;
using Akka.Management.Cluster.Bootstrap.Util;
using Akka.Management.Dsl;
using Akka.TestKit.Xunit2.Internals;
using FluentAssertions;
using Xunit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
using Akka.Http.Dsl;
using Akka.Management.Cluster.Bootstrap.ContactPoint;
using Akka.Management.Cluster.Bootstrap.Internal;
using Akka.Management.Dsl;
using Akka.Util;
using Route = System.ValueTuple<string, Akka.Http.Dsl.HttpModuleBase>;

namespace Akka.Management.Cluster.Bootstrap
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public ClusterMember(Address node, long nodeUid, MemberStatus status, ImmutableH

public sealed class SeedNodes
{
public SeedNodes(Address selfNode, ImmutableHashSet<ClusterMember> nodes)
public SeedNodes(Address selfNode, ImmutableList<ClusterMember> nodes)
{
SelfNode = selfNode;
Nodes = nodes;
}

public Address SelfNode { get; }
public ImmutableHashSet<ClusterMember> Nodes { get; }
public ImmutableList<ClusterMember> Nodes { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Cluster;
using Akka.Http;
using Akka.Http.Dsl;
using Akka.Http.Dsl.Model;
using Akka.Http.Dsl.Server;
using Akka.IO;
using Akka.Http.Extensions;
using Newtonsoft.Json;
using static Akka.Management.Cluster.Bootstrap.ContactPoint.HttpBootstrapJsonProtocol;
using HttpResponse = Akka.Http.Dsl.Model.HttpResponse;
using Route = System.ValueTuple<string, Akka.Http.Dsl.HttpModuleBase>;

namespace Akka.Management.Cluster.Bootstrap.ContactPoint
{
public class HttpClusterBootstrapRoutes
public class HttpClusterBootstrapRoutes: HttpModuleBase
{
private readonly ClusterBootstrapSettings _settings;

Expand All @@ -29,48 +28,33 @@ public HttpClusterBootstrapRoutes(ClusterBootstrapSettings settings)
_settings = settings;
}

public Route[] Routes
{
get
{
return new Route[]{ async context =>
{
if (context.Request is { Method: "GET", Path: "/bootstrap/seed-nodes" })
{
return await GetSeedNodes()(context);
}
return null;
}};
}
}
public Route[] Routes => new Route[] { ("/bootstrap/seed-nodes", this) };

private Route GetSeedNodes()
public override async Task<bool> HandleAsync(IAkkaHttpContext context)
{
return context =>
{
var cluster = Akka.Cluster.Cluster.Get(context.ActorSystem);
if (context.HttpContext.Request.Method.ToLowerInvariant() != "get")
return false;

var cluster = Akka.Cluster.Cluster.Get(context.ActorSystem);

ClusterMember MemberToClusterMember(Member m) =>
new (m.UniqueAddress.Address, m.UniqueAddress.Uid, m.Status, m.Roles);
ClusterMember MemberToClusterMember(Member m) =>
new (m.UniqueAddress.Address, m.UniqueAddress.Uid, m.Status, m.Roles);

var state = cluster.State;
var state = cluster.State;

// TODO shuffle the members so in a big deployment nodes start joining different ones and not all the same?
var members = state.Members
.Where(m => !state.Unreachable.Contains(m))
.Where(m => m.Status is MemberStatus.Up or MemberStatus.WeaklyUp or MemberStatus.Joining)
.Take(_settings.ContactPoint.MaxSeedNodesToExpose)
.Select(MemberToClusterMember).ToImmutableHashSet();
var members = state.Members
.Where(m => !state.Unreachable.Contains(m))
.Where(m => m.Status is MemberStatus.Up or MemberStatus.WeaklyUp or MemberStatus.Joining)
.Take(_settings.ContactPoint.MaxSeedNodesToExpose)
.Select(MemberToClusterMember).ToList().Shuffle();

var json = JsonConvert.SerializeObject(
new SeedNodes(cluster.SelfMember.UniqueAddress.Address, members));
var json = JsonConvert.SerializeObject(
new SeedNodes(cluster.SelfMember.UniqueAddress.Address, members.ToImmutableList()));

return Task.FromResult((RouteResult.IRouteResult?) new RouteResult.Complete(HttpResponse.Create(
entity: new ResponseEntity(ContentTypes.ApplicationJson, ByteString.FromString(json)))));
};
await context.HttpContext.Response.WriteAllJsonAsync(json);

return true;
}

}

public static class ClusterBootstrapRequests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Actor;
using Akka.Discovery;
using Akka.Event;
using Akka.Management.Dsl;
using Akka.Util;
using static Akka.Discovery.ServiceDiscovery;

Expand Down
1 change: 1 addition & 0 deletions src/cluster.bootstrap/examples/StressTest/StressTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Akka.Discovery;
using Akka.Management;
using Akka.Management.Cluster.Bootstrap;
using Akka.Management.Dsl;
using Akka.Util.Internal;

namespace StressTest
Expand Down

0 comments on commit 5b8b6c4

Please sign in to comment.