This repository has been archived by the owner on Mar 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 284
/
core.go
383 lines (326 loc) · 10.5 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package core
import (
"errors"
"fmt"
"net/http"
"path"
"sync"
"time"
"gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht"
libp2p "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto"
ma "gx/ipfs/QmTZBfrPJmjWsCvHEtX5FE6KimVJhsJg5sBbqEFYf4UZtL/go-multiaddr"
cid "gx/ipfs/QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN/go-cid"
peer "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer"
routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing"
"github.com/OpenBazaar/multiwallet"
"github.com/OpenBazaar/openbazaar-go/ipfs"
"github.com/OpenBazaar/openbazaar-go/net"
rep "github.com/OpenBazaar/openbazaar-go/net/repointer"
ret "github.com/OpenBazaar/openbazaar-go/net/retriever"
"github.com/OpenBazaar/openbazaar-go/pb"
"github.com/OpenBazaar/openbazaar-go/repo"
sto "github.com/OpenBazaar/openbazaar-go/storage"
"github.com/btcsuite/btcutil/hdkeychain"
"github.com/ipfs/go-ipfs/core"
logging "github.com/op/go-logging"
"golang.org/x/net/context"
"golang.org/x/net/proxy"
)
const (
// VERSION - current version
VERSION = "0.14.r-rc1"
// USERAGENT - user-agent header string
USERAGENT = "/openbazaar-go:" + VERSION + "/"
)
var log = logging.MustGetLogger("core")
// Node - ob node
var Node *OpenBazaarNode
var inflightPublishRequests int
// OpenBazaarNode - represent ob node which encapsulates ipfsnode, wallet etc
type OpenBazaarNode struct {
// IPFS node object
IpfsNode *core.IpfsNode
// An implementation of the custom DHT used by OpenBazaar
DHT *dht.IpfsDHT
// The roothash of the node directory inside the openbazaar repo.
// This directory hash is published on IPNS at our peer ID making
// the directory publicly viewable on the network.
RootHash string
// The path to the openbazaar repo in the file system
RepoPath string
// The OpenBazaar network service for direct communication between peers
Service net.NetworkService
// Database for storing node specific data
Datastore repo.Datastore
// Websocket channel used for pushing data to the UI
Broadcast chan repo.Notifier
// A map of cryptocurrency wallets
Multiwallet multiwallet.MultiWallet
// Storage for our outgoing messages
MessageStorage sto.OfflineMessagingStorage
// A service that periodically checks the dht for outstanding messages
MessageRetriever *ret.MessageRetriever
// OfflineMessageFailoverTimeout is the duration until the protocol
// will stop looking for the peer to send a direct message and failover to
// sending an offline message
OfflineMessageFailoverTimeout time.Duration
// A service that periodically republishes active pointers
PointerRepublisher *rep.PointerRepublisher
// Optional nodes to push user data to
PushNodes []peer.ID
// The user-agent for this node
UserAgent string
// A dialer for Tor if available
TorDialer proxy.Dialer
// Manage blocked peers
BanManager *net.BanManager
// Allow other nodes to push data to this node for storage
AcceptStoreRequests bool
// RecordAgingNotifier is a worker that walks the cases datastore to
// notify the user as disputes age past certain thresholds
RecordAgingNotifier *recordAgingNotifier
// Generic pubsub interface
Pubsub ipfs.Pubsub
// The master private key derived from the mnemonic
MasterPrivateKey *hdkeychain.ExtendedKey
// The number of DHT records to collect before returning. The larger the number
// the slower the query but the less likely we will get an old record.
IPNSQuorumSize uint
TestnetEnable bool
RegressionTestEnable bool
PublishLock sync.Mutex
seedLock sync.Mutex
InitalPublishComplete bool
// InboundMsgScanner is a worker that scans the messages
// table and tries to retry a failed order message
InboundMsgScanner *inboundMessageScanner
}
// TestNetworkEnabled indicates whether the node is operating with test parameters
func (n *OpenBazaarNode) TestNetworkEnabled() bool { return n.TestnetEnable }
// RegressionNetworkEnabled indicates whether the node is operating with regression parameters
func (n *OpenBazaarNode) RegressionNetworkEnabled() bool { return n.RegressionTestEnable }
// SeedNode - publish to IPNS
func (n *OpenBazaarNode) SeedNode() error {
n.seedLock.Lock()
err := ipfs.UnPinDir(n.IpfsNode, n.RootHash)
if err != nil {
log.Errorf("unpinning old root: %s", err.Error())
}
var aerr error
var rootHash string
// There's an IPFS bug on Windows that might be related to the Windows indexer that could cause this to fail
// If we fail the first time, let's retry a couple times before giving up.
for i := 0; i < 3; i++ {
rootHash, aerr = ipfs.AddDirectory(n.IpfsNode, path.Join(n.RepoPath, "root"))
if aerr == nil {
break
}
time.Sleep(time.Millisecond * 500)
}
if aerr != nil {
n.seedLock.Unlock()
return aerr
}
n.RootHash = rootHash
n.seedLock.Unlock()
n.InitalPublishComplete = true
go n.publish(rootHash)
return nil
}
func (n *OpenBazaarNode) publish(hash string) {
// Multiple publishes may have been queued
// We only need to publish the most recent
n.PublishLock.Lock()
defer n.PublishLock.Unlock()
if hash != n.RootHash {
return
}
if inflightPublishRequests == 0 {
n.Broadcast <- repo.StatusNotification{Status: "publishing"}
}
err := n.sendToPushNodes(hash)
if err != nil {
log.Error(err)
return
}
go func() {
// Update search endpoint with published hash
peerId, _ := n.GetNodeID()
endpoint := fmt.Sprintf("https://search.ob1.io/update/%s/%s", peerId.PeerID, hash)
log.Infof("Publishing new rootHash to: %s\n", endpoint)
var client *http.Client
if n.TorDialer != nil {
tbTransport := &http.Transport{Dial: n.TorDialer.Dial}
client = &http.Client{Transport: tbTransport, Timeout: time.Second * 30}
} else {
client = &http.Client{Timeout: time.Second * 30}
}
resp, err := client.Get(endpoint)
if err != nil {
log.Errorf("Search update did not succeed. %v\n", err)
}
log.Debugf("%s response: %v", endpoint, resp)
}()
inflightPublishRequests++
err = ipfs.Publish(n.IpfsNode, hash)
inflightPublishRequests--
if inflightPublishRequests == 0 {
if err != nil {
log.Error(err)
n.Broadcast <- repo.StatusNotification{Status: "error publishing"}
} else {
n.Broadcast <- repo.StatusNotification{Status: "publish complete"}
}
}
}
func (n *OpenBazaarNode) sendToPushNodes(hash string) error {
id, err := cid.Decode(hash)
if err != nil {
return err
}
var graph []cid.Cid
if len(n.PushNodes) > 0 {
graph, err = ipfs.FetchGraph(n.IpfsNode, &id)
if err != nil {
return err
}
pointers, err := n.Datastore.Pointers().GetByPurpose(ipfs.MESSAGE)
if err != nil {
return err
}
// Check if we're seeding any outgoing messages and add their CIDs to the graph
for _, p := range pointers {
if len(p.Value.Addrs) > 0 {
s, err := p.Value.Addrs[0].ValueForProtocol(ma.P_IPFS)
if err != nil {
continue
}
c, err := cid.Decode(s)
if err != nil {
continue
}
graph = append(graph, c)
}
}
}
for _, p := range n.PushNodes {
go n.retryableSeedStoreToPeer(p, hash, graph)
}
return nil
}
func (n *OpenBazaarNode) retryableSeedStoreToPeer(pid peer.ID, graphHash string, graph []cid.Cid) {
var retryTimeout = 2 * time.Second
for {
if graphHash != n.RootHash {
log.Errorf("root hash has changed, aborting push to %s", pid.Pretty())
return
}
err := n.SendStore(pid.Pretty(), graph)
if err != nil {
if retryTimeout > 8*time.Second {
log.Errorf("error pushing to peer %s: %s", pid.Pretty(), err.Error())
return
}
log.Errorf("error pushing to peer %s...backing off: %s", pid.Pretty(), err.Error())
time.Sleep(retryTimeout)
retryTimeout *= 2
continue
}
return
}
}
// SetUpRepublisher - periodic publishing to IPNS
func (n *OpenBazaarNode) SetUpRepublisher(interval time.Duration) {
if interval == 0 {
return
}
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
err := n.UpdateFollow()
if err != nil {
log.Error(err)
}
err = n.SeedNode()
if err != nil {
log.Error(err)
}
}
}()
}
/*EncryptMessage This is a placeholder until the libsignal is operational.
For now we will just encrypt outgoing offline messages with the long lived identity key.
Optionally you may provide a public key, to avoid doing an IPFS lookup */
func (n *OpenBazaarNode) EncryptMessage(peerID peer.ID, peerKey *libp2p.PubKey, message []byte) (ct []byte, rerr error) {
ctx, cancel := context.WithTimeout(context.Background(), n.OfflineMessageFailoverTimeout)
defer cancel()
if peerKey == nil {
var (
pubKey libp2p.PubKey
store = n.IpfsNode.Repo.Datastore()
)
keyval, err := ipfs.GetCachedPubkey(store, peerID.Pretty())
if err != nil {
pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, peerID)
if err != nil {
log.Errorf("Failed to find public key for %s", peerID.Pretty())
return nil, err
}
} else {
pubKey, err = libp2p.UnmarshalPublicKey(keyval)
if err != nil {
log.Errorf("Failed to find public key for %s", peerID.Pretty())
return nil, err
}
}
peerKey = &pubKey
}
if peerID.MatchesPublicKey(*peerKey) {
ciphertext, err := net.Encrypt(*peerKey, message)
if err != nil {
return nil, err
}
return ciphertext, nil
}
log.Errorf("peer public key and id do not match for peer: %s", peerID.Pretty())
return nil, errors.New("peer public key and id do not match")
}
// IPFSIdentityString - IPFS identifier
func (n *OpenBazaarNode) IPFSIdentityString() string {
return n.IpfsNode.Identity.Pretty()
}
// GetNodeID returns the protobuf representing the node's identity and crypto
// keys with the peer ID
func (n *OpenBazaarNode) GetNodeID() (*pb.ID, error) {
var id = new(pb.ID)
id.PeerID = n.IpfsNode.Identity.Pretty()
if p, err := n.GetProfile(); err == nil {
id.Handle = p.Handle
}
p := new(pb.ID_Pubkeys)
pubkey, err := n.IpfsNode.PrivateKey.GetPublic().Bytes()
if err != nil {
return nil, fmt.Errorf("ipfs pubkey bytes: %s", err.Error())
}
p.Identity = pubkey
coinPubkey, err := n.MasterPrivateKey.ECPubKey()
if err != nil {
return nil, fmt.Errorf("master pubkey: %s", err.Error())
}
p.Bitcoin = coinPubkey.SerializeCompressed()
id.Pubkeys = p
coinPrivKey, err := n.MasterPrivateKey.ECPrivKey()
if err != nil {
return nil, fmt.Errorf("master privkey: %s", err.Error())
}
coinSig, err := coinPrivKey.Sign([]byte(id.PeerID))
if err != nil {
return nil, fmt.Errorf("sign id: %s", err.Error())
}
id.BitcoinSig = coinSig.Serialize()
return id, nil
}
// Sign returns a signature for the payload signed by the IPFS private key
func (n *OpenBazaarNode) Sign(payload []byte) ([]byte, error) {
return n.IpfsNode.PrivateKey.Sign(payload)
}