Skip to content

Commit

Permalink
Additional testing and adjustments.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 14, 2024
1 parent 8dc1655 commit c138511
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 43 deletions.
16 changes: 8 additions & 8 deletions src/HouseofCat.Dataflows/BaseDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ TState WrapAction(TState state)
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
state.IsFaulted = true;
state.EDI = ExceptionDispatchInfo.Capture(ex);
return state;
Expand All @@ -101,8 +101,8 @@ async Task<TState> WrapActionAsync(TState state)
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
state.IsFaulted = true;
state.EDI = ExceptionDispatchInfo.Capture(ex);
return state;
Expand All @@ -126,12 +126,12 @@ void WrapAction(TState state)
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
}

childSpan?.End();
state.EndRootSpan();
state.EndRootSpan(true);
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -151,12 +151,12 @@ void WrapAction(TState state)
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
}

childSpan?.End();
state.EndRootSpan();
state.EndRootSpan(true);
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -176,12 +176,12 @@ async Task WrapActionAsync(TState state)
}
catch (Exception ex)
{
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
childSpan?.RecordException(ex);
childSpan?.SetStatus(Status.Error.WithDescription(ex.Message));
}

childSpan?.End();
state.EndRootSpan();
state.EndRootSpan(true);
}

return new ActionBlock<TState>(WrapActionAsync, options);
Expand Down
5 changes: 5 additions & 0 deletions src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public static void SetSpanAsError(this IWorkState state, TelemetrySpan span, str
attributes: attributes);
}

public static void AddEvent(this IWorkState state, string name, string description)
{
state.AddEvent(name, description);
}

public static void EndRootSpan(
this IWorkState state,
bool includeErrorWhenFaulted = false)
Expand Down
30 changes: 15 additions & 15 deletions src/HouseofCat.RabbitMQ/Dataflows/ConsumerDataflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ public ConsumerDataflow<TState> SetEncryptionProvider(IEncryptionProvider provid

#region Step Adders

protected static readonly string _defaultSpanNameFormat = "{0}.{1}";
protected static readonly string _defaultStepSpanNameFormat = "{0}.{1}.{2}";

protected string GetSpanName(string stepName)
{
return string.Format(_defaultSpanNameFormat, WorkflowName, stepName);
}

protected string GetStepSpanName(string stepName)
{
return string.Format(_defaultStepSpanNameFormat, WorkflowName, _suppliedTransforms.Count, stepName);
}

protected virtual ITargetBlock<TState> CreateTargetBlock(
int boundedCapacity, TaskScheduler taskScheduler = null) =>
new BufferBlock<TState>(
Expand All @@ -212,7 +225,7 @@ public ConsumerDataflow<TState> SetEncryptionProvider(IEncryptionProvider provid
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler");
_errorAction = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("error_handler"));
}
return this;
}
Expand All @@ -229,7 +242,7 @@ public ConsumerDataflow<TState> SetEncryptionProvider(IEncryptionProvider provid
{
_errorBuffer = CreateTargetBlock(boundedCapacity, taskScheduler);
var executionOptions = GetExecuteStepOptions(maxDoP, ensureOrdered, boundedCapacity, taskScheduler ?? _taskScheduler);
_errorAction = GetLastWrappedActionBlock(action, executionOptions, $"{WorkflowName}.ErrorHandler");
_errorAction = GetLastWrappedActionBlock(action, executionOptions, GetSpanName("error_handler"));
}
return this;
}
Expand All @@ -240,19 +253,6 @@ public ConsumerDataflow<TState> WithReadyToProcessBuffer(int boundedCapacity, Ta
return this;
}

protected static readonly string _defaultSpanNameFormat = "{0}.{1}";
protected static readonly string _defaultStepSpanNameFormat = "{0}.{1}.{2}";

protected string GetSpanName(string stepName)
{
return string.Format(_defaultSpanNameFormat, WorkflowName, stepName);
}

protected string GetStepSpanName(string stepName)
{
return string.Format(_defaultStepSpanNameFormat, WorkflowName, _suppliedTransforms.Count, stepName);
}

public ConsumerDataflow<TState> AddStep(
Func<TState, TState> suppliedStep,
string stepName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace HouseofCat.Utilities.Extensions;

public static class ServiceCollectionExtensions
{
public static string DeploymentEnvironmentKey { get; set; } = "deployment.environment";

public static void AddOpenTelemetryExporter(
this IServiceCollection services,
IConfiguration config)
Expand All @@ -22,6 +25,7 @@ public static class ServiceCollectionExtensions
bool.TryParse(config["OpenTelemetry:Enabled"] ?? "false", out var enabled);
if (!enabled) return;

var environment = config["OpenTelemetry:Environment"] ?? "Dev";
var otlpServiceName = config["OpenTelemetry:ServiceName"] ?? sourceName;
var otlpServiceNamespace = config["OpenTelemetry:ServiceNamespace"] ?? "hoc";
var otlpServiceVersion = config["OpenTelemetry:ServiceVersion"] ?? sourceVersion;
Expand All @@ -39,7 +43,12 @@ public static class ServiceCollectionExtensions
resource => resource.AddService(
serviceName: otlpServiceName,
serviceNamespace: otlpServiceNamespace,
serviceVersion: otlpServiceVersion));
serviceVersion: otlpServiceVersion)
.AddAttributes(
new[]
{
new KeyValuePair<string, object>(DeploymentEnvironmentKey, environment)
}));

