-
Notifications
You must be signed in to change notification settings - Fork 7
/
apex.go
172 lines (145 loc) · 4.28 KB
/
apex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package apex
import (
"context"
"errors"
stdlog "log"
"os"
"sync"
"time"
"github.com/apex/log"
"github.com/axiomhq/axiom-go/axiom"
"github.com/axiomhq/axiom-go/axiom/ingest"
)
var _ log.Handler = (*Handler)(nil)
const defaultBatchSize = 10_000
// ErrMissingDatasetName is raised when a dataset name is not provided. Set it
// manually using the [SetDataset] option or export "AXIOM_DATASET".
var ErrMissingDatasetName = errors.New("missing dataset name")
// An Option modifies the behaviour of the Axiom handler.
type Option func(*Handler) error
// SetClient specifies the Axiom client to use for ingesting the logs.
func SetClient(client *axiom.Client) Option {
return func(h *Handler) error {
h.client = client
return nil
}
}
// SetClientOptions specifies the Axiom client options to pass to
// [axiom.NewClient] which is only called if no [axiom.Client] was specified by
// the [SetClient] option.
func SetClientOptions(options ...axiom.Option) Option {
return func(h *Handler) error {
h.clientOptions = options
return nil
}
}
// SetDataset specifies the dataset to ingest the logs into. Can also be
// specified using the "AXIOM_DATASET" environment variable.
func SetDataset(datasetName string) Option {
return func(h *Handler) error {
h.datasetName = datasetName
return nil
}
}
// SetIngestOptions specifies the ingestion options to use for ingesting the
// logs.
func SetIngestOptions(opts ...ingest.Option) Option {
return func(h *Handler) error {
h.ingestOptions = opts
return nil
}
}
// Handler implements a [log.Handler] used for shipping logs to Axiom.
type Handler struct {
client *axiom.Client
datasetName string
clientOptions []axiom.Option
ingestOptions []ingest.Option
eventCh chan axiom.Event
closeCh chan struct{}
closeOnce sync.Once
}
// New creates a new handler that ingests logs into Axiom. It automatically
// takes its configuration from the environment. To connect, export the
// following environment variables:
//
// - AXIOM_TOKEN
// - AXIOM_ORG_ID (only when using a personal token)
// - AXIOM_DATASET
//
// The configuration can be set manually using options which are prefixed with
// "Set".
//
// An API token with "ingest" permission is sufficient enough.
//
// A handler needs to be closed properly to make sure all logs are sent by
// calling [Handler.Close].
func New(options ...Option) (*Handler, error) {
handler := &Handler{
eventCh: make(chan axiom.Event, defaultBatchSize),
closeCh: make(chan struct{}),
}
// Apply supplied options.
for _, option := range options {
if option == nil {
continue
} else if err := option(handler); err != nil {
return nil, err
}
}
// Create client, if not set.
if handler.client == nil {
var err error
if handler.client, err = axiom.NewClient(handler.clientOptions...); err != nil {
return nil, err
}
}
// When the dataset name is not set, use "AXIOM_DATASET".
if handler.datasetName == "" {
handler.datasetName = os.Getenv("AXIOM_DATASET")
if handler.datasetName == "" {
return nil, ErrMissingDatasetName
}
}
// Run background ingest.
go func() {
defer close(handler.closeCh)
logger := stdlog.New(os.Stderr, "[AXIOM|APEX]", 0)
res, err := handler.client.IngestChannel(context.Background(), handler.datasetName, handler.eventCh, handler.ingestOptions...)
if err != nil {
logger.Printf("failed to ingest events: %s\n", err)
} else if res.Failed > 0 {
// Best effort on notifying the user about the ingest failure.
logger.Printf("event at %s failed to ingest: %s\n",
res.Failures[0].Timestamp, res.Failures[0].Error)
}
}()
return handler, nil
}
// Close the handler and make sure all events are flushed. Closing the handler
// renders it unusable for further use.
func (h *Handler) Close() {
h.closeOnce.Do(func() {
close(h.eventCh)
<-h.closeCh
})
}
// HandleLog implements [log.Handler].
func (h *Handler) HandleLog(entry *log.Entry) error {
event := axiom.Event{}
// Set fields first.
for k, v := range entry.Fields {
event[k] = v
}
// Set timestamp, severity and actual message.
event[ingest.TimestampField] = entry.Timestamp.Format(time.RFC3339Nano)
event["severity"] = entry.Level.String()
event["message"] = entry.Message
select {
case <-h.closeCh:
return errors.New("handler closed")
default:
h.eventCh <- event
return nil
}
}