Skip to content
Permalink
Browse files

Switch to using github.com/rancher/remotedialer for the transport

Fundamentally Inlets must perform tunneling and reverse proxying. Prior
to this commit both functions in Inlets are custom implementations tied
quite heavily to HTTP 1.1 request/response with no bi-directional
communication.

This patch replaces both tunneling and proxying with external libraries.
The transport layer is switched to using
github.com/rancher/remotedialer. Remote dialer allows tunneling
connections at a net.Conn level so higher level protocols such
as HTTP, HTTP2, WebSockets, gRPC, etc need not require specific
tunneling logic only proxy logic.

For proxying logic we use Kubernetes UpgradeAwareHandler which builds on
top of go's built in httputil.ReverseProxy but adds support for
"Connection: Upgrade" requests and HTTP response HTML rewriting. Once
Inlets switches to go 1.12 it might be possible to drop the Kubernetes
library and instead use the bare httputil.ReverseProxy as go 1.12 adds
support for proxying WebSockets.

Signed-off-by: Darren Shepherd <darren@rancher.com>
  • Loading branch information...
ibuildthecloud authored and alexellis committed Mar 28, 2019
1 parent 395844d commit 44bb61cb9b1488042d01f7fd4fa24f7d347e558f
Showing with 253 additions and 387 deletions.
  1. +3 −11 cmd/client.go
  2. +2 −16 cmd/server.go
  3. +17 −164 pkg/client/client.go
  4. +174 −0 pkg/router/router.go
  5. +53 −184 pkg/server/server.go
  6. +4 −12 pkg/transport/transport.go
