Skip to content

Commit

Permalink
implement rpc helper in server pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
ruizeng committed Oct 26, 2015
1 parent 5b5e5a4 commit 694dfde
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
includes:

- tcp/http service framework
- rpc framework with ratelimit and metrics
- rpc helper
- stats api
- timer task interface
1 change: 1 addition & 0 deletions pkg/server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
errNewConnection = "receive new connection error (%s)"
errWrongHostAddr = "wrong address : %s"
errWrongEtcdPath = "wrong path in etcd: %s"
errServerManagerNotInit = "sever manager not init!"
)
3 changes: 1 addition & 2 deletions pkg/server/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package server

import (
"github.com/Sirupsen/logrus"
"os"
)

var Log *logrus.Entry
Expand All @@ -15,7 +14,7 @@ func initLog(name string, level string) error {
logrus.SetFormatter(&logrus.JSONFormatter{})

// Output to stderr instead of stdout, could also be a file.
logrus.SetOutput(os.Stderr)
// logrus.SetOutput(os.Stderr)

// logging level
lvl, err := logrus.ParseLevel(level)
Expand Down
77 changes: 77 additions & 0 deletions pkg/server/rpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// RPCClient implements a rpc client tool with reconnect and load balance.
package server

import (
"fmt"
"math/rand"
"net/rpc"
"time"
)

type RPCClient struct {
clients map[string]*rpc.Client
random *rand.Rand
}

func NewRPCClient() (*RPCClient, error) {
if serverInstance == nil {
return nil, errorf(errServerNotInit)
}
if serverInstance.svrmgr == nil {
return nil, errorf(errServerManagerNotInit)
}
return &RPCClient{
clients: make(map[string]*rpc.Client),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}, nil
}

func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string, args interface{}, reply interface{}) error {
err := client.Call(serverMethod, args, reply)
if err == rpc.ErrShutdown {
client, err = rpc.Dial("tcp", addr)
if err != nil {
return err
}
} else if err == nil {
return nil
}

err = client.Call(serverMethod, args, reply)

return err
}

//RPC call with reconnect and retry.
func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
addrs, err := serverInstance.svrmgr.GetServerHosts(severName, FlagRPCHost)
if err != nil {
return err
}

// pick a random start index for round robin
total := len(addrs)
start := client.random.Intn(total)

for idx := 0; idx < total; idx++ {
addr := addrs[(start+idx)%total]
mapkey := fmt.Sprintf("%s[%s]", severName, addr)
if client.clients[mapkey] == nil {
client.clients[mapkey], err = rpc.Dial("tcp", addr)
if err != nil {
Log.Warnf("RPC dial error : %s", err)
continue
}
}

err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
if err != nil {
Log.Warn("RpcCallWithReconnect error : %s", err)
continue
}

return nil
}

return errorf("rpc all failed")
}
30 changes: 30 additions & 0 deletions pkg/server/rpc_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import (
"testing"
)

func validateRPCClient(t *testing.T) {
rpccli, err := NewRPCClient()
if err != nil {
t.Fatal(err)
}

args := &Args{100, 200}
var reply int

err = rpccli.Call("test", "Arith.Multiply", args, &reply)
if err != nil {
t.Fatal(err)
}

if reply != testRPCArgs.A*testRPCArgs.B {
t.Fatalf("rpc client test faild, want %d, got %d", testRPCArgs.A*testRPCArgs.B, reply)
}

err = rpccli.Call("wrongtest", "Arith.Multiply", args, &reply)
t.Log(err)
if err == nil {
t.Fatalf("rpc client should return error when no server found!")
}
}
1 change: 1 addition & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,6 @@ func TestServer(t *testing.T) {
validateHTTPSServer(t, "https://"+*confHTTPHost)
validateTLSServer(t, *confTCPHost)
validateRPCServer(t, *confRPCHost, "Arith2.Multiply")
validateRPCClient(t)
validateServerManager(t)
}

0 comments on commit 694dfde

Please sign in to comment.