-
-
Notifications
You must be signed in to change notification settings - Fork 80
/
auth.go
131 lines (114 loc) · 3.37 KB
/
auth.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package proxy
import (
"context"
"encoding/binary"
"fmt"
"github.com/grepplabs/kafka-proxy/pkg/apis"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"io"
"strings"
"time"
)
type AuthClient struct {
enabled bool
magic uint64
method string
timeout time.Duration
tokenProvider apis.TokenProvider
}
// TODO: reset deadlines after method - ok
func (b *AuthClient) sendAndReceiveGatewayAuth(conn DeadlineReaderWriter) error {
//TODO: timeout
// ctx, cancel := context.WithTimeout(context.Background(), time.Duration(p.timeout)*time.Second)
// defer cancel()
resp, err := b.tokenProvider.GetToken(context.Background(), apis.TokenRequest{})
if err != nil {
return err
}
if !resp.Success {
return fmt.Errorf("get token failed with status: %d", resp.Status)
}
if resp.Token == "" {
return errors.New("get token returned empty token")
}
data := resp.Token
length := len(b.method) + 1 + len(data)
// 8 - bytes magic, 4 bytes length
buf := make([]byte, 12+length)
binary.BigEndian.PutUint64(buf[:8], b.magic)
binary.BigEndian.PutUint32(buf[8:], uint32(length))
copy(buf[12:], []byte(b.method+"\x00"+data))
err = conn.SetDeadline(time.Now().Add(b.timeout))
if err != nil {
return err
}
_, err = conn.Write(buf)
if err != nil {
return errors.Wrap(err, "Failed to write gateway handshake")
}
header := make([]byte, 4)
_, err = io.ReadFull(conn, header)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
if err == io.EOF {
return errors.New("Gateway auth failed")
}
return errors.Wrap(err, "Failed to read response while gateway authenticating")
}
return nil
}
type AuthServer struct {
enabled bool
magic uint64
method string
timeout time.Duration
tokenInfo apis.TokenInfo
}
// TODO: reset deadlines after method - ok
func (b *AuthServer) receiveAndSendGatewayAuth(conn DeadlineReaderWriter) error {
err := conn.SetDeadline(time.Now().Add(b.timeout))
if err != nil {
return err
}
headerBuf := make([]byte, 12) // magic 8 + length 4
_, err = io.ReadFull(conn, headerBuf)
if err != nil {
return errors.Wrap(err, "Failed to read gateway bytes magic")
}
magic := binary.BigEndian.Uint64(headerBuf[:8])
if magic != b.magic {
return errors.New("gateway handshake magic bytes mismatch")
}
length := binary.BigEndian.Uint32(headerBuf[8:])
payload := make([]byte, length)
_, err = io.ReadFull(conn, payload)
if err != nil {
return errors.Wrap(err, "failed to read gateway handshake payload")
}
tokens := strings.Split(string(payload), "\x00")
if len(tokens) != 2 {
return fmt.Errorf("invalid gateway handshake: expected 2 tokens, got %d", len(tokens))
}
if tokens[0] != b.method {
return fmt.Errorf("gateway handshake method mismatch: expected %s , got %s", b.method, tokens[0])
}
data := tokens[1]
//TODO: timeout
// ctx, cancel := context.WithTimeout(context.Background(), time.Duration(p.timeout)*time.Second)
// defer cancel()
resp, err := b.tokenInfo.VerifyToken(context.Background(), apis.VerifyRequest{Token: data})
if err != nil {
return err
}
if !resp.Success {
return fmt.Errorf("gateway server verify token failed with status: %d", resp.Status)
}
logrus.Debugf("gateway handshake payload: %s", data)
header := make([]byte, 4)
if _, err := conn.Write(header); err != nil {
return err
}
return nil
}