-
Notifications
You must be signed in to change notification settings - Fork 78
/
tokens.go
203 lines (165 loc) · 5.63 KB
/
tokens.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package client
import (
"context"
"encoding/base64"
"errors"
"fmt"
"github.com/apache/arrow/go/v8/arrow/flight"
configpb2 "github.com/deephaven/deephaven-core/go/internal/proto/config"
"google.golang.org/grpc/metadata"
"log"
"strconv"
"sync"
"time"
)
// makeAuthString creates an authentication string from an authentication type and an authentication token.
func makeAuthString(authType string, authToken string) string {
if authType == "Anonymous" {
return authType
} else if authType == "Basic" {
return "Basic " + base64.StdEncoding.EncodeToString([]byte(authToken))
} else {
return authType + " " + authToken
}
}
// withAuth returns a context decorated with authentication data.
func withAuth(ctx context.Context, authString string) context.Context {
return metadata.NewOutgoingContext(ctx, metadata.Pairs("authorization", authString))
}
// withAuthToken returns a context decorated with an authentication token.
func withAuthToken(ctx context.Context, token []byte) context.Context {
return withAuth(ctx, "Bearer "+string(token))
}
// requestToken requests a new token from flight.
func requestToken(handshakeClient flight.FlightService_HandshakeClient, handshakeReq *flight.HandshakeRequest) ([]byte, error) {
err := handshakeClient.Send(handshakeReq)
if err != nil {
return nil, err
}
handshakeResp, err := handshakeClient.Recv()
if err != nil {
return nil, err
}
return handshakeResp.Payload, nil
}
// tokenResp protects the current session token (or an error in getting the session token).
type tokenResp struct {
Lock sync.Mutex
Token []byte
Error error
}
// getToken returns the current token, or an error if an error has occurred at some point.
func (tk *tokenResp) getToken() ([]byte, error) {
tk.Lock.Lock()
defer tk.Lock.Unlock()
if tk.Error != nil {
return nil, tk.Error
} else {
return tk.Token, nil
}
}
// setToken sets the session token to a new value.
func (tk *tokenResp) setToken(tok []byte) {
tk.Lock.Lock()
tk.Token = tok
tk.Error = nil
tk.Lock.Unlock()
}
// setError sets an error value for the session token.
func (tk *tokenResp) setError(err error) {
tk.Lock.Lock()
tk.Error = err
tk.Lock.Unlock()
}
// A tokenManager stores the current client token and sends periodic keepalive messages to refresh the client token.
type tokenManager struct {
token *tokenResp // the actual client token, which gets periodically updated.
close func() error
}
// getToken returns the current token, or an error if an error has occurred at some point.
func (tr *tokenManager) getToken() ([]byte, error) {
return tr.token.getToken()
}
// withToken attaches the current session token to a context as metadata.
func (tr *tokenManager) withToken(ctx context.Context) (context.Context, error) {
tok, err := tr.getToken()
if err != nil {
return nil, err
}
return withAuthToken(ctx, tok), nil
}
func (tr *tokenManager) Close() error {
return tr.close()
}
// newTokenManager creates a tokenManager that begins a background goroutine that continually refreshes
// the token so that it does not time out.
//
// authType is the type of authentication to use. This can be 'Anonymous', 'Basic', or any custom-built
// authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", The default is 'Anonymous'.
// To see what authentication methods are available on the Deephaven server, navigate to: http://<host>:<port>/jsapi/authentication/.
//
// authToken is the authentication token string. When authType is 'Basic', it must be
// "user:password"; when auth_type is DefaultAuth, it will be ignored; when auth_type is a custom-built
// authenticator, it must conform to the specific requirement of the authenticator.
func newTokenManager(ctx context.Context, fs *flightStub, cfg configpb2.ConfigServiceClient, authType string, authToken string) (*tokenManager, error) {
authString := makeAuthString(authType, authToken)
handshakeClient, err := fs.handshake(withAuth(ctx, authString))
if err != nil {
return nil, err
}
tkn, err := requestToken(handshakeClient, &flight.HandshakeRequest{Payload: []byte(authString)})
if err != nil {
return nil, err
}
ac, err := cfg.GetConfigurationConstants(withAuthToken(ctx, tkn), &configpb2.ConfigurationConstantsRequest{})
if err != nil {
return nil, err
}
sessionDurationStr, ok := ac.ConfigValues[TokenTimeoutConfigConstant]
if !ok {
return nil, errors.New(fmt.Sprintf("server configuration constants do not contain: %v", TokenTimeoutConfigConstant))
}
maxTimeoutMillis, err := strconv.Atoi(sessionDurationStr.GetStringValue())
if err != nil {
return nil, err
}
timeout := time.Duration(maxTimeoutMillis/2) * time.Millisecond
token := &tokenResp{Token: tkn}
done := make(chan bool)
ticker := time.NewTicker(timeout)
go func() {
for {
select {
case <-done:
// Make sure that nobody accidentally tries
// to use a token after the client has closed.
token.setError(ErrClosedClient)
return
case <-ticker.C:
oldToken, err := token.getToken()
var tkn []byte
if err == nil {
tkn, err = requestToken(handshakeClient, &flight.HandshakeRequest{Payload: oldToken})
} else {
log.Println("Old token has an error during token update. Attempting to acquire a fresh token. err=", err)
tkn, err = requestToken(handshakeClient, &flight.HandshakeRequest{Payload: []byte(authString)})
}
if err != nil {
token.setError(err)
log.Println("Error when updating token. err=", err)
} else {
token.setToken(tkn)
}
}
}
}()
ref := &tokenManager{
token: token,
close: func() error {
ticker.Stop()
done <- true
return handshakeClient.CloseSend()
},
}
return ref, nil
}