Skip to content

Commit

Permalink
In PooledQueueCache, avoid cache miss if we know that we didn't misse…
Browse files Browse the repository at this point in the history
…d any event (#7060)

Co-authored-by: Reuben Bond <203839+ReubenBond@users.noreply.github.com>
  • Loading branch information
benjaminpetit and ReubenBond committed Jun 4, 2021
1 parent 8255e5a commit c3b1e6f
Show file tree
Hide file tree
Showing 14 changed files with 626 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class EventHubQueueCache : IEventHubQueueCache
private readonly ILogger logger;
private readonly AggregatedCachePressureMonitor cachePressureMonitor;
private readonly ICacheMonitor cacheMonitor;

private FixedSizeBuffer currentBuffer;

/// <summary>
Expand All @@ -46,6 +45,7 @@ public class EventHubQueueCache : IEventHubQueueCache
/// <param name="logger"></param>
/// <param name="cacheMonitor"></param>
/// <param name="cacheMonitorWriteInterval"></param>
/// <param name="metadataMinTimeInCache"></param>
public EventHubQueueCache(
string partition,
int defaultMaxAddCount,
Expand All @@ -55,14 +55,15 @@ public class EventHubQueueCache : IEventHubQueueCache
IStreamQueueCheckpointer<string> checkpointer,
ILogger logger,
ICacheMonitor cacheMonitor,
TimeSpan? cacheMonitorWriteInterval)
TimeSpan? cacheMonitorWriteInterval,
TimeSpan? metadataMinTimeInCache)
{
this.Partition = partition;
this.defaultMaxAddCount = defaultMaxAddCount;
this.bufferPool = bufferPool;
this.dataAdapter = dataAdapter;
this.checkpointer = checkpointer;
this.cache = new PooledQueueCache(dataAdapter, logger, cacheMonitor, cacheMonitorWriteInterval);
this.cache = new PooledQueueCache(dataAdapter, logger, cacheMonitor, cacheMonitorWriteInterval, metadataMinTimeInCache);
this.cacheMonitor = cacheMonitor;
this.evictionStrategy = evictionStrategy;
this.evictionStrategy.OnPurged = this.OnPurge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Orleans.ServiceBus.Providers
public class EventHubQueueCacheFactory : IEventHubQueueCacheFactory
{
private readonly EventHubStreamCachePressureOptions cacheOptions;
private readonly StreamCacheEvictionOptions evictionOptions;
private readonly StreamStatisticOptions statisticOptions;
private readonly IEventHubDataAdapter dataAdater;
private readonly SerializationManager serializationManager;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class EventHubQueueCacheFactory : IEventHubQueueCacheFactory
Func<EventHubBlockPoolMonitorDimensions, ILoggerFactory, ITelemetryProducer, IBlockPoolMonitor> blockPoolMonitorFactory = null)
{
this.cacheOptions = cacheOptions;
this.evictionOptions = evictionOptions;
this.statisticOptions = statisticOptions;
this.dataAdater = dataAdater;
this.serializationManager = serializationManager;
Expand All @@ -66,7 +68,7 @@ public IEventHubQueueCache CreateCache(string partition, IStreamQueueCheckpointe
{
string blockPoolId;
var blockPool = CreateBufferPool(this.statisticOptions, loggerFactory, this.sharedDimensions, telemetryProducer, out blockPoolId);
var cache = CreateCache(partition, dataAdater, this.statisticOptions, checkpointer, loggerFactory, blockPool, blockPoolId, this.timePurge, this.serializationManager, this.sharedDimensions, telemetryProducer);
var cache = CreateCache(partition, dataAdater, this.statisticOptions, this.evictionOptions, checkpointer, loggerFactory, blockPool, blockPoolId, this.timePurge, this.serializationManager, this.sharedDimensions, telemetryProducer);
AddCachePressureMonitors(cache, this.cacheOptions, loggerFactory.CreateLogger($"{typeof(EventHubQueueCache).FullName}.{this.sharedDimensions.EventHubPath}.{partition}"));
return cache;
}
Expand Down Expand Up @@ -127,16 +129,26 @@ protected virtual IObjectPool<FixedSizeBuffer> CreateBufferPool(StreamStatisticO
/// Default function to be called to create an EventhubQueueCache in IEventHubQueueCacheFactory.CreateCache method. User can
/// override this method to add more customization.
/// </summary>
protected virtual IEventHubQueueCache CreateCache(string partition, IEventHubDataAdapter dataAdatper, StreamStatisticOptions statisticOptions, IStreamQueueCheckpointer<string> checkpointer,
ILoggerFactory loggerFactory, IObjectPool<FixedSizeBuffer> bufferPool, string blockPoolId, TimePurgePredicate timePurge,
SerializationManager serializationManager, EventHubMonitorAggregationDimensions sharedDimensions, ITelemetryProducer telemetryProducer)
protected virtual IEventHubQueueCache CreateCache(
string partition,
IEventHubDataAdapter dataAdatper,
StreamStatisticOptions statisticOptions,
StreamCacheEvictionOptions streamCacheEvictionOptions,
IStreamQueueCheckpointer<string> checkpointer,
ILoggerFactory loggerFactory,
IObjectPool<FixedSizeBuffer> bufferPool,
string blockPoolId,
TimePurgePredicate timePurge,
SerializationManager serializationManager,
EventHubMonitorAggregationDimensions sharedDimensions,
ITelemetryProducer telemetryProducer)
{
var cacheMonitorDimensions = new EventHubCacheMonitorDimensions(sharedDimensions, partition, blockPoolId);
var cacheMonitor = this.CacheMonitorFactory(cacheMonitorDimensions, loggerFactory, telemetryProducer);
var logger = loggerFactory.CreateLogger($"{typeof(EventHubQueueCache).FullName}.{sharedDimensions.EventHubPath}.{partition}");
var evictionStrategy = new ChronologicalEvictionStrategy(logger, timePurge, cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);
return new EventHubQueueCache(partition, EventHubAdapterReceiver.MaxMessagesPerRead, bufferPool, dataAdatper, evictionStrategy, checkpointer, logger,
cacheMonitor, statisticOptions.StatisticMonitorWriteInterval);
cacheMonitor, statisticOptions.StatisticMonitorWriteInterval, streamCacheEvictionOptions.MetadataMinTimeInCache);
}
}
}
13 changes: 12 additions & 1 deletion src/Orleans.Core/Streams/Core/StreamIdentity.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@


using System;
using System.Collections.Generic;

namespace Orleans.Streams
{
Expand All @@ -25,5 +26,15 @@ public StreamIdentity(Guid streamGuid, string streamNamespace)
/// Stream namespace.
/// </summary>
public string Namespace { get; }

public override bool Equals(object obj) => obj is StreamIdentity identity && this.Guid.Equals(identity.Guid) && this.Namespace == identity.Namespace;

public override int GetHashCode()
{
var hashCode = -1455462324;
hashCode = hashCode * -1521134295 + this.Guid.GetHashCode();
hashCode = hashCode * -1521134295 + EqualityComparer<string>.Default.GetHashCode(this.Namespace);
return hashCode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal class StreamConsumerCollection
{
private readonly Dictionary<GuidId, StreamConsumerData> queueData; // map of consumers for one stream: from Guid ConsumerId to StreamConsumerData
private DateTime lastActivityTime;

public bool StreamRegistered { get; set; }

public StreamConsumerCollection(DateTime now)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;
using Orleans.Internal;
using Orleans.Streams;

namespace Orleans.Providers.Streams.Common
Expand All @@ -29,7 +30,11 @@ public class PooledQueueCache: IPurgeObservable
private readonly ICacheDataAdapter cacheDataAdapter;
private readonly ILogger logger;
private readonly ICacheMonitor cacheMonitor;
private readonly TimeSpan purgeMetadataInterval;
private readonly PeriodicAction periodicMonitoring;
private readonly PeriodicAction periodicMetadaPurging;

private readonly Dictionary<IStreamIdentity, (DateTime TimeStamp, StreamSequenceToken Token)> lastPurgedToken = new Dictionary<IStreamIdentity, (DateTime TimeStamp, StreamSequenceToken Token)>();

/// <summary>
/// Cached message most recently added
Expand Down Expand Up @@ -69,19 +74,30 @@ public class PooledQueueCache: IPurgeObservable
/// <param name="logger"></param>
/// <param name="cacheMonitor"></param>
/// <param name="cacheMonitorWriteInterval">cache monitor write interval. Only triggered for active caches.</param>
public PooledQueueCache(ICacheDataAdapter cacheDataAdapter, ILogger logger, ICacheMonitor cacheMonitor, TimeSpan? cacheMonitorWriteInterval)
/// <param name="purgeMetadataInterval"></param>
public PooledQueueCache(
ICacheDataAdapter cacheDataAdapter,
ILogger logger,
ICacheMonitor cacheMonitor,
TimeSpan? cacheMonitorWriteInterval,
TimeSpan? purgeMetadataInterval = null)
{
this.cacheDataAdapter = cacheDataAdapter ?? throw new ArgumentNullException("cacheDataAdapter");
this.logger = logger ?? throw new ArgumentNullException("logger");
this.ItemCount = 0;
pool = new CachedMessagePool(cacheDataAdapter);
messageBlocks = new LinkedList<CachedMessageBlock>();
this.cacheMonitor = cacheMonitor;

if (this.cacheMonitor != null && cacheMonitorWriteInterval.HasValue)
{
this.periodicMonitoring = new PeriodicAction(cacheMonitorWriteInterval.Value, this.ReportCacheMessageStatistics);
}

if (purgeMetadataInterval.HasValue)
{
this.purgeMetadataInterval = purgeMetadataInterval.Value;
this.periodicMetadaPurging = new PeriodicAction(purgeMetadataInterval.Value.Divide(5), this.PurgeMetadata);
}
}

/// <summary>
Expand Down Expand Up @@ -121,6 +137,41 @@ private void ReportCacheMessageStatistics()
}
}

private void PurgeMetadata()
{
var now = DateTime.UtcNow;
var keys = new List<IStreamIdentity>();

// Get all keys older than this.purgeMetadataInterval
foreach (var kvp in this.lastPurgedToken)
{
if (kvp.Value.TimeStamp + this.purgeMetadataInterval < now)
{
keys.Add(kvp.Key);
}
}

// Remove the expired entries
foreach (var key in keys)
{
this.lastPurgedToken.Remove(key);
}
}

private void TrackAndPurgeMetadata(CachedMessage messageToRemove)
{
// If tracking of evicted message metadata is disabled, do nothing
if (this.periodicMetadaPurging == null)
return;

var now = DateTime.UtcNow;
var streamId = new StreamIdentity(messageToRemove.StreamGuid, messageToRemove.StreamNamespace);
var token = this.cacheDataAdapter.GetSequenceToken(ref messageToRemove);
this.lastPurgedToken[streamId] = (now, token);

this.periodicMetadaPurging.TryAction(now);
}

private void SetCursor(Cursor cursor, StreamSequenceToken sequenceToken)
{
// If nothing in cache, unset token, and wait for more data.
Expand Down Expand Up @@ -153,13 +204,27 @@ private void SetCursor(Cursor cursor, StreamSequenceToken sequenceToken)
}

// Check to see if sequenceToken is too old to be in cache
CachedMessage oldestMessage = messageBlocks.Last.Value.OldestMessage;
var oldestBlock = messageBlocks.Last;
var oldestMessage = oldestBlock.Value.OldestMessage;
if (oldestMessage.Compare(sequenceToken) > 0)
{
// throw cache miss exception
throw new QueueCacheMissException(sequenceToken,
messageBlocks.Last.Value.GetOldestSequenceToken(cacheDataAdapter),
messageBlocks.First.Value.GetNewestSequenceToken(cacheDataAdapter));
// Check if the sequenceToken correspond to the last message purged from cache
var streamIdentity = new StreamIdentity(cursor.StreamIdentity.Guid, cursor.StreamIdentity.Namespace);
if (this.lastPurgedToken.TryGetValue(streamIdentity, out var entry) && sequenceToken.Equals(entry.Token))
{
// If it maches, then we didn't lose anything. Start from the oldest message in cache
cursor.State = CursorStates.Set;
cursor.CurrentBlock = oldestBlock;
cursor.Index = oldestBlock.Value.OldestMessageIndex;
cursor.SequenceToken = oldestBlock.Value.GetOldestSequenceToken(cacheDataAdapter);
return;
}
else
{
throw new QueueCacheMissException(cursor.SequenceToken,
messageBlocks.Last.Value.GetOldestSequenceToken(cacheDataAdapter),
messageBlocks.First.Value.GetNewestSequenceToken(cacheDataAdapter));
}
}

// Find block containing sequence number, starting from the newest and working back to oldest
Expand Down Expand Up @@ -312,6 +377,8 @@ private void Add(CachedMessage message)
/// </summary>
public void RemoveOldestMessage()
{
TrackAndPurgeMetadata(this.messageBlocks.Last.Value.OldestMessage);

this.messageBlocks.Last.Value.Remove();
this.ItemCount--;
CachedMessageBlock lastCachedMessageBlock = this.messageBlocks.Last.Value;
Expand Down
15 changes: 14 additions & 1 deletion src/OrleansProviders/Streams/Common/RecoverableStreamOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Internal;
using Orleans.Providers.Streams.Common;
using Orleans.Runtime;
using Orleans.Streams;
Expand All @@ -25,6 +26,18 @@ public class StreamCacheEvictionOptions
/// Default DataMaxAgeInCache
/// </summary>
public static readonly TimeSpan DefaultDataMaxAgeInCache = TimeSpan.FromMinutes(30);

/// <summary>
/// Minimum time message metadata (<see cref="StreamSequenceToken"/>) will stay in cache before it is available for time based purge.
/// Used to avoid cache miss if the full message was purged.
/// Set to null to disable this tracking.
/// </summary>
public TimeSpan? MetadataMinTimeInCache { get; set; } = DefaultMetadataMinTimeInCache;

/// <summary>
/// Default MetadataMinTimeInCache
/// </summary>
public static readonly TimeSpan DefaultMetadataMinTimeInCache = DefaultDataMinTimeInCache.Multiply(2);
}

public class StreamStatisticOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class EventHubQueueCacheForTesting : EventHubQueueCache
{
public EventHubQueueCacheForTesting(IObjectPool<FixedSizeBuffer> bufferPool, IEventHubDataAdapter dataAdapter, IEvictionStrategy evictionStrategy, IStreamQueueCheckpointer<string> checkpointer,
ILogger logger)
:base("test", EventHubAdapterReceiver.MaxMessagesPerRead, bufferPool, dataAdapter, evictionStrategy, checkpointer, logger, null, null)
:base("test", EventHubAdapterReceiver.MaxMessagesPerRead, bufferPool, dataAdapter, evictionStrategy, checkpointer, logger, null, null, null)
{ }

public int ItemCount => this.cache.ItemCount;
Expand Down
Loading

0 comments on commit c3b1e6f

Please sign in to comment.