Skip to content

Commit

Permalink
Merge pull request #3734 from EventStore/cherry-pick/3726/incorrect-c…
Browse files Browse the repository at this point in the history
…heckpoint-filtered-subscriptions-release/oss-v21.10

[release/oss-v21.10] Checkpoint does not update when filtered subscription becomes Live
  • Loading branch information
hayley-jean committed Feb 16, 2023
2 parents 20941d6 + ca24844 commit 80c1657
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,19 @@ public class when_subscribing_to_all_with_a_filter_live<TLogFormat, TStreamId>

Assert.AreEqual(ReadResp.ContentOneofCase.Confirmation, call.ResponseStream.Current.ContentCase);

var success = (await AppendToStreamBatch(new BatchAppendReq {
Options = new() {
Any = new(),
StreamIdentifier = new() {StreamName = ByteString.CopyFromUtf8(StreamName)}
},
IsFinal = true,
ProposedMessages = {CreateEvents(1)},
CorrelationId = Uuid.NewUuid().ToDto()
})).Success;

var success = new BatchAppendResp.Types.Success();
for (int i = 0; i <= _checkpointInterval ; i++) {
success = (await AppendToStreamBatch(new BatchAppendReq {
Options = new() {
Any = new(),
StreamIdentifier = new() {StreamName = ByteString.CopyFromUtf8(StreamName)}
},
IsFinal = true,
ProposedMessages = {CreateEvents(1)},
CorrelationId = Uuid.NewUuid().ToDto()
})).Success;
}

_position = new Position(success.Position.CommitPosition, success.Position.PreparePosition);

while (await call.ResponseStream.MoveNext()) {
Expand All @@ -231,15 +234,18 @@ public class when_subscribing_to_all_with_a_filter_live<TLogFormat, TStreamId>
}

if (response.ContentCase == ReadResp.ContentOneofCase.Event) {
Assert.AreEqual(StreamName, response.Event.Event.StreamIdentifier.StreamName.ToStringUtf8());
return;
Assert.AreEqual(StreamName,
response.Event.Event.StreamIdentifier.StreamName.ToStringUtf8());
if (!(response.Event.CommitPosition < _position.CommitPosition)) {
return;
}
}
}
}

[Test]
public void receives_the_correct_number_of_checkpoints() {
Assert.AreEqual(1, CheckpointCount);
Assert.AreEqual(2, CheckpointCount);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,21 @@ await foreach (var message in liveEvents.Reader.ReadAllAsync(_cancellationToken)
.ConfigureAwait(false)) {

if (message is ClientMessage.CheckpointReached checkpoint) {
if (checkpoint.Position == null) {
_channel.Writer.Complete(
new Exception("Unexpected error, checkpoint position is Null"));
break;
}

var checkpointPosition = Position.FromInt64(
checkpoint.Position.Value.CommitPosition,
checkpoint.Position.Value.PreparePosition);
await _channel.Writer
.WriteAsync(
new ReadResp {
Checkpoint = new ReadResp.Types.Checkpoint {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
CommitPosition = checkpointPosition.CommitPosition,
PreparePosition = checkpointPosition.PreparePosition
}
},
_cancellationToken)
Expand Down Expand Up @@ -439,8 +448,6 @@ await _channel.Writer
return;
}
case ClientMessage.CheckpointReached checkpointReached:
var position = Position.FromInt64(checkpointReached.Position.Value.CommitPosition,
checkpointReached.Position.Value.PreparePosition);
if (!liveEvents.Writer.TryWrite(checkpointReached)) {
ConsumerTooSlow(null);
}
Expand Down

0 comments on commit 80c1657

Please sign in to comment.