Skip to content

Commit

Permalink
Test connect to Kafka through HTTP Proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed May 5, 2018
1 parent 13e4f8b commit 760bc31
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 14 deletions.
4 changes: 0 additions & 4 deletions README.md
Expand Up @@ -219,7 +219,6 @@ Connect through test SOCKS5 Proxy server
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
--forward-proxy socks5://localhost:1080
```

```
Expand All @@ -229,7 +228,6 @@ Connect through test SOCKS5 Proxy server
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
--forward-proxy socks5://my-proxy-user:my-proxy-password@localhost:1080
```

### Connect to Kafka through HTTP Proxy example
Expand All @@ -243,7 +241,6 @@ Connect through test HTTP Proxy server using CONNECT method
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
--forward-proxy http://localhost:3128
```

```
Expand All @@ -253,7 +250,6 @@ Connect through test HTTP Proxy server using CONNECT method
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
--forward-proxy http://my-proxy-user:my-proxy-password@localhost:3128
```

### Kubernetes sidecar container example
Expand Down
9 changes: 7 additions & 2 deletions proxy/auth_test.go
Expand Up @@ -17,9 +17,14 @@ func TestAuthHandshake(t *testing.T) {
testAuthHandshake(a, makePipe)
}

func TestAuthHandshakeSocks5(t *testing.T) {
func TestAuthHandshakeSocks5Proxy(t *testing.T) {
a := assert.New(t)
testAuthHandshake(a, makeSocks5Pipe)
testAuthHandshake(a, makeSocks5ProxyPipe)
}

func TestAuthHandshakeHttpProxy(t *testing.T) {
a := assert.New(t)
testAuthHandshake(a, makeHttpProxyPipe)
}

func testAuthHandshake(a *assert.Assertions, mp func() (c1, c2 net.Conn, stop func(), err error)) {
Expand Down
65 changes: 59 additions & 6 deletions proxy/tls_test.go
Expand Up @@ -120,7 +120,7 @@ func TestTLSSelfSigned(t *testing.T) {
pingPong(t, c1, c2)
}

func TestTLSThroughSocks5(t *testing.T) {
func TestTLSThroughSocks5Proxy(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
Expand All @@ -131,15 +131,34 @@ func TestTLSThroughSocks5(t *testing.T) {
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Kafka.TLS.CAChainCertFile = bundle.ServerCert.Name()

c1, c2, stop, err := makeTLSSocks5Pipe(c, nil, "", "")
c1, c2, stop, err := makeTLSSocks5ProxyPipe(c, nil, "", "")
if err != nil {
a.FailNow(err.Error())
}
defer stop()
pingPong(t, c1, c2)
}

func TestTLSThroughSocks5WithCredentials(t *testing.T) {
func TestTLSThroughHttpProxy(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
defer bundle.Close()

c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Kafka.TLS.CAChainCertFile = bundle.ServerCert.Name()

c1, c2, stop, err := makeTLSHttpProxyPipe(c, "", "", "", "")
if err != nil {
a.FailNow(err.Error())
}
defer stop()
pingPong(t, c1, c2)
}

func TestTLSThroughSocks5ProxyWithCredentials(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
Expand All @@ -156,15 +175,33 @@ func TestTLSThroughSocks5WithCredentials(t *testing.T) {
password: "test-password",
},
}
c1, c2, stop, err := makeTLSSocks5Pipe(c, authenticator, "test-user", "test-password")
c1, c2, stop, err := makeTLSSocks5ProxyPipe(c, authenticator, "test-user", "test-password")
if err != nil {
a.FailNow(err.Error())
}
defer stop()
pingPong(t, c1, c2)
}

func TestTLSThroughHttpProxyWithCredentials(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
defer bundle.Close()

c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Kafka.TLS.CAChainCertFile = bundle.ServerCert.Name()
c1, c2, stop, err := makeTLSHttpProxyPipe(c, "test-user", "test-password", "test-user", "test-password")
if err != nil {
a.FailNow(err.Error())
}
defer stop()
pingPong(t, c1, c2)
}

func TestTLSThroughSocks5WithBadCredentials(t *testing.T) {
func TestTLSThroughSocks5ProxyWithBadCredentials(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
Expand All @@ -181,12 +218,28 @@ func TestTLSThroughSocks5WithBadCredentials(t *testing.T) {
password: "test-password",
},
}
_, _, _, err := makeTLSSocks5Pipe(c, authenticator, "test-user", "bad-password")
_, _, _, err := makeTLSSocks5ProxyPipe(c, authenticator, "test-user", "bad-password")
a.NotNil(err)
a.True(strings.HasPrefix(err.Error(), "proxy: SOCKS5 proxy at"))
a.True(strings.HasSuffix(err.Error(), "rejected username/password"))
}

func TestTLSThroughHttpProxyWithBadCredentials(t *testing.T) {
a := assert.New(t)

bundle := NewCertsBundle()
defer bundle.Close()

c := new(config.Config)
c.Proxy.TLS.ListenerCertFile = bundle.ServerCert.Name()
c.Proxy.TLS.ListenerKeyFile = bundle.ServerKey.Name()
c.Kafka.TLS.CAChainCertFile = bundle.ServerCert.Name()

_, _, _, err := makeTLSHttpProxyPipe(c, "test-user", "test-password", "test-user", "bad-password")
a.NotNil(err)
a.Equal(err.Error(), "connect server using proxy error, statuscode [407]")
}

func TestTLSVerifyClientCertDifferentCAs(t *testing.T) {
a := assert.New(t)

Expand Down
185 changes: 183 additions & 2 deletions proxy/util_test.go
Expand Up @@ -8,11 +8,14 @@ import (
"crypto/x509/pkix"
"encoding/pem"
"github.com/armon/go-socks5"
"github.com/elazarl/goproxy"
"github.com/elazarl/goproxy/ext/auth"
"github.com/grepplabs/kafka-proxy/config"
"github.com/pkg/errors"
"io/ioutil"
"math/big"
"net"
"net/http"
"os"
"time"
)
Expand Down Expand Up @@ -101,7 +104,7 @@ func (s testCredentials) Valid(username, password string) bool {
return s.username == username && s.password == password
}

func makeTLSSocks5Pipe(conf *config.Config, authenticator socks5.Authenticator, username, password string) (c1, c2 net.Conn, stop func(), err error) {
func makeTLSSocks5ProxyPipe(conf *config.Config, authenticator socks5.Authenticator, username, password string) (c1, c2 net.Conn, stop func(), err error) {
socks5Conf := &socks5.Config{}
if authenticator != nil {
socks5Conf.AuthMethods = []socks5.Authenticator{authenticator}
Expand Down Expand Up @@ -209,6 +212,113 @@ func makeTLSSocks5Pipe(conf *config.Config, authenticator socks5.Authenticator,
}
}

func makeTLSHttpProxyPipe(conf *config.Config, proxyusername, proxypassword string, username, password string) (c1, c2 net.Conn, stop func(), err error) {
server := goproxy.NewProxyHttpServer()

if proxyusername != "" && proxypassword != "" {
server.OnRequest().HandleConnect(auth.BasicConnect("", func(user, passwd string) bool {
return user == proxyusername && passwd == proxypassword
}))
}

if err != nil {
return nil, nil, nil, err
}
clientConfig, err := newTLSClientConfig(conf)
if err != nil {
return nil, nil, nil, err
}
serverConfig, err := newTLSListenerConfig(conf)
if err != nil {
return nil, nil, nil, err
}

proxy, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, err
}
httpProxy := &httpProxy{
forwardDialer: directDialer{
dialTimeout: 2 * time.Second,
keepAlive: 60 * time.Second,
},
network: proxy.Addr().Network(),
hostPort: proxy.Addr().String(),
username: username,
password: password,
}

tlsDialer := tlsDialer{
timeout: 3 * time.Second,
rawDialer: httpProxy,
config: clientConfig,
}

target, err := tls.Listen("tcp", "127.0.0.1:0", serverConfig)
if err != nil {
proxy.Close()
return nil, nil, nil, err
}
// Start a connection between two endpoints.
var err0, err1, err2 error
go func() {
err0 = http.Serve(proxy, server)
}()
done := make(chan bool)
go func() {
c2, err2 = target.Accept()
close(done)
if err2 != nil {
return
}
// will force handshake completion
buf := make([]byte, 0)
c2.Read(buf)

tlscon, ok := c2.(*tls.Conn)
if ok {
state := tlscon.ConnectionState()
for _, v := range state.PeerCertificates {
_ = v
//fmt.Println(x509.MarshalPKIXPublicKey(v.PublicKey))
}
}
}()
stop = func() {
if err1 == nil {
c1.Close()
}
if err2 == nil {
c2.Close()
}
target.Close()
proxy.Close()
}

c1, err1 = tlsDialer.Dial(target.Addr().Network(), target.Addr().String())
if err1 != nil {
target.Close()
return nil, nil, nil, err1
}
select {
case <-done:
case <-time.After(4 * time.Second):
target.Close()
return nil, nil, nil, errors.New("Accept timeout ")
}

switch {
case err1 != nil:
stop()
return nil, nil, nil, err1
case err2 != nil:
stop()
return nil, nil, nil, err2
default:
return c1, c2, stop, nil
}
}

func makePipe() (c1, c2 net.Conn, stop func(), err error) {
dialer := directDialer{
dialTimeout: 2 * time.Second,
Expand Down Expand Up @@ -257,7 +367,7 @@ func makePipe() (c1, c2 net.Conn, stop func(), err error) {
}
}

func makeSocks5Pipe() (c1, c2 net.Conn, stop func(), err error) {
func makeSocks5ProxyPipe() (c1, c2 net.Conn, stop func(), err error) {
server, err := socks5.New(&socks5.Config{})
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -330,6 +440,77 @@ func makeSocks5Pipe() (c1, c2 net.Conn, stop func(), err error) {
}
}

func makeHttpProxyPipe() (c1, c2 net.Conn, stop func(), err error) {
server := goproxy.NewProxyHttpServer()
//server.Verbose = true

if err != nil {
return nil, nil, nil, err
}
proxy, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
return nil, nil, nil, err
}
target, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
proxy.Close()
return nil, nil, nil, err
}
httpProxy := httpProxy{
forwardDialer: directDialer{
dialTimeout: 2 * time.Second,
keepAlive: 60 * time.Second,
},
network: proxy.Addr().Network(),
hostPort: proxy.Addr().String(),
}

var err0, err1, err2 error
go func() {
err0 = http.Serve(proxy, server)
}()

done := make(chan bool)
go func() {
c2, err2 = target.Accept()
close(done)
}()

c1, err1 = httpProxy.Dial(target.Addr().Network(), target.Addr().String())

stop = func() {
if err1 == nil {
c1.Close()
}
if err2 == nil {
c2.Close()
}
target.Close()
proxy.Close()
}

select {
case <-done:
case <-time.After(3 * time.Second):
stop()
return nil, nil, nil, errors.New("Accept timeout")
}

switch {
case err0 != nil:
stop()
return nil, nil, nil, err0
case err1 != nil:
stop()
return nil, nil, nil, err1
case err2 != nil:
stop()
return nil, nil, nil, err2
default:
return c1, c2, stop, nil
}
}

func generateCert(catls *tls.Certificate, certFile *os.File, keyFile *os.File) error {
// Prepare certificate
cert := &x509.Certificate{
Expand Down

0 comments on commit 760bc31

Please sign in to comment.