-
|
Hello, I'm trying to add a filter that would serve function of logging and measuring time required to finish call to Consume for all consumers registered in my sln. Example of flow with working filter: controller -> _bus.Publish()-> [LoggingFilter] -> next() -> MessageConsumer.Consume() -> [LoggingFilter] Example of flow with filter omitted: API_1 [ controller -> _bus.Publish()-> [LoggingFilter] -> next() -> MessageConsumer.Consume() -> topicProducer.Produce() -> [LoggingFilter] ] -> [API_2] MessageConsumer.Consume() All help would be really appreciated. Thanks! MT Registration builder.Services.AddMassTransit(busRegistrationConfig =>
{
busRegistrationConfig.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
//filters registration
cfg.UseConsumeFilter(typeof(LoggingFilter<>), context);
});
busRegistrationConfig.AddConsumers(Assembly.GetExecutingAssembly());
busRegistrationConfig.AddRider(riderConfig =>
{
riderConfig.AddProducer<string, SyncCatalogCompletedEvent>("catalog.sync.completed");
riderConfig.UsingKafka((context, kafkaFactoryConfig) =>
{
kafkaFactoryConfig.Host("localhost:9093");
kafkaFactoryConfig.TopicEndpoint<string, AuditAction>("audit", "booking-api", endpointConfig =>
{
endpointConfig.SetKeyDeserializer(new CustomJsonSingleTypeDeserializer<string>().AsSyncOverAsync());
endpointConfig.SetValueDeserializer(new CustomJsonMultiTypeDeserializer<AuditAction>().AsSyncOverAsync());
endpointConfig.UseConsumeFilter(typeof(LoggingFilter<>), context);
endpointConfig.ConfigureConsumer<AuditEventConsumer>(context);
});
});
});
});Kafka event consumer public class AuditEventConsumer : IConsumer<AuditAction>
{
private readonly ILogger<AuditEventConsumer> _logger;
public AuditEventConsumer(ILogger<AuditEventConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<AuditAction> context)
{
_logger.LogInformation("Received event of type ({type})", nameof(AuditAction));
return Task.CompletedTask;
}
}LoggingFilter public class LoggingFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
private readonly ILogger<LoggingFilter<T>> _logger;
public LoggingFilter(ILogger<LoggingFilter<T>> logger)
{
_logger = logger;
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
var timer = new Stopwatch();
timer.Start();
_logger.LogInformation("[FILTER] Start consuming message ID=({id}) of type {type}", context.MessageId, typeof(T));
try
{
await next.Send(context);
}
catch (Exception ex)
{
_logger.LogError("[FILTER] Message ID=({id}) of type {type} failed due to exception: {ex}", context.MessageId, typeof(T), ex.Message);
}
finally
{
timer.Stop();
_logger.LogInformation("[FILTER] Message ID=({id}) of type {type} finished execution. Ellapsed time {time} ms", context.MessageId, typeof(T), timer.ElapsedMilliseconds);
}
}
public void Probe(ProbeContext context)
{
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 4 replies
-
|
This came up a couple weeks ago, and seems to be an issue. I believe @NooNameR was looking into it. |
Beta Was this translation helpful? Give feedback.
-
|
So this actually wasn't bad, endpoints were not validating when the rider was built, so none of the configuration observers were called. I've committed a fix, and it should be in the next develop packages shortly. |
Beta Was this translation helpful? Give feedback.
So this actually wasn't bad, endpoints were not validating when the rider was built, so none of the configuration observers were called. I've committed a fix, and it should be in the next develop packages shortly.