Skip to content

Commit

Permalink
read and log a response message from the gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
apetruhin committed Jul 21, 2023
1 parent 677ba26 commit 8df6461
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
43 changes: 28 additions & 15 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/hashicorp/yamux"
"github.com/jpillora/backoff"
"io"
"io/ioutil"
"k8s.io/klog"
"net"
"net/http"
Expand Down Expand Up @@ -62,7 +61,8 @@ func (t *Tunnel) keepConnected(ctx context.Context) {
t.gwConn, err = connect(t.address, t.serverName, t.token, t.config)
if err != nil {
d := b.Duration()
klog.Errorf("%s, reconnecting to %s in %.0fs", err, t.address, d.Seconds())
klog.Errorln(err)
klog.Errorf("reconnecting to %s in %.0fs", t.address, d.Seconds())
time.Sleep(d)
continue
}
Expand All @@ -83,7 +83,7 @@ func (t *Tunnel) Close() {
func main() {
resolverUrl := os.Getenv("RESOLVER_URL")
if resolverUrl == "" {
resolverUrl = "https://gw.coroot.com/promtun/resolve"
resolverUrl = "https://gw.coroot.com/connect/resolve"
}
token := mustEnv("PROJECT_TOKEN")
if len(token) != 36 {
Expand Down Expand Up @@ -160,17 +160,22 @@ func getEndpoints(resolverUrl, token string) ([]string, error) {
return strings.Split(strings.TrimSpace(string(payload)), ";"), nil
}

type Header struct {
type RequestHeader struct {
Token [36]byte
Version [16]byte
ConfigSize uint32
}

type ResponseHeader struct {
Status uint16
MessageSize uint16
}

func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) {
h := Header{}
copy(h.Token[:], token)
copy(h.Version[:], version)
h.ConfigSize = uint32(len(config))
requestHeader := RequestHeader{}
copy(requestHeader.Token[:], token)
copy(requestHeader.Version[:], version)
requestHeader.ConfigSize = uint32(len(config))

klog.Infof("connecting to %s (%s)", gwAddr, serverName)
deadline := time.Now().Add(timeout)
Expand All @@ -183,25 +188,33 @@ func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error)
klog.Infof("connected to gateway %s", gwAddr)

_ = gwConn.SetDeadline(deadline)
if err = binary.Write(gwConn, binary.LittleEndian, h); err != nil {
if err = binary.Write(gwConn, binary.LittleEndian, requestHeader); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err)
}
if _, err = gwConn.Write(config); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err)
}
var resp uint16
if err := binary.Read(gwConn, binary.LittleEndian, &resp); err != nil {
var responseHeader ResponseHeader
if err := binary.Read(gwConn, binary.LittleEndian, &responseHeader); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err)
}
var responseMessage string
if responseHeader.MessageSize > 0 {
buf := make([]byte, responseHeader.MessageSize)
if _, err := gwConn.Read(buf); err != nil {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err)
}
responseMessage = string(buf)
}
_ = gwConn.SetDeadline(time.Time{})
klog.Infof(`got "%d" from the gateway %s`, resp, gwAddr)

if resp != 200 {
if responseHeader.Status != 200 {
_ = gwConn.Close()
return nil, fmt.Errorf("failed to authenticate project on %s: %d", gwAddr, resp)
return nil, fmt.Errorf("got %d from %s: %s", responseHeader.Status, gwAddr, responseMessage)
}
klog.Infof("ready to proxy requests from %s", gwAddr)
return gwConn, nil
Expand All @@ -210,7 +223,7 @@ func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error)
func proxy(ctx context.Context, gwConn net.Conn) {
cfg := yamux.DefaultConfig()
cfg.KeepAliveInterval = time.Second
cfg.LogOutput = ioutil.Discard
cfg.LogOutput = io.Discard
session, err := yamux.Server(gwConn, cfg)
if err != nil {
klog.Errorln("failed to start a TCP multiplexing server:", err)
Expand Down
15 changes: 9 additions & 6 deletions connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func TestHandshakeError(t *testing.T) {
conn, err := listener.Accept()
require.NoError(t, err)
readHeaderAndConfig(t, conn, token, []byte("config_data"))
writeStatus(t, conn, 500)
writeResponse(t, conn, 500, "internal server error")
})
defer stop()
_, err := connect(addr, "", token, []byte("config_data"))
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to authenticate project")
assert.Contains(t, err.Error(), "internal server error")
}

func TestProxy(t *testing.T) {
Expand All @@ -59,7 +59,7 @@ func TestProxy(t *testing.T) {
conn, err := listener.Accept()
require.NoError(t, err)
readHeaderAndConfig(t, conn, token, []byte("config_data"))
writeStatus(t, conn, 200)
writeResponse(t, conn, 200, "")

cfg := yamux.DefaultConfig()
cfg.KeepAliveInterval = time.Second
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestProxy(t *testing.T) {
}

func readHeaderAndConfig(t *testing.T, conn net.Conn, token string, config []byte) {
h := Header{}
h := RequestHeader{}
require.NoError(t, binary.Read(conn, binary.LittleEndian, &h))
require.Equal(t, token, string(h.Token[:]))
require.Equal(t, version, string(bytes.Trim(h.Version[:], "\x00")))
Expand All @@ -150,8 +150,11 @@ func readHeaderAndConfig(t *testing.T, conn net.Conn, token string, config []byt
require.Equal(t, config, buf)
}

func writeStatus(t *testing.T, conn net.Conn, status uint16) {
require.NoError(t, binary.Write(conn, binary.LittleEndian, status))
func writeResponse(t *testing.T, conn net.Conn, status uint16, message string) {
err := binary.Write(conn, binary.LittleEndian, ResponseHeader{Status: status, MessageSize: uint16(len(message))})
require.NoError(t, err)
_, err = conn.Write([]byte(message))
require.NoError(t, err)
}

func gateway(t *testing.T, handler func(g net.Listener)) (string, func()) {
Expand Down

0 comments on commit 8df6461

Please sign in to comment.