Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve grain directory cache consistency #8696

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Orleans.Core.Abstractions/IDs/GrainAddress.cs
Expand Up @@ -62,6 +62,11 @@ public bool Matches(GrainAddress other)
&& (_activationId.IsDefault || other._activationId.IsDefault || _activationId.Equals(other._activationId));
}

internal static bool MatchesGrainIdAndSilo(GrainAddress address, GrainAddress other)
{
return other is not null && address.GrainId.Equals(other.GrainId) && (address.SiloAddress?.Equals(other.SiloAddress) ?? other.SiloAddress is null);
}

public override int GetHashCode() => HashCode.Combine(SiloAddress, _grainId, _activationId);

public override string ToString() => $"[{nameof(GrainAddress)} GrainId {_grainId}, ActivationId: {_activationId}, SiloAddress: {SiloAddress}]";
Expand Down
109 changes: 109 additions & 0 deletions src/Orleans.Core.Abstractions/IDs/GrainAddressCacheUpdate.cs
@@ -0,0 +1,109 @@
using System;
using System.Diagnostics.CodeAnalysis;

#nullable enable
namespace Orleans.Runtime;

/// <summary>
/// Represents a directive to update an invalid, cached <see cref="GrainAddress"/> to a valid <see cref="GrainAddress"/>.
/// </summary>
[GenerateSerializer, Immutable]
public sealed class GrainAddressCacheUpdate : ISpanFormattable
{
[Id(0)]
private readonly GrainId _grainId;

[Id(1)]
private readonly ActivationId _invalidActivationId;

[Id(2)]
private readonly SiloAddress? _invalidSiloAddress;

[Id(3)]
private readonly MembershipVersion _invalidMembershipVersion = MembershipVersion.MinValue;

[Id(4)]
private readonly ActivationId _validActivationId;

[Id(5)]
private readonly SiloAddress? _validSiloAddress;

[Id(6)]
private readonly MembershipVersion _validMembershipVersion = MembershipVersion.MinValue;

public GrainAddressCacheUpdate(GrainAddress invalidAddress, GrainAddress? validAddress)
{
ArgumentNullException.ThrowIfNull(invalidAddress);

_grainId = invalidAddress.GrainId;
_invalidActivationId = invalidAddress.ActivationId;
_invalidSiloAddress = invalidAddress.SiloAddress;
_invalidMembershipVersion = invalidAddress.MembershipVersion;

if (validAddress is not null)
{
if (invalidAddress.GrainId != validAddress.GrainId)
{
ThrowGrainIdDoesNotMatch(invalidAddress, validAddress);
return;
}

_validActivationId = validAddress.ActivationId;
_validSiloAddress = validAddress.SiloAddress;
_validMembershipVersion = validAddress.MembershipVersion;
}
}

/// <summary>
/// Identifier of the Grain.
/// </summary>
public GrainId GrainId => _grainId;

/// <summary>
/// Identifier of the invalid grain activation.
/// </summary>
public ActivationId InvalidActivationId => _invalidActivationId;

/// <summary>
/// Address of the silo indicated by the invalid grain activation cache entry.
/// </summary>
public SiloAddress? InvalidSiloAddress => _invalidSiloAddress;

/// <summary>
/// Gets the valid grain activation address.
/// </summary>
public GrainAddress? ValidGrainAddress => _validSiloAddress switch
{
null => null,
_ => new()
{
GrainId = _grainId,
ActivationId = _validActivationId,
SiloAddress = _validSiloAddress,
MembershipVersion = _validMembershipVersion,
}
};

/// <summary>
/// Gets the invalid grain activation address.
/// </summary>
public GrainAddress InvalidGrainAddress => new()
{
GrainId = _grainId,
ActivationId = _invalidActivationId,
SiloAddress = _invalidSiloAddress,
MembershipVersion = _invalidMembershipVersion,
};

public override string ToString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]";

string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString();

bool ISpanFormattable.TryFormat(Span<char> destination, out int charsWritten, ReadOnlySpan<char> format, IFormatProvider? provider)
=> destination.TryWrite($"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}]", out charsWritten);

public string ToFullString() => $"[{nameof(GrainAddressCacheUpdate)} GrainId {_grainId}, InvalidActivationId: {_invalidActivationId}, InvalidSiloAddress: {_invalidSiloAddress}, ValidGrainAddress: {ValidGrainAddress}, MembershipVersion: {_invalidMembershipVersion}]";

