From 3d7343e1c1c7c8e859929954fea99f3ed5151819 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Tue, 21 May 2024 16:54:26 -0700 Subject: [PATCH] Improve `ActivationData` shutdown process (#9018) * Fix termination condition in ActivationMigrationManager.AcceptMigratingGrains * Improve ActivationData shutdown process * Use file-scoped namespaces in ActivationData --- .../Core/DeactivationReason.cs | 11 + src/Orleans.Runtime/Catalog/ActivationData.cs | 2994 ++++++++--------- .../Catalog/ActivationState.cs | 6 +- src/Orleans.Runtime/Catalog/Catalog.cs | 12 - 4 files changed, 1508 insertions(+), 1515 deletions(-) diff --git a/src/Orleans.Core.Abstractions/Core/DeactivationReason.cs b/src/Orleans.Core.Abstractions/Core/DeactivationReason.cs index a9839fd033..55d01b878a 100644 --- a/src/Orleans.Core.Abstractions/Core/DeactivationReason.cs +++ b/src/Orleans.Core.Abstractions/Core/DeactivationReason.cs @@ -56,5 +56,16 @@ public DeactivationReason(DeactivationReasonCode code, Exception exception, stri /// Gets the exception which resulted in deactivation. /// public Exception Exception { get; } + + /// + public override string ToString() + { + if (Exception is not null) + { + return $"{ReasonCode}: {Description}. Exception: {Exception}"; + } + + return $"{ReasonCode}: {Description}"; + } } } diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index 595af00e55..575abffa60 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -18,2054 +18,2048 @@ using Orleans.Serialization.Session; using Orleans.Serialization.TypeSystem; -namespace Orleans.Runtime +namespace Orleans.Runtime; + +/// +/// Maintains additional per-activation state that is required for Orleans internal operations. +/// MUST lock this object for any concurrent access +/// Consider: compartmentalize by usage, e.g., using separate interfaces for data for catalog, etc. +/// +internal sealed class ActivationData : IGrainContext, ICollectibleGrainContext, IGrainExtensionBinder, IActivationWorkingSetMember, IGrainTimerRegistry, IGrainManagementExtension, ICallChainReentrantGrainContext, IAsyncDisposable { - /// - /// Maintains additional per-activation state that is required for Orleans internal operations. - /// MUST lock this object for any concurrent access - /// Consider: compartmentalize by usage, e.g., using separate interfaces for data for catalog, etc. - /// - internal sealed class ActivationData : IGrainContext, ICollectibleGrainContext, IGrainExtensionBinder, IActivationWorkingSetMember, IGrainTimerRegistry, IGrainManagementExtension, ICallChainReentrantGrainContext, IAsyncDisposable - { - private const string GrainAddressMigrationContextKey = "sys.addr"; - private readonly GrainTypeSharedContext _shared; - private readonly IServiceScope _serviceScope; - private readonly WorkItemGroup _workItemGroup; - private readonly List<(Message Message, CoarseStopwatch QueuedTime)> _waitingRequests = new(); - private readonly Dictionary _runningRequests = new(); - private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true }; - private GrainLifecycle _lifecycle; - private List _pendingOperations; - private Message _blockingRequest; - private bool _isInWorkingSet; - private CoarseStopwatch _busyDuration; - private CoarseStopwatch _idleDuration; - private GrainReference _selfReference; - - // Values which are needed less frequently and do not warrant living directly on activation for object size reasons. - // The values in this field are typically used to represent termination state of an activation or features which are not - // used by all grains, such as grain timers. - private ActivationDataExtra _extras; - - // The task representing this activation's message loop. - // This field is assigned and never read and exists only for debugging purposes (eg, in memory dumps, to associate a loop task with an activation). + private const string GrainAddressMigrationContextKey = "sys.addr"; + private readonly GrainTypeSharedContext _shared; + private readonly IServiceScope _serviceScope; + private readonly WorkItemGroup _workItemGroup; + private readonly List<(Message Message, CoarseStopwatch QueuedTime)> _waitingRequests = new(); + private readonly Dictionary _runningRequests = new(); + private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true }; + private GrainLifecycle _lifecycle; + private List _pendingOperations; + private Message _blockingRequest; + private bool _isInWorkingSet; + private CoarseStopwatch _busyDuration; + private CoarseStopwatch _idleDuration; + private GrainReference _selfReference; + + // Values which are needed less frequently and do not warrant living directly on activation for object size reasons. + // The values in this field are typically used to represent termination state of an activation or features which are not + // used by all grains, such as grain timers. + private ActivationDataExtra _extras; + + // The task representing this activation's message loop. + // This field is assigned and never read and exists only for debugging purposes (eg, in memory dumps, to associate a loop task with an activation). #pragma warning disable IDE0052 // Remove unread private members - private readonly Task _messageLoopTask; + private readonly Task _messageLoopTask; #pragma warning restore IDE0052 // Remove unread private members - public ActivationData( - GrainAddress addr, - Func createWorkItemGroup, - IServiceProvider applicationServices, - GrainTypeSharedContext shared) - { - _shared = shared; - Address = addr ?? throw new ArgumentNullException(nameof(addr)); - State = ActivationState.Create; - _serviceScope = applicationServices.CreateScope(); - _isInWorkingSet = true; - _workItemGroup = createWorkItemGroup(this); - _messageLoopTask = this.RunOrQueueTask(RunMessageLoop); - } + public ActivationData( + GrainAddress addr, + Func createWorkItemGroup, + IServiceProvider applicationServices, + GrainTypeSharedContext shared) + { + _shared = shared; + Address = addr ?? throw new ArgumentNullException(nameof(addr)); + State = ActivationState.Create; + _serviceScope = applicationServices.CreateScope(); + _isInWorkingSet = true; + _workItemGroup = createWorkItemGroup(this); + _messageLoopTask = this.RunOrQueueTask(RunMessageLoop); + } - public IGrainRuntime GrainRuntime => _shared.Runtime; - public object GrainInstance { get; private set; } - public GrainAddress Address { get; } - public GrainReference GrainReference => _selfReference ??= _shared.GrainReferenceActivator.CreateReference(GrainId, default); - public ActivationState State { get; private set; } - public PlacementStrategy PlacementStrategy => _shared.PlacementStrategy; - public DateTime CollectionTicket { get; set; } - public IServiceProvider ActivationServices => _serviceScope.ServiceProvider; - public ActivationId ActivationId => Address.ActivationId; - public IGrainLifecycle ObservableLifecycle => Lifecycle; - internal GrainLifecycle Lifecycle + public IGrainRuntime GrainRuntime => _shared.Runtime; + public object GrainInstance { get; private set; } + public GrainAddress Address { get; } + public GrainReference GrainReference => _selfReference ??= _shared.GrainReferenceActivator.CreateReference(GrainId, default); + public ActivationState State { get; private set; } + public PlacementStrategy PlacementStrategy => _shared.PlacementStrategy; + public DateTime CollectionTicket { get; set; } + public IServiceProvider ActivationServices => _serviceScope.ServiceProvider; + public ActivationId ActivationId => Address.ActivationId; + public IGrainLifecycle ObservableLifecycle => Lifecycle; + internal GrainLifecycle Lifecycle + { + get { - get - { - if (_lifecycle is { } lifecycle) return lifecycle; - lock (this) { return _lifecycle ??= new GrainLifecycle(_shared.Logger); } - } + if (_lifecycle is { } lifecycle) return lifecycle; + lock (this) { return _lifecycle ??= new GrainLifecycle(_shared.Logger); } } + } - public GrainId GrainId => Address.GrainId; - public bool IsExemptFromCollection => _shared.CollectionAgeLimit == Timeout.InfiniteTimeSpan; - public DateTime KeepAliveUntil { get; set; } = DateTime.MinValue; - public bool IsValid => State is ActivationState.Valid; + public GrainId GrainId => Address.GrainId; + public bool IsExemptFromCollection => _shared.CollectionAgeLimit == Timeout.InfiniteTimeSpan; + public DateTime KeepAliveUntil { get; set; } = DateTime.MinValue; + public bool IsValid => State is ActivationState.Valid; - // Currently, the only supported multi-activation grain is one using the StatelessWorkerPlacement strategy. - internal bool IsStatelessWorker => PlacementStrategy is StatelessWorkerPlacement; + // Currently, the only supported multi-activation grain is one using the StatelessWorkerPlacement strategy. + internal bool IsStatelessWorker => PlacementStrategy is StatelessWorkerPlacement; - /// - /// Returns a value indicating whether or not this placement strategy requires activations to be registered in - /// the grain directory. - /// - internal bool IsUsingGrainDirectory => PlacementStrategy.IsUsingGrainDirectory; + /// + /// Returns a value indicating whether or not this placement strategy requires activations to be registered in + /// the grain directory. + /// + internal bool IsUsingGrainDirectory => PlacementStrategy.IsUsingGrainDirectory; - public int WaitingCount => _waitingRequests.Count; - public bool IsInactive => !IsCurrentlyExecuting && _waitingRequests.Count == 0; - public bool IsCurrentlyExecuting => _runningRequests.Count > 0; - public IWorkItemScheduler Scheduler => _workItemGroup; - public Task Deactivated => GetDeactivationCompletionSource().Task; + public int WaitingCount => _waitingRequests.Count; + public bool IsInactive => !IsCurrentlyExecuting && _waitingRequests.Count == 0; + public bool IsCurrentlyExecuting => _runningRequests.Count > 0; + public IWorkItemScheduler Scheduler => _workItemGroup; + public Task Deactivated => GetDeactivationCompletionSource().Task; - public SiloAddress ForwardingAddress + public SiloAddress ForwardingAddress + { + get => _extras?.ForwardingAddress; + set { - get => _extras?.ForwardingAddress; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.ForwardingAddress = value; - } + _extras ??= new(); + _extras.ForwardingAddress = value; } } + } - /// - /// Gets the previous directory registration for this grain, if known. - /// This is used to update the grain directory to point to the new registration during activation. - /// - public GrainAddress PreviousRegistration + /// + /// Gets the previous directory registration for this grain, if known. + /// This is used to update the grain directory to point to the new registration during activation. + /// + public GrainAddress PreviousRegistration + { + get => _extras?.PreviousRegistration; + set { - get => _extras?.PreviousRegistration; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.PreviousRegistration = value; - } + _extras ??= new(); + _extras.PreviousRegistration = value; } } + } - private Exception DeactivationException => _extras?.DeactivationReason.Exception; + private Exception DeactivationException => _extras?.DeactivationReason.Exception; - private DeactivationReason DeactivationReason + private DeactivationReason DeactivationReason + { + get => _extras?.DeactivationReason ?? default; + set { - get => _extras?.DeactivationReason ?? default; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.DeactivationReason = value; - } + _extras ??= new(); + _extras.DeactivationReason = value; } } + } - private HashSet Timers + private HashSet Timers + { + get => _extras?.Timers; + set { - get => _extras?.Timers; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.Timers = value; - } + _extras ??= new(); + _extras.Timers = value; } } + } - private DateTime? DeactivationStartTime + private DateTime? DeactivationStartTime + { + get => _extras?.DeactivationStartTime; + set { - get => _extras?.DeactivationStartTime; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.DeactivationStartTime = value; - } + _extras ??= new(); + _extras.DeactivationStartTime = value; } } + } - private bool IsStuckDeactivating + private bool IsStuckDeactivating + { + get => _extras?.IsStuckDeactivating ?? false; + set { - get => _extras?.IsStuckDeactivating ?? false; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.IsStuckDeactivating = value; - } + _extras ??= new(); + _extras.IsStuckDeactivating = value; } } + } - private bool IsStuckProcessingMessage + private bool IsStuckProcessingMessage + { + get => _extras?.IsStuckProcessingMessage ?? false; + set { - get => _extras?.IsStuckProcessingMessage ?? false; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.IsStuckProcessingMessage = value; - } + _extras ??= new(); + _extras.IsStuckProcessingMessage = value; } } + } - private DehydrationContextHolder DehydrationContext + private DehydrationContextHolder DehydrationContext + { + get => _extras?.DehydrationContext; + set { - get => _extras?.DehydrationContext; - set + lock (this) { - lock (this) - { - _extras ??= new(); - _extras.DehydrationContext = value; - } + _extras ??= new(); + _extras.DehydrationContext = value; } } + } - public TimeSpan CollectionAgeLimit => _shared.CollectionAgeLimit; + public TimeSpan CollectionAgeLimit => _shared.CollectionAgeLimit; - public TTarget GetTarget() where TTarget : class => (TTarget)GrainInstance; + public TTarget GetTarget() where TTarget : class => (TTarget)GrainInstance; - TComponent ITargetHolder.GetComponent() + TComponent ITargetHolder.GetComponent() + { + var result = GetComponent(); + if (result is null && typeof(IGrainExtension).IsAssignableFrom(typeof(TComponent))) { - var result = GetComponent(); - if (result is null && typeof(IGrainExtension).IsAssignableFrom(typeof(TComponent))) + var implementation = ActivationServices.GetKeyedService(typeof(TComponent)); + if (implementation is not TComponent typedResult) { - var implementation = ActivationServices.GetKeyedService(typeof(TComponent)); - if (implementation is not TComponent typedResult) - { - throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TComponent)} is installed on this instance and no implementations are registered for automated install"); - } - - SetComponent(typedResult); - result = typedResult; + throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TComponent)} is installed on this instance and no implementations are registered for automated install"); } - return result; + SetComponent(typedResult); + result = typedResult; } - public TComponent GetComponent() where TComponent : class - { - TComponent result; - if (GrainInstance is TComponent grainResult) - { - result = grainResult; - } - else if (this is TComponent contextResult) - { - result = contextResult; - } - else if (_extras is { } components && components.TryGetValue(typeof(TComponent), out var resultObj)) - { - result = (TComponent)resultObj; - } - else if (ActivationServices.GetService() is { } component) - { - SetComponent(component); - result = component; - } - else - { - result = _shared.GetComponent(); - } + return result; + } - return result; + public TComponent GetComponent() where TComponent : class + { + TComponent result; + if (GrainInstance is TComponent grainResult) + { + result = grainResult; } - - public void SetComponent(TComponent instance) where TComponent : class + else if (this is TComponent contextResult) { - if (GrainInstance is TComponent) - { - throw new ArgumentException("Cannot override a component which is implemented by this grain"); - } - - if (this is TComponent) - { - throw new ArgumentException("Cannot override a component which is implemented by this grain context"); - } + result = contextResult; + } + else if (_extras is { } components && components.TryGetValue(typeof(TComponent), out var resultObj)) + { + result = (TComponent)resultObj; + } + else if (ActivationServices.GetService() is { } component) + { + SetComponent(component); + result = component; + } + else + { + result = _shared.GetComponent(); + } - lock (this) - { - if (instance == null) - { - _extras?.Remove(typeof(TComponent)); - return; - } + return result; + } - _extras ??= new(); - _extras[typeof(TComponent)] = instance; - } + public void SetComponent(TComponent instance) where TComponent : class + { + if (GrainInstance is TComponent) + { + throw new ArgumentException("Cannot override a component which is implemented by this grain"); } - internal void SetGrainInstance(object grainInstance) + if (this is TComponent) { - switch (GrainInstance, grainInstance) - { - case (null, not null): - _shared.OnCreateActivation(this); - GetComponent()?.OnCreateActivation(this); - break; - case (not null, null): - _shared.OnDestroyActivation(this); - GetComponent()?.OnDestroyActivation(this); - break; - } + throw new ArgumentException("Cannot override a component which is implemented by this grain context"); + } - if (grainInstance is ILifecycleParticipant participant) + lock (this) + { + if (instance == null) { - participant.Participate(ObservableLifecycle); + _extras?.Remove(typeof(TComponent)); + return; } - GrainInstance = grainInstance; + _extras ??= new(); + _extras[typeof(TComponent)] = instance; } + } - public void SetState(ActivationState state) + internal void SetGrainInstance(object grainInstance) + { + switch (GrainInstance, grainInstance) { - State = state; + case (null, not null): + _shared.OnCreateActivation(this); + GetComponent()?.OnCreateActivation(this); + break; + case (not null, null): + _shared.OnDestroyActivation(this); + GetComponent()?.OnDestroyActivation(this); + break; } - /// - /// Check whether this activation is overloaded. - /// Returns LimitExceededException if overloaded, otherwise nullc> - /// - /// Returns LimitExceededException if overloaded, otherwise nullc> - public LimitExceededException CheckOverloaded() + if (grainInstance is ILifecycleParticipant participant) { - string limitName = nameof(SiloMessagingOptions.MaxEnqueuedRequestsHardLimit); - int maxRequestsHardLimit = _shared.MessagingOptions.MaxEnqueuedRequestsHardLimit; - int maxRequestsSoftLimit = _shared.MessagingOptions.MaxEnqueuedRequestsSoftLimit; - if (IsStatelessWorker) - { - limitName = nameof(SiloMessagingOptions.MaxEnqueuedRequestsHardLimit_StatelessWorker); - maxRequestsHardLimit = _shared.MessagingOptions.MaxEnqueuedRequestsHardLimit_StatelessWorker; - maxRequestsSoftLimit = _shared.MessagingOptions.MaxEnqueuedRequestsSoftLimit_StatelessWorker; - } + participant.Participate(ObservableLifecycle); + } - if (maxRequestsHardLimit <= 0 && maxRequestsSoftLimit <= 0) return null; // No limits are set + GrainInstance = grainInstance; + } - int count = GetRequestCount(); + public void SetState(ActivationState state) + { + State = state; + } - if (maxRequestsHardLimit > 0 && count > maxRequestsHardLimit) // Hard limit - { - _shared.Logger.LogWarning( - (int)ErrorCode.Catalog_Reject_ActivationTooManyRequests, - "Overload - {Count} enqueued requests for activation {Activation}, exceeding hard limit rejection threshold of {HardLimit}", - count, - this, - maxRequestsHardLimit); + /// + /// Check whether this activation is overloaded. + /// Returns LimitExceededException if overloaded, otherwise nullc> + /// + /// Returns LimitExceededException if overloaded, otherwise nullc> + public LimitExceededException CheckOverloaded() + { + string limitName = nameof(SiloMessagingOptions.MaxEnqueuedRequestsHardLimit); + int maxRequestsHardLimit = _shared.MessagingOptions.MaxEnqueuedRequestsHardLimit; + int maxRequestsSoftLimit = _shared.MessagingOptions.MaxEnqueuedRequestsSoftLimit; + if (IsStatelessWorker) + { + limitName = nameof(SiloMessagingOptions.MaxEnqueuedRequestsHardLimit_StatelessWorker); + maxRequestsHardLimit = _shared.MessagingOptions.MaxEnqueuedRequestsHardLimit_StatelessWorker; + maxRequestsSoftLimit = _shared.MessagingOptions.MaxEnqueuedRequestsSoftLimit_StatelessWorker; + } - return new LimitExceededException(limitName, count, maxRequestsHardLimit, ToString()); - } + if (maxRequestsHardLimit <= 0 && maxRequestsSoftLimit <= 0) return null; // No limits are set - if (maxRequestsSoftLimit > 0 && count > maxRequestsSoftLimit) // Soft limit - { - _shared.Logger.LogWarning( - (int)ErrorCode.Catalog_Warn_ActivationTooManyRequests, - "Hot - {Count} enqueued requests for activation {Activation}, exceeding soft limit warning threshold of {SoftLimit}", - count, - this, - maxRequestsSoftLimit); - return null; - } + int count = GetRequestCount(); + + if (maxRequestsHardLimit > 0 && count > maxRequestsHardLimit) // Hard limit + { + _shared.Logger.LogWarning( + (int)ErrorCode.Catalog_Reject_ActivationTooManyRequests, + "Overload - {Count} enqueued requests for activation {Activation}, exceeding hard limit rejection threshold of {HardLimit}", + count, + this, + maxRequestsHardLimit); + return new LimitExceededException(limitName, count, maxRequestsHardLimit, ToString()); + } + + if (maxRequestsSoftLimit > 0 && count > maxRequestsSoftLimit) // Soft limit + { + _shared.Logger.LogWarning( + (int)ErrorCode.Catalog_Warn_ActivationTooManyRequests, + "Hot - {Count} enqueued requests for activation {Activation}, exceeding soft limit warning threshold of {SoftLimit}", + count, + this, + maxRequestsSoftLimit); return null; } - internal int GetRequestCount() + return null; + } + + internal int GetRequestCount() + { + lock (this) { - lock (this) - { - return _runningRequests.Count + WaitingCount; - } + return _runningRequests.Count + WaitingCount; } + } - internal List DequeueAllWaitingRequests() + internal List DequeueAllWaitingRequests() + { + lock (this) { - lock (this) - { - var tmp = _waitingRequests.Select(m => m.Item1).ToList(); - _waitingRequests.Clear(); - return tmp; - } + var tmp = _waitingRequests.Select(m => m.Item1).ToList(); + _waitingRequests.Clear(); + return tmp; } + } - /// - /// Returns how long this activation has been idle. - /// - public TimeSpan GetIdleness() => _idleDuration.Elapsed; + /// + /// Returns how long this activation has been idle. + /// + public TimeSpan GetIdleness() => _idleDuration.Elapsed; - /// - /// Returns whether this activation has been idle long enough to be collected. - /// - public bool IsStale() => GetIdleness() >= _shared.CollectionAgeLimit; + /// + /// Returns whether this activation has been idle long enough to be collected. + /// + public bool IsStale() => GetIdleness() >= _shared.CollectionAgeLimit; - public void DelayDeactivation(TimeSpan timespan) + public void DelayDeactivation(TimeSpan timespan) + { + if (timespan == TimeSpan.MaxValue || timespan == Timeout.InfiniteTimeSpan) { - if (timespan == TimeSpan.MaxValue || timespan == Timeout.InfiniteTimeSpan) - { - // otherwise creates negative time. - KeepAliveUntil = DateTime.MaxValue; - } - else if (timespan <= TimeSpan.Zero) - { - // reset any current keepAliveUntil - ResetKeepAliveRequest(); - } - else - { - KeepAliveUntil = DateTime.UtcNow + timespan; - } + // otherwise creates negative time. + KeepAliveUntil = DateTime.MaxValue; } - - public void ResetKeepAliveRequest() + else if (timespan <= TimeSpan.Zero) { - KeepAliveUntil = DateTime.MinValue; + // reset any current keepAliveUntil + ResetKeepAliveRequest(); } - - private void ScheduleOperation(object operation) + else { - lock (this) - { - _pendingOperations ??= new(); - _pendingOperations.Add(operation); - } - - _workSignal.Signal(); + KeepAliveUntil = DateTime.UtcNow + timespan; } + } + + public void ResetKeepAliveRequest() + { + KeepAliveUntil = DateTime.MinValue; + } - public void Migrate(Dictionary requestContext, CancellationToken? cancellationToken = default) + private void ScheduleOperation(object operation) + { + lock (this) { - if (!cancellationToken.HasValue) - { - cancellationToken = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout).Token; - } + _pendingOperations ??= new(); + _pendingOperations.Add(operation); + } + + _workSignal.Signal(); + } - // We use a named work item since it is cheaper than allocating a Task and has the benefit of being named. - _workItemGroup.QueueWorkItem(new MigrateWorkItem(this, requestContext, cancellationToken.Value)); + public void Migrate(Dictionary requestContext, CancellationToken? cancellationToken = default) + { + if (!cancellationToken.HasValue) + { + cancellationToken = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout).Token; } - private async Task StartMigratingAsync(Dictionary requestContext, CancellationToken cancellationToken) + // We use a named work item since it is cheaper than allocating a Task and has the benefit of being named. + _workItemGroup.QueueWorkItem(new MigrateWorkItem(this, requestContext, cancellationToken.Value)); + } + + private async Task StartMigratingAsync(Dictionary requestContext, CancellationToken cancellationToken) + { + lock (this) { - lock (this) + // Avoid the cost of selecting a new location if the activation is not currently valid. + if (State is not ActivationState.Valid) { - // Avoid the cost of selecting a new location if the activation is not currently valid. - if (State is not ActivationState.Valid) - { - return; - } + return; } + } - SiloAddress newLocation; - try + SiloAddress newLocation; + try + { + // Run placement to select a new host. If a new (different) host is not selected, do not migrate. + var placementService = _shared.Runtime.ServiceProvider.GetRequiredService(); + newLocation = await placementService.PlaceGrainAsync(GrainId, requestContext, PlacementStrategy); + if (newLocation == Address.SiloAddress || newLocation is null) { - // Run placement to select a new host. If a new (different) host is not selected, do not migrate. - var placementService = _shared.Runtime.ServiceProvider.GetRequiredService(); - newLocation = await placementService.PlaceGrainAsync(GrainId, requestContext, PlacementStrategy); - if (newLocation == Address.SiloAddress || newLocation is null) + // No more appropriate silo was selected for this grain. The migration attempt will be aborted. + // This could be because this is the only (compatible) silo for the grain or because the placement director chose this + // silo for some other reason. + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - // No more appropriate silo was selected for this grain. The migration attempt will be aborted. - // This could be because this is the only (compatible) silo for the grain or because the placement director chose this - // silo for some other reason. - if (_shared.Logger.IsEnabled(LogLevel.Debug)) + if (newLocation is null) { - if (newLocation is null) - { - _shared.Logger.LogDebug("Placement strategy {PlacementStrategy} failed to select a destination for migration of {GrainId}", PlacementStrategy, GrainId); - } - else - { - _shared.Logger.LogDebug("Placement strategy {PlacementStrategy} selected the current silo as the destination for migration of {GrainId}", PlacementStrategy, GrainId); - } + _shared.Logger.LogDebug("Placement strategy {PlacementStrategy} failed to select a destination for migration of {GrainId}", PlacementStrategy, GrainId); } - - // Will not deactivate/migrate. - return; - } - - lock (this) - { - if (!StartDeactivating(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location"))) + else { - // Grain is already deactivating, ignore the migration request. - return; + _shared.Logger.LogDebug("Placement strategy {PlacementStrategy} selected the current silo as the destination for migration of {GrainId}", PlacementStrategy, GrainId); } + } - if (DehydrationContext is not null) - { - // Migration has already started. - return; - } + // Will not deactivate/migrate. + return; + } - // Set a migration context to capture any state which should be transferred. - // Doing this signals to the deactivation process that a migration is occurring, so it is important that this happens before we begin deactivation. - DehydrationContext = new(_shared.SerializerSessionPool, requestContext); - ForwardingAddress = newLocation; + lock (this) + { + if (!StartDeactivating(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location"))) + { + // Grain is already deactivating, ignore the migration request. + return; } - if (_shared.Logger.IsEnabled(LogLevel.Debug)) + if (DehydrationContext is not null) { - _shared.Logger.LogDebug("Migrating {GrainId} to {SiloAddress}", GrainId, newLocation); + // Migration has already started. + return; } - // Start deactivation to prevent any other. - ScheduleOperation(new Command.Deactivate(cancellationToken)); - } - catch (Exception exception) - { - _shared.Logger.LogError(exception, "Error while selecting a migration destination for {GrainId}", GrainId); - return; + // Set a migration context to capture any state which should be transferred. + // Doing this signals to the deactivation process that a migration is occurring, so it is important that this happens before we begin deactivation. + DehydrationContext = new(_shared.SerializerSessionPool, requestContext); + ForwardingAddress = newLocation; } - } - public void Deactivate(DeactivationReason reason, CancellationToken? token = default) - { - if (!token.HasValue) + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - token = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout).Token; + _shared.Logger.LogDebug("Migrating {GrainId} to {SiloAddress}", GrainId, newLocation); } - StartDeactivating(reason); - ScheduleOperation(new Command.Deactivate(token.Value)); + // Start deactivation to prevent any other. + ScheduleOperation(new Command.Deactivate(cancellationToken)); } + catch (Exception exception) + { + _shared.Logger.LogError(exception, "Error while selecting a migration destination for {GrainId}", GrainId); + return; + } + } - private void DeactivateStuckActivation() + public void Deactivate(DeactivationReason reason, CancellationToken? token = default) + { + if (!token.HasValue) { - IsStuckProcessingMessage = true; - var msg = $"Activation {this} has been processing request {_blockingRequest} since {_busyDuration} and is likely stuck."; - var reason = new DeactivationReason(DeactivationReasonCode.ActivationUnresponsive, msg); + token = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout).Token; + } - // Mark the grain as deactivating so that messages are forwarded instead of being invoked - Deactivate(reason, token: default); + StartDeactivating(reason); + ScheduleOperation(new Command.Deactivate(token.Value)); + } - // Try to remove this activation from the catalog and directory - // This leaves this activation dangling, stuck processing the current request until it eventually completes - // (which likely will never happen at this point, since if the grain was deemed stuck then there is probably some kind of - // application bug, perhaps a deadlock) - UnregisterMessageTarget(); - _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force).Ignore(); - } + private void DeactivateStuckActivation() + { + IsStuckProcessingMessage = true; + var msg = $"Activation {this} has been processing request {_blockingRequest} since {_busyDuration} and is likely stuck."; + var reason = new DeactivationReason(DeactivationReasonCode.ActivationUnresponsive, msg); + + // Mark the grain as deactivating so that messages are forwarded instead of being invoked + Deactivate(reason, token: default); + + // Try to remove this activation from the catalog and directory + // This leaves this activation dangling, stuck processing the current request until it eventually completes + // (which likely will never happen at this point, since if the grain was deemed stuck then there is probably some kind of + // application bug, perhaps a deadlock) + UnregisterMessageTarget(); + _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force).Ignore(); + } - void IGrainTimerRegistry.OnTimerCreated(IGrainTimer timer) + void IGrainTimerRegistry.OnTimerCreated(IGrainTimer timer) + { + lock (this) { - lock (this) - { - Timers ??= new HashSet(); - Timers.Add(timer); - } + Timers ??= new HashSet(); + Timers.Add(timer); } + } - void IGrainTimerRegistry.OnTimerDisposed(IGrainTimer orleansTimerInsideGrain) + void IGrainTimerRegistry.OnTimerDisposed(IGrainTimer orleansTimerInsideGrain) + { + lock (this) // need to lock since dispose can be called on finalizer thread, outside grain context (not single threaded). { - lock (this) // need to lock since dispose can be called on finalizer thread, outside grain context (not single threaded). + if (Timers is null) { - if (Timers is null) - { - return; - } + return; + } - Timers.Remove(orleansTimerInsideGrain); - if (Timers.Count == 0) - { - Timers = null; - } + Timers.Remove(orleansTimerInsideGrain); + if (Timers.Count == 0) + { + Timers = null; } } + } - private void StopAllTimers() + private void StopAllTimers() + { + lock (this) { - lock (this) + if (Timers is null) { - if (Timers is null) - { - return; - } + return; + } - foreach (var timer in Timers) - { - timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); - } + foreach (var timer in Timers) + { + timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } } + } - private Task WaitForAllTimersToFinish(CancellationToken cancellationToken) + private Task WaitForAllTimersToFinish(CancellationToken cancellationToken) + { + lock (this) { - lock (this) + if (Timers is null) { - if (Timers is null) - { - return Task.CompletedTask; - } + return Task.CompletedTask; + } - var tasks = new List(); - var timerCopy = Timers.ToList(); // need to copy since OnTimerDisposed will change the timers set. - foreach (var timer in timerCopy) + var tasks = new List(); + var timerCopy = Timers.ToList(); // need to copy since OnTimerDisposed will change the timers set. + foreach (var timer in timerCopy) + { + if (timer is IAsyncDisposable asyncDisposable) { - if (timer is IAsyncDisposable asyncDisposable) - { - var task = asyncDisposable.DisposeAsync(); - if (!task.IsCompletedSuccessfully) - { - tasks.Add(task.AsTask()); - } - } - else + var task = asyncDisposable.DisposeAsync(); + if (!task.IsCompletedSuccessfully) { - timer.Dispose(); + tasks.Add(task.AsTask()); } } - - return Task.WhenAll(tasks).WithCancellation(cancellationToken); + else + { + timer.Dispose(); + } } + + return Task.WhenAll(tasks).WithCancellation(cancellationToken); } + } - public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageFactory messageFactory, SiloMessagingOptions options) + public void AnalyzeWorkload(DateTime now, IMessageCenter messageCenter, MessageFactory messageFactory, SiloMessagingOptions options) + { + var slowRunningRequestDuration = options.RequestProcessingWarningTime; + var longQueueTimeDuration = options.RequestQueueDelayWarningTime; + + List diagnostics = null; + lock (this) { - var slowRunningRequestDuration = options.RequestProcessingWarningTime; - var longQueueTimeDuration = options.RequestQueueDelayWarningTime; + if (State != ActivationState.Valid) + { + return; + } - List diagnostics = null; - lock (this) + if (_blockingRequest is not null) { - if (State != ActivationState.Valid) + var message = _blockingRequest; + TimeSpan? timeSinceQueued = default; + if (_runningRequests.TryGetValue(message, out var waitTime)) { - return; + timeSinceQueued = waitTime.Elapsed; } - if (_blockingRequest is not null) + var executionTime = _busyDuration.Elapsed; + if (executionTime >= slowRunningRequestDuration) { - var message = _blockingRequest; - TimeSpan? timeSinceQueued = default; - if (_runningRequests.TryGetValue(message, out var waitTime)) + GetStatusList(ref diagnostics); + if (timeSinceQueued.HasValue) { - timeSinceQueued = waitTime.Elapsed; + diagnostics.Add($"Message {message} was enqueued {timeSinceQueued} ago and has now been executing for {executionTime}."); } - - var executionTime = _busyDuration.Elapsed; - if (executionTime >= slowRunningRequestDuration) + else { - GetStatusList(ref diagnostics); - if (timeSinceQueued.HasValue) - { - diagnostics.Add($"Message {message} was enqueued {timeSinceQueued} ago and has now been executing for {executionTime}."); - } - else - { - diagnostics.Add($"Message {message} has been executing for {executionTime}."); - } - - var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: true, isWaiting: false, diagnostics); - messageCenter.SendMessage(response); + diagnostics.Add($"Message {message} has been executing for {executionTime}."); } + + var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: true, isWaiting: false, diagnostics); + messageCenter.SendMessage(response); } + } - foreach (var running in _runningRequests) - { - var message = running.Key; - var runDuration = running.Value; - if (ReferenceEquals(message, _blockingRequest)) continue; + foreach (var running in _runningRequests) + { + var message = running.Key; + var runDuration = running.Value; + if (ReferenceEquals(message, _blockingRequest)) continue; - // Check how long they've been executing. - var executionTime = runDuration.Elapsed; - if (executionTime >= slowRunningRequestDuration) + // Check how long they've been executing. + var executionTime = runDuration.Elapsed; + if (executionTime >= slowRunningRequestDuration) + { + // Interleaving message X has been executing for a long time + GetStatusList(ref diagnostics); + var messageDiagnostics = new List(diagnostics) { - // Interleaving message X has been executing for a long time - GetStatusList(ref diagnostics); - var messageDiagnostics = new List(diagnostics) - { - $"Interleaving message {message} has been executing for {executionTime}." - }; + $"Interleaving message {message} has been executing for {executionTime}." + }; - var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: true, isWaiting: false, messageDiagnostics); - messageCenter.SendMessage(response); - } + var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: true, isWaiting: false, messageDiagnostics); + messageCenter.SendMessage(response); } + } - var queueLength = 1; - foreach (var pair in _waitingRequests) + var queueLength = 1; + foreach (var pair in _waitingRequests) + { + var message = pair.Message; + var queuedTime = pair.QueuedTime.Elapsed; + if (queuedTime >= longQueueTimeDuration) { - var message = pair.Message; - var queuedTime = pair.QueuedTime.Elapsed; - if (queuedTime >= longQueueTimeDuration) + // Message X has been enqueued on the target grain for Y and is currently position QueueLength in queue for processing. + GetStatusList(ref diagnostics); + var messageDiagnostics = new List(diagnostics) { - // Message X has been enqueued on the target grain for Y and is currently position QueueLength in queue for processing. - GetStatusList(ref diagnostics); - var messageDiagnostics = new List(diagnostics) - { - $"Message {message} has been enqueued on the target grain for {queuedTime} and is currently position {queueLength} in queue for processing." - }; + $"Message {message} has been enqueued on the target grain for {queuedTime} and is currently position {queueLength} in queue for processing." + }; - var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: false, isWaiting: true, messageDiagnostics); - messageCenter.SendMessage(response); - } - - queueLength++; + var response = messageFactory.CreateDiagnosticResponseMessage(message, isExecuting: false, isWaiting: true, messageDiagnostics); + messageCenter.SendMessage(response); } - } - - void GetStatusList(ref List diagnostics) - { - if (diagnostics is not null) return; - diagnostics = new List - { - ToDetailedString(), - $"TaskScheduler status: {_workItemGroup.DumpStatus()}" - }; + queueLength++; } } - public override string ToString() => $"[Activation: {Address.SiloAddress}/{GrainId}{ActivationId}{GetActivationInfoString()} State={State}]"; - - internal string ToDetailedString(bool includeExtraDetails = false) + void GetStatusList(ref List diagnostics) { - lock (this) + if (diagnostics is not null) return; + + diagnostics = new List { - var currentlyExecuting = includeExtraDetails ? _blockingRequest : null; - return @$"[Activation: {Address.SiloAddress}/{GrainId}{ActivationId} {GetActivationInfoString()} State={State} NonReentrancyQueueSize={WaitingCount} NumRunning={_runningRequests.Count} IdlenessTimeSpan={GetIdleness()} CollectionAgeLimit={_shared.CollectionAgeLimit}{(currentlyExecuting != null ? " CurrentlyExecuting=" : null)}{currentlyExecuting}]"; - } + ToDetailedString(), + $"TaskScheduler status: {_workItemGroup.DumpStatus()}" + }; } + } - private string GetActivationInfoString() + public override string ToString() => $"[Activation: {Address.SiloAddress}/{GrainId}{ActivationId}{GetActivationInfoString()} State={State}]"; + + internal string ToDetailedString(bool includeExtraDetails = false) + { + lock (this) { - var placement = PlacementStrategy?.GetType().Name; - var grainTypeName = _shared.GrainTypeName ?? GrainInstance switch - { - { } grainInstance => RuntimeTypeNameFormatter.Format(grainInstance.GetType()), - _ => null - }; - return grainTypeName is null ? $"#Placement={placement}" : $"#GrainType={grainTypeName} Placement={placement}"; + var currentlyExecuting = includeExtraDetails ? _blockingRequest : null; + return @$"[Activation: {Address.SiloAddress}/{GrainId}{ActivationId} {GetActivationInfoString()} State={State} NonReentrancyQueueSize={WaitingCount} NumRunning={_runningRequests.Count} IdlenessTimeSpan={GetIdleness()} CollectionAgeLimit={_shared.CollectionAgeLimit}{(currentlyExecuting != null ? " CurrentlyExecuting=" : null)}{currentlyExecuting}]"; } + } - public async ValueTask DisposeAsync() + private string GetActivationInfoString() + { + var placement = PlacementStrategy?.GetType().Name; + var grainTypeName = _shared.GrainTypeName ?? GrainInstance switch { - _extras ??= new(); - if (_extras.IsDisposing) return; - _extras.IsDisposing = true; + { } grainInstance => RuntimeTypeNameFormatter.Format(grainInstance.GetType()), + _ => null + }; + return grainTypeName is null ? $"#Placement={placement}" : $"#GrainType={grainTypeName} Placement={placement}"; + } - try - { - var activator = GetComponent(); - if (activator != null) - { - await activator.DisposeInstance(this, GrainInstance); - } - } - catch (ObjectDisposedException) - { - } + public async ValueTask DisposeAsync() + { + _extras ??= new(); + if (_extras.IsDisposing) return; + _extras.IsDisposing = true; - switch (_serviceScope) + try + { + var activator = GetComponent(); + if (activator != null) { - case IAsyncDisposable asyncDisposable: - await asyncDisposable.DisposeAsync(); - break; - case IDisposable disposable: - disposable.Dispose(); - break; + await activator.DisposeInstance(this, GrainInstance); } } + catch (ObjectDisposedException) + { + } + + try + { + SetGrainInstance(null); + } + catch (ObjectDisposedException) + { + } + + switch (_serviceScope) + { + case IAsyncDisposable asyncDisposable: + await asyncDisposable.DisposeAsync(); + break; + case IDisposable disposable: + disposable.Dispose(); + break; + } + } - bool IEquatable.Equals(IGrainContext other) => ReferenceEquals(this, other); + bool IEquatable.Equals(IGrainContext other) => ReferenceEquals(this, other); - public (TExtension, TExtensionInterface) GetOrSetExtension(Func newExtensionFunc) - where TExtension : class, TExtensionInterface - where TExtensionInterface : class, IGrainExtension + public (TExtension, TExtensionInterface) GetOrSetExtension(Func newExtensionFunc) + where TExtension : class, TExtensionInterface + where TExtensionInterface : class, IGrainExtension + { + TExtension implementation; + if (GetComponent() is object existing) { - TExtension implementation; - if (GetComponent() is object existing) + if (existing is TExtension typedResult) { - if (existing is TExtension typedResult) - { - implementation = typedResult; - } - else - { - throw new InvalidCastException($"Cannot cast existing extension of type {existing.GetType()} to target type {typeof(TExtension)}"); - } + implementation = typedResult; } else { - implementation = newExtensionFunc(); - SetComponent(implementation); + throw new InvalidCastException($"Cannot cast existing extension of type {existing.GetType()} to target type {typeof(TExtension)}"); } - - var reference = GrainReference.Cast(); - return (implementation, reference); } - - public TExtensionInterface GetExtension() - where TExtensionInterface : class, IGrainExtension + else { - if (GetComponent() is TExtensionInterface result) - { - return result; - } + implementation = newExtensionFunc(); + SetComponent(implementation); + } - var implementation = ActivationServices.GetKeyedService(typeof(TExtensionInterface)); - if (!(implementation is TExtensionInterface typedResult)) - { - throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TExtensionInterface)} is installed on this instance and no implementations are registered for automated install"); - } + var reference = GrainReference.Cast(); + return (implementation, reference); + } - SetComponent(typedResult); - return typedResult; + public TExtensionInterface GetExtension() + where TExtensionInterface : class, IGrainExtension + { + if (GetComponent() is TExtensionInterface result) + { + return result; } - bool IActivationWorkingSetMember.IsCandidateForRemoval(bool wouldRemove) + var implementation = ActivationServices.GetKeyedService(typeof(TExtensionInterface)); + if (!(implementation is TExtensionInterface typedResult)) { - const int IdlenessLowerBound = 10_000; - lock (this) - { - var inactive = IsInactive && _idleDuration.ElapsedMilliseconds > IdlenessLowerBound; + throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TExtensionInterface)} is installed on this instance and no implementations are registered for automated install"); + } - // This instance will remain in the working set if it is either not pending removal or if it is currently active. - _isInWorkingSet = !wouldRemove || !inactive; - return inactive; - } + SetComponent(typedResult); + return typedResult; + } + + bool IActivationWorkingSetMember.IsCandidateForRemoval(bool wouldRemove) + { + const int IdlenessLowerBound = 10_000; + lock (this) + { + var inactive = IsInactive && _idleDuration.ElapsedMilliseconds > IdlenessLowerBound; + + // This instance will remain in the working set if it is either not pending removal or if it is currently active. + _isInWorkingSet = !wouldRemove || !inactive; + return inactive; } + } - private async Task RunMessageLoop() + private async Task RunMessageLoop() + { + // Note that this loop never terminates. That might look strange, but there is a reason for it: + // a grain must always accept and process any incoming messages. How a grain processes + // those messages is up to the grain's state to determine. If the grain has not yet + // completed activating, it will let the messages continue to queue up until it completes activation. + // If the grain failed to activate, messages will be responded to with a rejection. + // If the grain has terminated, messages will be forwarded on to a new instance of this grain. + // The loop will eventually be garbage collected when the grain gets deactivated and there are no + // rooted references to it. + while (true) { - // Note that this loop never terminates. That might look strange, but there is a reason for it: - // a grain must always accept and process any incoming messages. How a grain processes - // those messages is up to the grain's state to determine. If the grain has not yet - // completed activating, it will let the messages continue to queue up until it completes activation. - // If the grain failed to activate, messages will be responded to with a rejection. - // If the grain has terminated, messages will be forwarded on to a new instance of this grain. - // The loop will eventually be garbage collected when the grain gets deactivated and there are no - // rooted references to it. - while (true) + try { - try + if (!IsCurrentlyExecuting) { - if (!IsCurrentlyExecuting) + List operations = null; + lock (this) { - List operations = null; - lock (this) + if (_pendingOperations is { Count: > 0 }) { - if (_pendingOperations is { Count: > 0 }) - { - operations = _pendingOperations; - _pendingOperations = null; - } + operations = _pendingOperations; + _pendingOperations = null; } + } - if (operations is not null) - { - await ProcessOperationsAsync(operations); - } + if (operations is not null) + { + await ProcessOperationsAsync(operations); } + } - ProcessPendingRequests(); + ProcessPendingRequests(); - await _workSignal.WaitAsync(); - } - catch (Exception exception) - { - _shared.InternalRuntime.MessagingTrace.LogError(exception, "Error in grain message loop"); - } + await _workSignal.WaitAsync(); } - - void ProcessPendingRequests() + catch (Exception exception) { - var i = 0; + _shared.InternalRuntime.MessagingTrace.LogError(exception, "Error in grain message loop"); + } + } + + void ProcessPendingRequests() + { + var i = 0; - do + do + { + Message message = null; + lock (this) { - Message message = null; - lock (this) + if (_waitingRequests.Count <= i) { - if (_waitingRequests.Count <= i) - { - break; - } + break; + } - if (State != ActivationState.Valid) - { - ProcessRequestsToInvalidActivation(); - break; - } + if (State != ActivationState.Valid) + { + ProcessRequestsToInvalidActivation(); + break; + } - message = _waitingRequests[i].Message; - try + message = _waitingRequests[i].Message; + try + { + if (!MayInvokeRequest(message)) { - if (!MayInvokeRequest(message)) - { - // The activation is not able to process this message right now, so try the next message. - ++i; + // The activation is not able to process this message right now, so try the next message. + ++i; - if (_blockingRequest != null) + if (_blockingRequest != null) + { + var currentRequestActiveTime = _busyDuration.Elapsed; + if (currentRequestActiveTime > _shared.MaxRequestProcessingTime && !IsStuckProcessingMessage) { - var currentRequestActiveTime = _busyDuration.Elapsed; - if (currentRequestActiveTime > _shared.MaxRequestProcessingTime && !IsStuckProcessingMessage) - { - DeactivateStuckActivation(); - } - else if (currentRequestActiveTime > _shared.MaxWarningRequestProcessingTime) - { - // Consider: Handle long request detection for reentrant activations -- this logic only works for non-reentrant activations - _shared.Logger.LogWarning( - (int)ErrorCode.Dispatcher_ExtendedMessageProcessing, - "Current request has been active for {CurrentRequestActiveTime} for grain {Grain}. Currently executing {BlockingRequest}. Trying to enqueue {Message}.", - currentRequestActiveTime, - ToDetailedString(), - _blockingRequest, - message); - } + DeactivateStuckActivation(); + } + else if (currentRequestActiveTime > _shared.MaxWarningRequestProcessingTime) + { + // Consider: Handle long request detection for reentrant activations -- this logic only works for non-reentrant activations + _shared.Logger.LogWarning( + (int)ErrorCode.Dispatcher_ExtendedMessageProcessing, + "Current request has been active for {CurrentRequestActiveTime} for grain {Grain}. Currently executing {BlockingRequest}. Trying to enqueue {Message}.", + currentRequestActiveTime, + ToDetailedString(), + _blockingRequest, + message); } - - continue; } - // If the current message is incompatible, deactivate this activation and eventually forward the message to a new incarnation. - if (message.InterfaceVersion > 0) + continue; + } + + // If the current message is incompatible, deactivate this activation and eventually forward the message to a new incarnation. + if (message.InterfaceVersion > 0) + { + var compatibilityDirector = _shared.InternalRuntime.CompatibilityDirectorManager.GetDirector(message.InterfaceType); + var currentVersion = _shared.InternalRuntime.GrainVersionManifest.GetLocalVersion(message.InterfaceType); + if (!compatibilityDirector.IsCompatible(message.InterfaceVersion, currentVersion)) { - var compatibilityDirector = _shared.InternalRuntime.CompatibilityDirectorManager.GetDirector(message.InterfaceType); - var currentVersion = _shared.InternalRuntime.GrainVersionManifest.GetLocalVersion(message.InterfaceType); - if (!compatibilityDirector.IsCompatible(message.InterfaceVersion, currentVersion)) - { - // Add this activation to cache invalidation headers. - message.CacheInvalidationHeader ??= new List(); - message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(Address, validAddress: null)); + // Add this activation to cache invalidation headers. + message.CacheInvalidationHeader ??= new List(); + message.CacheInvalidationHeader.Add(new GrainAddressCacheUpdate(Address, validAddress: null)); - var reason = new DeactivationReason( - DeactivationReasonCode.IncompatibleRequest, - $"Received incompatible request for interface {message.InterfaceType} version {message.InterfaceVersion}. This activation supports interface version {currentVersion}."); + var reason = new DeactivationReason( + DeactivationReasonCode.IncompatibleRequest, + $"Received incompatible request for interface {message.InterfaceType} version {message.InterfaceVersion}. This activation supports interface version {currentVersion}."); - Deactivate(reason, token: default); - return; - } + Deactivate(reason, token: default); + return; } } - catch (Exception exception) - { - _shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Transient, exception); - _waitingRequests.RemoveAt(i); - continue; - } - - // Process this message, removing it from the queue. + } + catch (Exception exception) + { + _shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Transient, exception); _waitingRequests.RemoveAt(i); - - Debug.Assert(State == ActivationState.Valid); - RecordRunning(message, message.IsAlwaysInterleave); + continue; } - // Start invoking the message outside of the lock - InvokeIncomingRequest(message); + // Process this message, removing it from the queue. + _waitingRequests.RemoveAt(i); + + Debug.Assert(State == ActivationState.Valid); + RecordRunning(message, message.IsAlwaysInterleave); } - while (true); + + // Start invoking the message outside of the lock + InvokeIncomingRequest(message); } + while (true); + } - void RecordRunning(Message message, bool isInterleavable) - { - var stopwatch = CoarseStopwatch.StartNew(); - _runningRequests.Add(message, stopwatch); + void RecordRunning(Message message, bool isInterleavable) + { + var stopwatch = CoarseStopwatch.StartNew(); + _runningRequests.Add(message, stopwatch); - if (_blockingRequest != null || isInterleavable) return; + if (_blockingRequest != null || isInterleavable) return; - // This logic only works for non-reentrant activations - // Consider: Handle long request detection for reentrant activations. - _blockingRequest = message; - _busyDuration = stopwatch; - } + // This logic only works for non-reentrant activations + // Consider: Handle long request detection for reentrant activations. + _blockingRequest = message; + _busyDuration = stopwatch; + } - void ProcessRequestsToInvalidActivation() + void ProcessRequestsToInvalidActivation() + { + if (State is ActivationState.Create or ActivationState.Activating) { - if (State is ActivationState.Create or ActivationState.Activating) - { - // Do nothing until the activation becomes either valid or invalid - return; - } + // Do nothing until the activation becomes either valid or invalid + return; + } - if (State is ActivationState.Deactivating) + if (State is ActivationState.Deactivating) + { + // Determine whether to declare this activation as stuck + var deactivatingTime = DateTime.UtcNow - DeactivationStartTime.Value; + if (deactivatingTime > _shared.MaxRequestProcessingTime && !IsStuckDeactivating) { - // Determine whether to declare this activation as stuck - var deactivatingTime = DateTime.UtcNow - DeactivationStartTime.Value; - if (deactivatingTime > _shared.MaxRequestProcessingTime && !IsStuckDeactivating) - { - IsStuckDeactivating = true; - if (DeactivationReason.Description is { Length: > 0 } && DeactivationReason.ReasonCode != DeactivationReasonCode.ActivationUnresponsive) - { - DeactivationReason = new(DeactivationReasonCode.ActivationUnresponsive, - $"{DeactivationReason.Description}. Activation {this} has been deactivating since {DeactivationStartTime.Value} and is likely stuck"); - } - } - - if (!IsStuckDeactivating && !IsStuckProcessingMessage) + IsStuckDeactivating = true; + if (DeactivationReason.Description is { Length: > 0 } && DeactivationReason.ReasonCode != DeactivationReasonCode.ActivationUnresponsive) { - // Do not forward messages while the grain is still deactivating and has not been declared stuck, since they - // will be forwarded to the same grain instance. - return; + DeactivationReason = new(DeactivationReasonCode.ActivationUnresponsive, + $"{DeactivationReason.Description}. Activation {this} has been deactivating since {DeactivationStartTime.Value} and is likely stuck"); } } - if (DeactivationException is null || ForwardingAddress is { }) - { - // Either this was a duplicate activation or it was at some point successfully activated - // Forward all pending messages - RerouteAllQueuedMessages(); - } - else + if (!IsStuckDeactivating && !IsStuckProcessingMessage) { - // Reject all pending messages - RejectAllQueuedMessages(); + // Do not forward messages while the grain is still deactivating and has not been declared stuck, since they + // will be forwarded to the same grain instance. + return; } } - bool MayInvokeRequest(Message incoming) + if (DeactivationException is null || ForwardingAddress is { }) { - if (!IsCurrentlyExecuting) - { - return true; - } + // Either this was a duplicate activation or it was at some point successfully activated + // Forward all pending messages + RerouteAllQueuedMessages(); + } + else + { + // Reject all pending messages + RejectAllQueuedMessages(); + } + } - // Otherwise, allow request invocation if the grain is reentrant or the message can be interleaved - if (incoming.IsAlwaysInterleave) - { - return true; - } + bool MayInvokeRequest(Message incoming) + { + if (!IsCurrentlyExecuting) + { + return true; + } - if (_blockingRequest is null) - { - return true; - } + // Otherwise, allow request invocation if the grain is reentrant or the message can be interleaved + if (incoming.IsAlwaysInterleave) + { + return true; + } - if (_blockingRequest.IsReadOnly && incoming.IsReadOnly) - { - return true; - } + if (_blockingRequest is null) + { + return true; + } + + if (_blockingRequest.IsReadOnly && incoming.IsReadOnly) + { + return true; + } + + // Handle call-chain reentrancy + if (incoming.GetReentrancyId() is Guid id + && IsReentrantSection(id)) + { + return true; + } - // Handle call-chain reentrancy - if (incoming.GetReentrancyId() is Guid id - && IsReentrantSection(id)) + if (GetComponent() is GrainCanInterleave canInterleave) + { + try { - return true; + return canInterleave.MayInterleave(GrainInstance, incoming); } - - if (GetComponent() is GrainCanInterleave canInterleave) + catch (Exception exception) { - try - { - return canInterleave.MayInterleave(GrainInstance, incoming); - } - catch (Exception exception) - { - _shared.Logger?.LogError(exception, "Error invoking MayInterleave predicate on grain {Grain} for message {Message}", this, incoming); - throw; - } + _shared.Logger?.LogError(exception, "Error invoking MayInterleave predicate on grain {Grain} for message {Message}", this, incoming); + throw; } - - return false; } - async Task ProcessOperationsAsync(List operations) + return false; + } + + async Task ProcessOperationsAsync(List operations) + { + foreach (var op in operations) { - foreach (var op in operations) + try { - try + switch (op) { - switch (op) - { - case Command.Rehydrate command: - RehydrateInternal(command.Context); - break; - case Command.Activate command: - await ActivateAsync(command.RequestContext, command.CancellationToken); - break; - case Command.Deactivate command: - await FinishDeactivating(command.CancellationToken); - break; - case Command.Delay command: - await Task.Delay(command.Duration); - break; - case Command.UnregisterFromCatalog: - UnregisterMessageTarget(); - break; - default: - throw new NotSupportedException($"Encountered unknown operation of type {op?.GetType().ToString() ?? "null"} {op}"); - } - } - catch (Exception exception) - { - _shared.Logger.LogError(exception, "Error in RunOnInactive for grain activation {Activation}", this); + case Command.Rehydrate command: + RehydrateInternal(command.Context); + break; + case Command.Activate command: + await ActivateAsync(command.RequestContext, command.CancellationToken); + break; + case Command.Deactivate command: + await FinishDeactivating(command.CancellationToken); + break; + case Command.Delay command: + await Task.Delay(command.Duration); + break; + case Command.UnregisterFromCatalog: + UnregisterMessageTarget(); + break; + default: + throw new NotSupportedException($"Encountered unknown operation of type {op?.GetType().ToString() ?? "null"} {op}"); } } + catch (Exception exception) + { + _shared.Logger.LogError(exception, "Error in RunOnInactive for grain activation {Activation}", this); + } } } + } - private void RehydrateInternal(IRehydrationContext context) + private void RehydrateInternal(IRehydrationContext context) + { + try { - try + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - if (_shared.Logger.IsEnabled(LogLevel.Debug)) + _shared.Logger.LogDebug("Rehydrating grain from previous activation"); + } + + lock (this) + { + if (State != ActivationState.Create) { - _shared.Logger.LogDebug("Rehydrating grain from previous activation"); + throw new InvalidOperationException($"Attempted to rehydrate a grain in the {State} state"); } - lock (this) + if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null) { - if (State != ActivationState.Create) + // Propagate the previous registration, so that the new activation can atomically replace it with its new address. + PreviousRegistration = previousRegistration; + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - throw new InvalidOperationException($"Attempted to rehydrate a grain in the {State} state"); + _shared.Logger.LogDebug("Previous activation address was {PreviousRegistration}", previousRegistration); } + } - if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null) + if (_lifecycle is { } lifecycle) + { + foreach (var participant in lifecycle.GetMigrationParticipants()) { - // Propagate the previous registration, so that the new activation can atomically replace it with its new address. - PreviousRegistration = previousRegistration; - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug("Previous activation address was {PreviousRegistration}", previousRegistration); - } + participant.OnRehydrate(context); } - - if (_lifecycle is { } lifecycle) - { - foreach (var participant in lifecycle.GetMigrationParticipants()) - { - participant.OnRehydrate(context); - } - } - - (GrainInstance as IGrainMigrationParticipant)?.OnRehydrate(context); } - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug("Rehydrated grain from previous activation"); - } + (GrainInstance as IGrainMigrationParticipant)?.OnRehydrate(context); } - catch (Exception exception) - { - _shared.Logger.LogError(exception, "Error while rehydrating activation"); - } - finally + + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - (context as IDisposable)?.Dispose(); + _shared.Logger.LogDebug("Rehydrated grain from previous activation"); } } + catch (Exception exception) + { + _shared.Logger.LogError(exception, "Error while rehydrating activation"); + } + finally + { + (context as IDisposable)?.Dispose(); + } + } - private void OnDehydrate(IDehydrationContext context) + private void OnDehydrate(IDehydrationContext context) + { + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug("Dehydrating grain activation"); - } + _shared.Logger.LogDebug("Dehydrating grain activation"); + } - lock (this) - { - Debug.Assert(context is not null); + lock (this) + { + Debug.Assert(context is not null); - // Note that these calls are in reverse order from Rehydrate, not for any particular reason other than symmetry. - (GrainInstance as IGrainMigrationParticipant)?.OnDehydrate(context); + // Note that these calls are in reverse order from Rehydrate, not for any particular reason other than symmetry. + (GrainInstance as IGrainMigrationParticipant)?.OnDehydrate(context); - if (_lifecycle is { } lifecycle) - { - foreach (var participant in lifecycle.GetMigrationParticipants()) - { - participant.OnDehydrate(context); - } - } - - if (IsUsingGrainDirectory) + if (_lifecycle is { } lifecycle) + { + foreach (var participant in lifecycle.GetMigrationParticipants()) { - context.TryAddValue(GrainAddressMigrationContextKey, Address); + participant.OnDehydrate(context); } } - if (_shared.Logger.IsEnabled(LogLevel.Debug)) + if (IsUsingGrainDirectory) { - _shared.Logger.LogDebug("Dehydrated grain activation"); + context.TryAddValue(GrainAddressMigrationContextKey, Address); } } - /// - /// Handle an incoming message and queue/invoke appropriate handler - /// - /// - private void InvokeIncomingRequest(Message message) + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - MessagingProcessingInstruments.OnDispatcherMessageProcessedOk(message); - _shared.InternalRuntime.MessagingTrace.OnScheduleMessage(message); + _shared.Logger.LogDebug("Dehydrated grain activation"); + } + } - try - { - var task = _shared.InternalRuntime.RuntimeClient.Invoke(this, message); + /// + /// Handle an incoming message and queue/invoke appropriate handler + /// + /// + private void InvokeIncomingRequest(Message message) + { + MessagingProcessingInstruments.OnDispatcherMessageProcessedOk(message); + _shared.InternalRuntime.MessagingTrace.OnScheduleMessage(message); - // Note: This runs for all outcomes - both Success or Fault - if (task.IsCompleted) - { - OnCompletedRequest(message); - } - else - { - _ = OnCompleteAsync(this, message, task); - } - } - catch + try + { + var task = _shared.InternalRuntime.RuntimeClient.Invoke(this, message); + + // Note: This runs for all outcomes - both Success or Fault + if (task.IsCompleted) { OnCompletedRequest(message); } - - static async ValueTask OnCompleteAsync(ActivationData activation, Message message, Task task) + else { - try - { - await task; - } - catch - { - } - finally - { - activation.OnCompletedRequest(message); - } + _ = OnCompleteAsync(this, message, task); } } + catch + { + OnCompletedRequest(message); + } - /// - /// Invoked when an activation has finished a transaction and may be ready for additional transactions - /// - /// The message that has just completed processing. - /// This will be null for the case of completion of Activate/Deactivate calls. - private void OnCompletedRequest(Message message) + static async ValueTask OnCompleteAsync(ActivationData activation, Message message, Task task) { - lock (this) + try { - _runningRequests.Remove(message); - - if (_runningRequests.Count == 0) - { - _idleDuration = CoarseStopwatch.StartNew(); - } - - if (!_isInWorkingSet) - { - _isInWorkingSet = true; - _shared.InternalRuntime.ActivationWorkingSet.OnActive(this); - } - - // The below logic only works for non-reentrant activations - if (_blockingRequest is null || message.Equals(_blockingRequest)) - { - _blockingRequest = null; - _busyDuration = default; - } + await task; + } + catch + { + } + finally + { + activation.OnCompletedRequest(message); } - - // Signal the message pump to see if there is another request which can be processed now that this one has completed - _workSignal.Signal(); } + } - public void ReceiveMessage(object message) => ReceiveMessage((Message)message); - public void ReceiveMessage(Message message) + /// + /// Invoked when an activation has finished a transaction and may be ready for additional transactions + /// + /// The message that has just completed processing. + /// This will be null for the case of completion of Activate/Deactivate calls. + private void OnCompletedRequest(Message message) + { + lock (this) { - _shared.InternalRuntime.MessagingTrace.OnDispatcherReceiveMessage(message); + _runningRequests.Remove(message); - // Don't process messages that have already timed out - if (message.IsExpired) + if (_runningRequests.Count == 0) { - MessagingProcessingInstruments.OnDispatcherMessageProcessedError(message); - _shared.InternalRuntime.MessagingTrace.OnDropExpiredMessage(message, MessagingInstruments.Phase.Dispatch); - return; + _idleDuration = CoarseStopwatch.StartNew(); } - if (message.Direction == Message.Directions.Response) + if (!_isInWorkingSet) { - ReceiveResponse(message); + _isInWorkingSet = true; + _shared.InternalRuntime.ActivationWorkingSet.OnActive(this); } - else // Request or OneWay + + // The below logic only works for non-reentrant activations + if (_blockingRequest is null || message.Equals(_blockingRequest)) { - ReceiveRequest(message); + _blockingRequest = null; + _busyDuration = default; } } - private void ReceiveResponse(Message message) - { - lock (this) - { - if (State == ActivationState.Invalid || State == ActivationState.FailedToActivate) - { - _shared.InternalRuntime.MessagingTrace.OnDispatcherReceiveInvalidActivation(message, State); + // Signal the message pump to see if there is another request which can be processed now that this one has completed + _workSignal.Signal(); + } - // Always process responses - _shared.InternalRuntime.RuntimeClient.ReceiveResponse(message); - return; - } + public void ReceiveMessage(object message) => ReceiveMessage((Message)message); + public void ReceiveMessage(Message message) + { + _shared.InternalRuntime.MessagingTrace.OnDispatcherReceiveMessage(message); - MessagingProcessingInstruments.OnDispatcherMessageProcessedOk(message); - _shared.InternalRuntime.RuntimeClient.ReceiveResponse(message); - } + // Don't process messages that have already timed out + if (message.IsExpired) + { + MessagingProcessingInstruments.OnDispatcherMessageProcessedError(message); + _shared.InternalRuntime.MessagingTrace.OnDropExpiredMessage(message, MessagingInstruments.Phase.Dispatch); + return; } - private void ReceiveRequest(Message message) + if (message.Direction == Message.Directions.Response) { - var overloadException = CheckOverloaded(); - if (overloadException != null) + ReceiveResponse(message); + } + else // Request or OneWay + { + ReceiveRequest(message); + } + } + + private void ReceiveResponse(Message message) + { + lock (this) + { + if (State == ActivationState.Invalid || State == ActivationState.FailedToActivate) { - MessagingProcessingInstruments.OnDispatcherMessageProcessedError(message); - _shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Overloaded, overloadException, "Target activation is overloaded " + this); + _shared.InternalRuntime.MessagingTrace.OnDispatcherReceiveInvalidActivation(message, State); + + // Always process responses + _shared.InternalRuntime.RuntimeClient.ReceiveResponse(message); return; } - lock (this) - { - _waitingRequests.Add((message, CoarseStopwatch.StartNew())); - } + MessagingProcessingInstruments.OnDispatcherMessageProcessedOk(message); + _shared.InternalRuntime.RuntimeClient.ReceiveResponse(message); + } + } - _workSignal.Signal(); + private void ReceiveRequest(Message message) + { + var overloadException = CheckOverloaded(); + if (overloadException != null) + { + MessagingProcessingInstruments.OnDispatcherMessageProcessedError(message); + _shared.InternalRuntime.MessageCenter.RejectMessage(message, Message.RejectionTypes.Overloaded, overloadException, "Target activation is overloaded " + this); + return; } - /// - /// Rejects all messages enqueued for the provided activation. - /// - private void RejectAllQueuedMessages() + lock (this) { - lock (this) - { - List msgs = DequeueAllWaitingRequests(); - if (msgs == null || msgs.Count <= 0) return; + _waitingRequests.Add((message, CoarseStopwatch.StartNew())); + } - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - _shared.Logger.LogDebug( - (int)ErrorCode.Catalog_RerouteAllQueuedMessages, - "RejectAllQueuedMessages: {Count} messages from invalid activation {Activation}.", - msgs.Count, - this); - _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); - _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation( - msgs, - Address, - forwardingAddress: ForwardingAddress, - failedOperation: DeactivationReason.Description, - exc: DeactivationException, - rejectMessages: true); - } + _workSignal.Signal(); + } + + /// + /// Rejects all messages enqueued for the provided activation. + /// + private void RejectAllQueuedMessages() + { + lock (this) + { + List msgs = DequeueAllWaitingRequests(); + if (msgs == null || msgs.Count <= 0) return; + + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + _shared.Logger.LogDebug( + (int)ErrorCode.Catalog_RerouteAllQueuedMessages, + "RejectAllQueuedMessages: {Count} messages from invalid activation {Activation}.", + msgs.Count, + this); + _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); + _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation( + msgs, + Address, + forwardingAddress: ForwardingAddress, + failedOperation: DeactivationReason.Description, + exc: DeactivationException, + rejectMessages: true); } + } - private void RerouteAllQueuedMessages() + private void RerouteAllQueuedMessages() + { + lock (this) { - lock (this) + List msgs = DequeueAllWaitingRequests(); + if (msgs is not { Count: > 0 }) { - List msgs = DequeueAllWaitingRequests(); - if (msgs is not { Count: > 0 }) + return; + } + + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + { + if (ForwardingAddress is { } address) { - return; + _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain} to {ForwardingAddress}.", msgs.Count, this, address); + } + else + { + _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain}.", msgs.Count, this); } - - if (_shared.Logger.IsEnabled(LogLevel.Debug)) _shared.Logger.LogDebug((int)ErrorCode.Catalog_RerouteAllQueuedMessages, "Rerouting {NumMessages} messages from invalid grain activation {Grain}", msgs.Count, this); - _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); - _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(msgs, Address, ForwardingAddress, DeactivationReason.Description, DeactivationException); } + + _shared.InternalRuntime.GrainLocator.InvalidateCache(Address); + _shared.InternalRuntime.MessageCenter.ProcessRequestsToInvalidActivation(msgs, Address, ForwardingAddress, DeactivationReason.Description, DeactivationException); } + } - #region Activation + #region Activation + + public void Rehydrate(IRehydrationContext context) + { + ScheduleOperation(new Command.Rehydrate(context)); + } - public void Rehydrate(IRehydrationContext context) + public void Activate(Dictionary requestContext, CancellationToken? cancellationToken) + { + if (!cancellationToken.HasValue) { - ScheduleOperation(new Command.Rehydrate(context)); + cancellationToken = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.ActivationTimeout).Token; } - public void Activate(Dictionary requestContext, CancellationToken? cancellationToken) + ScheduleOperation(new Command.Activate(requestContext, cancellationToken.Value)); + } + + private async Task ActivateAsync(Dictionary requestContextData, CancellationToken cancellationToken) + { + // A chain of promises that will have to complete in order to complete the activation + // Register with the grain directory, register with the store if necessary and call the Activate method on the new activation. + try { - if (!cancellationToken.HasValue) + var success = await RegisterActivationInGrainDirectoryAndValidate(); + if (!success) { - cancellationToken = new CancellationTokenSource(_shared.InternalRuntime.CollectionOptions.Value.ActivationTimeout).Token; + // If registration failed, bail out. + return; } - ScheduleOperation(new Command.Activate(requestContext, cancellationToken.Value)); + lock (this) + { + SetState(ActivationState.Activating); + } + + success = await CallActivateAsync(requestContextData, cancellationToken); + if (!success) + { + // If activation failed, bail out. + return; + } + + _shared.InternalRuntime.ActivationWorkingSet.OnActivated(this); + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + { + _shared.Logger.LogDebug("InitActivation is done: {Address}", Address); + } + } + catch (Exception exception) + { + _shared.Logger.LogError(exception, "Activation of grain {Grain} failed", this); + } + finally + { + _workSignal.Signal(); } - private async Task ActivateAsync(Dictionary requestContextData, CancellationToken cancellationToken) + async Task CallActivateAsync(Dictionary requestContextData, CancellationToken cancellationToken) { - // A chain of promises that will have to complete in order to complete the activation - // Register with the grain directory, register with the store if necessary and call the Activate method on the new activation. + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + { + _shared.Logger.LogDebug((int)ErrorCode.Catalog_BeforeCallingActivate, "Activating grain {Grain}", this); + } + + // Start grain lifecycle within try-catch wrapper to safely capture any exceptions thrown from called function try { - var success = await RegisterActivationInGrainDirectoryAndValidate(); - if (!success) + RequestContextExtensions.Import(requestContextData); + await Lifecycle.OnStart(cancellationToken).WithCancellation("Timed out waiting for grain lifecycle to complete activation", cancellationToken); + if (GrainInstance is IGrainBase grainBase) { - // If registration failed, bail out. - return; + await grainBase.OnActivateAsync(cancellationToken).WithCancellation($"Timed out waiting for {nameof(IGrainBase.OnActivateAsync)} to complete", cancellationToken); } lock (this) { - SetState(ActivationState.Activating); - } - - success = await CallActivateAsync(requestContextData, cancellationToken); - if (!success) - { - // If activation failed, bail out. - return; + if (State == ActivationState.Activating) + { + SetState(ActivationState.Valid); // Activate calls on this activation are finished + } } - _shared.InternalRuntime.ActivationWorkingSet.OnActivated(this); if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - _shared.Logger.LogDebug("InitActivation is done: {Address}", Address); + _shared.Logger.LogDebug((int)ErrorCode.Catalog_AfterCallingActivate, "Finished activating grain {Grain}", this); } + + return true; } catch (Exception exception) { - _shared.Logger.LogError(exception, "Activation of grain {Grain} failed", this); - } - finally - { - _workSignal.Signal(); - } + CatalogInstruments.ActivationFailedToActivate.Add(1); - async Task CallActivateAsync(Dictionary requestContextData, CancellationToken cancellationToken) - { - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug((int)ErrorCode.Catalog_BeforeCallingActivate, "Activating grain {Grain}", this); - } + // Capture the exception so that it can be propagated to rejection messages + var sourceException = (exception as OrleansLifecycleCanceledException)?.InnerException ?? exception; + _shared.Logger.LogError((int)ErrorCode.Catalog_ErrorCallingActivate, sourceException, "Error activating grain {Grain}", this); - // Start grain lifecycle within try-catch wrapper to safely capture any exceptions thrown from called function - try + // Unregister the activation from the directory so other silo don't keep sending message to it + lock (this) { - RequestContextExtensions.Import(requestContextData); - await Lifecycle.OnStart(cancellationToken).WithCancellation("Timed out waiting for grain lifecycle to complete activation", cancellationToken); - if (GrainInstance is IGrainBase grainBase) - { - await grainBase.OnActivateAsync(cancellationToken).WithCancellation($"Timed out waiting for {nameof(IGrainBase.OnActivateAsync)} to complete", cancellationToken); - } - - lock (this) - { - if (State == ActivationState.Activating) - { - SetState(ActivationState.Valid); // Activate calls on this activation are finished - } - } - - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug((int)ErrorCode.Catalog_AfterCallingActivate, "Finished activating grain {Grain}", this); - } - - return true; + SetState(ActivationState.FailedToActivate); + DeactivationReason = new(DeactivationReasonCode.ActivationFailed, sourceException, "Failed to activate grain."); } - catch (Exception exception) - { - CatalogInstruments.ActivationFailedToActivate.Add(1); - // Capture the exception so that it can be propagated to rejection messages - var sourceException = (exception as OrleansLifecycleCanceledException)?.InnerException ?? exception; - _shared.Logger.LogError((int)ErrorCode.Catalog_ErrorCallingActivate, sourceException, "Error activating grain {Grain}", this); + GetDeactivationCompletionSource().TrySetResult(true); - // Unregister the activation from the directory so other silo don't keep sending message to it - lock (this) - { - SetState(ActivationState.FailedToActivate); - DeactivationReason = new(DeactivationReasonCode.ActivationFailed, sourceException, "Failed to activate grain."); - } - - GetDeactivationCompletionSource().TrySetResult(true); - - if (IsUsingGrainDirectory && ForwardingAddress is null) + if (IsUsingGrainDirectory && ForwardingAddress is null) + { + try { - try - { - await _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force); - } - catch (Exception ex) - { - _shared.Logger.LogWarning( - (int)ErrorCode.Catalog_UnregisterAsync, - ex, - "Failed to unregister grain activation {Grain} after activation failed", - this); - } + await _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force); } - - // Unregister this as a message target after some period of time. - // This is delayed so that consistently failing activation, perhaps due to an application bug or network - // issue, does not cause a flood of doomed activations. - // If the cancellation token was canceled, there is no need to wait an additional time, since the activation - // has already waited some significant amount of time. - if (!cancellationToken.IsCancellationRequested) + catch (Exception ex) { - ScheduleOperation(new Command.Delay(TimeSpan.FromSeconds(5))); + _shared.Logger.LogWarning( + (int)ErrorCode.Catalog_UnregisterAsync, + ex, + "Failed to unregister grain activation {Grain} after activation failed", + this); } + } - ScheduleOperation(new Command.UnregisterFromCatalog()); + // Unregister this as a message target after some period of time. + // This is delayed so that consistently failing activation, perhaps due to an application bug or network + // issue, does not cause a flood of doomed activations. + // If the cancellation token was canceled, there is no need to wait an additional time, since the activation + // has already waited some significant amount of time. + if (!cancellationToken.IsCancellationRequested) + { + ScheduleOperation(new Command.Delay(TimeSpan.FromSeconds(5))); + } - lock (this) - { - SetState(ActivationState.Invalid); - } + ScheduleOperation(new Command.UnregisterFromCatalog()); - return false; + lock (this) + { + SetState(ActivationState.Invalid); } + + return false; } } + } - private async ValueTask RegisterActivationInGrainDirectoryAndValidate() - { - bool success; + private async ValueTask RegisterActivationInGrainDirectoryAndValidate() + { + bool success; - // Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker. - // Among those that are registered in the directory, we currently do not have any multi activations. - if (!IsUsingGrainDirectory) - { - // Grains which do not use the grain directory do not need to do anything here - success = true; - } - else + // Currently, the only grain type that is not registered in the Grain Directory is StatelessWorker. + // Among those that are registered in the directory, we currently do not have any multi activations. + if (!IsUsingGrainDirectory) + { + // Grains which do not use the grain directory do not need to do anything here + success = true; + } + else + { + Exception registrationException; + var previousRegistration = PreviousRegistration; + try { - Exception registrationException; - var previousRegistration = PreviousRegistration; - try + while (true) { - while (true) + var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration); + if (Address.Matches(result)) { - var result = await _shared.InternalRuntime.GrainLocator.Register(Address, previousRegistration); - if (Address.Matches(result)) + success = true; + } + else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress)) + { + if (_shared.Logger.IsEnabled(LogLevel.Debug)) { - success = true; + _shared.Logger.LogDebug( + "The grain directory has an existing entry pointing to a different activation of this grain on this silo, {PreviousRegistration}." + + " This may indicate that the previous activation was deactivated but the directory was not successfully updated." + + " The directory will be updated to point to this activation.", + previousRegistration); } - else if (result?.SiloAddress is { } registeredSilo && registeredSilo.Equals(Address.SiloAddress)) - { - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - _shared.Logger.LogDebug( - "The grain directory has an existing entry pointing to a different activation of this grain on this silo, {PreviousRegistration}." - + " This may indicate that the previous activation was deactivated but the directory was not successfully updated." - + " The directory will be updated to point to this activation.", - previousRegistration); - } - // Attempt to register this activation again, using the registration of the previous instance of this grain, - // which is registered to this silo. That activation must be a defunct predecessor of this activation, - // since the catalog only allows one activation of a given grain at a time. - // This could occur if the previous activation failed to unregister itself from the grain directory. - previousRegistration = result; - continue; - } - else + // Attempt to register this activation again, using the registration of the previous instance of this grain, + // which is registered to this silo. That activation must be a defunct predecessor of this activation, + // since the catalog only allows one activation of a given grain at a time. + // This could occur if the previous activation failed to unregister itself from the grain directory. + previousRegistration = result; + continue; + } + else + { + // Set the forwarding address so that messages enqueued on this activation can be forwarded to + // the existing activation. + ForwardingAddress = result?.SiloAddress; + if (ForwardingAddress is { } address) { - // Set the forwarding address so that messages enqueued on this activation can be forwarded to - // the existing activation. - ForwardingAddress = result?.SiloAddress; - if (ForwardingAddress is { } address) - { - DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address})."); - } - - success = false; - CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1); - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - { - // If this was a duplicate, it's not an error, just a race. - // Forward on all of the pending messages, and then forget about this activation. - var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId); - _shared.Logger.LogDebug( - (int)ErrorCode.Catalog_DuplicateActivation, - "Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. " - + "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}" - + "Full activation address is {Address}. We have {WaitingCount} messages to forward.", - Address, - ForwardingAddress, - GrainInstance?.GetType(), - primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty, - Address.ToFullString(), - WaitingCount); - } + DeactivationReason = new(DeactivationReasonCode.DuplicateActivation, $"This grain is active on another host ({address})."); } - break; + success = false; + CatalogInstruments.ActivationConcurrentRegistrationAttempts.Add(1); + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + { + // If this was a duplicate, it's not an error, just a race. + // Forward on all of the pending messages, and then forget about this activation. + var primary = _shared.InternalRuntime.LocalGrainDirectory.GetPrimaryForGrain(GrainId); + _shared.Logger.LogDebug( + (int)ErrorCode.Catalog_DuplicateActivation, + "Tried to create a duplicate activation {Address}, but we'll use {ForwardingAddress} instead. " + + "GrainInstance type is {GrainInstanceType}. {PrimaryMessage}" + + "Full activation address is {Address}. We have {WaitingCount} messages to forward.", + Address, + ForwardingAddress, + GrainInstance?.GetType(), + primary != null ? "Primary Directory partition for this grain is " + primary + ". " : string.Empty, + Address.ToFullString(), + WaitingCount); + } } - registrationException = null; - } - catch (Exception exception) - { - registrationException = exception; - _shared.Logger.LogWarning((int)ErrorCode.Runtime_Error_100064, registrationException, "Failed to register grain {Grain} in grain directory", ToString()); - success = false; + break; } - if (!success) - { - if (DeactivationReason.ReasonCode == DeactivationReasonCode.None) - { - DeactivationReason = new(DeactivationReasonCode.InternalFailure, registrationException, "Failed to register activation in grain directory."); - } - - lock (this) - { - SetState(ActivationState.Invalid); - } - - UnregisterMessageTarget(); - } + registrationException = null; } - - return success; - } - #endregion - - #region Deactivation - - /// - /// Starts the deactivation process. - /// - public bool StartDeactivating(DeactivationReason reason) - { - lock (this) + catch (Exception exception) { - if (State is ActivationState.Deactivating or ActivationState.Invalid or ActivationState.FailedToActivate) - { - return false; - } - - if (State is ActivationState.Activating or ActivationState.Create) - { - throw new InvalidOperationException("Calling DeactivateOnIdle from within OnActivateAsync is not supported"); - } - - // If State is Valid, then begin deactivation. + registrationException = exception; + _shared.Logger.LogWarning((int)ErrorCode.Runtime_Error_100064, registrationException, "Failed to register grain {Grain} in grain directory", ToString()); + success = false; + } + if (!success) + { if (DeactivationReason.ReasonCode == DeactivationReasonCode.None) { - DeactivationReason = reason; + DeactivationReason = new(DeactivationReasonCode.InternalFailure, registrationException, "Failed to register activation in grain directory."); } - DeactivationStartTime = DateTime.UtcNow; - SetState(ActivationState.Deactivating); - if (!IsCurrentlyExecuting) + lock (this) { - StopAllTimers(); + SetState(ActivationState.Invalid); } - _shared.InternalRuntime.ActivationWorkingSet.OnDeactivating(this); + UnregisterMessageTarget(); } - - return true; } - /// - /// Completes the deactivation process. - /// - /// A cancellation which terminates graceful deactivation when cancelled. - private async Task FinishDeactivating(CancellationToken cancellationToken) + return success; + } + #endregion + + #region Deactivation + + /// + /// Starts the deactivation process. + /// + public bool StartDeactivating(DeactivationReason reason) + { + lock (this) { - var migrated = false; - try + if (State is ActivationState.Deactivating or ActivationState.Invalid or ActivationState.FailedToActivate) { - if (_shared.Logger.IsEnabled(LogLevel.Trace)) - { - _shared.Logger.LogTrace("FinishDeactivating activation {Activation}", this.ToDetailedString()); - } + return false; + } + + if (State is ActivationState.Activating or ActivationState.Create) + { + throw new InvalidOperationException("Calling DeactivateOnIdle from within OnActivateAsync is not supported"); + } + + // If State is Valid, then begin deactivation. + + if (DeactivationReason.ReasonCode == DeactivationReasonCode.None) + { + DeactivationReason = reason; + } + DeactivationStartTime = DateTime.UtcNow; + SetState(ActivationState.Deactivating); + if (!IsCurrentlyExecuting) + { StopAllTimers(); + } - // Wait timers and call OnDeactivateAsync(reason, cancellationToken) - await WaitForAllTimersToFinish(cancellationToken); - await CallGrainDeactivate(cancellationToken); + _shared.InternalRuntime.ActivationWorkingSet.OnDeactivating(this); + } - if (DehydrationContext is { } context && _shared.MigrationManager is { } migrationManager) - { - Debug.Assert(ForwardingAddress is not null); + return true; + } - try - { - // Populate the dehydration context. - if (context.RequestContext is { } requestContext) - { - RequestContextExtensions.Import(requestContext); - } - else - { - RequestContext.Clear(); - } + /// + /// Completes the deactivation process. + /// + /// A cancellation which terminates graceful deactivation when cancelled. + private async Task FinishDeactivating(CancellationToken cancellationToken) + { + var migrated = false; + try + { + if (_shared.Logger.IsEnabled(LogLevel.Trace)) + { + _shared.Logger.LogTrace("FinishDeactivating activation {Activation}", this.ToDetailedString()); + } - OnDehydrate(context.Value); + StopAllTimers(); - // Send the dehydration context to the target host. - await migrationManager.MigrateAsync(ForwardingAddress, GrainId, context.Value); - migrated = true; - } - catch (Exception exception) + // Wait timers and call OnDeactivateAsync(reason, cancellationToken) + await WaitForAllTimersToFinish(cancellationToken); + await CallGrainDeactivate(cancellationToken); + + if (DehydrationContext is { } context + && ForwardingAddress is { } forwardingAddress + && _shared.MigrationManager is { } migrationManager) + { + try + { + // Populate the dehydration context. + if (context.RequestContext is { } requestContext) { - _shared.Logger.LogWarning(exception, "Failed to migrate grain {GrainId} to {SiloAddress}", GrainId, ForwardingAddress); + RequestContextExtensions.Import(requestContext); } - finally + else { RequestContext.Clear(); } - } - if (!migrated) + OnDehydrate(context.MigrationContext); + + // Send the dehydration context to the target host. + await migrationManager.MigrateAsync(forwardingAddress, GrainId, context.MigrationContext); + _shared.InternalRuntime.GrainLocator.UpdateCache(GrainId, forwardingAddress); + migrated = true; + } + catch (Exception exception) { - // Unregister from directory - await _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force); + _shared.Logger.LogWarning(exception, "Failed to migrate grain {GrainId} to {SiloAddress}", GrainId, forwardingAddress); } - - if (_shared.Logger.IsEnabled(LogLevel.Trace)) + finally { - _shared.Logger.LogTrace("Completed async portion of FinishDeactivating for activation {Activation}", this.ToDetailedString()); + RequestContext.Clear(); } } - catch (Exception ex) - { - _shared.Logger.LogWarning((int)ErrorCode.Catalog_DeactivateActivation_Exception, ex, "Exception when trying to deactivate {Activation}", this); - } - lock (this) + if (!migrated) { - SetState(ActivationState.Invalid); + // Unregister from directory + await _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force); } - if (IsStuckDeactivating) - { - CatalogInstruments.ActiviationShutdownViaDeactivateStuckActivation(); - } - else if (migrated) - { - CatalogInstruments.ActiviationShutdownViaMigration(); - } - else if (_isInWorkingSet) - { - CatalogInstruments.ActiviationShutdownViaDeactivateOnIdle(); - } - else + if (_shared.Logger.IsEnabled(LogLevel.Trace)) { - CatalogInstruments.ActiviationShutdownViaCollection(); + _shared.Logger.LogTrace("Completed async portion of FinishDeactivating for activation {Activation}", this.ToDetailedString()); } + } + catch (Exception ex) + { + _shared.Logger.LogWarning((int)ErrorCode.Catalog_DeactivateActivation_Exception, ex, "Exception when trying to deactivate {Activation}", this); + } - _shared.InternalRuntime.ActivationWorkingSet.OnDeactivated(this); + lock (this) + { + SetState(ActivationState.Invalid); + } - try - { - UnregisterMessageTarget(); - await DisposeAsync(); - } - catch (Exception exception) - { - _shared.Logger.LogWarning(exception, "Exception disposing activation {Activation}", (ActivationData)this); - } + if (IsStuckDeactivating) + { + CatalogInstruments.ActiviationShutdownViaDeactivateStuckActivation(); + } + else if (migrated) + { + CatalogInstruments.ActiviationShutdownViaMigration(); + } + else if (_isInWorkingSet) + { + CatalogInstruments.ActiviationShutdownViaDeactivateOnIdle(); + } + else + { + CatalogInstruments.ActiviationShutdownViaCollection(); + } - // Signal deactivation - GetDeactivationCompletionSource().TrySetResult(true); - _workSignal.Signal(); + _shared.InternalRuntime.ActivationWorkingSet.OnDeactivated(this); - if (_shared.Logger.IsEnabled(LogLevel.Trace)) - { - _shared.Logger.LogTrace("Completed final portion of FinishDeactivating for activation {Activation}", this.ToDetailedString()); - } + try + { + await DisposeAsync(); + } + catch (Exception exception) + { + _shared.Logger.LogWarning(exception, "Exception disposing activation {Activation}", (ActivationData)this); + } + + UnregisterMessageTarget(); - async Task CallGrainDeactivate(CancellationToken ct) + // Signal deactivation + GetDeactivationCompletionSource().TrySetResult(true); + _workSignal.Signal(); + + if (_shared.Logger.IsEnabled(LogLevel.Trace)) + { + _shared.Logger.LogTrace("Completed final portion of FinishDeactivating for activation {Activation}", this.ToDetailedString()); + } + + async Task CallGrainDeactivate(CancellationToken ct) + { + try { + // Note: This call is being made from within Scheduler.Queue wrapper, so we are already executing on worker thread + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + _shared.Logger.LogDebug( + (int)ErrorCode.Catalog_BeforeCallingDeactivate, + "About to call {Activation} grain's OnDeactivateAsync(...) method {GrainInstanceType}", + this, + GrainInstance?.GetType().FullName); + + // Call OnDeactivateAsync inline, but within try-catch wrapper to safely capture any exceptions thrown from called function try { - // Note: This call is being made from within Scheduler.Queue wrapper, so we are already executing on worker thread - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - _shared.Logger.LogDebug( - (int)ErrorCode.Catalog_BeforeCallingDeactivate, - "About to call {Activation} grain's OnDeactivateAsync(...) method {GrainInstanceType}", - this, - GrainInstance?.GetType().FullName); - - // Call OnDeactivateAsync inline, but within try-catch wrapper to safely capture any exceptions thrown from called function - try + // just check in case this activation data is already Invalid or not here at all. + if (State == ActivationState.Deactivating) { - // just check in case this activation data is already Invalid or not here at all. - if (State == ActivationState.Deactivating) + RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake. + if (GrainInstance is IGrainBase grainBase) { - RequestContext.Clear(); // Clear any previous RC, so it does not leak into this call by mistake. - if (GrainInstance is IGrainBase grainBase) - { - await grainBase.OnDeactivateAsync(DeactivationReason, ct).WithCancellation($"Timed out waiting for {nameof(IGrainBase.OnDeactivateAsync)} to complete", ct); - } - - await Lifecycle.OnStop(ct).WithCancellation("Timed out waiting for grain lifecycle to complete deactivation", ct); + await grainBase.OnDeactivateAsync(DeactivationReason, ct).WithCancellation($"Timed out waiting for {nameof(IGrainBase.OnDeactivateAsync)} to complete", ct); } - if (_shared.Logger.IsEnabled(LogLevel.Debug)) - _shared.Logger.LogDebug( - (int)ErrorCode.Catalog_AfterCallingDeactivate, - "Returned from calling {Activation} grain's OnDeactivateAsync(...) method {GrainInstanceType}", - this, - GrainInstance?.GetType().FullName); - } - catch (Exception exc) - { - _shared.Logger.LogError( - (int)ErrorCode.Catalog_ErrorCallingDeactivate, - exc, - "Error calling grain's OnDeactivateAsync(...) method - Grain type = {GrainType} Activation = {Activation}", - GrainInstance?.GetType().FullName, - this); + await Lifecycle.OnStop(ct).WithCancellation("Timed out waiting for grain lifecycle to complete deactivation", ct); } + + if (_shared.Logger.IsEnabled(LogLevel.Debug)) + _shared.Logger.LogDebug( + (int)ErrorCode.Catalog_AfterCallingDeactivate, + "Returned from calling {Activation} grain's OnDeactivateAsync(...) method {GrainInstanceType}", + this, + GrainInstance?.GetType().FullName); } catch (Exception exc) { _shared.Logger.LogError( - (int)ErrorCode.Catalog_FinishGrainDeactivateAndCleanupStreams_Exception, + (int)ErrorCode.Catalog_ErrorCallingDeactivate, exc, - "CallGrainDeactivateAndCleanupStreams Activation = {Activation} failed.", + "Error calling grain's OnDeactivateAsync(...) method - Grain type = {GrainType} Activation = {Activation}", + GrainInstance?.GetType().FullName, this); } } - } - - private TaskCompletionSource GetDeactivationCompletionSource() - { - lock (this) + catch (Exception exc) { - _extras ??= new(); - return _extras.DeactivationTask ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _shared.Logger.LogError( + (int)ErrorCode.Catalog_FinishGrainDeactivateAndCleanupStreams_Exception, + exc, + "CallGrainDeactivateAndCleanupStreams Activation = {Activation} failed.", + this); } } + } - ValueTask IGrainManagementExtension.DeactivateOnIdle() + private TaskCompletionSource GetDeactivationCompletionSource() + { + lock (this) { - Deactivate(new(DeactivationReasonCode.ApplicationRequested, $"{nameof(IGrainManagementExtension.DeactivateOnIdle)} was called.")); - return default; + _extras ??= new(); + return _extras.DeactivationTask ??= new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); } + } + + ValueTask IGrainManagementExtension.DeactivateOnIdle() + { + Deactivate(new(DeactivationReasonCode.ApplicationRequested, $"{nameof(IGrainManagementExtension.DeactivateOnIdle)} was called.")); + return default; + } - ValueTask IGrainManagementExtension.MigrateOnIdle() + ValueTask IGrainManagementExtension.MigrateOnIdle() + { + Migrate(RequestContext.CallContextData?.Value.Values); + return default; + } + + private void UnregisterMessageTarget() + { + _shared.InternalRuntime.Catalog.UnregisterMessageTarget(this); + } + + void ICallChainReentrantGrainContext.OnEnterReentrantSection(Guid reentrancyId) + { + var tracker = GetComponent(); + if (tracker is null) { - Migrate(RequestContext.CallContextData?.Value.Values); - return default; + tracker = new ReentrantRequestTracker(); + SetComponent(tracker); } - private void UnregisterMessageTarget() + tracker.EnterReentrantSection(reentrancyId); + } + + void ICallChainReentrantGrainContext.OnExitReentrantSection(Guid reentrancyId) + { + var tracker = GetComponent(); + if (tracker is null) { - _shared.InternalRuntime.Catalog.UnregisterMessageTarget(this); - if (GrainInstance is not null) - { - SetGrainInstance(null); - } + throw new InvalidOperationException("Attempted to exit reentrant section without entering it."); } - void ICallChainReentrantGrainContext.OnEnterReentrantSection(Guid reentrancyId) - { - var tracker = GetComponent(); - if (tracker is null) - { - tracker = new ReentrantRequestTracker(); - SetComponent(tracker); - } + tracker.LeaveReentrantSection(reentrancyId); + } - tracker.EnterReentrantSection(reentrancyId); + private bool IsReentrantSection(Guid reentrancyId) + { + if (reentrancyId == Guid.Empty) + { + return false; } - void ICallChainReentrantGrainContext.OnExitReentrantSection(Guid reentrancyId) + var tracker = GetComponent(); + if (tracker is null) { - var tracker = GetComponent(); - if (tracker is null) - { - throw new InvalidOperationException("Attempted to exit reentrant section without entering it."); - } - - tracker.LeaveReentrantSection(reentrancyId); + return false; } - private bool IsReentrantSection(Guid reentrancyId) - { - if (reentrancyId == Guid.Empty) - { - return false; - } + return tracker.IsReentrantSectionActive(reentrancyId); + } - var tracker = GetComponent(); - if (tracker is null) - { - return false; - } + #endregion - return tracker.IsReentrantSectionActive(reentrancyId); - } + /// + /// Additional properties which are not needed for the majority of an activation's lifecycle. + /// + private class ActivationDataExtra : Dictionary + { + private const int IsStuckProcessingMessageFlag = 1 << 0; + private const int IsStuckDeactivatingFlag = 1 << 1; + private const int IsDisposingFlag = 1 << 2; + private byte _flags; - #endregion + public HashSet Timers { get => GetValueOrDefault>(nameof(Timers)); set => SetOrRemoveValue(nameof(Timers), value); } /// - /// Additional properties which are not needed for the majority of an activation's lifecycle. + /// During rehydration, this may contain the address for the previous (recently dehydrated) activation of this grain. /// - private class ActivationDataExtra : Dictionary - { - private const int IsStuckProcessingMessageFlag = 1 << 0; - private const int IsStuckDeactivatingFlag = 1 << 1; - private const int IsDisposingFlag = 1 << 2; - private byte _flags; - - public HashSet Timers { get => GetValueOrDefault>(nameof(Timers)); set => SetOrRemoveValue(nameof(Timers), value); } - - /// - /// During rehydration, this may contain the address for the previous (recently dehydrated) activation of this grain. - /// - public GrainAddress PreviousRegistration { get => GetValueOrDefault(nameof(PreviousRegistration)); set => SetOrRemoveValue(nameof(PreviousRegistration), value); } + public GrainAddress PreviousRegistration { get => GetValueOrDefault(nameof(PreviousRegistration)); set => SetOrRemoveValue(nameof(PreviousRegistration), value); } - /// - /// If State == Invalid, this may contain a forwarding address for incoming messages - /// - public SiloAddress ForwardingAddress { get => GetValueOrDefault(nameof(ForwardingAddress)); set => SetOrRemoveValue(nameof(ForwardingAddress), value); } + /// + /// If State == Invalid, this may contain a forwarding address for incoming messages + /// + public SiloAddress ForwardingAddress { get => GetValueOrDefault(nameof(ForwardingAddress)); set => SetOrRemoveValue(nameof(ForwardingAddress), value); } - /// - /// A which completes when a grain has deactivated. - /// - public TaskCompletionSource DeactivationTask { get => GetDeactivationInfoOrDefault()?.DeactivationTask; set => EnsureDeactivationInfo().DeactivationTask = value; } + /// + /// A which completes when a grain has deactivated. + /// + public TaskCompletionSource DeactivationTask { get => GetDeactivationInfoOrDefault()?.DeactivationTask; set => EnsureDeactivationInfo().DeactivationTask = value; } - public DateTime? DeactivationStartTime { get => GetDeactivationInfoOrDefault()?.DeactivationStartTime; set => EnsureDeactivationInfo().DeactivationStartTime = value; } + public DateTime? DeactivationStartTime { get => GetDeactivationInfoOrDefault()?.DeactivationStartTime; set => EnsureDeactivationInfo().DeactivationStartTime = value; } - public DeactivationReason DeactivationReason { get => GetDeactivationInfoOrDefault()?.DeactivationReason ?? default; set => EnsureDeactivationInfo().DeactivationReason = value; } + public DeactivationReason DeactivationReason { get => GetDeactivationInfoOrDefault()?.DeactivationReason ?? default; set => EnsureDeactivationInfo().DeactivationReason = value; } - /// - /// When migrating to another location, this contains the information to preserve across activations. - /// - public DehydrationContextHolder DehydrationContext { get => GetValueOrDefault(nameof(DehydrationContext)); set => SetOrRemoveValue(nameof(DehydrationContext), value); } + /// + /// When migrating to another location, this contains the information to preserve across activations. + /// + public DehydrationContextHolder DehydrationContext { get => GetValueOrDefault(nameof(DehydrationContext)); set => SetOrRemoveValue(nameof(DehydrationContext), value); } - private DeactivationInfo GetDeactivationInfoOrDefault() => GetValueOrDefault(nameof(DeactivationInfo)); - private DeactivationInfo EnsureDeactivationInfo() + private DeactivationInfo GetDeactivationInfoOrDefault() => GetValueOrDefault(nameof(DeactivationInfo)); + private DeactivationInfo EnsureDeactivationInfo() + { + if (!TryGetValue(nameof(DeactivationInfo), out var info)) { - if (!TryGetValue(nameof(DeactivationInfo), out var info)) - { - info = base[nameof(DeactivationInfo)] = new DeactivationInfo(); - } - - return (DeactivationInfo)info; + info = base[nameof(DeactivationInfo)] = new DeactivationInfo(); } - public bool IsStuckProcessingMessage { get => GetFlag(IsStuckProcessingMessageFlag); set => SetFlag(IsStuckProcessingMessageFlag, value); } - public bool IsStuckDeactivating { get => GetFlag(IsStuckDeactivatingFlag); set => SetFlag(IsStuckDeactivatingFlag, value); } - public bool IsDisposing { get => GetFlag(IsDisposingFlag); set => SetFlag(IsDisposingFlag, value); } - - private void SetFlag(int flag, bool value) - { - if (value) - { - _flags |= (byte)flag; - } - else - { - _flags &= (byte)~flag; - } - } + return (DeactivationInfo)info; + } - private bool GetFlag(int flag) => (_flags & flag) != 0; - private T GetValueOrDefault(object key) - { - TryGetValue(key, out var result); - return (T)result; - } + public bool IsStuckProcessingMessage { get => GetFlag(IsStuckProcessingMessageFlag); set => SetFlag(IsStuckProcessingMessageFlag, value); } + public bool IsStuckDeactivating { get => GetFlag(IsStuckDeactivatingFlag); set => SetFlag(IsStuckDeactivatingFlag, value); } + public bool IsDisposing { get => GetFlag(IsDisposingFlag); set => SetFlag(IsDisposingFlag, value); } - private void SetOrRemoveValue(object key, object value) + private void SetFlag(int flag, bool value) + { + if (value) { - if (value is null) - { - Remove(key); - } - else - { - base[key] = value; - } + _flags |= (byte)flag; } - - private sealed class DeactivationInfo + else { - public DateTime? DeactivationStartTime; - public DeactivationReason DeactivationReason; - public TaskCompletionSource DeactivationTask; + _flags &= (byte)~flag; } } - private class Command + private bool GetFlag(int flag) => (_flags & flag) != 0; + private T GetValueOrDefault(object key) { - protected Command() { } + TryGetValue(key, out var result); + return (T)result; + } - public class Deactivate : Command + private void SetOrRemoveValue(object key, object value) + { + if (value is null) { - public Deactivate(CancellationToken cancellation) => CancellationToken = cancellation; - public CancellationToken CancellationToken { get; } + Remove(key); } - - public class Activate : Command + else { - public Activate(Dictionary requestContext, CancellationToken cancellationToken) - { - RequestContext = requestContext; - CancellationToken = cancellationToken; - } - - public Dictionary RequestContext { get; } - public CancellationToken CancellationToken { get; } + base[key] = value; } + } - public class Rehydrate : Command - { - public readonly IRehydrationContext Context; + private sealed class DeactivationInfo + { + public DateTime? DeactivationStartTime; + public DeactivationReason DeactivationReason; + public TaskCompletionSource DeactivationTask; + } + } - public Rehydrate(IRehydrationContext context) - { - Context = context; - } - } + private class Command + { + protected Command() { } - public class Delay : Command - { - public Delay(TimeSpan duration) - { - Duration = duration; - } + public class Deactivate(CancellationToken cancellation) : Command + { + public CancellationToken CancellationToken { get; } = cancellation; + } - public TimeSpan Duration { get; } - } + public class Activate(Dictionary requestContext, CancellationToken cancellationToken) : Command + { + public Dictionary RequestContext { get; } = requestContext; + public CancellationToken CancellationToken { get; } = cancellationToken; + } - public class UnregisterFromCatalog : Command - { - } + public class Rehydrate(IRehydrationContext context) : Command + { + public readonly IRehydrationContext Context = context; } - internal class ReentrantRequestTracker : Dictionary + public class Delay(TimeSpan duration) : Command { - public void EnterReentrantSection(Guid reentrancyId) - { - Debug.Assert(reentrancyId != Guid.Empty); - ref var count = ref CollectionsMarshal.GetValueRefOrAddDefault(this, reentrancyId, out _); - ++count; - } + public TimeSpan Duration { get; } = duration; + } - public void LeaveReentrantSection(Guid reentrancyId) - { - Debug.Assert(reentrancyId != Guid.Empty); - ref var count = ref CollectionsMarshal.GetValueRefOrNullRef(this, reentrancyId); - if (Unsafe.IsNullRef(ref count)) - { - return; - } + public class UnregisterFromCatalog : Command + { + } + } - if (--count <= 0) - { - Remove(reentrancyId); - } - } + internal class ReentrantRequestTracker : Dictionary + { + public void EnterReentrantSection(Guid reentrancyId) + { + Debug.Assert(reentrancyId != Guid.Empty); + ref var count = ref CollectionsMarshal.GetValueRefOrAddDefault(this, reentrancyId, out _); + ++count; + } - public bool IsReentrantSectionActive(Guid reentrancyId) + public void LeaveReentrantSection(Guid reentrancyId) + { + Debug.Assert(reentrancyId != Guid.Empty); + ref var count = ref CollectionsMarshal.GetValueRefOrNullRef(this, reentrancyId); + if (Unsafe.IsNullRef(ref count)) { - Debug.Assert(reentrancyId != Guid.Empty); - return TryGetValue(reentrancyId, out var count) && count > 0; + return; } - } - private class DehydrationContextHolder - { - public readonly MigrationContext Value; - public readonly Dictionary RequestContext; - public DehydrationContextHolder(SerializerSessionPool sessionPool, Dictionary requestContext) + if (--count <= 0) { - RequestContext = requestContext; - Value = new MigrationContext(sessionPool); + Remove(reentrancyId); } } - private class MigrateWorkItem(ActivationData activation, Dictionary requestContext, CancellationToken cancellationToken) : WorkItemBase + public bool IsReentrantSectionActive(Guid reentrancyId) { - public override string Name => "Migrate"; + Debug.Assert(reentrancyId != Guid.Empty); + return TryGetValue(reentrancyId, out var count) && count > 0; + } + } + + private class DehydrationContextHolder(SerializerSessionPool sessionPool, Dictionary requestContext) + { + public readonly MigrationContext MigrationContext = new(sessionPool); + public readonly Dictionary RequestContext = requestContext; + } + + private class MigrateWorkItem(ActivationData activation, Dictionary requestContext, CancellationToken cancellationToken) : WorkItemBase + { + public override string Name => "Migrate"; - public override IGrainContext GrainContext => activation; + public override IGrainContext GrainContext => activation; - public override void Execute() => activation.StartMigratingAsync(requestContext, cancellationToken).Ignore(); - } + public override void Execute() => activation.StartMigratingAsync(requestContext, cancellationToken).Ignore(); } } diff --git a/src/Orleans.Runtime/Catalog/ActivationState.cs b/src/Orleans.Runtime/Catalog/ActivationState.cs index 15cbdd6269..10bc3c3a56 100644 --- a/src/Orleans.Runtime/Catalog/ActivationState.cs +++ b/src/Orleans.Runtime/Catalog/ActivationState.cs @@ -7,9 +7,9 @@ internal enum ActivationState /// Create, - //// - //// Activation is in the middle of activation process. - //// + /// + /// Activation is in the middle of activation process. + /// Activating, /// diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index b56c6e283b..be1c5f99c4 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -480,17 +480,5 @@ private void OnSiloStatusChange(SiloAddress updatedSilo, SiloStatus status) } } } - - public ValueTask AcceptMigratingGrains(List migratingGrains) - { - foreach (var package in migratingGrains) - { - // If the activation does not exist, create it and provide it with the migration context while doing so. - // If the activation already exists or cannot be created, it is too late to perform migration, so ignore the request. - GetOrCreateActivation(package.GrainId, requestContextData: null, package.MigrationContext); - } - - return default; - } } }