Permalink
Browse files

Resolve issues with cursor ids crossing streams.

- Take the stream index the into account when checking for
  overrun buffers.
- Only process the next cursor if it's bigger than what we've
  already seen.
- Only update cursors relevant to the stream being processed.

#1973
  • Loading branch information...
1 parent 5d70038 commit d0a283091ce027ed0f09479a505356c2f238f3aa @davidfowl davidfowl committed May 11, 2013
@@ -94,10 +94,15 @@ public Message(string source, string key, ArraySegment<byte> value)
public Encoding Encoding { get; private set; }
/// <summary>
- /// The scaleout mapping id. Only used in scaleout scenarios
+ /// The payload id. Only used in scaleout scenarios
/// </summary>
public ulong MappingId { get; set; }
+ /// <summary>
+ /// The stream index this message came from. Only used the scaleout scenarios.
+ /// </summary>
+ public int StreamIndex { get; set; }
+
public bool IsCommand
{
get
@@ -178,7 +178,10 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM
for (var i = 0; i < scaleoutMessage.Messages.Count; ++i)
{
Message message = scaleoutMessage.Messages[i];
+
+ // Remember where this message came from
message.MappingId = id;
+ message.StreamIndex = streamIndex;
IList<LocalEventKeyInfo> keyInfo;
if (!localMapping.TryGetValue(message.Key, out keyInfo))
@@ -187,7 +190,6 @@ private void OnReceivedCore(int streamIndex, ulong id, ScaleoutMessage scaleoutM
localMapping.Add(message.Key, keyInfo);
}
-
ulong localId = Save(message);
MessageStore<Message> messageStore = Topics[message.Key].Store;
@@ -83,16 +83,16 @@ protected override void PerformWork(IList<ArraySegment<Message>> items, out int
ScaleoutMapping mapping = enumerator.Current.Item1;
int streamIndex = enumerator.Current.Item2;
- ulong mappingId = ExtractMessages(mapping, items, ref totalCount);
+ ulong? nextCursor = nextCursors[streamIndex];
- // Update the cursor id
- nextCursors[streamIndex] = mappingId;
-
- // If the mapping id of the message we received is bigger than our current mapping id
- // it means we missed messages and we need to jump ahead.
- if (mappingId > mapping.Id)
+ // Only keep going with this stream if the cursor we're looking at is bigger than
+ // anything we already processed
+ if (nextCursor == null || mapping.Id > nextCursor)
{
- break;
+ ulong mappingId = ExtractMessages(streamIndex, mapping, items, ref totalCount);
+
+ // Update the cursor id
+ nextCursors[streamIndex] = mappingId;
}
}
@@ -167,7 +167,7 @@ protected override void BeforeInvoke(object state)
}
}
- private ulong ExtractMessages(ScaleoutMapping mapping, IList<ArraySegment<Message>> items, ref int totalCount)
+ private ulong ExtractMessages(int streamIndex, ScaleoutMapping mapping, IList<ArraySegment<Message>> items, ref int totalCount)
{
// For each of the event keys we care about, extract all of the messages
// from the payload
@@ -186,14 +186,27 @@ private ulong ExtractMessages(ScaleoutMapping mapping, IList<ArraySegment<Messag
if (storeResult.Messages.Count > 0)
{
- items.Add(storeResult.Messages);
- totalCount += storeResult.Messages.Count;
-
- ulong mappingId = storeResult.Messages.Array[storeResult.Messages.Offset].MappingId;
+ // TODO: Figure out what to do when we have multiple event keys per mapping
+ Message message = storeResult.Messages.Array[storeResult.Messages.Offset];
- if (mappingId > mapping.Id)
+ // Only add the message to the list if the stream index matches
+ if (message.StreamIndex == streamIndex)
+ {
+ items.Add(storeResult.Messages);
+ totalCount += storeResult.Messages.Count;
+
+ // We got a mapping id bigger than what we expected which
+ // means we missed messages. Use the new mappingId.
+ if (message.MappingId > mapping.Id)
+ {
+ return message.MappingId;
+ }
+ }
+ else
{
- return mappingId;
+ // REVIEW: When the stream indexes don't match should we leave the mapping id as is?
+ // If we do nothing then we'll end up querying old cursor ids until
+ // we eventually find a message id that matches this stream index.
}
}
}
@@ -223,6 +223,64 @@ public void SubscriptionGetsNewMessagesWhenTopicStoreOverrun()
}
[Fact]
+ public void SubscriptionDoesNotGetNewMessagesWhenTopicStoreOverrunByOtherStream()
+ {
+ var dr = new DefaultDependencyResolver();
+ dr.Resolve<IConfigurationManager>().DefaultMessageBufferSize = 10;
+
+ using (var bus = new TestScaleoutBus(dr, streams: 2))
+ {
+ var subscriber = new TestSubscriber(new[] { "key" });
+ IDisposable subscription = null;
+
+ // The min fragment size is 8 and min fragments is 5
+ var expectedValues = Enumerable.Range(171, 8);
+ var cd = new OrderedCountDownRange<int>(expectedValues);
+
+ // This will overwrite the buffer ending up with (40 - 79) for stream 2
+ for (int i = 0; i < 80; i++)
+ {
+ bus.Publish(0, (ulong)i, new[] {
+ new Message("test", "key", i.ToString())
+ });
+ }
+
+ // This will overwrite the buffer with (140 - 179) for stream 1
+ for (int i = 100; i < 180; i++)
+ {
+ bus.Publish(1, (ulong)i, new[] {
+ new Message("test", "key", i.ToString())
+ });
+ }
+
+ try
+ {
+ subscription = bus.Subscribe(subscriber, "0,27|1,AA", (result, state) =>
+ {
+ foreach (var m in result.GetMessages())
+ {
+ int n = Int32.Parse(m.GetString());
+
+ cd.Expect(n);
+ }
+
+ return TaskAsyncHelper.True;
+
+ }, 100, null);
+
+ Assert.True(cd.Wait(TimeSpan.FromSeconds(10)));
+ }
+ finally
+ {
+ if (subscription != null)
+ {
+ subscription.Dispose();
+ }
+ }
+ }
+ }
+
+ [Fact]
public void SubscriptionPublishingAfter()
{
var dr = new DefaultDependencyResolver();

0 comments on commit d0a2830

Please sign in to comment.