[DoesNotReturn]
private static void ThrowGrainIdDoesNotMatch(GrainAddress invalidAddress, GrainAddress validAddress) => throw new ArgumentException($"Invalid grain address grain id {invalidAddress.GrainId} does not match valid grain address grain id {validAddress.GrainId}.", nameof(validAddress));
}
8 changes: 4 additions & 4 deletions src/Orleans.Core/GrainDirectory/IGrainLocator.cs
Expand Up @@ -33,11 +33,11 @@ public interface IGrainLocator
ValueTask<GrainAddress?> Lookup(GrainId grainId);

/// <summary>
/// Records a grain placement decision.
/// Updates the cache with a grain placement decision or known activation address.
/// </summary>
/// <param name="grainId">The newly placed grain.</param>
/// <param name="siloAddress">The placement result.</param>
void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress);
/// <param name="grainId">The grain identifier.</param>
/// <param name="siloAddress">The silo which may host the grain.</param>
void UpdateCache(GrainId grainId, SiloAddress siloAddress);

/// <summary>
/// Invalidates any lookup cache entry associated with the provided grain id.
Expand Down
10 changes: 5 additions & 5 deletions src/Orleans.Core/Messaging/Message.cs
Expand Up @@ -32,7 +32,7 @@ internal sealed class Message : ISpanFormattable
public ushort _interfaceVersion;
public GrainInterfaceType _interfaceType;

public List<GrainAddress> _cacheInvalidationHeader;
public List<GrainAddressCacheUpdate> _cacheInvalidationHeader;

public PackedHeaders Headers { get => _headers; set => _headers = value; }

Expand Down Expand Up @@ -204,7 +204,7 @@ internal void SetInfiniteTimeToLive()
_timeToExpiry = default;
}

public List<GrainAddress> CacheInvalidationHeader
public List<GrainAddressCacheUpdate> CacheInvalidationHeader
{
get => _cacheInvalidationHeader;
set
Expand Down Expand Up @@ -245,15 +245,15 @@ public bool IsExpirableMessage(bool dropExpiredMessages)
return Direction != Directions.OneWay && !id.IsSystemTarget();
}

internal void AddToCacheInvalidationHeader(GrainAddress address)
internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAddress validAddress)
{
var list = new List<GrainAddress>();
var list = new List<GrainAddressCacheUpdate>();
if (CacheInvalidationHeader != null)
{
list.AddRange(CacheInvalidationHeader);
}

list.Add(address);
list.Add(new GrainAddressCacheUpdate(invalidAddress, validAddress));
CacheInvalidationHeader = list;
}

Expand Down
14 changes: 7 additions & 7 deletions src/Orleans.Core/Messaging/MessageSerializer.cs
Expand Up @@ -27,7 +27,7 @@ internal sealed class MessageSerializer
private const int MessageSizeHint = 4096;
private readonly Dictionary<Type, ResponseCodec> _rawResponseCodecs = new();
private readonly CodecProvider _codecProvider;
private readonly IFieldCodec<GrainAddress> _activationAddressCodec;
private readonly IFieldCodec<GrainAddressCacheUpdate> _activationAddressCodec;
private readonly CachingSiloAddressCodec _readerSiloAddressCodec = new();
private readonly CachingSiloAddressCodec _writerSiloAddressCodec = new();
private readonly CachingIdSpanCodec _idSpanCodec = new();
Expand All @@ -49,7 +49,7 @@ internal sealed class MessageSerializer
_maxBodyLength = options.MaxMessageBodySize;
_codecProvider = sessionPool.CodecProvider;
_requestContextCodec = OrleansGeneratedCodeHelper.GetService<DictionaryCodec<string, object>>(this, sessionPool.CodecProvider);
_activationAddressCodec = OrleansGeneratedCodeHelper.GetService<IFieldCodec<GrainAddress>>(this, sessionPool.CodecProvider);
_activationAddressCodec = OrleansGeneratedCodeHelper.GetService<IFieldCodec<GrainAddressCacheUpdate>>(this, sessionPool.CodecProvider);
_bufferWriter = new(FramingLength, MessageSizeHint, memoryPool.Pool);
}

Expand Down Expand Up @@ -299,12 +299,12 @@ private void Deserialize<TInput>(ref Reader<TInput> reader, Message result)
}
}

