-
-
Notifications
You must be signed in to change notification settings - Fork 69
/
BaseProducer.cs
50 lines (36 loc) · 2.03 KB
/
BaseProducer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.
using System.Diagnostics;
using Eventuous.Diagnostics;
using static Eventuous.Diagnostics.TelemetryTags;
namespace Eventuous.Producers;
using Diagnostics;
public abstract class BaseProducer<TProduceOptions> : IEventProducer<TProduceOptions> where TProduceOptions : class {
protected BaseProducer(ProducerTracingOptions? tracingOptions = null) {
var options = tracingOptions ?? new ProducerTracingOptions();
DefaultTags = options.AllTags.Concat(EventuousDiagnostics.Tags).ToArray();
}
protected KeyValuePair<string, object?>[] DefaultTags { get; }
protected abstract Task ProduceMessages(StreamName stream, IEnumerable<ProducedMessage> messages, TProduceOptions? options, CancellationToken cancellationToken = default);
public Task Produce(StreamName stream, IEnumerable<ProducedMessage> messages, CancellationToken cancellationToken = default)
=> Produce(stream, messages, null, cancellationToken);
/// <inheritdoc />
public async Task Produce(StreamName stream, IEnumerable<ProducedMessage> messages, TProduceOptions? options, CancellationToken cancellationToken = default) {
var messagesArray = messages.ToArray();
if (messagesArray.Length == 0) return;
var traced = messagesArray.Length == 1
? ForOne()
: ProducerActivity.Start(messagesArray, DefaultTags);
using var activity = traced.act;
if (activity is { IsAllDataRequested: true }) {
activity.SetTag(Messaging.Destination, stream.ToString());
activity.SetTag(TelemetryTags.Eventuous.Stream, stream.ToString());
}
await ProduceMessages(stream, traced.msgs, options, cancellationToken).NoContext();
return;
(Activity? act, ProducedMessage[] msgs) ForOne() {
var (act, producedMessage) = ProducerActivity.Start(messagesArray[0], DefaultTags);
return (act?.Start(), new[] { producedMessage });
}
}
}