Skip to content

Commit

Permalink
rpc: add new client, use it everywhere
Browse files Browse the repository at this point in the history
The new client implementation supports concurrent requests,
subscriptions and replaces the various ad hoc RPC clients
throughout go-ethereum.
  • Loading branch information
fjl committed Jul 22, 2016
1 parent bb01bea commit 91b7690
Show file tree
Hide file tree
Showing 30 changed files with 2,002 additions and 751 deletions.
160 changes: 18 additions & 142 deletions accounts/abi/bind/backends/remote.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
package backends package backends


import ( import (
"encoding/json"
"fmt"
"math/big" "math/big"
"sync"
"sync/atomic"


"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
Expand All @@ -37,183 +33,78 @@ var _ bind.ContractBackend = (*rpcBackend)(nil)
// rpcBackend implements bind.ContractBackend, and acts as the data provider to // rpcBackend implements bind.ContractBackend, and acts as the data provider to
// Ethereum contracts bound to Go structs. It uses an RPC connection to delegate // Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
// all its functionality. // all its functionality.
//
// Note: The current implementation is a blocking one. This should be replaced
// by a proper async version when a real RPC client is created.
type rpcBackend struct { type rpcBackend struct {
client rpc.Client // RPC client connection to interact with an API server client *rpc.Client // RPC client connection to interact with an API server
autoid uint32 // ID number to use for the next API request
lock sync.Mutex // Singleton access until we get to request multiplexing
} }


// NewRPCBackend creates a new binding backend to an RPC provider that can be // NewRPCBackend creates a new binding backend to an RPC provider that can be
// used to interact with remote contracts. // used to interact with remote contracts.
func NewRPCBackend(client rpc.Client) bind.ContractBackend { func NewRPCBackend(client *rpc.Client) bind.ContractBackend {
return &rpcBackend{ return &rpcBackend{client: client}
client: client,
}
}

// request is a JSON RPC request package assembled internally from the client
// method calls.
type request struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Method string `json:"method"` // Remote procedure name to invoke on the server
Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple)
}

// response is a JSON RPC response package sent back from the API server.
type response struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Error *failure `json:"error"` // Any error returned by the remote side
Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply
}

// failure is a JSON RPC response error field sent back from the API server.
type failure struct {
Code int `json:"code"` // JSON RPC error code associated with the failure
Message string `json:"message"` // Specific error message of the failure
}

// request forwards an API request to the RPC server, and parses the response.
//
// This is currently painfully non-concurrent, but it will have to do until we
// find the time for niceties like this :P
func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
b.lock.Lock()
defer b.lock.Unlock()

if ctx == nil {
ctx = context.Background()
}

// Ugly hack to serialize an empty list properly
if params == nil {
params = []interface{}{}
}
// Assemble the request object
reqID := int(atomic.AddUint32(&b.autoid, 1))
req := &request{
JSONRPC: "2.0",
ID: reqID,
Method: method,
Params: params,
}
if err := b.client.Send(req); err != nil {
return nil, err
}
res := new(response)
errc := make(chan error, 1)
go func() {
errc <- b.client.Recv(res)
}()
select {
case err := <-errc:
if err != nil {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
if res.Error != nil {
if res.Error.Message == bind.ErrNoCode.Error() {
return nil, bind.ErrNoCode
}
return nil, fmt.Errorf("remote error: %s", res.Error.Message)
}
return res.Result, nil
} }


// HasCode implements ContractVerifier.HasCode by retrieving any code associated // HasCode implements ContractVerifier.HasCode by retrieving any code associated
// with the contract from the remote node, and checking its size. // with the contract from the remote node, and checking its size.
func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) { func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
// Execute the RPC code retrieval
block := "latest" block := "latest"
if pending { if pending {
block = "pending" block = "pending"
} }
res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block})
if err != nil {
return false, err
}
var hex string var hex string
if err := json.Unmarshal(res, &hex); err != nil { err := b.client.CallContext(ctx, &hex, "eth_getCode", contract, block)
if err != nil {
return false, err return false, err
} }
// Convert the response back to a Go byte slice and return
return len(common.FromHex(hex)) > 0, nil return len(common.FromHex(hex)) > 0, nil
} }