private List<GrainAddress> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInput> reader)
internal List<GrainAddressCacheUpdate> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInput> reader)
{
var n = (int)reader.ReadVarUInt32();
if (n > 0)
{
var list = new List<GrainAddress>(n);
var list = new List<GrainAddressCacheUpdate>(n);
for (int i = 0; i < n; i++)
{
list.Add(_activationAddressCodec.ReadValue(ref reader, reader.ReadFieldHeader()));
Expand All @@ -313,15 +313,15 @@ private List<GrainAddress> ReadCacheInvalidationHeaders<TInput>(ref Reader<TInpu
return list;
}

return new List<GrainAddress>();
return new List<GrainAddressCacheUpdate>();
}

private void WriteCacheInvalidationHeaders<TBufferWriter>(ref Writer<TBufferWriter> writer, List<GrainAddress> value) where TBufferWriter : IBufferWriter<byte>
internal void WriteCacheInvalidationHeaders<TBufferWriter>(ref Writer<TBufferWriter> writer, List<GrainAddressCacheUpdate> value) where TBufferWriter : IBufferWriter<byte>
{
writer.WriteVarUInt32((uint)value.Count);
foreach (var entry in value)
{
_activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddress), entry);
_activationAddressCodec.WriteField(ref writer, 0, typeof(GrainAddressCacheUpdate), entry);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Expand Up @@ -894,8 +894,8 @@ void ProcessPendingRequests()
if (!compatibilityDirector.IsCompatible(message.InterfaceVersion, currentVersion))
{
// Add this activation to cache invalidation headers.
message.CacheInvalidationHeader ??= new();
message.CacheInvalidationHeader.Add(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress });
message.CacheInvalidationHeader ??= new List<GrainAddressCacheUpdate>();
message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(new GrainAddress { GrainId = GrainId, SiloAddress = Address.SiloAddress }, validAddress: null));

var reason = new DeactivationReason(
DeactivationReasonCode.IncompatibleRequest,
Expand Down Expand Up @@ -1302,7 +1302,7 @@ private void RejectAllQueuedMessages()
"RejectAllQueuedMessages: {Count} messages from invalid activation {Activation}.",
msgs.Count,
this);
_shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address);
_shared.InternalRuntime.GrainLocator.InvalidateCache(Address);
_shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(
msgs,
Address,
Expand All @@ -1324,7 +1324,7 @@ private void RerouteAllQueuedMessages()
}

if (_shared.Logger.IsEnabled(LogLevel.Debug)) _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain}", msgs.Count, this);
_shared.InternalRuntime.LocalGrainDirectory.InvalidateCacheEntry(Address);
_shared.InternalRuntime.GrainLocator.InvalidateCache(Address);
_shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(msgs, Address, ForwardingAddress, DeactivationReason.Description, DeactivationException);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Catalog/Catalog.cs
Expand Up @@ -312,7 +312,7 @@ static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)

CatalogInstruments.NonExistentActivations.Add(1);

self.directory.InvalidateCacheEntry(grainId);
self.grainLocator.InvalidateCache(grainId);

// Unregister the target activation so we don't keep getting spurious messages.
// The time delay (one minute, as of this writing) is to handle the unlikely but possible race where
Expand Down
6 changes: 3 additions & 3 deletions src/Orleans.Runtime/Core/InsideRuntimeClient.cs
Expand Up @@ -200,9 +200,9 @@ public void SniffIncomingMessage(Message message)
{
if (message.CacheInvalidationHeader != null)
{
foreach (GrainAddress address in message.CacheInvalidationHeader)
foreach (var update in message.CacheInvalidationHeader)
{
GrainLocator.InvalidateCache(address);
GrainLocator.UpdateCache(update);
}
}

Expand Down Expand Up @@ -392,7 +392,7 @@ public void ReceiveResponse(Message message)
if (message.CacheInvalidationHeader is null)
{
// Remove from local directory cache. Note that SendingGrain is the original target, since message is the rejection response.
// If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatability.
// If CacheInvalidationHeader is present, we already did this. Otherwise, we left this code for backward compatibility.
// It should be retired as we move to use CacheMgmtHeader in all relevant places.
this.GrainLocator.InvalidateCache(message.SendingGrain);
}
Expand Down
Expand Up @@ -41,7 +41,7 @@ internal void Refresh(TimeSpan newExpirationTimer)
}
}

