Skip to content

Commit

Permalink
Merge c3e661f into 630e4c6
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Jun 30, 2017
2 parents 630e4c6 + c3e661f commit f61ed21
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 14 deletions.
78 changes: 72 additions & 6 deletions api_definition_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"io"
"net"
"net/http"
"net/http/httptest"
"sync"
Expand Down Expand Up @@ -315,14 +317,10 @@ func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server {
config.SlaveOptions.APIKey = "test"

server := gorpc.NewTCPServer(":9090", dispatcher.NewHandlerFunc())
go server.Serve()
server.Listener = &customListener{}
config.SlaveOptions.ConnectionString = server.Addr

RPCCLientSingleton = gorpc.NewTCPClient(server.Addr)
RPCCLientSingleton.Conns = 1
RPCCLientSingleton.Start()
RPCClientIsConnected = true
RPCFuncClientSingleton = getDispatcher().NewFuncClient(RPCCLientSingleton)
go server.Serve()

return server
}
Expand Down Expand Up @@ -437,3 +435,71 @@ func TestRoundRobin(t *testing.T) {
}
}
}

func setupKeepalive(conn net.Conn) error {
tcpConn := conn.(*net.TCPConn)
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
if err := tcpConn.SetKeepAlivePeriod(30 * time.Second); err != nil {
return err
}
return nil
}

type customListener struct {
L net.Listener
}

func (ln *customListener) Init(addr string) (err error) {
ln.L, err = net.Listen("tcp", addr)
return
}

func (ln *customListener) ListenAddr() net.Addr {
if ln.L != nil {
return ln.L.Addr()
}
return nil
}

func (ln *customListener) Accept() (conn io.ReadWriteCloser, clientAddr string, err error) {
c, err := ln.L.Accept()

if err != nil {
return
}

if err = setupKeepalive(c); err != nil {
c.Close()
return
}

handshake := make([]byte, 8)
_, err = c.Read(handshake)

// Let gorpc handle it
if err != nil {
return
}

idLenBuf := make([]byte, 1)
_, err = c.Read(idLenBuf)
if err != nil {
return
}

idLen := uint8(idLenBuf[0])
id := make([]byte, idLen)
_, err = c.Read(id)

if err != nil {
return
}

return c, string(id), nil
}

func (ln *customListener) Close() error {
return ln.L.Close()
}
37 changes: 29 additions & 8 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"io"
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -130,6 +131,7 @@ func (r *RPCStorageHandler) ReConnect() {
}

var RPCCLientSingleton *gorpc.Client
var RPCClientSingletonConnectionID string
var RPCFuncClientSingleton *gorpc.DispatcherClient
var RPCGlobalCache = cache.New(30*time.Second, 15*time.Second)
var RPCClientIsConnected bool
Expand All @@ -145,22 +147,41 @@ func (r *RPCStorageHandler) Connect() bool {
// RPC Client is unset
// Set up the cache
log.Info("Setting new RPC connection!")
if config.SlaveOptions.UseSSL {
clientCfg := &tls.Config{
InsecureSkipVerify: config.SlaveOptions.SSLInsecureSkipVerify,
}

RPCCLientSingleton = gorpc.NewTLSClient(r.Address, clientCfg)
} else {
RPCCLientSingleton = gorpc.NewTCPClient(r.Address)
}
RPCCLientSingleton = &gorpc.Client{Addr: r.Address}
RPCClientSingletonConnectionID = uuid.NewV4().String()

if log.Level != logrus.DebugLevel {
gorpc.SetErrorLogger(gorpc.NilErrorLogger)
}

RPCCLientSingleton.OnConnect = r.OnConnectFunc
RPCCLientSingleton.Conns = 50
RPCCLientSingleton.Dial = func(addr string) (conn io.ReadWriteCloser, err error) {
dialer := &net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}

if config.SlaveOptions.UseSSL {
cfg := &tls.Config{
InsecureSkipVerify: config.SlaveOptions.SSLInsecureSkipVerify,
}

conn, err = tls.DialWithDialer(dialer, "tcp", addr, cfg)
} else {
conn, err = dialer.Dial("tcp", addr)
}

if err != nil {
return
}

conn.Write([]byte("register"))
conn.Write([]byte{byte(len(RPCClientSingletonConnectionID))})
conn.Write([]byte(RPCClientSingletonConnectionID))
return conn, nil
}
RPCCLientSingleton.Start()
d := getDispatcher()

Expand Down

0 comments on commit f61ed21

Please sign in to comment.