Skip to content

Commit

Permalink
Kubediag fixes (#2007)
Browse files Browse the repository at this point in the history
* improve kubediag
* harden k8s monitor
* ignore blocked gossips
  • Loading branch information
rogeralsing committed May 15, 2023
1 parent 9a2e872 commit 814e151
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 147 deletions.
204 changes: 99 additions & 105 deletions benchmarks/KubernetesDiagnostics/Program.cs
Original file line number Diff line number Diff line change
@@ -1,126 +1,106 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Gossip;
using Proto.Cluster.Kubernetes;
using Proto.Cluster.Partition;
using Proto.Cluster.PartitionActivator;
using Proto.Remote;
using Proto.Remote.GrpcNet;

namespace KubernetesDiagnostics;
var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC");

public static class Program
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddLogging(x => x.AddSimpleConsole(c =>
{
public static async Task Main()
{
ThreadPool.SetMinThreads(100, 100);
Console.WriteLine("Starting...");

/*
* docker build . -t rogeralsing/kubdiagg
* kubectl apply --filename service.yaml
* kubectl get pods -l app=kubdiag
* kubectl logs -l app=kubdiag --all-containers
*
*/

var l = LoggerFactory.Create(c => c.AddConsole().SetMinimumLevel(LogLevel.Information));
Log.SetLoggerFactory(l);
var log = Log.CreateLogger("main");

var identity = new PartitionIdentityLookup(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(2)
);

/*
- name: "REDIS"
value: "redis"
- name: PROTOPORT
value: "8080"
- name: PROTOHOST
value: "0.0.0.0"
- name: "PROTOHOSTPUBLIC"
*/

var port = int.Parse(Environment.GetEnvironmentVariable("PROTOPORT") ?? "0");
var host = Environment.GetEnvironmentVariable("PROTOHOST") ?? "127.0.0.1";
var advertisedHost = Environment.GetEnvironmentVariable("PROTOHOSTPUBLIC");

log.LogInformation("Host {host}", host);
log.LogInformation("Port {port}", port);
log.LogInformation("Advertised Host {advertisedHost}", advertisedHost);

var clusterProvider = GetProvider();

var noOpsProps = Props.FromFunc(ctx => Task.CompletedTask);
var echoKind = new ClusterKind("echo", noOpsProps);
var system = new ActorSystem(new ActorSystemConfig())
.WithRemote(GrpcNetRemoteConfig
.BindTo(host, port)
.WithAdvertisedHost(advertisedHost)
.WithEndpointWriterMaxRetries(2)
)
.WithCluster(ClusterConfig
.Setup("mycluster", clusterProvider, identity)
.WithClusterKind("empty", Props.Empty)
.WithClusterKind(echoKind)
);

// system.EventStream.Subscribe<GossipUpdate>(e => { Console.WriteLine($"{DateTime.Now:O} Gossip update Member {e.MemberId} Key {e.Key}"); });

system.EventStream.Subscribe<ClusterTopology>(e => {
c.SingleLine = true;
}));

var hash = e.TopologyHash;

Console.WriteLine($"{DateTime.Now:O} My members {hash}");
}
);
builder.Services.AddProtoCluster((_, x) =>
{
x.Port = 0;
x.ConfigureRemote = r =>
r.WithAdvertisedHost(advertisedHost);
var cts = new CancellationTokenSource();
x.ConfigureCluster = c => c
.WithClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask))
.WithClusterKind("empty", Props.FromFunc(ctx => Task.CompletedTask))
.WithExitOnShutdown()
.WithHeartbeatExpirationDisabled();
Console.CancelKeyPress += (_, _) => { cts.Cancel(); };
x.ClusterProvider = new KubernetesProvider();
x.IdentityLookup = new PartitionActivatorLookup();
});

await system
.Cluster()
.StartMemberAsync();

system.Shutdown.Register(() =>
{
Console.WriteLine("Shutting down...");
Environment.Exit(0);
});
builder.Services.AddHealthChecks().AddCheck<ClusterHealthCheck>("proto", null, new[] { "ready", "live" });
builder.Services.AddHostedService<DummyHostedService>();

var app = builder.Build();

app.MapGet("/", async (Cluster cluster) =>
{
});

