-
Notifications
You must be signed in to change notification settings - Fork 1k
/
ShardingConsumerController.cs
84 lines (76 loc) · 4.44 KB
/
ShardingConsumerController.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// -----------------------------------------------------------------------
// <copyright file="ShardingConsumerController.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------
#nullable enable
using System;
using Akka.Actor;
using Akka.Annotations;
using Akka.Cluster.Sharding.Delivery.Internal;
using Akka.Configuration;
using Akka.Delivery;
namespace Akka.Cluster.Sharding.Delivery;
/// <summary>
/// <see cref="ShardingConsumerController"/> is used together with <see cref="ShardingProducerController"/>.
///
/// <see cref="ShardingConsumerController"/> is the entity actor that is initialized via <see cref="ClusterSharding"/>.
/// It will manage the lifecycle and message delivery to the destination consumer actor (your actor type specified via <see cref="Props"/>
/// in the <see cref="ShardingConsumerController.Create{T}(System.Func{Akka.Actor.IActorRef,Akka.Actor.Props},Settings)"/> method.)
///
/// The destination consumer actor will start the flow by sending an initial <see cref="ConsumerController.Start{T}"/>
/// message to the <see cref="ShardingConsumerController"/>, its parent actor.
///
/// Received messages from the producer are wrapped in a <see cref="ConsumerController.Delivery{T}"/> message and sent to the destination consumer actor,
/// which is supposed to reply with <see cref="ConsumerController.Confirmed"/> when it has successfully processed the message.
///
/// Next message from the producer will not be delivered until the destination consumer actor has confirmed the previous message.
/// However, since there can be several producers, e.g. one per node, sending messages to the same destination entity actor there can
/// be multiple <see cref="ConsumerController.Delivery{T}"/> messages in flight at the same time.
///
/// More messages from a specific producer that arrive while waiting for the confirmation are stashed by the <see cref="ConsumerController"/>
/// and delivered when the previous messages are confirmed.
/// </summary>
[ApiMayChange]
public static class ShardingConsumerController
{
public sealed record Settings
{
private Settings(Config config, ConsumerController.Settings consumerControllerSettings)
{
AllowBypass = config.GetBoolean("allow-bypass");
BufferSize = config.GetInt("buffer-size");
ConsumerControllerSettings = consumerControllerSettings;
}
public bool AllowBypass { get; init; }
public int BufferSize { get; init; }
public ConsumerController.Settings ConsumerControllerSettings { get; init; }
public static Settings Create(ActorSystem system)
{
// TODO: remove work-around once substitutions + overrides work properly in HOCON
return Create(system.Settings.Config.GetConfig("akka.reliable-delivery.sharding.consumer-controller"),
system.Settings.Config.GetConfig("akka.reliable-delivery.consumer-controller"));
}
internal static Settings Create(Config config, Config consumerControllerConfig) // made internal so users can't foot-gun themselves
{
return new Settings(config, ConsumerController.Settings.Create(config.WithFallback(consumerControllerConfig)));
}
public override string ToString()
{
return $"ShardingConsumerController.Settings(BufferSize={BufferSize}, ConsumerControllerSettings={ConsumerControllerSettings})";
}
}
/// <summary>
/// Creates a new instance of <see cref="ShardingConsumerController"/> props for the given entity and type of message.
/// </summary>
/// <param name="consumerProps">A function that passes in the <see cref="ShardingConsumerController"/> actor reference
/// in exchange for the consumer's <see cref="Props"/>.</param>
/// <param name="settings">The settings for the <see cref="ShardingConsumerController"/>.</param>
/// <typeparam name="T">The type of message for which we will be guaranteeing delivery.</typeparam>
/// <returns>The props used to start this entity.</returns>
public static Props Create<T>(Func<IActorRef, Props> consumerProps, Settings settings)
{
return Props.Create(() => new ShardingConsumerController<T>(consumerProps, settings)).WithStashCapacity(settings.BufferSize);
}
}