otlpBuilder
.WithTracing(
Expand Down
51 changes: 34 additions & 17 deletions tests/RabbitMQ.Console.Tests/Program.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
using RabbitMQ.ConsoleTests;
using Microsoft.Extensions.Logging;
using HouseofCat.Utilities.Extensions;
using HouseofCat.Utilities.Helpers;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using RabbitMQ.ConsoleTests;

var loggerFactory = LogHelpers.CreateConsoleLoggerFactory(LogLevel.Information);
LogHelpers.LoggerFactory = loggerFactory;
var logger = loggerFactory.CreateLogger<Program>();

// Basic Tests
//await BasicGetTests.RunBasicGetAsync(logger, "./RabbitMQ.BasicGetTests.json");
var builder = WebApplication.CreateBuilder(args);
var configuration = new ConfigurationBuilder()
.SetBasePath(AppContext.BaseDirectory)
.AddJsonFile("appsettings.json")
.Build();

builder.Services.AddOpenTelemetryExporter(configuration);

using var app = builder.Build();

logger.LogInformation("Tests complete! Press CTRL+C to gracefully exit....");

// Publisher Tests
//await PublisherTests.RunSlowPublisherTestAsync(logger, "./RabbitMQ.PublisherTests.json");
//await PublisherTests.RunAutoPublisherStandaloneAsync();
app.Lifetime.ApplicationStarted.Register(
async () =>
{
// Basic Tests
//await BasicGetTests.RunBasicGetAsync(logger, "./RabbitMQ.BasicGetTests.json");
// Consumer Tests
//await ConsumerTests.RunConsumerTestAsync(logger, "./RabbitMQ.ConsumerTests.json");
// Publisher Tests
//await PublisherTests.RunSlowPublisherTestAsync(logger, "./RabbitMQ.PublisherTests.json");
//await PublisherTests.RunAutoPublisherStandaloneAsync();
// PubSub Tests
//await PubSubTests.RunPubSubTestAsync(logger, "./RabbitMQ.PubSubTests.json");
//await PubSubTests.RunPubSubCheckForDuplicateTestAsync(logger, "./RabbitMQ.PubSubTests.json");
// Consumer Tests
//await ConsumerTests.RunConsumerTestAsync(logger, "./RabbitMQ.ConsumerTests.json");
// RabbitService Tests
await RabbitServiceTests.RunRabbitServicePingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json");
//await RabbitServiceTests.RunRabbitServiceAltPingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json");
// PubSub Tests
//await PubSubTests.RunPubSubTestAsync(logger, "./RabbitMQ.PubSubTests.json");
//await PubSubTests.RunPubSubCheckForDuplicateTestAsync(logger, "./RabbitMQ.PubSubTests.json");
logger.LogInformation("Tests complete! Press return to exit....");
// RabbitService Tests
await RabbitServiceTests.RunRabbitServicePingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json");
//await RabbitServiceTests.RunRabbitServiceAltPingPongTestAsync(loggerFactory, "./RabbitMQ.RabbitServiceTests.json");
});

Console.ReadLine();
await app.RunAsync();
3 changes: 3 additions & 0 deletions tests/RabbitMQ.Console.Tests/RabbitMQ.Console.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="RabbitMQ.ConnectionPoolTests.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
11 changes: 11 additions & 0 deletions tests/RabbitMQ.Console.Tests/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"OpenTelemetry": {
"Enabled": false,
"ServiceVersion": "v1.0.0",
"Environment": "production",
"EndpointUrl": "https://ingest.us.signoz.cloud:443",
"HeaderFormat": "{0}={1}",
"HeaderKey": "signoz-access-token",
"ApiKey": "*"
}
}
8 changes: 6 additions & 2 deletions tests/RabbitMQ.ConsumerDataflows.Tests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
"write_message_to_console",
(state) =>
{
Console.WriteLine(Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span));
var message = Encoding.UTF8.GetString(state.ReceivedMessage.Body.Span);
if (message == "throw")
{
throw new Exception("Throwing an exception!");
}
Console.WriteLine(message);
return state;
});

Expand Down Expand Up @@ -75,7 +80,6 @@
(state) =>
{
logger.LogError(state?.EDI?.SourceException, "Error Step!");
state?.ReceivedMessage?.NackMessage(requeue: true);
state?.ReceivedMessage?.Complete();
});

Expand Down

0 comments on commit c138511

Please sign in to comment.