-
Notifications
You must be signed in to change notification settings - Fork 3
/
inserter.go
101 lines (89 loc) · 2.88 KB
/
inserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package chstorage
import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"github.com/go-faster/errors"
"github.com/go-faster/oteldb/internal/tracestorage"
)
var _ tracestorage.Inserter = (*Inserter)(nil)
// Inserter implements tracestorage.Inserter using Clickhouse.
type Inserter struct {
ch ClickhouseClient
tables Tables
insertedPoints metric.Int64Counter
insertedSpans metric.Int64Counter
insertedTags metric.Int64Counter
insertedRecords metric.Int64Counter
inserts metric.Int64Counter
tracer trace.Tracer
}
// InserterOptions is Inserter's options.
type InserterOptions struct {
// Tables provides table paths to query.
Tables Tables
// MeterProvider provides OpenTelemetry meter for this querier.
MeterProvider metric.MeterProvider
// TracerProvider provides OpenTelemetry tracer for this querier.
TracerProvider trace.TracerProvider
}
func (opts *InserterOptions) setDefaults() {
if opts.Tables == (Tables{}) {
opts.Tables = DefaultTables()
}
if opts.MeterProvider == nil {
opts.MeterProvider = otel.GetMeterProvider()
}
if opts.TracerProvider == nil {
opts.TracerProvider = otel.GetTracerProvider()
}
}
// NewInserter creates new Inserter.
func NewInserter(c ClickhouseClient, opts InserterOptions) (*Inserter, error) {
// HACK(ernado): for some reason, we are getting no-op here.
opts.TracerProvider = otel.GetTracerProvider()
opts.MeterProvider = otel.GetMeterProvider()
opts.setDefaults()
meter := opts.MeterProvider.Meter("chstorage.Inserter")
insertedSpans, err := meter.Int64Counter("chstorage.traces.inserted_spans",
metric.WithDescription("Number of inserted spans"),
)
if err != nil {
return nil, errors.Wrap(err, "create inserted_spans")
}
insertedTags, err := meter.Int64Counter("chstorage.traces.inserted_tags",
metric.WithDescription("Number of inserted tags"),
)
if err != nil {
return nil, errors.Wrap(err, "create inserted_tags")
}
insertedRecords, err := meter.Int64Counter("chstorage.logs.inserted_records",
metric.WithDescription("Number of inserted log records"),
)
if err != nil {
return nil, errors.Wrap(err, "create inserted_records")
}
insertedPoints, err := meter.Int64Counter("chstorage.metrics.inserted_points",
metric.WithDescription("Number of inserted points"),
)
if err != nil {
return nil, errors.Wrap(err, "create inserted_points")
}
inserts, err := meter.Int64Counter("chstorage.inserts",
metric.WithDescription("Number of insert invocations"),
)
if err != nil {
return nil, errors.Wrap(err, "create inserts")
}
inserter := &Inserter{
ch: c,
tables: opts.Tables,
insertedSpans: insertedSpans,
insertedTags: insertedTags,
insertedRecords: insertedRecords,
insertedPoints: insertedPoints,
inserts: inserts,
tracer: opts.TracerProvider.Tracer("chstorage.Inserter"),
}
return inserter, nil
}