Skip to content

Commit

Permalink
Move Redis Profiler registration to ConditionalWeakTable (#2148)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Aug 4, 2023
1 parent 61f5448 commit c4b73fe
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 73 deletions.
61 changes: 35 additions & 26 deletions sample/StackExchangeRedisSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm;
using Elastic.Apm.Api;
using Elastic.Apm.StackExchange.Redis;
using StackExchange.Redis;
using Testcontainers.Redis;

namespace StackExchangeRedisSample
// handle control+c gracefully
var ctx = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
public class Program
{
public static async Task Main(string[] args)
{
await using var container = new RedisBuilder().Build();
await container.StartAsync();

var connection = await ConnectionMultiplexer.ConnectAsync(container.GetConnectionString());
connection.UseElasticApm();
eventArgs.Cancel = true;
ctx.Cancel();
};

for (var i = 0; i < 10; i++)
{
// async
await Agent.Tracer.CaptureTransaction("Set and Get String", ApiConstants.TypeDb, async () =>
{
var database = connection.GetDatabase();
await database.StringSetAsync($"string{i}", i);
await database.StringGetAsync($"string{i}");
await using var container = new RedisBuilder().Build();
await container.StartAsync();

// fire and forget commands may not end up in the profiling session before
// transaction end, and the profiling session is finished.
await database.StringSetAsync($"string{i}", i, flags: CommandFlags.FireAndForget);
await database.StringGetAsync($"string{i}", CommandFlags.FireAndForget);
});
}
var connection = await ConnectionMultiplexer.ConnectAsync(container.GetConnectionString());
connection.UseElasticApm();

await container.StopAsync();
}
for (var i = 0; i < 1_000_000; i++)
{
if (ctx.IsCancellationRequested)
{
Console.WriteLine("\n Cancellation Requested.. halting...");
break;
}
Console.Write($"\rExecuting iteration: {i}");
// async
await Agent.Tracer.CaptureTransaction("Set and Get String", ApiConstants.TypeDb, async () =>
{
var database = connection.GetDatabase();
await database.StringSetAsync($"string{i}", i);
await database.StringGetAsync($"string{i}");
// fire and forget commands may not end up in the profiling session before
// transaction end, and the profiling session is finished.
await database.StringSetAsync($"string{i}", i, flags: CommandFlags.FireAndForget);
await database.StringGetAsync($"string{i}", CommandFlags.FireAndForget);
});
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
Console.WriteLine("Stopping Redis Container...");
await container.StopAsync();
Console.WriteLine("Exiting");
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net7.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Globalization;
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using Elastic.Apm.Api;
using Elastic.Apm.Helpers;
using Elastic.Apm.Logging;
Expand All @@ -22,29 +23,10 @@ namespace Elastic.Apm.StackExchange.Redis
/// </summary>
public class ElasticApmProfiler
{
private readonly ConcurrentDictionary<string, ProfilingSession> _executionSegmentSessions = new();
private readonly ConditionalWeakTable<IExecutionSegment, ProfilingSession> _executionSegmentSessions = new();

private readonly Lazy<IApmLogger> _logger;
private readonly Lazy<IApmAgent> _agent;
private static readonly Func<object, object> MessageFetcher;
private static readonly Func<object, object> CommandAndKeyFetcher;
private static readonly Type _profiledCommandType;

static ElasticApmProfiler()
{
var messageType = Type.GetType("StackExchange.Redis.Message,StackExchange.Redis", false);
_profiledCommandType = Type.GetType("StackExchange.Redis.Profiling.ProfiledCommand,StackExchange.Redis", false);
if (messageType != null && _profiledCommandType != null)
{
var commandAndKey = messageType.GetProperty("CommandAndKey", BindingFlags.Public | BindingFlags.Instance);
var messageProperty = _profiledCommandType.GetField("Message", BindingFlags.NonPublic | BindingFlags.Instance);
if (commandAndKey != null && messageProperty != null)
{
MessageFetcher = ExpressionBuilder.BuildFieldGetter(_profiledCommandType, messageProperty);
CommandAndKeyFetcher = ExpressionBuilder.BuildPropertyGetter(messageType, commandAndKey);
}
}
}

public ElasticApmProfiler(Func<IApmAgent> agentGetter)
{
Expand Down Expand Up @@ -78,26 +60,20 @@ public ProfilingSession GetProfilingSession()
}

var isSpan = realSpan != null;
if (!_executionSegmentSessions.TryGetValue(executionSegment.Id, out var session))
{
_logger.Value.Trace()?.Log("Creating profiling session for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction",
executionSegment.Id);
if (_executionSegmentSessions.TryGetValue(executionSegment, out var session))
return session;

session = new ProfilingSession();
_logger.Value.Trace()?.Log("Creating profiling session for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction", executionSegment.Id);

if (!_executionSegmentSessions.TryAdd(executionSegment.Id, session))
{
_logger.Value.Debug()?.Log("could not add profiling session to tracked sessions for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction",
executionSegment.Id);
}
session = new ProfilingSession();

if (isSpan)
realSpan.Ended += (sender, _) => EndProfilingSession(sender, session);
else
realTransaction.Ended += (sender, _) => EndProfilingSession(sender, session);
}
_executionSegmentSessions.Add(executionSegment, session);

if (isSpan)
realSpan.Ended += (sender, _) => EndProfilingSession(sender, session);
else
realTransaction.Ended += (sender, _) => EndProfilingSession(sender, session);

return session;
}
Expand All @@ -121,7 +97,7 @@ private void EndProfilingSession(object sender, ProfilingSession session)
{
// Remove the session. Use session passed to EndProfilingSession rather than the removed session in the event
// there was an issue in adding or removing the session
if (!_executionSegmentSessions.TryRemove(executionSegment.Id, out _))
if (!_executionSegmentSessions.Remove(executionSegment))
{
_logger.Value.Debug()?.Log(
"could not remove profiling session from tracked sessions for {ExecutionSegment} {Id}",
Expand Down Expand Up @@ -201,13 +177,5 @@ private static void ProcessCommand(IProfiledCommand profiledCommand, IExecutionS
? profiledCommand.Command
: "UNKNOWN";

private static string GetCommandAndKey(IProfiledCommand profiledCommand)
{
if (profiledCommand.GetType() != _profiledCommandType || MessageFetcher == null)
return null;

var message = MessageFetcher.Invoke(profiledCommand);
return CommandAndKeyFetcher.Invoke(message) as string;
}
}
}

0 comments on commit c4b73fe

Please sign in to comment.