/
peer_wrapper.go
151 lines (133 loc) · 4.64 KB
/
peer_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package offchainreporting
import (
"strings"
"github.com/pkg/errors"
"github.com/GoPlugin/Plugin/core/logger"
"github.com/GoPlugin/Plugin/core/services/keystore"
"github.com/GoPlugin/Plugin/core/services/keystore/keys/p2pkey"
"github.com/GoPlugin/Plugin/core/store/orm"
"github.com/GoPlugin/Plugin/core/utils"
ocrnetworking "github.com/smartcontractkit/libocr/networking"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting/types"
"go.uber.org/multierr"
"gorm.io/gorm"
)
type (
peer interface {
ocrtypes.BootstrapperFactory
ocrtypes.BinaryNetworkEndpointFactory
Close() error
}
// SingletonPeerWrapper manages all libocr peers for the application
SingletonPeerWrapper struct {
keyStore *keystore.OCR
config *orm.Config
db *gorm.DB
pstoreWrapper *Pstorewrapper
PeerID p2pkey.PeerID
Peer peer
utils.StartStopOnce
}
)
// NewSingletonPeerWrapper creates a new peer based on the p2p keys in the keystore
// It currently only supports one peerID/key
// It should be fairly easy to modify it to support multiple peerIDs/keys using e.g. a map
func NewSingletonPeerWrapper(keyStore *keystore.OCR, config *orm.Config, db *gorm.DB) *SingletonPeerWrapper {
return &SingletonPeerWrapper{
keyStore: keyStore,
config: config,
db: db,
}
}
func (p *SingletonPeerWrapper) IsStarted() bool {
return p.State() == utils.StartStopOnce_Started
}
func (p *SingletonPeerWrapper) Start() error {
return p.StartOnce("SingletonPeerWrapper", func() (err error) {
p2pkeys := p.keyStore.DecryptedP2PKeys()
listenPort := p.config.P2PListenPort()
if listenPort == 0 {
return errors.New("failed to instantiate oracle or bootstrapper service. If FEATURE_OFFCHAIN_REPORTING is on, then P2P_LISTEN_PORT is required and must be set to a non-zero value")
}
if len(p2pkeys) == 0 {
return nil
}
var key p2pkey.Key
var matched bool
checkedKeys := []string{}
configuredPeerID, err := p.config.P2PPeerID(nil)
if err != nil {
return errors.Wrap(err, "failed to start peer wrapper")
}
for _, k := range p2pkeys {
var peerID p2pkey.PeerID
peerID, err = k.GetPeerID()
if err != nil {
return errors.Wrap(err, "unexpectedly failed to get peer ID from key")
}
if peerID == configuredPeerID {
key = k
matched = true
break
}
checkedKeys = append(checkedKeys, peerID.String())
}
keys := strings.Join(checkedKeys, ", ")
if !matched {
if configuredPeerID == "" {
return errors.Errorf("multiple p2p keys found but peer ID was not set. You must specify P2P_PEER_ID if you have more than one key. Keys available: %s", keys)
}
return errors.Errorf("multiple p2p keys found but none matched the given P2P_PEER_ID of '%s'. Keys available: %s", configuredPeerID, keys)
}
p.PeerID, err = key.GetPeerID()
if err != nil {
return errors.Wrap(err, "could not get peer ID")
}
p.pstoreWrapper, err = NewPeerstoreWrapper(p.db, p.config.P2PPeerstoreWriteInterval(), p.PeerID)
if err != nil {
return errors.Wrap(err, "could not make new pstorewrapper")
}
// If the P2PAnnounceIP is set we must also set the P2PAnnouncePort
// Fallback to P2PListenPort if it wasn't made explicit
var announcePort uint16
if p.config.P2PAnnounceIP() != nil && p.config.P2PAnnouncePort() != 0 {
announcePort = p.config.P2PAnnouncePort()
} else if p.config.P2PAnnounceIP() != nil {
announcePort = listenPort
}
peerLogger := NewLogger(logger.Default, p.config.OCRTraceLogging(), func(string) {})
p.Peer, err = ocrnetworking.NewPeer(ocrnetworking.PeerConfig{
PrivKey: key.PrivKey,
ListenIP: p.config.P2PListenIP(),
ListenPort: listenPort,
AnnounceIP: p.config.P2PAnnounceIP(),
AnnouncePort: announcePort,
Logger: peerLogger,
Peerstore: p.pstoreWrapper.Peerstore,
EndpointConfig: ocrnetworking.EndpointConfig{
IncomingMessageBufferSize: p.config.OCRIncomingMessageBufferSize(),
OutgoingMessageBufferSize: p.config.OCROutgoingMessageBufferSize(),
NewStreamTimeout: p.config.OCRNewStreamTimeout(),
DHTLookupInterval: p.config.OCRDHTLookupInterval(),
BootstrapCheckInterval: p.config.OCRBootstrapCheckInterval(),
},
DHTAnnouncementCounterUserPrefix: p.config.P2PDHTAnnouncementCounterUserPrefix(),
})
if err != nil {
return errors.Wrap(err, "error calling NewPeer")
}
return p.pstoreWrapper.Start()
})
}
// Close closes the peer and peerstore
func (p *SingletonPeerWrapper) Close() error {
return p.StopOnce("SingletonPeerWrapper", func() (err error) {
if p.Peer != nil {
err = p.Peer.Close()
}
if p.pstoreWrapper != nil {
err = multierr.Combine(err, p.pstoreWrapper.Close())
}
return err
})
}