Skip to content

Commit

Permalink
Cluster kind as class (#813)
Browse files Browse the repository at this point in the history
* cluster kinds as a real thing
  • Loading branch information
rogeralsing committed Mar 24, 2021
1 parent ab24545 commit 276b6cf
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 25 deletions.
24 changes: 18 additions & 6 deletions src/Proto.Cluster/Cluster.cs
Expand Up @@ -4,6 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -64,13 +65,13 @@ public Cluster(ActorSystem system, ClusterConfig config)

public PidCache PidCache { get; }

public string[] GetClusterKinds() => Config.ClusterKinds.Keys.ToArray();
public string[] GetClusterKinds() =>_clusterKinds.Keys.ToArray();

public async Task StartMemberAsync()
{
await BeginStartAsync(false);
Provider = Config.ClusterProvider;
var kinds = GetClusterKinds();

await Provider.StartMemberAsync(this);

Logger.LogInformation("Started as cluster member");
Expand All @@ -88,6 +89,7 @@ public async Task StartClientAsync()

private async Task BeginStartAsync(bool client)
{
InitClusterKinds();
//default to partition identity lookup
IdentityLookup = Config.IdentityLookup ?? new PartitionIdentityLookup();

Expand All @@ -104,6 +106,14 @@ private async Task BeginStartAsync(bool client)
await _clusterHeartBeat.StartAsync();
}

private void InitClusterKinds()
{
foreach (var (name, props) in Config.ClusterKinds)
{
_clusterKinds.Add(name, new ClusterKind(name, props));
}
}

public async Task ShutdownAsync(bool graceful = true)
{
await System.ShutdownAsync();
Expand All @@ -128,12 +138,14 @@ public async Task ShutdownAsync(bool graceful = true)
public Task<T> RequestAsync<T>(string identity, string kind, object message, ISenderContext context, CancellationToken ct) =>
ClusterContext.RequestAsync<T>(new ClusterIdentity {Identity = identity, Kind = kind}, message, context, ct);

public Props GetClusterKind(string kind)

private Dictionary<string, ClusterKind> _clusterKinds = new();
public ClusterKind GetClusterKind(string kind)
{
if (!Config.ClusterKinds.TryGetValue(kind, out var props))
throw new ArgumentException($"No Props found for kind '{kind}'");
if (!_clusterKinds.TryGetValue(kind, out var clusterKind))
throw new ArgumentException($"No cluster kind '{kind}' was not found");

return props;
return clusterKind;
}
}
}
26 changes: 13 additions & 13 deletions src/Proto.Cluster/ClusterExtension.cs
Expand Up @@ -6,11 +6,13 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto.Cluster.Metrics;
using Proto.Deduplication;

namespace Proto.Cluster
{
[PublicAPI]
public static class Extensions
{
public static ActorSystem WithCluster(this ActorSystem system, ClusterConfig config)
Expand All @@ -32,23 +34,21 @@ public static Task<T> ClusterRequestAsync<T>(this IContext context, string ident
return cluster.RequestAsync<T>(identity, kind, message, context, ct);
}

public static Props WithClusterInit(this Props props, Cluster cluster, ClusterIdentity clusterIdentity)
public static Props WithClusterInit(this Props props, Cluster cluster, ClusterIdentity clusterIdentity, ClusterKind clusterKind)
{
return props.WithReceiverMiddleware(
baseReceive =>
(ctx, env) => {
return env.Message switch
{
Started => HandleStart(cluster, clusterIdentity, baseReceive, ctx, env),
Stopped => HandleStopped(cluster, clusterIdentity, baseReceive, ctx, env),
Started => HandleStart(baseReceive, ctx, env),
Stopped => HandleStopped(baseReceive, ctx, env),
_ => baseReceive(ctx, env)
};
}
);

static async Task HandleStart(
Cluster cluster,
ClusterIdentity clusterIdentity,
async Task HandleStart(
Receiver baseReceive,
IReceiverContext ctx,
MessageEnvelope startEnvelope
Expand All @@ -57,21 +57,21 @@ MessageEnvelope startEnvelope
await baseReceive(ctx, startEnvelope);
var grainInit = new ClusterInit(clusterIdentity, cluster);
var grainInitEnvelope = new MessageEnvelope(grainInit, null);
cluster.System.Metrics.Get<ClusterMetrics>().ClusterActorCount
.Inc(new[] {cluster.System.Id, cluster.System.Address, clusterIdentity.Kind});
var count = clusterKind.Inc();
cluster.System.Metrics.Get<ClusterMetrics>().ClusterActorGauge
.Set(count,new[] {cluster.System.Id, cluster.System.Address, clusterIdentity.Kind});
await baseReceive(ctx, grainInitEnvelope);
}

static async Task HandleStopped(
Cluster cluster,
ClusterIdentity clusterIdentity,
async Task HandleStopped(
Receiver baseReceive,
IReceiverContext ctx,
MessageEnvelope startEnvelope
)
{
cluster.System.Metrics.Get<ClusterMetrics>().ClusterActorCount
.Inc(new[] {cluster.System.Id, cluster.System.Address, clusterIdentity.Kind}, -1);
var count = clusterKind.Dec();
cluster.System.Metrics.Get<ClusterMetrics>().ClusterActorGauge
.Set(count, new[] {cluster.System.Id, cluster.System.Address, clusterIdentity.Kind});
await baseReceive(ctx, startEnvelope);
}
}
Expand Down
17 changes: 17 additions & 0 deletions src/Proto.Cluster/ClusterKind.cs
@@ -0,0 +1,17 @@
// -----------------------------------------------------------------------
// <copyright file="ClusterKind.cs" company="Asynkron AB">
// Copyright (C) 2015-2021 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System.Threading;

namespace Proto.Cluster
{
public record ClusterKind(string Name, Props Props)
{
private int _count;

public int Inc() => Interlocked.Increment(ref _count);
public int Dec() => Interlocked.Decrement(ref _count);
}
}
5 changes: 3 additions & 2 deletions src/Proto.Cluster/Identity/IdentityStoragePlacementActor.cs
Expand Up @@ -83,7 +83,7 @@ private async Task Terminated(IContext context, Terminated msg)

private Task ActivationRequest(IContext context, ActivationRequest msg)
{
var props = _cluster.GetClusterKind(msg.Kind);


try
{
Expand All @@ -94,12 +94,13 @@ private Task ActivationRequest(IContext context, ActivationRequest msg)
}
else
{
var clusterKind = _cluster.GetClusterKind(msg.Kind);
//this actor did not exist, lets spawn a new activation

//spawn and remember this actor
//as this id is unique for this activation (id+counter)
//we cannot get ProcessNameAlreadyExists exception here
var clusterProps = props.WithClusterInit(_cluster, msg.ClusterIdentity);
var clusterProps = clusterKind.Props.WithClusterInit(_cluster, msg.ClusterIdentity, clusterKind);

var sw = Stopwatch.StartNew();
var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.ToString());
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Cluster/Metrics/ClusterMetrics.cs
Expand Up @@ -10,15 +10,15 @@ namespace Proto.Cluster.Metrics
{
public class ClusterMetrics
{
public readonly ICountMetric ClusterActorCount;
public readonly IGaugeMetric ClusterActorGauge;
public readonly IHistogramMetric ClusterActorSpawnHistogram;
public readonly IHistogramMetric ClusterRequestHistogram;
public readonly ICountMetric ClusterRequestRetryCount;
public readonly IGaugeMetric ClusterTopologyEventGauge;

public ClusterMetrics(ProtoMetrics metrics)
{
ClusterActorCount = metrics.CreateCount("protocluster_virtualactor_count", "", "id", "address", "clusterkind");
ClusterActorGauge = metrics.CreateGauge("protocluster_virtualactors", "", "id", "address", "clusterkind");

ClusterActorSpawnHistogram =
metrics.CreateHistogram("protocluster_virtualactor_spawn_duration_seconds", "", "id", "address", "clusterkind");
Expand Down
5 changes: 3 additions & 2 deletions src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Expand Up @@ -134,7 +134,7 @@ private Task IdentityHandoverRequest(IContext context, IdentityHandoverRequest m

private Task ActivationRequest(IContext context, ActivationRequest msg)
{
var props = _cluster.GetClusterKind(msg.ClusterIdentity.Kind);


try
{
Expand All @@ -149,13 +149,14 @@ private Task ActivationRequest(IContext context, ActivationRequest msg)
}
else
{
var clusterKind = _cluster.GetClusterKind(msg.ClusterIdentity.Kind);
//this actor did not exist, lets spawn a new activation

//spawn and remember this actor
//as this id is unique for this activation (id+counter)
//we cannot get ProcessNameAlreadyExists exception here

var clusterProps = props.WithClusterInit(_cluster, msg.ClusterIdentity);
var clusterProps = clusterKind.Props.WithClusterInit(_cluster, msg.ClusterIdentity, clusterKind);

var pid = context.SpawnPrefix(clusterProps, msg.ClusterIdentity.Identity);

Expand Down

0 comments on commit 276b6cf

Please sign in to comment.