-
Notifications
You must be signed in to change notification settings - Fork 2k
/
PersistentStreamProvider.cs
184 lines (162 loc) · 8.87 KB
/
PersistentStreamProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Streams;
using Orleans.Streams.Core;
namespace Orleans.Providers.Streams.Common
{
[Serializable]
public enum PersistentStreamProviderCommand
{
None,
StartAgents,
StopAgents,
GetAgentsState,
GetNumberRunningAgents,
AdapterCommandStartRange = 10000,
AdapterCommandEndRange = AdapterCommandStartRange + 9999,
AdapterFactoryCommandStartRange = AdapterCommandEndRange + 1,
AdapterFactoryCommandEndRange = AdapterFactoryCommandStartRange + 9999,
}
/// <summary>
/// Persistent stream provider that uses an adapter for persistence
/// </summary>
public class PersistentStreamProvider : IStreamProvider, IInternalStreamProvider, IControllable, IStreamSubscriptionManagerRetriever, ILifecycleParticipant<ILifecycleObservable>
{
private readonly ILogger logger;
private readonly IStreamProviderRuntime runtime;
private readonly SerializationManager serializationManager;
private readonly IRuntimeClient runtimeClient;
private readonly ProviderStateManager stateManager = new ProviderStateManager();
private IQueueAdapterFactory adapterFactory;
private IQueueAdapter queueAdapter;
private IPersistentStreamPullingManager pullingAgentManager;
private IStreamSubscriptionManager streamSubscriptionManager;
private readonly StreamPubSubOptions pubsubOptions;
private readonly StreamLifecycleOptions lifeCycleOptions;
public string Name { get; private set; }
public bool IsRewindable { get { return queueAdapter.IsRewindable; } }
public PersistentStreamProvider(string name, StreamPubSubOptions pubsubOptions, StreamLifecycleOptions lifeCycleOptions, IProviderRuntime runtime, SerializationManager serializationManager, ILogger<PersistentStreamProvider> logger)
{
if (String.IsNullOrEmpty(name)) throw new ArgumentNullException(nameof(name));
if (runtime == null) throw new ArgumentNullException(nameof(runtime));
this.pubsubOptions = pubsubOptions ?? throw new ArgumentNullException(nameof(pubsubOptions));
this.Name = name;
this.lifeCycleOptions = lifeCycleOptions ?? throw new ArgumentNullException(nameof(lifeCycleOptions));
this.runtime = runtime.ServiceProvider.GetRequiredService<IStreamProviderRuntime>();
this.runtimeClient = runtime.ServiceProvider.GetRequiredService<IRuntimeClient>();
this.serializationManager = serializationManager ?? throw new ArgumentNullException(nameof(serializationManager));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
private async Task Init(CancellationToken token)
{
if(!this.stateManager.PresetState(ProviderState.Initialized)) return;
this.adapterFactory = this.runtime.ServiceProvider.GetRequiredServiceByName<IQueueAdapterFactory>(this.Name);
this.queueAdapter = await adapterFactory.CreateAdapter();
if (this.pubsubOptions.PubSubType == StreamPubSubType.ExplicitGrainBasedAndImplicit
|| this.pubsubOptions.PubSubType == StreamPubSubType.ExplicitGrainBasedOnly)
{
this.streamSubscriptionManager = this.runtime.ServiceProvider
.GetService<IStreamSubscriptionManagerAdmin>().GetStreamSubscriptionManager(StreamSubscriptionManagerType.ExplicitSubscribeOnly);
}
this.stateManager.CommitState();
}
private async Task Start(CancellationToken token)
{
if (!this.stateManager.PresetState(ProviderState.Started)) return;
if (this.queueAdapter.Direction.Equals(StreamProviderDirection.ReadOnly) ||
this.queueAdapter.Direction.Equals(StreamProviderDirection.ReadWrite))
{
var siloRuntime = this.runtime as ISiloSideStreamProviderRuntime;
if (siloRuntime != null)
{
this.pullingAgentManager = await siloRuntime.InitializePullingAgents(this.Name, this.adapterFactory, this.queueAdapter);
// TODO: No support yet for DeliveryDisabled, only Stopped and Started
if (this.lifeCycleOptions.StartupState == StreamLifecycleOptions.RunState.AgentsStarted)
await pullingAgentManager.StartAgents();
}
}
stateManager.CommitState();
}
public IStreamSubscriptionManager GetStreamSubscriptionManager()
{
return this.streamSubscriptionManager;
}
private async Task Close(CancellationToken token)
{
if (!stateManager.PresetState(ProviderState.Closed)) return;
var manager = this.pullingAgentManager;
if (manager != null)
{
await manager.Stop();
}
stateManager.CommitState();
}
public IAsyncStream<T> GetStream<T>(Guid id, string streamNamespace)
{
var streamId = StreamId.GetStreamId(id, Name, streamNamespace);
return this.runtime.GetStreamDirectory().GetOrAddStream<T>(
streamId, () => new StreamImpl<T>(streamId, this, IsRewindable, this.runtimeClient));
}
IInternalAsyncBatchObserver<T> IInternalStreamProvider.GetProducerInterface<T>(IAsyncStream<T> stream)
{
if (queueAdapter.Direction == StreamProviderDirection.ReadOnly)
{
throw new InvalidOperationException($"Stream provider {queueAdapter.Name} is ReadOnly.");
}
return new PersistentStreamProducer<T>((StreamImpl<T>)stream, this.runtime, queueAdapter, IsRewindable, this.serializationManager);
}
IInternalAsyncObservable<T> IInternalStreamProvider.GetConsumerInterface<T>(IAsyncStream<T> streamId)
{
return GetConsumerInterfaceImpl(streamId);
}
private IInternalAsyncObservable<T> GetConsumerInterfaceImpl<T>(IAsyncStream<T> stream)
{
return new StreamConsumer<T>((StreamImpl<T>)stream, Name, this.runtime, this.runtime.PubSub(this.pubsubOptions.PubSubType), this.logger, IsRewindable);
}
public Task<object> ExecuteCommand(int command, object arg)
{
if (command >= (int)PersistentStreamProviderCommand.AdapterCommandStartRange &&
command <= (int)PersistentStreamProviderCommand.AdapterCommandEndRange &&
queueAdapter is IControllable)
{
return ((IControllable)queueAdapter).ExecuteCommand(command, arg);
}
if (command >= (int)PersistentStreamProviderCommand.AdapterFactoryCommandStartRange &&
command <= (int)PersistentStreamProviderCommand.AdapterFactoryCommandEndRange &&
adapterFactory is IControllable)
{
return ((IControllable)adapterFactory).ExecuteCommand(command, arg);
}
if (pullingAgentManager != null)
{
return pullingAgentManager.ExecuteCommand((PersistentStreamProviderCommand)command, arg);
}
logger.Warn(0, $"Got command {(PersistentStreamProviderCommand)command} with arg {arg}, but PullingAgentManager is not initialized yet. Ignoring the command.");
throw new ArgumentException("PullingAgentManager is not initialized yet.");
}
public void Participate(ILifecycleObservable lifecycle)
{
lifecycle.Subscribe(OptionFormattingUtilities.Name<PersistentStreamProvider>(this.Name), this.lifeCycleOptions.InitStage, Init);
lifecycle.Subscribe(OptionFormattingUtilities.Name<PersistentStreamProvider>(this.Name), this.lifeCycleOptions.StartStage, Start, Close);
}
public static IStreamProvider Create(IServiceProvider services, string name)
{
var pubsubOptions = services.GetRequiredService<IOptionsMonitor<StreamPubSubOptions>>().Get(name);
var initOptions = services.GetRequiredService<IOptionsMonitor<StreamLifecycleOptions>>().Get(name);
return ActivatorUtilities.CreateInstance<PersistentStreamProvider>(services, name, pubsubOptions, initOptions);
}
public static ILifecycleParticipant<TLifecycle> ParticipateIn<TLifecycle>(IServiceProvider serviceProvider, string name)
where TLifecycle : ILifecycleObservable
{
var provider = (PersistentStreamProvider)serviceProvider.GetRequiredServiceByName<IStreamProvider>(name);
return provider.ParticipateIn<TLifecycle>();
}
}
}