Skip to content

Commit

Permalink
Merge pull request #83 from filecoin-project/raulk/fix-id
Browse files Browse the repository at this point in the history
make request and response ID handling spec-compliant.
  • Loading branch information
magik6k committed Nov 7, 2022
2 parents 236bc02 + 666479d commit 70f44a9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 42 deletions.
28 changes: 20 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *ErrClient) Unwrap() error {
type clientResponse struct {
Jsonrpc string `json:"jsonrpc"`
Result json.RawMessage `json:"result"`
ID int64 `json:"id"`
ID interface{} `json:"id"`
Error *respError `json:"error,omitempty"`
}

Expand Down Expand Up @@ -167,12 +167,15 @@ func httpClient(ctx context.Context, addr string, namespace string, outs []inter
defer httpResp.Body.Close()

var resp clientResponse

if err := json.NewDecoder(httpResp.Body).Decode(&resp); err != nil {
return clientResponse{}, xerrors.Errorf("http status %s unmarshaling response: %w", httpResp.Status, err)
}

if resp.ID != *cr.req.ID {
if resp.ID, err = normalizeID(resp.ID); err != nil {
return clientResponse{}, xerrors.Errorf("failed to response ID: %w", err)
}

if resp.ID != cr.req.ID {
return clientResponse{}, xerrors.New("request and response id didn't match")
}

Expand Down Expand Up @@ -246,7 +249,7 @@ func websocketClient(ctx context.Context, addr string, namespace string, outs []
req: request{
Jsonrpc: "2.0",
Method: wsCancel,
Params: []param{{v: reflect.ValueOf(*cr.req.ID)}},
Params: []param{{v: reflect.ValueOf(cr.req.ID)}},
},
}
select {
Expand Down Expand Up @@ -468,7 +471,7 @@ func (fn *rpcFunc) processError(err error) []reflect.Value {
}

func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) {
id := atomic.AddInt64(&fn.client.idCtr, 1)
var id interface{} = atomic.AddInt64(&fn.client.idCtr, 1)
params := make([]param, len(args)-fn.hasCtx)
for i, arg := range args[fn.hasCtx:] {
enc, found := fn.client.paramEncoders[arg.Type()]
Expand Down Expand Up @@ -503,9 +506,19 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut)
}

// Prepare the ID to send on the wire.
// We track int64 ids as float64 in the inflight map (because that's what
// they'll be decoded to). encoding/json outputs numbers with their minimal
// encoding, avoding the decimal point when possible, i.e. 3 will never get
// converted to 3.0.
id, err := normalizeID(id)
if err != nil {
return fn.processError(fmt.Errorf("failed to normalize id")) // should probably panic
}

req := request{
Jsonrpc: "2.0",
ID: &id,
ID: id,
Method: fn.client.namespace + "." + fn.name,
Params: params,
}
Expand All @@ -526,7 +539,6 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
}

var resp clientResponse
var err error
// keep retrying if got a forced closed websocket conn and calling method
// has retry annotation
for attempt := 0; true; attempt++ {
Expand All @@ -535,7 +547,7 @@ func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value)
return fn.processError(fmt.Errorf("sendRequest failed: %w", err))
}

if resp.ID != *req.ID {
if resp.ID != req.ID {
return fn.processError(xerrors.New("request and response id didn't match"))
}

Expand Down
21 changes: 13 additions & 8 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type rpcHandler struct {

type request struct {
Jsonrpc string `json:"jsonrpc"`
ID *int64 `json:"id,omitempty"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params []param `json:"params"`
Meta map[string]string `json:"meta,omitempty"`
Expand Down Expand Up @@ -90,15 +90,15 @@ func (e *respError) val(errors *Errors) reflect.Value {
type response struct {
Jsonrpc string `json:"jsonrpc"`
Result interface{} `json:"result,omitempty"`
ID int64 `json:"id"`
ID interface{} `json:"id"`
Error *respError `json:"error,omitempty"`
}

// Register

func (s *RPCServer) register(namespace string, r interface{}) {
val := reflect.ValueOf(r)
//TODO: expect ptr
// TODO: expect ptr

for i := 0; i < val.NumMethod(); i++ {
method := val.Type().Method(i)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (s *RPCServer) register(namespace string, r interface{}) {
// Handle

type rpcErrFunc func(w func(func(io.Writer)), req *request, code ErrorCode, err error)
type chanOut func(reflect.Value, int64) error
type chanOut func(reflect.Value, interface{}) error

func (s *RPCServer) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
wf := func(cb func(io.Writer)) {
Expand Down Expand Up @@ -174,6 +174,11 @@ func (s *RPCServer) handleReader(ctx context.Context, r io.Reader, w io.Writer,
return
}

if req.ID, err = normalizeID(req.ID); err != nil {
rpcError(wf, &req, rpcParseError, xerrors.Errorf("failed to parse ID: %w", err))
return
}

s.handle(ctx, req, wf, rpcError, func(bool) {}, nil)
}

Expand Down Expand Up @@ -304,7 +309,7 @@ func (s *RPCServer) handle(ctx context.Context, req request, w func(func(io.Writ
callParams[i+1+handler.hasCtx] = reflect.ValueOf(rp.Interface())
}

///////////////////
// /////////////////

callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
if err != nil {
Expand All @@ -316,11 +321,11 @@ func (s *RPCServer) handle(ctx context.Context, req request, w func(func(io.Writ
return // notification
}

///////////////////
// /////////////////

resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
ID: req.ID,
}

if handler.errOut != -1 {
Expand Down Expand Up @@ -350,7 +355,7 @@ func (s *RPCServer) handle(ctx context.Context, req request, w func(func(io.Writ
// sending channel messages before this rpc call returns

//noinspection GoNilness // already checked above
err = chOut(callResult[handler.valOut], *req.ID)
err = chOut(callResult[handler.valOut], req.ID)
if err == nil {
return // channel goroutine handles responding
}
Expand Down
34 changes: 33 additions & 1 deletion rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ func testControlChanDeadlock(t *testing.T) {
for i := 0; i < n; i++ {
if <-sub != i+1 {
panic("bad!")
//require.Equal(t, i+1, <-sub)
// require.Equal(t, i+1, <-sub)
}
}
}()
Expand Down Expand Up @@ -1107,3 +1107,35 @@ func TestUserError(t *testing.T) {

closer()
}

// Unit test for request/response ID translation.
func TestIDHandling(t *testing.T) {
var decoded request

cases := []struct {
str string
expect interface{}
expectErr bool
}{
{`{"id":"8116d306-56cc-4637-9dd7-39ce1548a5a0","jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, "8116d306-56cc-4637-9dd7-39ce1548a5a0", false},
{`{"id":1234,"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, float64(1234), false},
{`{"id":null,"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, nil, false},
{`{"id":1234.0,"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, 1234.0, false},
{`{"id":1.2,"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, 1.2, false},
{`{"id":["1"],"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, nil, true},
{`{"id":{"a":"b"},"jsonrpc":"2.0","method":"eth_blockNumber","params":[]}`, nil, true},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc.expect), func(t *testing.T) {
dec := json.NewDecoder(strings.NewReader(tc.str))
require.NoError(t, dec.Decode(&decoded))
if id, err := normalizeID(decoded.ID); !tc.expectErr {
require.NoError(t, err)
require.Equal(t, tc.expect, id)
} else {
require.Error(t, err)
}
})
}
}
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func rpcError(wf func(func(io.Writer)), req *request, code ErrorCode, err error)

resp := response{
Jsonrpc: "2.0",
ID: *req.ID,
ID: req.ID,
Error: &respError{
Code: code,
Message: err.Error(),
Expand Down

0 comments on commit 70f44a9

Please sign in to comment.