/
rpc_handler.go
179 lines (164 loc) · 6.04 KB
/
rpc_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// +build !js
package main
import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"time"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/core"
"github.com/0xProject/0x-mesh/rpc"
"github.com/0xProject/0x-mesh/zeroex"
"github.com/0xProject/0x-mesh/zeroex/ordervalidator"
ethrpc "github.com/ethereum/go-ethereum/rpc"
peerstore "github.com/libp2p/go-libp2p-peerstore"
log "github.com/sirupsen/logrus"
)
type rpcHandler struct {
app *core.App
}
// listenRPC starts the RPC server and listens on config.RPCAddr. It blocks
// until there is an error or the RPC server is closed.
func listenRPC(app *core.App, config standaloneConfig, ctx context.Context) error {
// Initialize the JSON RPC WebSocket server (but don't start it yet).
rpcAddr := fmt.Sprintf("%s", config.RPCAddr)
rpcHandler := &rpcHandler{
app: app,
}
rpcServer, err := rpc.NewServer(rpcAddr, rpcHandler)
if err != nil {
return nil
}
go func() {
// Wait for the server to start listening and select an address.
for rpcServer.Addr() == nil {
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Millisecond)
}
log.WithField("address", rpcServer.Addr().String()).Info("started RPC server")
}()
return rpcServer.Listen(ctx)
}
// GetOrders is called when an RPC client calls GetOrders.
func (handler *rpcHandler) GetOrders(page, perPage int, snapshotID string) (*rpc.GetOrdersResponse, error) {
log.WithFields(map[string]interface{}{
"page": page,
"perPage": perPage,
"snapshotID": snapshotID,
}).Debug("received GetOrders request via RPC")
getOrdersResponse, err := handler.app.GetOrders(page, perPage, snapshotID)
if err != nil {
if _, ok := err.(core.ErrSnapshotNotFound); ok {
return nil, err
}
// We don't want to leak internal error details to the RPC client.
log.WithField("error", err.Error()).Error("internal error in AddOrders RPC call")
return nil, constants.ErrInternal
}
return getOrdersResponse, nil
}
// AddOrders is called when an RPC client calls AddOrders.
func (handler *rpcHandler) AddOrders(signedOrdersRaw []*json.RawMessage, opts rpc.AddOrdersOpts) (*ordervalidator.ValidationResults, error) {
log.WithFields(log.Fields{
"count": len(signedOrdersRaw),
"pinned": opts.Pinned,
}).Info("received AddOrders request via RPC")
validationResults, err := handler.app.AddOrders(signedOrdersRaw, opts.Pinned)
if err != nil {
// We don't want to leak internal error details to the RPC client.
log.WithField("error", err.Error()).Error("internal error in AddOrders RPC call")
return nil, constants.ErrInternal
}
return validationResults, nil
}
// AddPeer is called when an RPC client calls AddPeer,
func (handler *rpcHandler) AddPeer(peerInfo peerstore.PeerInfo) error {
log.Debug("received AddPeer request via RPC")
if err := handler.app.AddPeer(peerInfo); err != nil {
log.WithField("error", err.Error()).Error("internal error in AddPeer RPC call")
return constants.ErrInternal
}
return nil
}
// GetStats is called when an RPC client calls GetStats,
func (handler *rpcHandler) GetStats() (*rpc.GetStatsResponse, error) {
log.Debug("received GetStats request via RPC")
getStatsResponse, err := handler.app.GetStats()
if err != nil {
log.WithField("error", err.Error()).Error("internal error in GetStats RPC call")
return nil, constants.ErrInternal
}
return getStatsResponse, nil
}
// SubscribeToOrders is called when an RPC client sends a `mesh_subscribe` request with the `orders` topic parameter
func (handler *rpcHandler) SubscribeToOrders(ctx context.Context) (*ethrpc.Subscription, error) {
log.Debug("received order event subscription request via RPC")
subscription, err := SetupOrderStream(ctx, handler.app)
if err != nil {
log.WithField("error", err.Error()).Error("internal error in `mesh_subscribe` to `orders` RPC call")
return nil, constants.ErrInternal
}
return subscription, nil
}
// SetupOrderStream sets up the order stream for a subscription
func SetupOrderStream(ctx context.Context, app *core.App) (*ethrpc.Subscription, error) {
notifier, supported := ethrpc.NotifierFromContext(ctx)
if !supported {
return ðrpc.Subscription{}, ethrpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
orderEventsChan := make(chan []*zeroex.OrderEvent)
orderWatcherSub := app.SubscribeToOrderEvents(orderEventsChan)
for {
select {
case orderEvents := <-orderEventsChan:
err := notifier.Notify(rpcSub.ID, orderEvents)
if err != nil {
// TODO(fabio): The current implementation of `notifier.Notify` returns a
// `write: broken pipe` error when it is called _after_ the client has
// disconnected but before the corresponding error is received on the
// `rpcSub.Err()` channel. This race-condition is not problematic beyond
// the unnecessary computation and log spam resulting from it. Once this is
// fixed upstream, give all logs an `Error` severity.
logEntry := log.WithFields(map[string]interface{}{
"error": err.Error(),
"subscriptionType": "orders",
"orderEvents": len(orderEvents),
})
message := "error while calling notifier.Notify"
// If the network connection disconnects for longer then ~2mins and then comes
// back up, we've noticed the call to `notifier.Notify` return `i/o timeout`
// `net.OpError` errors everytime it's called and no values are sent over
// `rpcSub.Err()` nor `notifier.Closed()`. In order to stop the error from
// endlessly re-occuring, we unsubscribe and return for encountering this type of
// error.
if _, ok := err.(*net.OpError); ok {
logEntry.Trace(message)
orderWatcherSub.Unsubscribe()
return
}
if strings.Contains(err.Error(), "write: broken pipe") {
logEntry.Trace(message)
} else {
logEntry.Error(message)
}
}
case err := <-rpcSub.Err():
log.WithField("err", err).Error("rpcSub returned an error")
orderWatcherSub.Unsubscribe()
return
case <-notifier.Closed():
orderWatcherSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}