/
client.go
136 lines (121 loc) · 4.28 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package keeltemporal
import (
"context"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/namespace/v1"
"go.temporal.io/api/replication/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/opentelemetry"
"go.uber.org/zap"
"github.com/foomo/keel/env"
"github.com/foomo/keel/log"
"github.com/foomo/keel/telemetry"
)
type (
ClientOptions struct {
Logger *zap.Logger
Namespace string
RegisterNamespace *workflowservice.RegisterNamespaceRequest
OtelEnabled bool
}
ClientOption func(o *ClientOptions)
)
func ClientWithOtelEnabled(v bool) ClientOption {
return func(o *ClientOptions) {
o.OtelEnabled = v
}
}
func ClientWithNamespace(v string) ClientOption {
return func(o *ClientOptions) {
o.Namespace = v
}
}
func ClientWithRegisterNamespace(v *workflowservice.RegisterNamespaceRequest) ClientOption {
return func(o *ClientOptions) {
o.RegisterNamespace = v
}
}
func DefaultClientOptions() ClientOptions {
return ClientOptions{
Logger: log.Logger(),
Namespace: "default",
RegisterNamespace: nil,
OtelEnabled: env.GetBool("OTEL_TEMPORAL_ENABLED", env.GetBool("OTEL_ENABLED", false)),
}
}
func NewClient(ctx context.Context, endpoint string, opts ...ClientOption) (client.Client, error) {
o := DefaultClientOptions()
// apply options
for _, opt := range opts {
opt(&o)
}
clientOpts := client.Options{
HostPort: endpoint,
Namespace: o.Namespace,
Logger: NewLogger(o.Logger),
}
nsc, err := client.NewNamespaceClient(clientOpts)
if err != nil {
return nil, errors.Wrap(err, "failed to create temporal namespace client")
}
// setup namespace
if o.RegisterNamespace != nil {
var notFoundErr *serviceerror.NotFound
if ns, err := nsc.Describe(ctx, o.RegisterNamespace.Namespace); errors.As(err, ¬FoundErr) {
if err := nsc.Register(ctx, o.RegisterNamespace); err != nil {
return nil, errors.Wrap(err, "failed to register temporal namespace")
}
} else if err != nil {
return nil, errors.Wrap(err, "failed to retrieve temporal namespace info")
} else if nsInfo := ns.GetNamespaceInfo(); nsInfo.State != enums.NAMESPACE_STATE_REGISTERED { //nolint:nosnakecase
return nil, errors.New("Could not register namespace due to existing state: " + nsInfo.State.String())
} else if err := nsc.Update(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: o.RegisterNamespace.Namespace,
UpdateInfo: &namespace.UpdateNamespaceInfo{
Description: o.RegisterNamespace.Description,
OwnerEmail: o.RegisterNamespace.OwnerEmail,
Data: o.RegisterNamespace.Data,
State: nsInfo.State,
},
Config: &namespace.NamespaceConfig{
WorkflowExecutionRetentionTtl: o.RegisterNamespace.WorkflowExecutionRetentionPeriod,
BadBinaries: ns.Config.BadBinaries,
HistoryArchivalState: o.RegisterNamespace.HistoryArchivalState,
HistoryArchivalUri: o.RegisterNamespace.HistoryArchivalUri,
VisibilityArchivalState: o.RegisterNamespace.VisibilityArchivalState,
VisibilityArchivalUri: o.RegisterNamespace.VisibilityArchivalUri,
},
ReplicationConfig: &replication.NamespaceReplicationConfig{
ActiveClusterName: o.RegisterNamespace.ActiveClusterName,
Clusters: o.RegisterNamespace.Clusters,
State: ns.ReplicationConfig.State,
},
SecurityToken: o.RegisterNamespace.SecurityToken,
DeleteBadBinary: "",
PromoteNamespace: false,
}); err != nil {
return nil, errors.Wrap(err, "failed to register temporal namespace")
}
clientOpts.Namespace = o.RegisterNamespace.Namespace
}
// setup otel
if o.OtelEnabled {
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(opentelemetry.TracerOptions{
Tracer: telemetry.Tracer(),
TextMapPropagator: otel.GetTextMapPropagator(),
SpanContextKey: nil,
HeaderKey: "",
SpanStarter: nil,
})
if err != nil {
return nil, errors.Wrap(err, "failed to create new opentracing interceptor")
}
clientOpts.Interceptors = append(clientOpts.Interceptors, tracingInterceptor)
clientOpts.MetricsHandler = NewMetricsHandler(telemetry.Meter())
}
return client.Dial(clientOpts)
}