-
Notifications
You must be signed in to change notification settings - Fork 20
/
telemetry.go
84 lines (75 loc) · 2.18 KB
/
telemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package telemetry
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/google/uuid"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"github.com/algorand/conduit/version"
)
// MakeTelemetryConfig initializes a new TelemetryConfig.
func MakeTelemetryConfig(telemetryURI, index, username, password string) Config {
return Config{
Enable: true,
URI: telemetryURI,
GUID: uuid.New().String(), // Use Google UUID instead of go-algorand utils
Index: index,
UserName: username,
Password: password,
}
}
// initializeOpenSearchClient creates a new OpenSearch client.
func initializeOpenSearchClient(cfg Config) (*opensearch.Client, error) {
client, err := opensearch.NewClient(opensearch.Config{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Addresses: []string{cfg.URI},
// These credentials are here intentionally. Not a bug.
Username: cfg.UserName,
Password: cfg.Password,
})
if err != nil {
return nil, fmt.Errorf("unable to create new OpenSearch client with URI %s: %w", cfg.URI, err)
}
return client, nil
}
// MakeOpenSearchClient initializes a new TelemetryState.
func MakeOpenSearchClient(cfg Config) (*OpenSearchClient, error) {
client, err := initializeOpenSearchClient(cfg)
if err != nil {
return nil, err
}
telemetryState := &OpenSearchClient{
Client: client,
TelemetryConfig: cfg,
}
return telemetryState, nil
}
// MakeTelemetryStartupEvent sends a startup event when the pipeline is initialized.
func (t *OpenSearchClient) MakeTelemetryStartupEvent() Event {
return Event{
Message: "starting conduit",
GUID: t.TelemetryConfig.GUID,
Time: time.Now(),
Version: version.LongVersion(),
}
}
// SendEvent sends a TelemetryEvent to OpenSearch.
func (t *OpenSearchClient) SendEvent(event Event) error {
data, _ := json.Marshal(event)
req := opensearchapi.IndexRequest{
Index: t.TelemetryConfig.Index,
Body: bytes.NewReader(data),
}
_, err := req.Do(context.Background(), t.Client)
if err != nil {
return fmt.Errorf("failed to insert event: %w", err)
}
return nil
}