Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Redis Profiler registration to ConditionalWeakTable #2148

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 35 additions & 26 deletions sample/StackExchangeRedisSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Apm;
using Elastic.Apm.Api;
using Elastic.Apm.StackExchange.Redis;
using StackExchange.Redis;
using Testcontainers.Redis;

namespace StackExchangeRedisSample
// handle control+c gracefully
var ctx = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
public class Program
{
public static async Task Main(string[] args)
{
await using var container = new RedisBuilder().Build();
await container.StartAsync();

var connection = await ConnectionMultiplexer.ConnectAsync(container.GetConnectionString());
connection.UseElasticApm();
eventArgs.Cancel = true;
ctx.Cancel();
};

for (var i = 0; i < 10; i++)
{
// async
await Agent.Tracer.CaptureTransaction("Set and Get String", ApiConstants.TypeDb, async () =>
{
var database = connection.GetDatabase();
await database.StringSetAsync($"string{i}", i);
await database.StringGetAsync($"string{i}");
await using var container = new RedisBuilder().Build();
await container.StartAsync();

// fire and forget commands may not end up in the profiling session before
// transaction end, and the profiling session is finished.
await database.StringSetAsync($"string{i}", i, flags: CommandFlags.FireAndForget);
await database.StringGetAsync($"string{i}", CommandFlags.FireAndForget);
});
}
var connection = await ConnectionMultiplexer.ConnectAsync(container.GetConnectionString());
connection.UseElasticApm();

await container.StopAsync();
}
for (var i = 0; i < 1_000_000; i++)
{
if (ctx.IsCancellationRequested)
{
Console.WriteLine("\n Cancellation Requested.. halting...");
break;
}
Console.Write($"\rExecuting iteration: {i}");
// async
await Agent.Tracer.CaptureTransaction("Set and Get String", ApiConstants.TypeDb, async () =>
{
var database = connection.GetDatabase();
await database.StringSetAsync($"string{i}", i);
await database.StringGetAsync($"string{i}");

// fire and forget commands may not end up in the profiling session before
// transaction end, and the profiling session is finished.
await database.StringSetAsync($"string{i}", i, flags: CommandFlags.FireAndForget);
await database.StringGetAsync($"string{i}", CommandFlags.FireAndForget);
});
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
Console.WriteLine("Stopping Redis Container...");
await container.StopAsync();
Console.WriteLine("Exiting");
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net7.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Globalization;
using System.Net;
using System.Reflection;
using System.Runtime.CompilerServices;
using Elastic.Apm.Api;
using Elastic.Apm.Helpers;
using Elastic.Apm.Logging;
Expand All @@ -22,29 +23,10 @@ namespace Elastic.Apm.StackExchange.Redis
/// </summary>
public class ElasticApmProfiler
{
private readonly ConcurrentDictionary<string, ProfilingSession> _executionSegmentSessions = new();
private readonly ConditionalWeakTable<IExecutionSegment, ProfilingSession> _executionSegmentSessions = new();

private readonly Lazy<IApmLogger> _logger;
private readonly Lazy<IApmAgent> _agent;
private static readonly Func<object, object> MessageFetcher;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a continuation of #2104 since we no longer set db.instance cc @stevejgordon

private static readonly Func<object, object> CommandAndKeyFetcher;
private static readonly Type _profiledCommandType;

static ElasticApmProfiler()
{
var messageType = Type.GetType("StackExchange.Redis.Message,StackExchange.Redis", false);
_profiledCommandType = Type.GetType("StackExchange.Redis.Profiling.ProfiledCommand,StackExchange.Redis", false);
if (messageType != null && _profiledCommandType != null)
{
var commandAndKey = messageType.GetProperty("CommandAndKey", BindingFlags.Public | BindingFlags.Instance);
var messageProperty = _profiledCommandType.GetField("Message", BindingFlags.NonPublic | BindingFlags.Instance);
if (commandAndKey != null && messageProperty != null)
{
MessageFetcher = ExpressionBuilder.BuildFieldGetter(_profiledCommandType, messageProperty);
CommandAndKeyFetcher = ExpressionBuilder.BuildPropertyGetter(messageType, commandAndKey);
}
}
}

public ElasticApmProfiler(Func<IApmAgent> agentGetter)
{
Expand Down Expand Up @@ -78,26 +60,20 @@ public ProfilingSession GetProfilingSession()
}

var isSpan = realSpan != null;
if (!_executionSegmentSessions.TryGetValue(executionSegment.Id, out var session))
{
_logger.Value.Trace()?.Log("Creating profiling session for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction",
executionSegment.Id);
if (_executionSegmentSessions.TryGetValue(executionSegment, out var session))
return session;

session = new ProfilingSession();
_logger.Value.Trace()?.Log("Creating profiling session for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction", executionSegment.Id);

if (!_executionSegmentSessions.TryAdd(executionSegment.Id, session))
{
_logger.Value.Debug()?.Log("could not add profiling session to tracked sessions for {ExecutionSegment} {Id}",
isSpan ? "span" : "transaction",
executionSegment.Id);
}
session = new ProfilingSession();

if (isSpan)
realSpan.Ended += (sender, _) => EndProfilingSession(sender, session);
else
realTransaction.Ended += (sender, _) => EndProfilingSession(sender, session);
}
_executionSegmentSessions.Add(executionSegment, session);

if (isSpan)
realSpan.Ended += (sender, _) => EndProfilingSession(sender, session);
else
realTransaction.Ended += (sender, _) => EndProfilingSession(sender, session);

return session;
}
Expand All @@ -121,7 +97,7 @@ private void EndProfilingSession(object sender, ProfilingSession session)
{
// Remove the session. Use session passed to EndProfilingSession rather than the removed session in the event
// there was an issue in adding or removing the session
if (!_executionSegmentSessions.TryRemove(executionSegment.Id, out _))
if (!_executionSegmentSessions.Remove(executionSegment))
{
_logger.Value.Debug()?.Log(
"could not remove profiling session from tracked sessions for {ExecutionSegment} {Id}",
Expand Down Expand Up @@ -201,13 +177,5 @@ private static void ProcessCommand(IProfiledCommand profiledCommand, IExecutionS
? profiledCommand.Command
: "UNKNOWN";

private static string GetCommandAndKey(IProfiledCommand profiledCommand)
{
if (profiledCommand.GetType() != _profiledCommandType || MessageFetcher == null)
return null;

var message = MessageFetcher.Invoke(profiledCommand);
return CommandAndKeyFetcher.Invoke(message) as string;
}
}
}
Loading