Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orleans Streaming: in [SetCursor] method, If last purged token does not exists, do not throw an exception, just start from the oldest message in cache #8863

Open
gusuchengnan opened this issue Feb 18, 2024 · 2 comments
Assignees

Comments

@gusuchengnan
Copy link

if (this.lastPurgedToken.TryGetValue(cursor.StreamId, out var entry) && sequenceToken.CompareTo(entry.Token) >= 0)

Should it be changed to this:

// Check if we missed an event since we last purged the cache
var isLastPurged = this.lastPurgedToken.TryGetValue(cursor.StreamId, out var entry);
if (!isLastPurged || sequenceToken.CompareTo(entry.Token) >= 0)
{
    // If the token is more recent than the last purged token, then we didn't lose anything. Start from the oldest message in cache
    cursor.State = CursorStates.Set;
    cursor.CurrentBlock = oldestBlock;
    cursor.Index = oldestBlock.Value.OldestMessageIndex;
    cursor.SequenceToken = oldestBlock.Value.GetOldestSequenceToken(cacheDataAdapter);
    return;
}
else
{
    throw new QueueCacheMissException(sequenceToken,
        messageBlocks.Last.Value.GetOldestSequenceToken(cacheDataAdapter),
        messageBlocks.First.Value.GetNewestSequenceToken(cacheDataAdapter));
}
@ghost ghost added the Needs: triage 🔍 label Feb 18, 2024
gusuchengnan added a commit to gusuchengnan/fix-orleans-streaming that referenced this issue Mar 13, 2024
…es not exists, do not throw an exception, just start from the oldest message in cache dotnet#8863
@benjaminpetit
Copy link
Member

We want to throw QueueCacheMissException if we cannot guarantee that we didn't miss any event. QueueCacheMissException should be handled by the application code, and should be read as "we may have missed some events for this stream".

The lastPurgedToken dictionnary might not contains all the purged token (only recent streams purged by this streaming agent).

@benjaminpetit benjaminpetit self-assigned this Mar 14, 2024
@gusuchengnan
Copy link
Author

We want to throw QueueCacheMissException if we cannot guarantee that we didn't miss any event. QueueCacheMissException should be handled by the application code, and should be read as "we may have missed some events for this stream".

The lastPurgedToken dictionnary might not contains all the purged token (only recent streams purged by this streaming agent).

I found during testing that when the stream is idle for a period of time (more than 20 minutes) without any messages, when a new message needs to be transmitted through the stream, it will trigger a QueueCacheMissException, the message are not distributed and will be lost.
I suspect it may be related to the handling of [PurgeMetadata()]:

`private void PurgeMetadata()
{
var now = DateTime.UtcNow;

// Get all keys older than this.purgeMetadataInterval
foreach (var kvp in this.lastPurgedToken)
{
	if (kvp.Value.TimeStamp + this.purgeMetadataInterval < now)
	{
		lastPurgedToken.Remove(kvp.Key);
	}
}

}`

lastPurgedToken will periodically remove data that is considered expired based on the [purgeMetadataInterval],resulting in the inability to locate the last purged token when a new one arrives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants