From 8df6461c93ab8659c107302f8b75bd45131c0e8f Mon Sep 17 00:00:00 2001 From: Anton Petruhin Date: Fri, 21 Jul 2023 15:52:00 +0300 Subject: [PATCH] read and log a response message from the gateway --- connect.go | 43 ++++++++++++++++++++++++++++--------------- connect_test.go | 15 +++++++++------ 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/connect.go b/connect.go index 847c687..3b61ac7 100644 --- a/connect.go +++ b/connect.go @@ -8,7 +8,6 @@ import ( "github.com/hashicorp/yamux" "github.com/jpillora/backoff" "io" - "io/ioutil" "k8s.io/klog" "net" "net/http" @@ -62,7 +61,8 @@ func (t *Tunnel) keepConnected(ctx context.Context) { t.gwConn, err = connect(t.address, t.serverName, t.token, t.config) if err != nil { d := b.Duration() - klog.Errorf("%s, reconnecting to %s in %.0fs", err, t.address, d.Seconds()) + klog.Errorln(err) + klog.Errorf("reconnecting to %s in %.0fs", t.address, d.Seconds()) time.Sleep(d) continue } @@ -83,7 +83,7 @@ func (t *Tunnel) Close() { func main() { resolverUrl := os.Getenv("RESOLVER_URL") if resolverUrl == "" { - resolverUrl = "https://gw.coroot.com/promtun/resolve" + resolverUrl = "https://gw.coroot.com/connect/resolve" } token := mustEnv("PROJECT_TOKEN") if len(token) != 36 { @@ -160,17 +160,22 @@ func getEndpoints(resolverUrl, token string) ([]string, error) { return strings.Split(strings.TrimSpace(string(payload)), ";"), nil } -type Header struct { +type RequestHeader struct { Token [36]byte Version [16]byte ConfigSize uint32 } +type ResponseHeader struct { + Status uint16 + MessageSize uint16 +} + func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) { - h := Header{} - copy(h.Token[:], token) - copy(h.Version[:], version) - h.ConfigSize = uint32(len(config)) + requestHeader := RequestHeader{} + copy(requestHeader.Token[:], token) + copy(requestHeader.Version[:], version) + requestHeader.ConfigSize = uint32(len(config)) klog.Infof("connecting to %s (%s)", gwAddr, serverName) deadline := time.Now().Add(timeout) @@ -183,7 +188,7 @@ func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) klog.Infof("connected to gateway %s", gwAddr) _ = gwConn.SetDeadline(deadline) - if err = binary.Write(gwConn, binary.LittleEndian, h); err != nil { + if err = binary.Write(gwConn, binary.LittleEndian, requestHeader); err != nil { _ = gwConn.Close() return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err) } @@ -191,17 +196,25 @@ func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) _ = gwConn.Close() return nil, fmt.Errorf("failed to send config to %s: %s", gwAddr, err) } - var resp uint16 - if err := binary.Read(gwConn, binary.LittleEndian, &resp); err != nil { + var responseHeader ResponseHeader + if err := binary.Read(gwConn, binary.LittleEndian, &responseHeader); err != nil { _ = gwConn.Close() return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err) } + var responseMessage string + if responseHeader.MessageSize > 0 { + buf := make([]byte, responseHeader.MessageSize) + if _, err := gwConn.Read(buf); err != nil { + _ = gwConn.Close() + return nil, fmt.Errorf("failed to read the response from %s: %s", gwAddr, err) + } + responseMessage = string(buf) + } _ = gwConn.SetDeadline(time.Time{}) - klog.Infof(`got "%d" from the gateway %s`, resp, gwAddr) - if resp != 200 { + if responseHeader.Status != 200 { _ = gwConn.Close() - return nil, fmt.Errorf("failed to authenticate project on %s: %d", gwAddr, resp) + return nil, fmt.Errorf("got %d from %s: %s", responseHeader.Status, gwAddr, responseMessage) } klog.Infof("ready to proxy requests from %s", gwAddr) return gwConn, nil @@ -210,7 +223,7 @@ func connect(gwAddr, serverName, token string, config []byte) (net.Conn, error) func proxy(ctx context.Context, gwConn net.Conn) { cfg := yamux.DefaultConfig() cfg.KeepAliveInterval = time.Second - cfg.LogOutput = ioutil.Discard + cfg.LogOutput = io.Discard session, err := yamux.Server(gwConn, cfg) if err != nil { klog.Errorln("failed to start a TCP multiplexing server:", err) diff --git a/connect_test.go b/connect_test.go index 67941ec..27590cc 100644 --- a/connect_test.go +++ b/connect_test.go @@ -42,12 +42,12 @@ func TestHandshakeError(t *testing.T) { conn, err := listener.Accept() require.NoError(t, err) readHeaderAndConfig(t, conn, token, []byte("config_data")) - writeStatus(t, conn, 500) + writeResponse(t, conn, 500, "internal server error") }) defer stop() _, err := connect(addr, "", token, []byte("config_data")) require.Error(t, err) - assert.Contains(t, err.Error(), "failed to authenticate project") + assert.Contains(t, err.Error(), "internal server error") } func TestProxy(t *testing.T) { @@ -59,7 +59,7 @@ func TestProxy(t *testing.T) { conn, err := listener.Accept() require.NoError(t, err) readHeaderAndConfig(t, conn, token, []byte("config_data")) - writeStatus(t, conn, 200) + writeResponse(t, conn, 200, "") cfg := yamux.DefaultConfig() cfg.KeepAliveInterval = time.Second @@ -139,7 +139,7 @@ func TestProxy(t *testing.T) { } func readHeaderAndConfig(t *testing.T, conn net.Conn, token string, config []byte) { - h := Header{} + h := RequestHeader{} require.NoError(t, binary.Read(conn, binary.LittleEndian, &h)) require.Equal(t, token, string(h.Token[:])) require.Equal(t, version, string(bytes.Trim(h.Version[:], "\x00"))) @@ -150,8 +150,11 @@ func readHeaderAndConfig(t *testing.T, conn net.Conn, token string, config []byt require.Equal(t, config, buf) } -func writeStatus(t *testing.T, conn net.Conn, status uint16) { - require.NoError(t, binary.Write(conn, binary.LittleEndian, status)) +func writeResponse(t *testing.T, conn net.Conn, status uint16, message string) { + err := binary.Write(conn, binary.LittleEndian, ResponseHeader{Status: status, MessageSize: uint16(len(message))}) + require.NoError(t, err) + _, err = conn.Write([]byte(message)) + require.NoError(t, err) } func gateway(t *testing.T, handler func(g net.Listener)) (string, func()) {