-
Notifications
You must be signed in to change notification settings - Fork 2k
/
SimpleQueueCacheCursor.cs
168 lines (150 loc) · 5.79 KB
/
SimpleQueueCacheCursor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Streams;
namespace Orleans.Providers.Streams.Common
{
/// <summary>
/// Cursor into a simple queue cache
/// </summary>
public class SimpleQueueCacheCursor : IQueueCacheCursor
{
private readonly IStreamIdentity streamIdentity;
private readonly SimpleQueueCache cache;
private readonly ILogger logger;
private IBatchContainer current; // this is a pointer to the current element in the cache. It is what will be returned by GetCurrent().
// This is also a pointer to the current element in the cache. It differs from current, in
// that current is just the batch, and is null before the first call to MoveNext after
// construction. (Or after refreshing if we had previously run out of batches). Upon MoveNext
// being called in that situation, current gets set to the batch included in Element. That is
// needed to implement the Enumerator pattern properly, since in that pattern MoveNext gets called
// before the first access of (Get)Current.
internal LinkedListNode<SimpleQueueCacheItem> Element { get; private set; }
internal StreamSequenceToken SequenceToken { get; private set; }
internal bool IsSet => Element != null;
internal void Set(LinkedListNode<SimpleQueueCacheItem> item)
{
if (item == null) throw new NullReferenceException(nameof(item));
Element = item;
SequenceToken = item.Value.SequenceToken;
}
internal void UnSet(StreamSequenceToken token)
{
Element = null;
SequenceToken = token;
}
/// <summary>
/// Cursor into a simple queue cache
/// </summary>
/// <param name="cache"></param>
/// <param name="streamIdentity"></param>
/// <param name="logger"></param>
public SimpleQueueCacheCursor(SimpleQueueCache cache, IStreamIdentity streamIdentity, ILogger logger)
{
if (cache == null)
{
throw new ArgumentNullException(nameof(cache));
}
this.cache = cache;
this.streamIdentity = streamIdentity;
this.logger = logger;
current = null;
SimpleQueueCache.Log(logger, "SimpleQueueCacheCursor New Cursor for {Guid}, {NameSpace}", streamIdentity.Guid, streamIdentity.Namespace);
}
/// <summary>
/// Get the current value.
/// </summary>
/// <param name="exception"></param>
/// <returns>
/// Returns the current batch container.
/// If null then the stream has completed or there was a stream error.
/// If there was a stream error, an error exception will be provided in the output.
/// </returns>
public virtual IBatchContainer GetCurrent(out Exception exception)
{
SimpleQueueCache.Log(logger, "SimpleQueueCacheCursor.GetCurrent: {Current}", current);
exception = null;
return current;
}
/// <summary>
/// Move to next message in the stream.
/// If it returns false, there are no more messages. The enumerator is still
/// valid however and can be called again when more data has come in on this
/// stream.
/// </summary>
/// <returns></returns>
public virtual bool MoveNext()
{
if (current == null && IsSet && IsInStream(Element.Value.Batch))
{
current = Element.Value.Batch;
return true;
}
IBatchContainer next;
while (cache.TryGetNextMessage(this, out next))
{
if(IsInStream(next))
break;
}
current = next;
if (!IsInStream(next))
return false;
return true;
}
/// <summary>
/// Refresh that cache cursor. Called when new data is added into a cache.
/// </summary>
/// <returns></returns>
public virtual void Refresh(StreamSequenceToken sequenceToken)
{
if (!IsSet)
{
cache.RefreshCursor(this, sequenceToken);
}
}
/// <summary>
/// Record that delivery of the current event has failed
/// </summary>
public void RecordDeliveryFailure()
{
if (IsSet && current != null)
{
Element.Value.DeliveryFailure = true;
}
}
private bool IsInStream(IBatchContainer batchContainer)
{
return batchContainer != null &&
batchContainer.StreamGuid.Equals(streamIdentity.Guid) &&
string.Equals(batchContainer.StreamNamespace, streamIdentity.Namespace);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// Clean up cache data when done
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
cache.UnsetCursor(this, null);
current = null;
}
}
/// <summary>
/// Convert object to string
/// </summary>
/// <returns></returns>
public override string ToString()
{
return $"<SimpleQueueCacheCursor: Element={Element?.Value.Batch.ToString() ?? "null"}, SequenceToken={SequenceToken?.ToString() ?? "null"}>";
}
}
}