@@ -3,7 +3,6 @@ package cmd
import (
"log"
"strings"
"time"

"github.com/alexellis/inlets/pkg/client"
"github.com/pkg/errors"
@@ -15,7 +14,6 @@ func init() {
clientCmd.Flags().StringP("remote", "r", "127.0.0.1:8000", "server address i.e. 127.0.0.1:8000")
clientCmd.Flags().StringP("upstream", "u", "", "upstream server i.e. http://127.0.0.1:3000")
clientCmd.Flags().StringP("token", "t", "", "token for authentication")
clientCmd.Flags().DurationP("ping", "p", time.Second*10, "ping internal")
}

type UpstreamParser interface {
@@ -84,16 +82,10 @@ func runClient(cmd *cobra.Command, _ []string) error {
return errors.Wrap(err, "failed to get 'token' value.")
}

pingDuration, err := cmd.Flags().GetDuration("ping")
if err != nil {
return errors.Wrap(err, "failed to get 'ping' value.")
}

inletsClient := client.Client{
Remote: remote,
UpstreamMap: upstreamMap,
Token: token,
PingWaitDuration: pingDuration,
Remote: remote,
UpstreamMap: upstreamMap,
Token: token,
}

if err := inletsClient.Connect(); err != nil {
@@ -6,15 +6,13 @@ import (
"github.com/alexellis/inlets/pkg/server"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"time"
)

func init() {
inletsCmd.AddCommand(serverCmd)
serverCmd.Flags().IntP("port", "p", 8000, "port for server")
serverCmd.Flags().StringP("token", "t", "", "token for authentication")
serverCmd.Flags().Bool("print-token", true, "prints the token in server mode")
serverCmd.Flags().String("gateway-timeout", "5s", "timeout for upstream gateway")
}

// serverCmd represents the server sub command.
@@ -44,26 +42,14 @@ func runServer(cmd *cobra.Command, _ []string) error {
log.Printf("Server token: %s", token)
}

gatewayTimeoutRaw, err := cmd.Flags().GetString("gateway-timeout")
if err != nil {
return errors.Wrap(err, "failed to get the 'gateway-timeout' value.")
}

gatewayTimeout, gatewayTimeoutErr := time.ParseDuration(gatewayTimeoutRaw)
if gatewayTimeoutErr != nil {
return gatewayTimeoutErr
}
log.Printf("Gateway timeout: %f secs\n", gatewayTimeout.Seconds())

port, err := cmd.Flags().GetInt("port")
if err != nil {
return errors.Wrap(err, "failed to get the 'port' value.")
}

inletsServer := server.Server{
Port: port,
GatewayTimeout: gatewayTimeout,
Token: token,
Port: port,
Token: token,
}

inletsServer.Serve()
@@ -1,23 +1,15 @@
package client

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"time"

"github.com/alexellis/inlets/pkg/transport"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/rancher/remotedialer"
"github.com/twinj/uuid"
)

var httpClient *http.Client

// Client for inlets
type Client struct {
// Remote site for websocket address
@@ -28,168 +20,29 @@ type Client struct {

// Token for authentication
Token string
}

// PingWaitDuration duration to wait between pings
PingWaitDuration time.Duration
func allowsAllow(network, address string) bool {
return true
}

// Connect connect and serve traffic through websocket
func (c *Client) Connect() error {

httpClient = http.DefaultClient
httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
headers := http.Header{}
headers.Set(transport.InletsHeader, uuid.Formatter(uuid.NewV4(), uuid.FormatHex))
for k, v := range c.UpstreamMap {
headers.Set(transport.UpstreamHeader, fmt.Sprintf("%s=%s", k, v))
}

remote := c.Remote
if !strings.HasPrefix(remote, "ws") {
remote = "ws://" + remote
if c.Token != "" {
headers.Add("Authorization", "Bearer "+c.Token)
}

remoteURL, urlErr := url.Parse(remote)
if urlErr != nil {
return errors.Wrap(urlErr, "bad remote URL")
fmt.Println(headers)
url := c.Remote
if !strings.HasPrefix(url, "ws") {
url = "ws://" + url
}

u := url.URL{Scheme: remoteURL.Scheme, Host: remoteURL.Host, Path: "/tunnel"}

log.Printf("connecting to %s with ping=%s", u.String(), c.PingWaitDuration.String())

wsc, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{
"Authorization": []string{"Bearer " + c.Token},
})

ws := transport.NewWebsocketConn(wsc, c.PingWaitDuration)

if err != nil {
return err
for {
remotedialer.ClientConnect(url+"/tunnel", headers, nil, allowsAllow, nil)
}

log.Printf("Connected to websocket: %s", ws.LocalAddr())

defer wsc.Close()

// Send pings
tickerDone := make(chan bool)

go func() {
log.Printf("Writing pings")

ticker := time.NewTicker((c.PingWaitDuration * 9) / 10) // send on a period which is around 9/10ths of original value
for {
select {
case <-ticker.C:
if err := ws.Ping(); err != nil {
close(tickerDone)
}
break
case <-tickerDone:
log.Printf("tickerDone, no more pings will be sent from client\n")
return
}
}
}()

// Work with websocket
done := make(chan struct{})

go func() {
defer close(done)

for {
messageType, message, err := ws.ReadMessage()
fmt.Printf("Read a message from websocket.\n")
if err != nil {
fmt.Printf("Read error: %s.\n", err)
return
}

switch messageType {
case websocket.TextMessage:
log.Printf("TextMessage: %s\n", message)

break
case websocket.BinaryMessage:
// proxyToUpstream

buf := bytes.NewBuffer(message)
bufReader := bufio.NewReader(buf)
req, readReqErr := http.ReadRequest(bufReader)
if readReqErr != nil {
log.Println(readReqErr)
return
}

inletsID := req.Header.Get(transport.InletsHeader)
// log.Printf("[%s] recv: %d", requestID, len(message))

log.Printf("[%s] %s", inletsID, req.RequestURI)

body, _ := ioutil.ReadAll(req.Body)

proxyHost := ""
if val, ok := c.UpstreamMap[req.Host]; ok {
proxyHost = val
} else if val, ok := c.UpstreamMap[""]; ok {
proxyHost = val
}

requestURI := fmt.Sprintf("%s%s", proxyHost, req.URL.String())
if len(req.URL.RawQuery) > 0 {
requestURI = requestURI + "?" + req.URL.RawQuery
}

log.Printf("[%s] proxy => %s", inletsID, requestURI)

newReq, newReqErr := http.NewRequest(req.Method, requestURI, bytes.NewReader(body))
if newReqErr != nil {
log.Printf("[%s] newReqErr: %s", inletsID, newReqErr.Error())
return
}

transport.CopyHeaders(newReq.Header, &req.Header)

res, resErr := httpClient.Do(newReq)

if resErr != nil {
log.Printf("[%s] Upstream tunnel err: %s", inletsID, resErr.Error())

errRes := http.Response{
StatusCode: http.StatusBadGateway,
Body: ioutil.NopCloser(strings.NewReader(resErr.Error())),
Header: http.Header{},
}

errRes.Header.Set(transport.InletsHeader, inletsID)
buf2 := new(bytes.Buffer)
errRes.Write(buf2)
if errRes.Body != nil {
errRes.Body.Close()
}

ws.WriteMessage(websocket.BinaryMessage, buf2.Bytes())

} else {
log.Printf("[%s] tunnel res.Status => %s", inletsID, res.Status)

buf2 := new(bytes.Buffer)
res.Header.Set(transport.InletsHeader, inletsID)

res.Write(buf2)
if res.Body != nil {
res.Body.Close()
}

log.Printf("[%s] %d bytes", inletsID, buf2.Len())

ws.WriteMessage(websocket.BinaryMessage, buf2.Bytes())
}
}

}
}()

<-done

return nil
}
Oops, something went wrong.

0 comments on commit 44bb61c

Please sign in to comment.
You can’t perform that action at this time.