app.MapHealthChecks("/health");

app.Run();

public class DummyHostedService : IHostedService
{
private readonly ActorSystem _system;
private readonly ILogger<DummyHostedService> _logger;
private bool _running;

public DummyHostedService(ActorSystem system, ILogger<DummyHostedService> logger)
{
_system = system;
_logger = logger;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Starting DummyHostedService");
_running = true;

_system.EventStream.Subscribe<ClusterTopology>(e => {
var hash = e.TopologyHash;
_logger.LogInformation($"{DateTime.Now:O} My members {hash}");
}
);

var props = Props.FromFunc(ctx => Task.CompletedTask);
system.Root.SpawnNamed(props, "dummy");
_system.Root.SpawnNamed(props, "dummy");

var clusterIdentity = ClusterIdentity.Create("some-id", echoKind.Name);
_ = SafeTask.Run(RunLoop);
_ = SafeTask.Run(PrintMembersLoop);
}

while (!cts.IsCancellationRequested)
{
var m = system.Cluster().MemberList.GetAllMembers();
var hash = Member.TopologyHash(m);
private async Task RunLoop()
{
var clusterIdentity =
ClusterIdentity.Create("some-id", new ClusterKind("echo", Props.FromFunc(ctx => Task.CompletedTask)).Name);

Console.WriteLine($"{DateTime.Now:O} Hash {hash} Count {m.Length}");
while (_running)
{
var m = _system.Cluster().MemberList.GetAllMembers();

try
{
var t = await system.Cluster().RequestAsync<Touched>(clusterIdentity, new Touch(), CancellationTokens.FromSeconds(1));
var t = await _system.Cluster()
.RequestAsync<Touched>(clusterIdentity, new Touch(), CancellationTokens.FromSeconds(1));

if (t != null)
{
Console.WriteLine($"called cluster actor {t.Who}");
}
else
{
Console.WriteLine($"call to cluster actor returned null");
}
_logger.LogInformation($"called cluster actor {t.Who}");
}
catch (Exception e)
{
Console.WriteLine($"Could not call cluster actor: {e}");
_logger.LogError(e, "Could not call cluster actor");
}

foreach (var member in m)
Expand All @@ -129,30 +109,44 @@ await system

try
{
var t = await system.Root.RequestAsync<Touched>(pid, new Touch(), CancellationTokens.FromSeconds(1));
var t = await _system.Root.RequestAsync<Touched>(pid, new Touch(), CancellationTokens.FromSeconds(1));

if (t != null)
{
Console.WriteLine($"called dummy actor {pid}");
_logger.LogInformation("called dummy actor {PID}", pid);
}
else
{
Console.WriteLine($"call to dummy actor timed out {pid}");
_logger.LogInformation("call to dummy actor timed out {PID}", pid);
}
}
catch
{
Console.WriteLine($"Could not call dummy actor {pid}");
_logger.LogInformation("Could not call dummy actor {PID}", pid);
}
}

await Task.Delay(3000);
await Task.Delay(5000);
}

await system
.Cluster()
.ShutdownAsync();
}

private async Task PrintMembersLoop()
{

while (_running)
{
var m = _system.Cluster().MemberList.GetAllMembers();
var hash = Member.TopologyHash(m);

_logger.LogInformation($"{DateTime.Now:O} Hash {hash} Count {m.Length}");

private static IClusterProvider GetProvider() => new KubernetesProvider();
await Task.Delay(2000);
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
_running = false;
return Task.CompletedTask;
}
}
6 changes: 6 additions & 0 deletions benchmarks/KubernetesDiagnostics/build2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
docker login
kubectl delete --filename service.yaml
dotnet publish --os linux -c Release --arch amd64 -p:PublishProfile=DefaultContainer
docker tag kubernetesdiagnostics:1.0.0 rogeralsing/kubediag
docker push rogeralsing/kubediag
kubectl apply --filename service.yaml
12 changes: 6 additions & 6 deletions src/Proto.Actor/Context/ActorLoggingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ public override async Task<T> RequestAsync<T>(PID target, object message, Cancel
{
if (_exceptionLogLevel != LogLevel.None && _logger.IsEnabled(_exceptionLogLevel))
{
_logger.Log(_exceptionLogLevel, x,
"Actor {Self} {ActorType} Got exception waiting for RequestAsync response of {MessageType}:{MessagePayload} from {Target}",
Self,
ActorType,
message.GetMessageTypeName(), message, target
);
// _logger.Log(_exceptionLogLevel, x,
// "Actor {Self} {ActorType} Got exception waiting for RequestAsync response of {MessageType}:{MessagePayload} from {Target}",
// Self,
// ActorType,
// message.GetMessageTypeName(), message, target
// );
}

throw;
Expand Down
1 change: 1 addition & 0 deletions src/Proto.Cluster.Kubernetes/KubernetesClusterMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ private void UpdateTopology()

var memberStatuses = _clusterPods.Values
.Select(x => x.GetMemberStatus())
.Where(x => x is not null)
.Where(x => x.IsRunning && (x.IsReady || x.Member.Id == _cluster.System.Id))
.Select(x => x.Member)
.ToList();
Expand Down
10 changes: 9 additions & 1 deletion src/Proto.Cluster.Kubernetes/KubernetesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Json.Patch;
using k8s;
using k8s.Models;
Expand Down Expand Up @@ -57,9 +58,16 @@ internal static class KubernetesExtensions
/// </summary>
/// <param name="pod">Kubernetes Pod object</param>
/// <returns></returns>
[CanBeNull]
internal static MemberStatus GetMemberStatus(this V1Pod pod)
{
var isRunning = pod.Status.Phase == "Running" && pod.Status.PodIP is not null;
var isRunning = pod.Status is { Phase: "Running", PodIP: not null };

if (pod.Status?.ContainerStatuses is null)
return null;

if (pod.Metadata?.Labels is null)
return null;

var kinds = pod
.Metadata
Expand Down
6 changes: 6 additions & 0 deletions src/Proto.Cluster/ClusterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,12 @@ private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIde
/// <returns></returns>
public ClusterConfig WithHeartbeatExpiration(TimeSpan expiration) => this with { HeartbeatExpiration = expiration };

/// <summary>
/// Disables gossip heartbeat expiration.
/// </summary>
/// <returns></returns>
public ClusterConfig WithHeartbeatExpirationDisabled() => this with { HeartbeatExpiration = TimeSpan.Zero };

/// <summary>
/// Configuration for the PubSub extension.
/// </summary>
Expand Down
49 changes: 28 additions & 21 deletions src/Proto.Cluster/Gossip/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,38 +114,45 @@ public void SetState(string key, IMessage message)
//TODO: this does not need to use a callback, it can return a list of MemberStates
public void SendState(SendStateAction sendStateToMember)
{
var logger = _logger?.BeginMethodScope();

foreach (var member in _otherMembers)
try
{
GossipStateManagement.EnsureMemberStateExists(_state, member.Id);
}
var logger = _logger?.BeginMethodScope();

var randomMembers = _otherMembers.OrderByRandom(_rnd);
foreach (var member in _otherMembers)
{
GossipStateManagement.EnsureMemberStateExists(_state, member.Id);
}

var fanoutCount = 0;
var randomMembers = _otherMembers.OrderByRandom(_rnd);

foreach (var member in randomMembers)
{
//TODO: we can chunk up sends here
//instead of sending less state, we can send all of it, but in chunks
var memberState = GetMemberStateDelta(member.Id);
var fanoutCount = 0;

if (!memberState.HasState)
foreach (var member in randomMembers)
{
continue;
}
//TODO: we can chunk up sends here
//instead of sending less state, we can send all of it, but in chunks
var memberState = GetMemberStateDelta(member.Id);

//fire and forget, we handle results in ReenterAfter
sendStateToMember(memberState, member, logger);
if (!memberState.HasState)
{
continue;
}

fanoutCount++;
//fire and forget, we handle results in ReenterAfter
sendStateToMember(memberState, member, logger);

if (fanoutCount == _gossipFanout)
{
break;
fanoutCount++;

if (fanoutCount == _gossipFanout)
{
break;
}
}
}
catch (Exception x)
{
Logger.LogError(x, "SendState failed");
}
}

public MemberStateDelta GetMemberStateDelta(string targetMemberId)
Expand Down
Loading

0 comments on commit 814e151

Please sign in to comment.