Skip to content

Commit

Permalink
Merge branch 'release/2.3.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
MvRens committed Jan 18, 2021
2 parents 8a94223 + 0b7c84a commit dc9f097
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 10 deletions.
38 changes: 37 additions & 1 deletion Tapeti.Cmd/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ public class ShovelOptions : CommonOptions
}


[Verb("purge", HelpText = "Removes all messages from a queue destructively.")]
public class PurgeOptions : CommonOptions
{
[Option('q', "queue", Required = true, HelpText = "The queue to purge.")]
public string QueueName { get; set; }

[Option("confirm", HelpText = "Confirms the purging of the specified queue. If not provided, an interactive prompt will ask for confirmation.", Default = false)]
public bool Confirm { get; set; }
}


[Verb("example", HelpText = "Output an example SingleFileJSON formatted message.")]
public class ExampleOptions
{
Expand All @@ -129,11 +140,12 @@ public class ExampleOptions

public static int Main(string[] args)
{
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, ExampleOptions>(args)
return Parser.Default.ParseArguments<ExportOptions, ImportOptions, ShovelOptions, PurgeOptions, ExampleOptions>(args)
.MapResult(
(ExportOptions o) => ExecuteVerb(o, RunExport),
(ImportOptions o) => ExecuteVerb(o, RunImport),
(ShovelOptions o) => ExecuteVerb(o, RunShovel),
(PurgeOptions o) => ExecuteVerb(o, RunPurge),
(ExampleOptions o) => ExecuteVerb(o, RunExample),
errs =>
{
Expand Down Expand Up @@ -371,6 +383,30 @@ private static IConnection GetTargetConnection(ShovelOptions options)
}


private static void RunPurge(PurgeOptions options)
{
if (!options.Confirm)
{
Console.Write($"Do you want to purge the queue '{options.QueueName}'? (Y/N) ");
var answer = Console.ReadLine();

if (string.IsNullOrEmpty(answer) || !answer.Equals("Y", StringComparison.CurrentCultureIgnoreCase))
return;
}

uint messageCount;

using (var connection = GetConnection(options))
using (var channel = connection.CreateModel())
{
messageCount = channel.QueuePurge(options.QueueName);
}

Console.WriteLine($"{messageCount} message{(messageCount != 1 ? "s" : "")} purged from '{options.QueueName}'.");

}


private static void RunExample(ExampleOptions options)
{
using (var messageSerializer = new SingleFileJSONMessageSerializer(Console.OpenStandardOutput(), false, new UTF8Encoding(false)))
Expand Down
34 changes: 28 additions & 6 deletions Tapeti/Connection/TapetiConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Tapeti.Config;
using Tapeti.Default;
using System.Threading.Tasks;
Expand All @@ -14,6 +15,7 @@ namespace Tapeti.Connection
/// </summary>
internal class TapetiConsumer : IConsumer
{
private readonly CancellationToken cancellationToken;
private readonly ITapetiConfig config;
private readonly string queueName;
private readonly List<IBinding> bindings;
Expand All @@ -23,9 +25,9 @@ internal class TapetiConsumer : IConsumer
private readonly IMessageSerializer messageSerializer;


/// <inheritdoc />
public TapetiConsumer(ITapetiConfig config, string queueName, IEnumerable<IBinding> bindings)
public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config, string queueName, IEnumerable<IBinding> bindings)
{
this.cancellationToken = cancellationToken;
this.config = config;
this.queueName = queueName;
this.bindings = bindings.ToList();
Expand Down Expand Up @@ -80,11 +82,8 @@ private async Task<ConsumeResult> DispatchMessage(object message, MessageContext
var messageType = message.GetType();
var validMessageType = false;

foreach (var binding in bindings)
foreach (var binding in bindings.Where(binding => binding.Accept(messageType)))
{
if (!binding.Accept(messageType))
continue;

var consumeResult = await InvokeUsingBinding(message, messageContextData, binding);
validMessageType = true;

Expand Down Expand Up @@ -135,6 +134,13 @@ await MiddlewareHelper.GoAsync(config.Middleware.Message,

private void HandleException(ExceptionStrategyContext exceptionContext)
{
if (cancellationToken.IsCancellationRequested && IgnoreExceptionDuringShutdown(exceptionContext.Exception))
{
// The service is most likely stopping, and the connection is gone anyways.
exceptionContext.SetConsumeResult(ConsumeResult.Requeue);
return;
}

try
{
exceptionStrategy.HandleException(exceptionContext);
Expand All @@ -150,6 +156,22 @@ private void HandleException(ExceptionStrategyContext exceptionContext)
}


private static bool IgnoreExceptionDuringShutdown(Exception e)
{
switch (e)
{
case AggregateException aggregateException:
return aggregateException.InnerExceptions.Any(IgnoreExceptionDuringShutdown);

case TaskCanceledException _:
case OperationCanceledException _: // thrown by CancellationTokenSource.ThrowIfCancellationRequested
return true;

default:
return e.InnerException != null && IgnoreExceptionDuringShutdown(e.InnerException);
}
}


private struct MessageContextData
{
Expand Down
2 changes: 1 addition & 1 deletion Tapeti/Connection/TapetiSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private async Task ConsumeQueues(CancellationToken cancellationToken)
consumerTags.AddRange(await Task.WhenAll(queues.Select(async group =>
{
var queueName = group.Key;
var consumer = new TapetiConsumer(config, queueName, group);
var consumer = new TapetiConsumer(cancellationToken, config, queueName, group);
return await clientFactory().Consume(cancellationToken, queueName, consumer);
})));
Expand Down
1 change: 0 additions & 1 deletion Tapeti/Default/ControllerBindingContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public class ControllerBindingResult : IBindingResult
public bool HasHandler => Handler != null;


/// <inheritdoc />
public ControllerBindingResult(ParameterInfo info)
{
Info = info;
Expand Down
1 change: 0 additions & 1 deletion Tapeti/Default/ControllerMethodBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public struct BindingInfo
public MethodInfo Method => bindingInfo.Method;


/// <inheritdoc />
public ControllerMethodBinding(IDependencyResolver dependencyResolver, BindingInfo bindingInfo)
{
this.dependencyResolver = dependencyResolver;
Expand Down

0 comments on commit dc9f097

Please sign in to comment.