Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
169 lines (148 sloc) 5.63 KB
package rpc
import (
ethrpc ""
peer ""
peerstore ""
ma ""
log ""
// minHeartbeatInterval specifies the interval at which to emit heartbeat events to a subscriber
var minHeartbeatInterval = 5 * time.Second
// rpcService is an /ethereum/go-ethereum/rpc compatible service.
type rpcService struct {
rpcHandler RPCHandler
// RPCHandler is used to respond to incoming requests from the client.
type RPCHandler interface {
// AddOrders is called when the client sends an AddOrders request.
AddOrders(signedOrdersRaw []*json.RawMessage, opts AddOrdersOpts) (*ordervalidator.ValidationResults, error)
// GetOrders is called when the clients sends a GetOrders request
GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error)
// AddPeer is called when the client sends an AddPeer request.
AddPeer(peerInfo peerstore.PeerInfo) error
// GetStats is called when the client sends an GetStats request.
GetStats() (*GetStatsResponse, error)
// SubscribeToOrders is called when a client sends a Subscribe to `orders` request
SubscribeToOrders(ctx context.Context) (*rpc.Subscription, error)
// Orders calls rpcHandler.SubscribeToOrders and returns the rpc subscription.
func (s *rpcService) Orders(ctx context.Context) (*rpc.Subscription, error) {
return s.rpcHandler.SubscribeToOrders(ctx)
// Heartbeat calls rpcHandler.SubscribeToHeartbeat and returns the rpc subscription.
func (s *rpcService) Heartbeat(ctx context.Context) (*rpc.Subscription, error) {
log.Debug("received heartbeat subscription request via RPC")
subscription, err := SetupHeartbeat(ctx)
if err != nil {
log.WithField("error", err.Error()).Error("internal error in `mesh_subscribe` to `heartbeat` RPC call")
return nil, constants.ErrInternal
return subscription, nil
// SetupHeartbeat sets up the heartbeat for a subscription
func SetupHeartbeat(ctx context.Context) (*ethrpc.Subscription, error) {
notifier, supported := ethrpc.NotifierFromContext(ctx)
if !supported {
return &ethrpc.Subscription{}, ethrpc.ErrNotificationsUnsupported
rpcSub := notifier.CreateSubscription()
go func() {
for {
select {
case err := <-rpcSub.Err():
if err != nil {
log.WithField("err", err).Error("rpcSub returned an error")
} else {
log.Debug("rpcSub was closed without error")
case <-notifier.Closed():
// Continue
start := time.Now()
err := notifier.Notify(rpcSub.ID, "tick")
if err != nil {
// TODO(fabio): The current implementation of `notifier.Notify` returns a
// `write: broken pipe` error when it is called _after_ the client has
// disconnected but before the corresponding error is received on the
// `rpcSub.Err()` channel. This race-condition is not problematic beyond
// the unnecessary computation and log spam resulting from it. Once this is
// fixed upstream, give all logs an `Error` severity.
logEntry := log.WithFields(map[string]interface{}{
"error": err.Error(),
"subscriptionType": "heartbeat",
message := "error while calling notifier.Notify"
// If the network connection disconnects for longer then ~2mins and then comes
// back up, we've noticed the call to `notifier.Notify` return `i/o timeout`
// `net.OpError` errors everytime it's called and no values are sent over
// `rpcSub.Err()` nor `notifier.Closed()`. In order to stop the error from
// endlessly re-occuring, we unsubscribe and return for encountering this type of
// error.
if _, ok := err.(*net.OpError); ok {
if strings.Contains(err.Error(), "write: broken pipe") {
} else {
// Wait MinCleanupInterval before emitting the next heartbeat.
time.Sleep(minHeartbeatInterval - time.Since(start))
return rpcSub, nil
var defaultAddOrdersOpts = AddOrdersOpts{
Pinned: true,
// AddOrders calls rpcHandler.AddOrders and returns the validation results.
func (s *rpcService) AddOrders(signedOrdersRaw []*json.RawMessage, opts *AddOrdersOpts) (*ordervalidator.ValidationResults, error) {
if opts == nil {
opts = &defaultAddOrdersOpts
return s.rpcHandler.AddOrders(signedOrdersRaw, *opts)
// GetOrders calls rpcHandler.GetOrders and returns the validation results.
func (s *rpcService) GetOrders(page, perPage int, snapshotID string) (*GetOrdersResponse, error) {
return s.rpcHandler.GetOrders(page, perPage, snapshotID)
// AddPeer builds PeerInfo out of the given peer ID and multiaddresses and
// calls rpcHandler.AddPeer. If there is an error, it returns it.
func (s *rpcService) AddPeer(peerID string, multiaddrs []string) error {
// Parse peer ID.
parsedPeerID, err := peer.IDB58Decode(peerID)
if err != nil {
return err
peerInfo := peerstore.PeerInfo{
ID: parsedPeerID,
// Parse each given multiaddress.
parsedMultiaddrs := make([]ma.Multiaddr, len(multiaddrs))
for i, addr := range multiaddrs {
parsed, err := ma.NewMultiaddr(addr)
if err != nil {
return err
parsedMultiaddrs[i] = parsed
peerInfo.Addrs = parsedMultiaddrs
return s.rpcHandler.AddPeer(peerInfo)
// GetStats calls rpcHandler.GetStats. If there is an error, it returns it.
func (s *rpcService) GetStats() (*GetStatsResponse, error) {
return s.rpcHandler.GetStats()
You can’t perform that action at this time.