Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/BuslyCLI.Console/Commands/NsbTimeout/SendTimeout.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System.Text;
using BuslyCLI.Config;
using BuslyCLI.Config.Transports;
using BuslyCLI.Infrastructure.Factories;
using NServiceBus.DelayedDelivery;
using NServiceBus.Routing;
using NServiceBus.Transport;
using Spectre.Console;
using Spectre.Console.Cli;

namespace BuslyCLI.Commands.NsbTimeout;

public class SendTimeout(IAnsiConsole console, IRawEndpointFactory rawEndpointFactory, INServiceBusConfiguration nServiceBusConfiguration) : AsyncCommand<SendTimeoutCommandSettings>
{
private static readonly HashSet<Type> UnsupportedTransportTypes =
[
typeof(SqlServerTransportConfig),
typeof(PostgreSqlTransportConfig),
typeof(AzureStorageQueuesTransportConfig)
];

protected override async Task<int> ExecuteAsync(CommandContext context, SendTimeoutCommandSettings settings, CancellationToken cancellationToken)
{
var config = await nServiceBusConfiguration.GetValidatedConfigurationAsync(settings.Config.Path);

if (UnsupportedTransportTypes.Contains(config.CurrentTransportConfig.Config.GetType()))
{
console.MarkupLine($"[red]Error:[/] The [bold]{config.CurrentTransportConfig.Config.GetType().Name.Replace("Config", "")}[/] transport does not support sending timeouts.");
console.MarkupLine("This transport relies on an in-process poller to forward deferred messages, which is incompatible with the CLI's fire-and-forget execution model.");
console.MarkupLine("For details see: [link]https://tragiccode.com/busly-cli/docs/cli-reference/timeout/send[/]");
return 1;
}

var rawEndpoint = await rawEndpointFactory.CreateRawSendOnlyEndpoint(Constants.DefaultOriginatingEndpoint, config.CurrentTransportConfig);
// TODO: Validate body is valid json/xml
var headers = new Dictionary<string, string>
{
["NServiceBus.OriginatingEndpoint"] = Constants.DefaultOriginatingEndpoint,
["NServiceBus.OriginatingMachine"] = Environment.MachineName,
["NServiceBus.ConversationId"] = Guid.NewGuid().ToString(),
["NServiceBus.CorrelationId"] = Guid.NewGuid().ToString(),
["NServiceBus.MessageIntent"] = Constants.NServiceBus.CommandMessageIntent,
["NServiceBus.ContentType"] = settings.ContentType,
["NServiceBus.EnclosedMessageTypes"] = settings.EnclosedMessageType
};
var message = new OutgoingMessage(
Guid.NewGuid().ToString(),
headers,
Encoding.ASCII.GetBytes(settings.MessageBody)
);

var dispatchProperties = new DispatchProperties();

if (settings.DoNotDeliverBefore is not null)
{
dispatchProperties.DoNotDeliverBefore = new DoNotDeliverBefore(settings.DoNotDeliverBefore.Value);
}
else if (settings.DelayDeliveryWith is not null)
{
dispatchProperties.DelayDeliveryWith = new DelayDeliveryWith(settings.DelayDeliveryWith.Value);
}

var transportOperation = new TransportOperation(
message,
new UnicastAddressTag(settings.DestinationEndpoint),
dispatchProperties
);

await rawEndpoint.Dispatch(
new TransportOperations(transportOperation),
new TransportTransaction(),
cancellationToken);

await rawEndpoint.ShutDownAndCleanUp();

return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System.ComponentModel;
using Spectre.Console;
using Spectre.Console.Cli;

namespace BuslyCLI.Commands.NsbTimeout;

public class SendTimeoutCommandSettings : CommonCommandSettings
{
[CommandOption("--do-not-deliver-before <do-not-deliver-before>")]
[Description("Allows specifying a date before which the delivery should not occur, using ISO-8601 format (YYYY-MM-DDTHH:mm:ssZ)")]
public DateTime? DoNotDeliverBefore { get; init; }

[CommandOption("--delay-delivery-with <delay-delivery-with>")]
// ([days.]hh:mm:ss[.fffffff])
[Description("Specifies the delay before the timeout is delivered, using a TimeSpan format")]
public TimeSpan? DelayDeliveryWith { get; init; }

public override ValidationResult Validate()
{
var baseResult = base.Validate();
if (baseResult.Successful == false) return baseResult;
// Neither provided
if (DelayDeliveryWith is null && DoNotDeliverBefore is null)
{
return ValidationResult.Error(
"You must specify either --do-not-deliver-before or --delay-delivery-with.");
}

// Both provided
if (DelayDeliveryWith is not null && DoNotDeliverBefore is not null)
{
return ValidationResult.Error(
"--do-not-deliver-before and --delay-delivery-with cannot be used together.");
}

return ValidationResult.Success();
}
}
8 changes: 8 additions & 0 deletions src/BuslyCLI.Console/Infrastructure/AppConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using BuslyCLI.Commands.Demo;
using BuslyCLI.Commands.NsbCommand;
using BuslyCLI.Commands.NsbEvent;
using BuslyCLI.Commands.NsbTimeout;
using BuslyCLI.Commands.Transport;
using Spectre.Console;
using Spectre.Console.Cli;
Expand Down Expand Up @@ -48,6 +49,13 @@ public static Action<IConfigurator> GetSpectreCommandConfiguration()
.WithAlias("p")
.WithDescription("Publish an event to subscribing endpoints.");
});
config.AddBranch("timeout", timeout =>
{
timeout.SetDescription("Operations related to NServiceBus timeouts.");
timeout.AddCommand<SendTimeout>("send")
.WithAlias("s")
.WithDescription("Send a timeout message to an endpoint.");
});
config.AddBranch("demo", demo =>
{
demo.SetDescription("Demo mode for the busly quick start guide.");
Expand Down
6 changes: 0 additions & 6 deletions src/BuslyCLI.Console/Infrastructure/Endpoints/RawEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ public Task<ErrorHandleResult> OnError(ErrorContext errorContext, CancellationTo
return Task.FromResult(ErrorHandleResult.Handled);
}

public IncomingMessage TryReceiveMessageWithTimeout()
{
if (_receivedMessages.TryTake(out var incomingMessage, IncomingMessageTimeout)) return incomingMessage;
throw new TimeoutException($"The message did not arrive within {IncomingMessageTimeout.TotalSeconds} seconds.");
}

public IncomingMessage TryReceiveMessage()
{
if (_receivedMessages.TryTake(out var incomingMessage, IncomingMessageTimeout)) return incomingMessage;
Expand Down
4 changes: 0 additions & 4 deletions tests/BuslyCLI.Console.Tests/BuslyCLI.Console.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@
<ProjectReference Include="..\..\src\BuslyCLI.Console\BuslyCLI.Console.csproj" />
</ItemGroup>

<ItemGroup>
<Folder Include="Commands\Event\" />
</ItemGroup>

<ItemGroup>
<Reference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using BuslyCLI.Console.Tests.TestHelpers;

namespace BuslyCLI.Console.Tests.Commands.NsbTimeout;

public class SendTimeoutTests : CommandTestBase
{
[Test]
public void ShouldOutputAnErrorWhenTransportIsSqlServer()
{
// Arrange
var yamlFile = """
---
current-transport: local-sql-server
transports:
- name: local-sql-server
sql-server-transport-config:
connection-string: Server=localhost;Database=test;
""";
using var configFile = new TestableNServiceBusConfigurationFile(yamlFile);

// Act
var result = Sut.Run(
"timeout", "send",
"--content-type", "application/json",
"--enclosed-message-type", "MessageContracts.Timeouts.OrderTimeout",
"--destination-endpoint", "Sales",
"--message-body", "{}",
"--delay-delivery-with", "00:00:01",
"--config", configFile.FilePath);

// Assert
Assert.That(result.ExitCode, Is.EqualTo(1));
Assert.That(result.Output, Does.Contain("SqlServerTransport"));
Assert.That(result.Output, Does.Contain("does not support sending timeouts"));
Assert.That(result.Output, Does.Contain("https://tragiccode.com/busly-cli/docs/cli-reference/timeout/send"));
}

[Test]
public void ShouldOutputAnErrorWhenTransportIsPostgreSql()
{
// Arrange
var yamlFile = """
---
current-transport: local-postgresql
transports:
- name: local-postgresql
postgre-sql-transport-config:
connection-string: Host=localhost;Database=test;
""";
using var configFile = new TestableNServiceBusConfigurationFile(yamlFile);

// Act
var result = Sut.Run(
"timeout", "send",
"--content-type", "application/json",
"--enclosed-message-type", "MessageContracts.Timeouts.OrderTimeout",
"--destination-endpoint", "Sales",
"--message-body", "{}",
"--delay-delivery-with", "00:00:01",
"--config", configFile.FilePath);

// Assert
Assert.That(result.ExitCode, Is.EqualTo(1));
Assert.That(result.Output, Does.Contain("PostgreSqlTransport"));
Assert.That(result.Output, Does.Contain("does not support sending timeouts"));
Assert.That(result.Output, Does.Contain("https://tragiccode.com/busly-cli/docs/cli-reference/timeout/send"));
}

[Test]
public void ShouldOutputAnErrorWhenTransportIsAzureStorageQueues()
{
// Arrange
var yamlFile = """
---
current-transport: local-azure-storage-queues
transports:
- name: local-azure-storage-queues
azure-storage-queues-transport-config:
connection-string: UseDevelopmentStorage=true
""";
using var configFile = new TestableNServiceBusConfigurationFile(yamlFile);

// Act
var result = Sut.Run(
"timeout", "send",
"--content-type", "application/json",
"--enclosed-message-type", "MessageContracts.Timeouts.OrderTimeout",
"--destination-endpoint", "Sales",
"--message-body", "{}",
"--delay-delivery-with", "00:00:01",
"--config", configFile.FilePath);

// Assert
Assert.That(result.ExitCode, Is.EqualTo(1));
Assert.That(result.Output, Does.Contain("AzureStorageQueuesTransport"));
Assert.That(result.Output, Does.Contain("does not support sending timeouts"));
Assert.That(result.Output, Does.Contain("https://tragiccode.com/busly-cli/docs/cli-reference/timeout/send"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Text.Json;
using BuslyCLI.Console.Tests.TestHelpers;

namespace BuslyCLI.Console.Tests.EndToEnd.AmazonSQS;

[TestFixture]
public class PublishEventCommandAmazonSqsEndToEndTests : AmazonSqsEndToEndTestBase
{
[Test]
public async Task ShouldPublishEvent()
{
// Arrange
await TestEndpoint.Subscribe("MessageContracts.Events.OrderCreated");
var messageBody = new { OrderNumber = Guid.NewGuid() };

var json = JsonSerializer.Serialize(messageBody, _jsonObjectOptions);
var yamlFile = $"""
---
current-transport: local-amazonsqs
transports:
- name: local-amazonsqs
amazonsqs-transport-config:
service-url: {Container.GetConnectionString()}
region-name: us-east-1
""";
using var configFile = new TestableNServiceBusConfigurationFile(yamlFile);

// Act
var result = Sut.Run(
"event",
"publish",
"--content-type", "application/json",
"--enclosed-message-type", "MessageContracts.Events.OrderCreated",
"--message-body", json,
"--config", configFile.FilePath);

// Assert
Assert.That(result.ExitCode, Is.EqualTo(0));
AssertMessageReceived(TestEndpoint.TryReceiveMessage(), "MessageContracts.Events.OrderCreated", json);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,36 +39,4 @@ public async Task ShouldSendCommand()
AssertMessageReceived(TestEndpoint.TryReceiveMessage(), "MessageContracts.Commands.CreateOrder", json);
}

[Test]
public async Task ShouldPublishEvent()
{
// Arrange
await TestEndpoint.Subscribe("MessageContracts.Events.OrderCreated");
var messageBody = new { OrderNumber = Guid.NewGuid() };

var json = JsonSerializer.Serialize(messageBody, _jsonObjectOptions);
var yamlFile = $"""
---
current-transport: local-amazonsqs
transports:
- name: local-amazonsqs
amazonsqs-transport-config:
service-url: {Container.GetConnectionString()}
region-name: us-east-1
""";
using var configFile = new TestableNServiceBusConfigurationFile(yamlFile);

// Act
var result = Sut.Run(
"event",
"publish",
"--content-type", "application/json",
"--enclosed-message-type", "MessageContracts.Events.OrderCreated",
"--message-body", json,
"--config", configFile.FilePath);

// Assert
Assert.That(result.ExitCode, Is.EqualTo(0));
AssertMessageReceived(TestEndpoint.TryReceiveMessage(), "MessageContracts.Events.OrderCreated", json);
}
}
Loading
Loading