// ContractCall implements ContractCaller.ContractCall, delegating the execution of // ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing. // a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) { func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
// Pack up the request into an RPC argument
args := struct { args := struct {
To common.Address `json:"to"` To common.Address `json:"to"`
Data string `json:"data"` Data string `json:"data"`
}{ }{
To: contract, To: contract,
Data: common.ToHex(data), Data: common.ToHex(data),
} }
// Execute the RPC call and retrieve the response
block := "latest" block := "latest"
if pending { if pending {
block = "pending" block = "pending"
} }
res, err := b.request(ctx, "eth_call", []interface{}{args, block})
if err != nil {
return nil, err
}
var hex string var hex string
if err := json.Unmarshal(res, &hex); err != nil { err := b.client.CallContext(ctx, &hex, "eth_call", args, block)
if err != nil {
return nil, err return nil, err
} }
// Convert the response back to a Go byte slice and return
return common.FromHex(hex), nil return common.FromHex(hex), nil

} }


// PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating // PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
// the current account nonce retrieval to the remote node. // the current account nonce retrieval to the remote node.
func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) { func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"}) var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_getTransactionCount", account.Hex(), "pending")
if err != nil { if err != nil {
return 0, err return 0, err
} }
var hex string return hex.Uint64(), nil
if err := json.Unmarshal(res, &hex); err != nil {
return 0, err
}
nonce, ok := new(big.Int).SetString(hex, 0)
if !ok {
return 0, fmt.Errorf("invalid nonce hex: %s", hex)
}
return nonce.Uint64(), nil
} }


// SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the // SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
// gas price oracle request to the remote node. // gas price oracle request to the remote node.
func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) { func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
res, err := b.request(ctx, "eth_gasPrice", nil) var hex rpc.HexNumber
if err != nil { if err := b.client.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err return nil, err
} }
price, ok := new(big.Int).SetString(hex, 0) return (*big.Int)(&hex), nil
if !ok {
return nil, fmt.Errorf("invalid price hex: %s", hex)
}
return price, nil
} }


// EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating // EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
// the gas estimation to the remote node. // the gas estimation to the remote node.
func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) { func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
// Pack up the request into an RPC argument
args := struct { args := struct {
From common.Address `json:"from"` From common.Address `json:"from"`
To *common.Address `json:"to"` To *common.Address `json:"to"`
Expand All @@ -226,19 +117,12 @@ func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address
Value: rpc.NewHexNumber(value), Value: rpc.NewHexNumber(value),
} }
// Execute the RPC call and retrieve the response // Execute the RPC call and retrieve the response
res, err := b.request(ctx, "eth_estimateGas", []interface{}{args}) var hex rpc.HexNumber
err := b.client.CallContext(ctx, &hex, "eth_estimateGas", args)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var hex string return (*big.Int)(&hex), nil
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
estimate, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid estimate hex: %s", hex)
}
return estimate, nil
} }


// SendTransaction implements ContractTransactor.SendTransaction, delegating the // SendTransaction implements ContractTransactor.SendTransaction, delegating the
Expand All @@ -248,13 +132,5 @@ func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction)
if err != nil { if err != nil {
return err return err
} }
res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)}) return b.client.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
if err != nil {
return err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return err
}
return nil
} }
19 changes: 18 additions & 1 deletion cmd/geth/consolecmd.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package main
import ( import (
"os" "os"
"os/signal" "os/signal"
"strings"


"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/console" "github.com/ethereum/go-ethereum/console"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1" "gopkg.in/urfave/cli.v1"
) )


Expand Down Expand Up @@ -99,7 +102,7 @@ func localConsole(ctx *cli.Context) error {
// console to it. // console to it.
func remoteConsole(ctx *cli.Context) error { func remoteConsole(ctx *cli.Context) error {
// Attach to a remotely running geth instance and start the JavaScript console // Attach to a remotely running geth instance and start the JavaScript console
client, err := utils.NewRemoteRPCClient(ctx) client, err := dialRPC(ctx.Args().First())
if err != nil { if err != nil {
utils.Fatalf("Unable to attach to remote geth: %v", err) utils.Fatalf("Unable to attach to remote geth: %v", err)
} }
Expand Down Expand Up @@ -127,6 +130,20 @@ func remoteConsole(ctx *cli.Context) error {
return nil return nil
} }


// dialRPC returns a RPC client which connects to the given endpoint.
// The check for empty endpoint implements the defaulting logic
// for "geth attach" and "geth monitor" with no argument.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint()
} else if strings.HasPrefix(endpoint, "rpc:") || strings.HasPrefix(endpoint, "ipc:") {
// Backwards compatibility with geth < 1.5 which required
// these prefixes.
endpoint = endpoint[4:]
}
return rpc.Dial(endpoint)
}

// ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript // ephemeralConsole starts a new geth node, attaches an ephemeral JavaScript
// console to it, and each of the files specified as arguments and tears the // console to it, and each of the files specified as arguments and tears the
// everything down. // everything down.
Expand Down
39 changes: 9 additions & 30 deletions cmd/geth/monitorcmd.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"math" "math"
"reflect" "reflect"
"runtime" "runtime"
"sort"
"strings" "strings"
"time" "time"


"sort"

"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
Expand All @@ -36,7 +35,7 @@ import (
var ( var (
monitorCommandAttachFlag = cli.StringFlag{ monitorCommandAttachFlag = cli.StringFlag{
Name: "attach", Name: "attach",
Value: "ipc:" + node.DefaultIPCEndpoint(), Value: node.DefaultIPCEndpoint(),
Usage: "API endpoint to attach to", Usage: "API endpoint to attach to",
} }
monitorCommandRowsFlag = cli.IntFlag{ monitorCommandRowsFlag = cli.IntFlag{
Expand Down Expand Up @@ -69,12 +68,12 @@ to display multiple metrics simultaneously.
// monitor starts a terminal UI based monitoring tool for the requested metrics. // monitor starts a terminal UI based monitoring tool for the requested metrics.
func monitor(ctx *cli.Context) error { func monitor(ctx *cli.Context) error {
var ( var (
client rpc.Client client *rpc.Client
err error err error
) )
// Attach to an Ethereum node over IPC or RPC // Attach to an Ethereum node over IPC or RPC
endpoint := ctx.String(monitorCommandAttachFlag.Name) endpoint := ctx.String(monitorCommandAttachFlag.Name)
if client, err = utils.NewRemoteRPCClientFromString(endpoint); err != nil { if client, err = dialRPC(endpoint); err != nil {
utils.Fatalf("Unable to attach to geth node: %v", err) utils.Fatalf("Unable to attach to geth node: %v", err)
} }
defer client.Close() defer client.Close()
Expand Down Expand Up @@ -159,30 +158,10 @@ func monitor(ctx *cli.Context) error {


// retrieveMetrics contacts the attached geth node and retrieves the entire set // retrieveMetrics contacts the attached geth node and retrieves the entire set
// of collected system metrics. // of collected system metrics.
func retrieveMetrics(client rpc.Client) (map[string]interface{}, error) { func retrieveMetrics(client *rpc.Client) (map[string]interface{}, error) {
req := map[string]interface{}{ var metrics map[string]interface{}
"id": new(int64), err := client.Call(&metrics, "debug_metrics", true)
"method": "debug_metrics", return metrics, err
"jsonrpc": "2.0",
"params": []interface{}{true},
}

if err := client.Send(req); err != nil {
return nil, err
}

var res rpc.JSONSuccessResponse
if err := client.Recv(&res); err != nil {
return nil, err
}

if res.Result != nil {
if mets, ok := res.Result.(map[string]interface{}); ok {
return mets, nil
}
}

return nil, fmt.Errorf("unable to retrieve metrics")
} }


// resolveMetrics takes a list of input metric patterns, and resolves each to one // resolveMetrics takes a list of input metric patterns, and resolves each to one
Expand Down Expand Up @@ -270,7 +249,7 @@ func fetchMetric(metrics map[string]interface{}, metric string) float64 {


// refreshCharts retrieves a next batch of metrics, and inserts all the new // refreshCharts retrieves a next batch of metrics, and inserts all the new
// values into the active datasets and charts // values into the active datasets and charts
func refreshCharts(client rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) { func refreshCharts(client *rpc.Client, metrics []string, data [][]float64, units []int, charts []*termui.LineChart, ctx *cli.Context, footer *termui.Par) (realign bool) {
values, err := retrieveMetrics(client) values, err := retrieveMetrics(client)
for i, metric := range metrics { for i, metric := range metrics {
if len(data) < 512 { if len(data) < 512 {
Expand Down
Loading

0 comments on commit 91b7690

Please sign in to comment.