Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Startup context decorator #2028

Merged
merged 2 commits into from
Jul 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/Proto.Actor/Context/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Proto.Future;
using Proto.Mailbox;
using Proto.Metrics;
using Proto.Utils;

namespace Proto.Context;

Expand All @@ -30,6 +31,12 @@ public class ActorContext : IMessageInvoker, IContext, ISupervisor
private ActorContextExtras? _extras;
private object? _messageOrEnvelope;
private ContextState _state;

private ShouldThrottle shouldThrottleStartLogs = Throttle.Create(1000,TimeSpan.FromSeconds(1), droppedLogs =>
{
Logger.LogInformation("[ActorContext] Throttled {LogCount} logs", droppedLogs);
} );


public ActorContext(ActorSystem system, Props props, PID? parent, PID self, IMailbox mailbox)
{
Expand Down Expand Up @@ -383,7 +390,7 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
{
return msg switch
{
Started s => InvokeUserMessageAsync(s),
Started s => HandleStartedAsync(),
Stop _ => HandleStopAsync(),
Terminated t => HandleTerminatedAsync(t),
Watch w => HandleWatch(w),
Expand All @@ -405,6 +412,32 @@ public ValueTask InvokeSystemMessageAsync(SystemMessage msg)
}
}

private ValueTask HandleStartedAsync()
{
if (_props.StartDeadline != TimeSpan.Zero)
{
return Await();
}

return InvokeUserMessageAsync(Started.Instance);

async ValueTask Await()
{
var sw = Stopwatch.StartNew();
await InvokeUserMessageAsync(Started.Instance);
sw.Stop();
if (sw.Elapsed > _props.StartDeadline)
{
if (shouldThrottleStartLogs().IsOpen())
{
Logger.LogCritical(
"Actor {Self} took too long to start, deadline is {Deadline}, actual start time is {ActualStart}, your system might suffer from incorrect design, please consider reaching out to https://proto.actor/docs/training/ for help",
Self, _props.StartDeadline, sw.Elapsed);
}
}
}
}

public ValueTask InvokeUserMessageAsync(object msg)
{
if (!System.Metrics.Enabled)
Expand Down
76 changes: 76 additions & 0 deletions src/Proto.Actor/Context/StartupDeadlineContextDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// -----------------------------------------------------------------------
// <copyright file="DeadlineContextDecorator.cs" company="Asynkron AB">
// Copyright (C) 2015-2022 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Proto.Utils;

namespace Proto.Context;

[PublicAPI]
public static class StartupDeadlineContextExtensions
{
/// <summary>
/// Adds a decorator for a <see cref="ActorContext" /> that logs warning message if Receive takes more time than
/// specified timeout.
/// </summary>
/// <param name="props"></param>
/// <param name="deadline">The timeout for Receive to complete</param>
/// <param name="logger"></param>
/// <returns></returns>
public static Props WithStartupDeadlineDecorator(
this Props props,
TimeSpan deadline,
ILogger logger
) =>
props.WithContextDecorator(ctx => new StartupDeadlineContextDecorator(ctx, deadline, logger));
}

/// <summary>
/// A decorator for a <see cref="ActorContext" /> that logs warning message if Receive takes more time than specified
/// timeout.
/// </summary>
public class StartupDeadlineContextDecorator : ActorContextDecorator
{
private readonly IContext _context;
private readonly TimeSpan _deadline;
private readonly ILogger _logger;

public StartupDeadlineContextDecorator(IContext context, TimeSpan deadline, ILogger logger) : base(context)
{
_deadline = deadline;
_logger = logger;
_context = context;
}

public override async Task Receive(MessageEnvelope envelope)
{
var (m,_,_) = MessageEnvelope.Unwrap(envelope);
if (m is Started)
{
var t = base.Receive(envelope);

if (t.IsCompleted)
{
return;
}

var ok = await t.WaitUpTo(_deadline).ConfigureAwait(false);

if (!ok)
{
_logger.LogWarning("Actor {Self} deadline {Deadline}, exceeded on actor Started",
_context.Self, _deadline);

// keep waiting, we cannot just ignore and continue as an async task might still be running and updating state of the actor
// if we return here, actor concurrency guarantees could break
await t.ConfigureAwait(false);
}
}
}
}
7 changes: 7 additions & 0 deletions src/Proto.Actor/Props/Props.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public sealed record Props
/// </summary>
public ProducerWithSystemAndContext Producer { get; init; } = NullProducer;

/// <summary>
/// Time to wait for the actor to start before logging warning message.
/// </summary>
public TimeSpan StartDeadline { get; init; } = TimeSpan.FromMilliseconds(100);

/// <summary>
/// Delegate used to create the mailbox.
/// </summary>
Expand Down Expand Up @@ -146,6 +151,8 @@ public static PID SystemSpawner(ActorSystem system, string name, Props props, PI
/// Delegate used to create the actor.
/// </summary>
public Props WithProducer(Producer producer) => this with { Producer = (_, _) => producer() };

public Props PropsWithStartDeadline(TimeSpan deadline) => this with { StartDeadline = deadline };

/// <summary>
/// Delegate used to create the actor.
Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Remote/BlockList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Microsoft.Extensions.Logging;

namespace Proto.Remote;

Expand All @@ -22,7 +21,6 @@ public class BlockList
{
private readonly object _lock = new();
private readonly ActorSystem _system;
private static readonly ILogger Logger = Log.CreateLogger<BlockList>();

private ImmutableDictionary<string, DateTime> _blockedMembers = ImmutableDictionary<string, DateTime>.Empty;

Expand Down
Loading