Skip to content

Commit

Permalink
Merge pull request #1302 from shaan1337/fix-checkpoint-issue-1301
Browse files Browse the repository at this point in the history
Fixes checkpoint bug
  • Loading branch information
hayley-jean committed May 23, 2017
2 parents 494e0db + e0de61b commit af2e705
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
Expand Up @@ -54,7 +54,7 @@ public void can_remove_duplicate()
cache.StartMessage(new OutstandingMessage(id, null, Helper.BuildFakeEvent(id, "type", "name", 1), 0), DateTime.Now);
cache.Remove(id);
Assert.AreEqual(0, cache.Count);
Assert.AreEqual(long.MinValue, cache.GetLowestPosition());
Assert.AreEqual(long.MaxValue, cache.GetLowestPosition());
}

[Test]
Expand All @@ -65,7 +65,7 @@ public void can_remove_existing_item()
cache.StartMessage(new OutstandingMessage(id, null, Helper.BuildFakeEvent(id, "type", "name", 0), 0), DateTime.Now);
cache.Remove(id);
Assert.AreEqual(0, cache.Count);
Assert.AreEqual(long.MinValue, cache.GetLowestPosition());
Assert.AreEqual(long.MaxValue, cache.GetLowestPosition());
}

[Test]
Expand All @@ -92,17 +92,17 @@ public void lowest_works_on_adds_then_remove()
}

[Test]
public void lowest_on_empty_cache_returns_min()
public void lowest_on_empty_cache_returns_max()
{
var cache = new OutstandingMessageCache();
Assert.AreEqual(long.MinValue, cache.GetLowestPosition());
Assert.AreEqual(long.MaxValue, cache.GetLowestPosition());
}
[Test]
public void get_expired_messages_returns_min_value_on_empty_cache()
public void get_expired_messages_returns_max_value_on_empty_cache()
{
var cache = new OutstandingMessageCache();
Assert.AreEqual(0, cache.GetMessagesExpiringBefore(DateTime.Now).Count());
Assert.AreEqual(long.MinValue, cache.GetLowestPosition());
Assert.AreEqual(long.MaxValue, cache.GetLowestPosition());
}

[Test]
Expand Down
Expand Up @@ -671,6 +671,61 @@ public void subscription_does_write_checkpoint_for_disconnected_clients_on_time_
sub.NotifyClockTick(DateTime.UtcNow);
Assert.AreEqual(1, cp);
}

[Test]
public void subscription_writes_correct_checkpoint_when_outstanding_messages_is_empty_and_retry_buffer_is_non_empty()
{
long cp = -1;
var reader = new FakeCheckpointReader();
var sub = new Core.Services.PersistentSubscription.PersistentSubscription(
PersistentSubscriptionParamsBuilder.CreateFor("streamName", "groupName")
.WithEventLoader(new FakeStreamReader(x => { }))
.WithCheckpointReader(reader)
.WithCheckpointWriter(new FakeCheckpointWriter(i => cp = i))
.WithMessageParker(new FakeMessageParker())
.StartFromBeginning()
.MinimumToCheckPoint(1)
.MaximumToCheckPoint(1));
reader.Load(null);

var eventId1 = Guid.NewGuid();
var eventId2 = Guid.NewGuid();
var eventId3 = Guid.NewGuid();

var clientConnectionId = Guid.NewGuid();
var clientCorrelationId = Guid.NewGuid();
sub.AddClient(clientCorrelationId, clientConnectionId, new FakeEnvelope(), 10, "foo", "bar");

//send events 1-3, ACK event 1 only and Mark checkpoint
sub.HandleReadCompleted(new[]
{
Helper.BuildFakeEvent(eventId1, "type", "streamName", 1),
Helper.BuildFakeEvent(eventId2, "type", "streamName", 2),
Helper.BuildFakeEvent(eventId3, "type", "streamName", 3)
}, 1, false);
sub.GetNextNOrLessMessages(3).ToArray();
sub.AcknowledgeMessagesProcessed(clientCorrelationId, new [] {eventId1});
sub.TryMarkCheckpoint(false);

//checkpoint should be at event 2
Assert.AreEqual(2, cp);

//events 2 & 3 should still be in _outstandingMessages buffer
Assert.AreEqual(sub.OutstandingMessageCount,2);
//retry queue should be empty
Assert.AreEqual(sub._streamBuffer.RetryBufferCount, 0);

//Disconnect the client
sub.RemoveClientByConnectionId(clientConnectionId);

//this should empty the _outstandingMessages buffer and move events 2 & 3 to the retry queue
Assert.AreEqual(sub.OutstandingMessageCount,0);
Assert.AreEqual(sub._streamBuffer.RetryBufferCount, 2);

//mark the checkpoint which should still be at event 2 although the _lastKnownMessage value is 3.
sub.TryMarkCheckpoint(false);
Assert.AreEqual(2, cp);
}
}

[TestFixture]
Expand Down
Expand Up @@ -91,7 +91,7 @@ public IEnumerable<OutstandingMessage> GetMessagesExpiringBefore(DateTime time)
public long GetLowestPosition()
{
//TODO is there a better way of doing this?
if (_bySequences.Count == 0) return long.MinValue;
if (_bySequences.Count == 0) return long.MaxValue;
return _bySequences.Values[0];
}

Expand Down
Expand Up @@ -294,7 +294,7 @@ public void TryMarkCheckpoint(bool isTimeCheck)
//TODO? COMPETING better to make -1? as of now we are inclusive of checkpoint.
var lowestBufferedRetry = _streamBuffer.GetLowestRetry();
lowest = Math.Min(lowest, lowestBufferedRetry);
if (lowest == long.MinValue) lowest = _lastKnownMessage;
if (lowest == long.MaxValue) lowest = _lastKnownMessage;
if (lowest == 0) return;
//no outstanding messages. in this case we can say that the last known
//event would be our checkpoint place (we have already completed it)
Expand Down

0 comments on commit af2e705

Please sign in to comment.