-
Notifications
You must be signed in to change notification settings - Fork 40
/
QueueSink.cs
155 lines (137 loc) · 5.9 KB
/
QueueSink.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
using System;
using System.Threading.Tasks;
using Akka.Streams.Azure.Utils;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Azure;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
namespace Akka.Streams.Azure.StorageQueue
{
/// <summary>
/// A <see cref="Sink{TIn,TMat}"/> for the Azure Storage Queue
/// </summary>
public class QueueSink : GraphStageWithMaterializedValue<SinkShape<string>, Task>
{
#region Logic
private sealed class Logic : GraphStageLogic
{
private readonly QueueSink _sink;
private readonly TaskCompletionSource<NotUsed> _completion;
private Action<(Task<Response<SendReceipt>>, string)> _messageAddedCallback;
private bool _isAddInProgress;
private readonly Decider _decider;
public Logic(QueueSink sink, Attributes attributes, TaskCompletionSource<NotUsed> completion) : base(sink.Shape)
{
_sink = sink;
_completion = completion;
_decider = attributes.GetDeciderOrDefault();
SetHandler(sink.In,
onPush: () => TryAdd(Grab(_sink.In)),
onUpstreamFinish: () =>
{
// It is most likely that we receive the finish event before the task from the last element has finished
// so if the task is still running we need to complete the stage later
if (!_isAddInProgress)
Finish();
},
onUpstreamFailure: ex =>
{
_completion.TrySetException(ex);
// We have set KeepGoing to true so we need to fail the stage manually
FailStage(ex);
});
}
public override void PreStart()
{
// Keep going even if the upstream has finished so that we can process the task from the last element
SetKeepGoing(true);
_messageAddedCallback = GetAsyncCallback<(Task<Response<SendReceipt>>, string)>(OnMessageAdded);
// Request the first element
Pull(_sink.In);
}
private void TryAdd(string message)
{
_isAddInProgress = true;
_sink._queue.SendMessageAsync(
message,
_sink._options.InitialVisibilityDelay,
_sink._options.TimeToLive)
.ContinueWith(t => _messageAddedCallback((t, message)));
}
private void OnMessageAdded((Task<Response<SendReceipt>>, string) t)
{
_isAddInProgress = false;
var (task, message) = t;
if (task.IsFaulted || task.IsCanceled)
{
switch (_decider(task.Exception))
{
case Directive.Stop:
// Throw
_completion.TrySetException(task.Exception);
FailStage(task.Exception);
break;
case Directive.Resume:
// Try again
TryAdd(message);
break;
case Directive.Restart:
// Take the next element or complete
PullOrComplete();
break;
default:
throw new ArgumentOutOfRangeException();
}
}
else
PullOrComplete();
}
private void PullOrComplete()
{
if (IsClosed(_sink.In))
Finish();
else
Pull(_sink.In);
}
private void Finish()
{
_completion.TrySetResult(NotUsed.Instance);
CompleteStage();
}
}
#endregion
/// <summary>
/// Creates a <see cref="Sink{TIn,TMat}"/> for the Azure Storage Queue
/// </summary>
/// <param name="queue">The queue</param>
/// <param name="options">The options for the <see cref="QueueClient.SendMessageAsync(string)"/> call</param>
/// <returns>The <see cref="Sink{TIn,TMat}"/> for the Azure Storage Queue</returns>
public static Sink<string, Task> Create(QueueClient queue, AddRequestOptions options = null)
{
return Sink.FromGraph(new QueueSink(queue, options));
}
private readonly QueueClient _queue;
private readonly AddRequestOptions _options;
/// <summary>
/// Create a new instance of the <see cref="QueueSink"/>
/// </summary>
/// <param name="queue">The queue</param>
/// <param name="options">The options for the <see cref="QueueClient.SendMessageAsync(string)"/> call</param>
public QueueSink(QueueClient queue, AddRequestOptions options = null)
{
_queue = queue;
_options = options ?? new AddRequestOptions();
Shape = new SinkShape<string>(In);
}
public Inlet<string> In { get; } = new Inlet<string>("QueueSink.In");
public override SinkShape<string> Shape { get; }
protected override Attributes InitialAttributes { get; } = Attributes.CreateName("QueueSink");
public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
{
var completion = new TaskCompletionSource<NotUsed>();
return new LogicAndMaterializedValue<Task>(new Logic(this, inheritedAttributes, completion), completion.Task);
}
}
}