/
core.go
317 lines (270 loc) · 8.82 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package core
import (
"errors"
routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing"
dshelp "gx/ipfs/QmTmqJGRQfuH8eKWD1FjThwPRipt1QhqJQNZ8MpzmfAAxo/go-ipfs-ds-help"
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
libp2p "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
"net/url"
"path"
"strings"
"sync"
"time"
"github.com/OpenBazaar/multiwallet"
"github.com/btcsuite/btcutil/hdkeychain"
"github.com/evenfound/even-go/node/ipfs"
"github.com/evenfound/even-go/node/namesys"
"github.com/evenfound/even-go/node/net"
rep "github.com/evenfound/even-go/node/net/repointer"
ret "github.com/evenfound/even-go/node/net/retriever"
"github.com/evenfound/even-go/node/repo"
sto "github.com/evenfound/even-go/node/storage"
"github.com/ipfs/go-ipfs/core"
"github.com/kennygrant/sanitize"
logging "github.com/op/go-logging"
"golang.org/x/net/context"
"golang.org/x/net/proxy"
)
const (
// VERSION - current version
VERSION = "0.13.0"
// USERAGENT - user-agent header string
USERAGENT = "/openbazaar-go:" + VERSION + "/"
)
var log = logging.MustGetLogger("core")
// Node - ob node
var Node *EvenNode
var inflightPublishRequests int
// EvenNode - represent ob node which encapsulates ipfsnode, wallet etc
type EvenNode struct {
// IPFS node object
IpfsNode *core.IpfsNode
/* The roothash of the node directory inside the openbazaar repo.
This directory hash is published on IPNS at our peer ID making
the directory publicly viewable on the network. */
RootHash string
// The path to the openbazaar repo in the file system
RepoPath string
// The EvenNetwork network service for direct communication between peers
Service net.NetworkService
// Database for storing node specific data
Datastore repo.Datastore
// Websocket channel used for pushing data to the UI
Broadcast chan repo.Notifier
// A map of cryptocurrency wallets
Multiwallet multiwallet.MultiWallet
// Storage for our outgoing messages
MessageStorage sto.OfflineMessagingStorage
// A service that periodically checks the dht for outstanding messages
MessageRetriever *ret.MessageRetriever
// OfflineMessageFailoverTimeout is the duration until the protocol
// will stop looking for the peer to send a direct message and failover to
// sending an offline message
OfflineMessageFailoverTimeout time.Duration
// A service that periodically republishes active pointers
PointerRepublisher *rep.PointerRepublisher
// Used to resolve domains to EvenNetwork IDs
NameSystem *namesys.NameSystem
// A service that periodically fetches and caches the bitcoin exchange rates
//ExchangeRates wallet.ExchangeRates
// Optional nodes to push user data to
PushNodes []peer.ID
// The user-agent for this node
UserAgent string
// A dialer for Tor if available
TorDialer proxy.Dialer
// Manage blocked peers
BanManager *net.BanManager
// Allow other nodes to push data to this node for storage
AcceptStoreRequests bool
// Last ditch API to find records that dropped out of the DHT
IPNSBackupAPI string
// RecordAgingNotifier is a worker that walks the cases datastore to
// notify the user as disputes age past certain thresholds
RecordAgingNotifier *recordAgingNotifier
// Generic pubsub interface
Pubsub ipfs.Pubsub
// The master private key derived from the mnemonic
MasterPrivateKey *hdkeychain.ExtendedKey
TestnetEnable bool
RegressionTestEnable bool
}
// PublishLock seedLock - Unpin the current node repo, re-add it, then publish to IPNS
var PublishLock sync.Mutex
var seedLock sync.Mutex
// InitalPublishComplete - indicate publish completion
var InitalPublishComplete bool // = false
// TestNetworkEnabled indicates whether the node is operating with test parameters
func (n *EvenNode) TestNetworkEnabled() bool { return n.TestnetEnable }
// RegressionNetworkEnabled indicates whether the node is operating with regression parameters
func (n *EvenNode) RegressionNetworkEnabled() bool { return n.RegressionTestEnable }
// SeedNode - publish to IPNS
func (n *EvenNode) SeedNode() error {
seedLock.Lock()
ipfs.UnPinDir(n.IpfsNode, n.RootHash)
var aerr error
var rootHash string
// There's an IPFS bug on Windows that might be related to the Windows indexer that could cause this to fail
// If we fail the first time, let's retry a couple times before giving up.
for i := 0; i < 3; i++ {
rootHash, aerr = ipfs.AddDirectory(n.IpfsNode, path.Join(n.RepoPath, "root"))
if aerr == nil {
break
}
time.Sleep(time.Millisecond * 500)
}
if aerr != nil {
seedLock.Unlock()
return aerr
}
n.RootHash = rootHash
seedLock.Unlock()
InitalPublishComplete = true
go n.publish(rootHash)
return nil
}
func (n *EvenNode) publish(hash string) {
// Multiple publishes may have been queued
// We only need to publish the most recent
PublishLock.Lock()
defer PublishLock.Unlock()
if hash != n.RootHash {
return
}
if inflightPublishRequests == 0 {
n.Broadcast <- repo.StatusNotification{Status: "publishing"}
}
err := n.sendToPushNodes(hash)
if err != nil {
log.Error(err)
return
}
inflightPublishRequests++
err = ipfs.Publish(n.IpfsNode, hash)
inflightPublishRequests--
if inflightPublishRequests == 0 {
if err != nil {
log.Error(err)
n.Broadcast <- repo.StatusNotification{Status: "error publishing"}
} else {
n.Broadcast <- repo.StatusNotification{Status: "publish complete"}
}
}
}
func (n *EvenNode) sendToPushNodes(hash string) error {
id, err := cid.Decode(hash)
if err != nil {
return err
}
var graph []cid.Cid
if len(n.PushNodes) > 0 {
graph, err = ipfs.FetchGraph(n.IpfsNode, id)
if err != nil {
return err
}
pointers, err := n.Datastore.Pointers().GetByPurpose(ipfs.MESSAGE)
if err != nil {
return err
}
// Check if we're seeding any outgoing messages and add their CIDs to the graph
for _, p := range pointers {
if len(p.Value.Addrs) > 0 {
s, err := p.Value.Addrs[0].ValueForProtocol(ma.P_IPFS)
if err != nil {
continue
}
c, err := cid.Decode(s)
if err != nil {
continue
}
graph = append(graph, *c)
}
}
}
for _, p := range n.PushNodes {
go n.retryableSeedStoreToPeer(p, hash, graph)
}
return nil
}
func (n *EvenNode) retryableSeedStoreToPeer(pid peer.ID, graphHash string, graph []cid.Cid) {
var retryTimeout = 2 * time.Second
for {
if graphHash != n.RootHash {
log.Errorf("root hash has changed, aborting push to %s", pid.Pretty())
return
}
err := n.SendStore(pid.Pretty(), graph)
if err != nil {
if retryTimeout > 60*time.Second {
log.Errorf("error pushing to peer %s: %s", pid.Pretty(), err.Error())
return
}
log.Errorf("error pushing to peer %s...backing off: %s", pid.Pretty(), err.Error())
time.Sleep(retryTimeout)
retryTimeout *= 2
continue
}
return
}
}
// SetUpRepublisher - periodic publishing to IPNS
func (n *EvenNode) SetUpRepublisher(interval time.Duration) {
if interval == 0 {
return
}
ticker := time.NewTicker(interval)
go func() {
for range ticker.C {
n.UpdateFollow()
n.SeedNode()
}
}()
}
/*EncryptMessage This is a placeholder until the libsignal is operational.
For now we will just encrypt outgoing offline messages with the long lived identity key.
Optionally you may provide a public key, to avoid doing an IPFS lookup */
func (n *EvenNode) EncryptMessage(peerID peer.ID, peerKey *libp2p.PubKey, message []byte) (ct []byte, rerr error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if peerKey == nil {
var pubKey libp2p.PubKey
keyval, err := n.IpfsNode.Repo.Datastore().Get(dshelp.NewKeyFromBinary([]byte(KeyCachePrefix + peerID.Pretty())))
if err != nil {
pubKey, err = routing.GetPublicKey(n.IpfsNode.Routing, ctx, []byte(peerID))
if err != nil {
log.Errorf("Failed to find public key for %s", peerID.Pretty())
return nil, err
}
} else {
pubKey, err = libp2p.UnmarshalPublicKey(keyval.([]byte))
if err != nil {
log.Errorf("Failed to find public key for %s", peerID.Pretty())
return nil, err
}
}
peerKey = &pubKey
}
if peerID.MatchesPublicKey(*peerKey) {
ciphertext, err := net.Encrypt(*peerKey, message)
if err != nil {
return nil, err
}
return ciphertext, nil
}
log.Errorf("peer public key and id do not match for peer: %s", peerID.Pretty())
return nil, errors.New("peer public key and id do not match")
}
// IPFSIdentityString - IPFS identifier
func (n *EvenNode) IPFSIdentityString() string {
return n.IpfsNode.Identity.Pretty()
}
// createSlugFor Create a slug from a string
func createSlugFor(slugName string) string {
l := SentenceMaxCharacters - SlugBuffer
if len(slugName) < SentenceMaxCharacters-SlugBuffer {
l = len(slugName)
}
return url.QueryEscape(sanitize.Path(strings.ToLower(slugName[:l])))
}