private static readonly Func<GrainAddress, GrainDirectoryCacheEntry, bool> ActivationAddressesMatches = (addr, entry) => addr.Matches(entry.Address);
private static readonly Func<GrainAddress, GrainDirectoryCacheEntry, bool> ActivationAddressesMatches = (addr, entry) => GrainAddress.MatchesGrainIdAndSilo(addr, entry.Address);

private readonly LRU<GrainId, GrainDirectoryCacheEntry> cache;
/// controls the time the new entry is considered "fresh" (unit: ms)
Expand Down Expand Up @@ -88,7 +88,7 @@ public bool LookUp(GrainId key, out GrainAddress result, out int version)

// Here we do not check whether the found entry is expired.
// It will be done by the thread managing the cache.
// This is to avoid situation where the entry was just expired, but the manager still have not run and have not refereshed it.
// This is to avoid situation where the entry was just expired, but the manager still have not run and have not refreshed it.
if (!cache.TryGetValue(key, out var tmp))
{
result = default;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Expand Up @@ -117,7 +117,7 @@ public async Task Unregister(GrainAddress address, UnregistrationCause cause)
await GetGrainDirectory(address.GrainId.Type).Unregister(address);

// There is the potential for a lookup to race with the Unregister and add the bad entry back to the cache.
if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry == address)
if (this.cache.LookUp(address.GrainId, out var entry, out _) && entry.Equals(address))
{
this.cache.Remove(address);
}
Expand Down Expand Up @@ -192,7 +192,7 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh

private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}");

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public void InvalidateCache(GrainId grainId) => cache.Remove(grainId);
public void InvalidateCache(GrainAddress address) => cache.Remove(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/GrainDirectory/ClientGrainLocator.cs
Expand Up @@ -61,7 +61,7 @@ private GrainAddress SelectAddress(List<GrainAddress> results, GrainId grainId)

private static void ThrowNotClientGrainId(GrainId grainId) => throw new InvalidOperationException($"{grainId} is not a client id");

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) { }
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) { }

public void InvalidateCache(GrainId grainId) { }

Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/DhtGrainLocator.cs
Expand Up @@ -39,7 +39,7 @@ public Task Unregister(GrainAddress address, UnregistrationCause cause)
{
UnregistrationCause.Force => _forceWorker,
UnregistrationCause.NonexistentActivation => _neaWorker,
_ => throw new ArgumentOutOfRangeException($"Unregistration cause {cause} is unknown and is not supported. This is a bug."),
_ => throw new ArgumentOutOfRangeException($"Deregistration cause {cause} is unknown and is not supported. This is a bug."),
};

return worker.Unregister(address);
Expand Down Expand Up @@ -70,7 +70,7 @@ void EnsureInitialized()
public static DhtGrainLocator FromLocalGrainDirectory(LocalGrainDirectory localGrainDirectory)
=> new(localGrainDirectory, localGrainDirectory.RemoteGrainDirectory);

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.CachePlacementDecision(grainId, siloAddress);
public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => _localGrainDirectory.AddOrUpdateCacheEntry(grainId, siloAddress);
public void InvalidateCache(GrainId grainId) => _localGrainDirectory.InvalidateCacheEntry(grainId);
public void InvalidateCache(GrainAddress address) => _localGrainDirectory.InvalidateCacheEntry(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address) => _localGrainDirectory.TryCachedLookup(grainId, out address);
Expand Down
18 changes: 16 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/GrainLocator.cs
@@ -1,4 +1,5 @@
#nullable enable
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Orleans.GrainDirectory;
Expand Down Expand Up @@ -29,8 +30,21 @@ public GrainLocator(GrainLocatorResolver grainLocatorResolver)

public void InvalidateCache(GrainAddress address) => GetGrainLocator(address.GrainId.Type).InvalidateCache(address);

public void CachePlacementDecision(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).CachePlacementDecision(grainId, siloAddress);

private IGrainLocator GetGrainLocator(GrainType grainType) => _grainLocatorResolver.GetGrainLocator(grainType);

public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => GetGrainLocator(grainId.Type).UpdateCache(grainId, siloAddress);

public void UpdateCache(GrainAddressCacheUpdate update)
{
if (update.ValidGrainAddress is { } validAddress)
{
Debug.Assert(validAddress.SiloAddress is not null);
UpdateCache(validAddress.GrainId, validAddress.SiloAddress);
}
else
{
InvalidateCache(update.InvalidGrainAddress);
}
}
}
}