forked from orbitdb-archive/ipfs-log
-
Notifications
You must be signed in to change notification settings - Fork 0
/
log.js
599 lines (520 loc) · 19.1 KB
/
log.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
'use strict'
const pMap = require('p-map')
const GSet = require('./g-set')
const Entry = require('./entry')
const LogIO = require('./log-io')
const LogError = require('./log-errors')
const Clock = require('./lamport-clock')
const { LastWriteWins } = require('./log-sorting')
const AccessController = require('./default-access-controller')
const { isDefined, findUniques } = require('./utils')
const randomId = () => new Date().getTime().toString()
const getHash = e => e.hash
const flatMap = (res, acc) => res.concat(acc)
const getNextPointers = entry => entry.next
const maxClockTimeReducer = (res, acc) => Math.max(res, acc.clock.time)
const uniqueEntriesReducer = (res, acc) => {
res[acc.hash] = acc
return res
}
/**
* Log
*
* @description
* Log implements a G-Set CRDT and adds ordering
*
* From:
* "A comprehensive study of Convergent and Commutative Replicated Data Types"
* https://hal.inria.fr/inria-00555588
*/
class Log extends GSet {
/**
* Create a new Log instance
* @param {IPFS} [ipfs] An IPFS instance
* @param {Object} [access] AccessController (./default-access-controller)
* @param {Object} [identity] Identity (https://github.com/orbitdb/orbit-db-identity-provider/blob/master/src/identity.js)
* @param {String} [logId] ID of the log
* @param {Array<Entry>} [entries] An Array of Entries from which to create the log
* @param {Array<Entry>} [heads] Set the heads of the log
* @param {Clock} [clock] Set the clock of the log
* @return {Log} Log
*/
constructor (ipfs, access, identity, logId, entries, heads, clock) {
if (!isDefined(ipfs)) {
throw LogError.IPFSNotDefinedError()
}
if (!isDefined(access)) {
throw new Error('Access controller is required')
}
if (!isDefined(identity)) {
throw new Error('Identity is required')
}
if (isDefined(entries) && !Array.isArray(entries)) {
throw new Error(`'entries' argument must be an array of Entry instances`)
}
if (isDefined(heads) && !Array.isArray(heads)) {
throw new Error(`'heads' argument must be an array`)
}
super()
this._storage = ipfs
this._id = logId || randomId()
// Access Controller
this._access = access
// Identity
this._identity = identity
// Add entries to the internal cache
entries = entries || []
this._entryIndex = entries.reduce(uniqueEntriesReducer, {})
// Set heads if not passed as an argument
heads = heads || Log.findHeads(entries)
this._headsIndex = heads.reduce(uniqueEntriesReducer, {})
// Index of all next pointers in this log
this._nextsIndex = {}
const addToNextsIndex = e => e.next.forEach(a => (this._nextsIndex[a] = e.hash))
entries.forEach(addToNextsIndex)
// Set the length, we calculate the length manually internally
this._length = entries.length
// Set the clock
const maxTime = Math.max(clock ? clock.time : 0, this.heads.reduce(maxClockTimeReducer, 0))
// Take the given key as the clock id is it's a Key instance,
// otherwise if key was given, take whatever it is,
// and if it was null, take the given id as the clock id
this._clock = new Clock(this._identity.publicKey, maxTime)
}
/**
* Returns the ID of the log
* @returns {string}
*/
get id () {
return this._id
}
/**
* Returns the clock of the log
* @returns {string}
*/
get clock () {
return this._clock
}
/**
* Returns the length of the log
* @return {Number} Length
*/
get length () {
return this._length
}
/**
* Returns the values in the log
* @returns {Array<Entry>}
*/
get values () {
return Object.values(this.traverse(this.heads)).reverse()
}
/**
* Returns an array of heads as multihashes
* @returns {Array<string>}
*/
get heads () {
return Object.values(this._headsIndex).sort(LastWriteWins).reverse()
}
/**
* Returns an array of Entry objects that reference entries which
* are not in the log currently
* @returns {Array<Entry>}
*/
get tails () {
return Log.findTails(this.values)
}
/**
* Returns an array of multihashes that are referenced by entries which
* are not in the log currently
* @returns {Array<string>} Array of multihashes
*/
get tailHashes () {
return Log.findTailHashes(this.values)
}
/**
* Find an entry
* @param {string} [hash] The Multihash of the entry as Base58 encoded string
* @returns {Entry|undefined}
*/
get (hash) {
return this._entryIndex[hash]
}
has (entry) {
return this._entryIndex[entry.hash || entry] !== undefined
}
traverse (rootEntries, amount = -1) {
// Sort the given given root entries and use as the starting stack
let stack = rootEntries.sort(LastWriteWins).reverse()
// Cache for checking if we've processed an entry already
let traversed = {}
// End result
let result = {}
// We keep a counter to check if we have traversed requested amount of entries
let count = 0
// Named function for getting an entry from the log
const getEntry = e => this.get(e)
// Add an entry to the stack and traversed nodes index
const addToStack = entry => {
// If we've already processed the entry, don't add it to the stack
if (!entry || traversed[entry.hash]) {
return
}
// Add the entry in front of the stack and sort
stack = [entry, ...stack]
.sort(LastWriteWins)
.reverse()
// Add to the cache of processed entries
traversed[entry.hash] = true
}
// Start traversal
// Process stack until it's empty (traversed the full log)
// or when we have the requested amount of entries
// If requested entry amount is -1, traverse all
while (stack.length > 0 && (amount === -1 || count < amount)) { // eslint-disable-line no-unmodified-loop-condition
// Get the next element from the stack
const entry = stack.shift()
// Add to the result
count++
result[entry.hash] = entry
// Add entry's next references to the stack
entry.next.map(getEntry)
.filter(isDefined)
.forEach(addToStack)
}
return result
}
/**
* Append an entry to the log
* @param {Entry} entry Entry to add
* @return {Log} New Log containing the appended value
*/
async append (data, pointerCount = 1) {
// Update the clock (find the latest clock)
const newTime = Math.max(this.clock.time, this.heads.reduce(maxClockTimeReducer, 0)) + 1
this._clock = new Clock(this.clock.id, newTime)
// Get the required amount of hashes to next entries (as per current state of the log)
const references = this.traverse(this.heads, Math.max(pointerCount, this.heads.length))
const nexts = Object.keys(Object.assign({}, this._headsIndex, references))
// @TODO: Split Entry.create into creating object, checking permission, signing and then posting to IPFS
// Create the entry and add it to the internal cache
const entry = await Entry.create(
this._storage,
this._identity,
this.id,
data,
nexts,
this.clock
)
const canAppend = await this._access.canAppend(entry, this._identity.provider)
if (!canAppend) {
throw new Error(`Could not append entry, key "${this._identity.id}" is not allowed to write to the log`)
}
this._entryIndex[entry.hash] = entry
nexts.forEach(e => (this._nextsIndex[e] = entry.hash))
this._headsIndex = {}
this._headsIndex[entry.hash] = entry
// Update the length
this._length++
return entry
}
/**
* Join two logs
*
* @description Joins two logs returning a new log. Doesn't mutate the original logs.
*
* @param {IPFS} [ipfs] An IPFS instance
* @param {Log} log Log to join with this Log
* @param {Number} [size] Max size of the joined log
* @param {string} [id] ID to use for the new log
*
* @example
* await log1.join(log2)
*
* @returns {Promise<Log>}
*/
async join (log, size = -1) {
if (!isDefined(log)) throw LogError.LogNotDefinedError()
if (!Log.isLog(log)) throw LogError.NotALogError()
if (this.id !== log.id) return
// Get the difference of the logs
const newItems = Log.difference(log, this)
const identityProvider = this._identity.provider
// Verify if entries are allowed to be added to the log and throws if
// there's an invalid entry
const permitted = async (entry) => {
const canAppend = await this._access.canAppend(entry, identityProvider)
if (!canAppend) {
throw new Error(`Could not append entry, key "${entry.identity.id}" is not allowed to write to the log`)
}
}
// Verify signature for each entry and throws if there's an invalid signature
const verify = async (entry) => {
const isValid = await Entry.verify(identityProvider, entry)
const publicKey = entry.identity ? entry.identity.publicKey : entry.key
if (!isValid) throw new Error(`Could not validate signature "${entry.sig}" for entry "${entry.hash}" and key "${publicKey}"`)
}
const entriesToJoin = Object.values(newItems)
await pMap(entriesToJoin, permitted, { concurrency: 1 })
await pMap(entriesToJoin, verify, { concurrency: 1 })
// Update the internal next pointers index
const addToNextsIndex = e => {
const entry = this.get(e.hash)
if (!entry) this._length++ /* istanbul ignore else */
e.next.forEach(a => (this._nextsIndex[a] = e.hash))
}
Object.values(newItems).forEach(addToNextsIndex)
// Update the internal entry index
this._entryIndex = Object.assign(this._entryIndex, newItems)
// Merge the heads
const notReferencedByNewItems = e => !nextsFromNewItems.find(a => a === e.hash)
const notInCurrentNexts = e => !this._nextsIndex[e.hash]
const nextsFromNewItems = Object.values(newItems).map(getNextPointers).reduce(flatMap, [])
const mergedHeads = Log.findHeads(Object.values(Object.assign({}, this._headsIndex, log._headsIndex)))
.filter(notReferencedByNewItems)
.filter(notInCurrentNexts)
.reduce(uniqueEntriesReducer, {})
this._headsIndex = mergedHeads
// Slice to the requested size
if (size > -1) {
let tmp = this.values
tmp = tmp.slice(-size)
this._entryIndex = tmp.reduce(uniqueEntriesReducer, {})
this._headsIndex = Log.findHeads(tmp)
this._length = Object.values(this._entryIndex).length
}
// Find the latest clock from the heads
const maxClock = Object.values(this._headsIndex).reduce(maxClockTimeReducer, 0)
this._clock = new Clock(this.clock.id, Math.max(this.clock.time, maxClock))
return this
}
/**
* Get the log in JSON format
* @returns {Object<{heads}>}
*/
toJSON () {
return {
id: this.id,
heads: this.heads
.sort(LastWriteWins) // default sorting
.reverse() // we want the latest as the first element
.map(getHash) // return only the head hashes
}
}
toSnapshot () {
return {
id: this.id,
heads: this.heads,
values: this.values
}
}
/**
* Get the log as a Buffer
* @returns {Buffer}
*/
toBuffer () {
return Buffer.from(JSON.stringify(this.toJSON()))
}
/**
* Returns the log entries as a formatted string
* @example
* two
* └─one
* └─three
* @returns {string}
*/
toString (payloadMapper) {
return this.values
.slice()
.reverse()
.map((e, idx) => {
const parents = Entry.findChildren(e, this.values)
const len = parents.length
let padding = new Array(Math.max(len - 1, 0))
padding = len > 1 ? padding.fill(' ') : padding
padding = len > 0 ? padding.concat(['└─']) : padding
/* istanbul ignore next */
return padding.join('') + (payloadMapper ? payloadMapper(e.payload) : e.payload)
})
.join('\n')
}
/**
* Check whether an object is a Log instance
* @param {Object} log An object to check
* @returns {true|false}
*/
static isLog (log) {
return log.id !== undefined &&
log.heads !== undefined &&
log._entryIndex !== undefined
}
/**
* Get the log's multihash
* @returns {Promise<string>} Multihash of the Log as Base58 encoded string
*/
toMultihash () {
return LogIO.toMultihash(this._storage, this)
}
/**
* Create a log from multihash
* @param {IPFS} ipfs An IPFS instance
* @param {string} hash Multihash (as a Base58 encoded string) to create the log from
* @param {Number} [length=-1] How many items to include in the log
* @param {Function(hash, entry, parent, depth)} onProgressCallback
* @return {Promise<Log>} New Log
*/
static async fromMultihash (ipfs, access, identity, hash, length = -1, exclude, onProgressCallback) {
if (!isDefined(ipfs)) throw LogError.IPFSNotDefinedError()
if (!isDefined(hash)) throw new Error(`Invalid hash: ${hash}`)
// TODO: need to verify the entries with 'key'
const data = await LogIO.fromMultihash(ipfs, hash, length, exclude, onProgressCallback)
return new Log(ipfs, access, identity, data.id, data.values, data.heads, data.clock)
}
/**
* Create a log from a single entry's multihash
* @param {IPFS} ipfs An IPFS instance
* @param {string} hash Multihash (as a Base58 encoded string) of the Entry from which to create the log from
* @param {Number} [length=-1] How many entries to include in the log
* @param {Function(hash, entry, parent, depth)} onProgressCallback
* @return {Promise<Log>} New Log
*/
static async fromEntryHash (ipfs, access, identity, hash, id, length = -1, exclude, onProgressCallback) {
if (!isDefined(ipfs)) throw LogError.IPFSNotDefinedError()
if (!isDefined(hash)) throw new Error("'hash' must be defined")
// TODO: need to verify the entries with 'key'
const data = await LogIO.fromEntryHash(ipfs, hash, id, length, exclude, onProgressCallback)
return new Log(ipfs, access, identity, id, data.values)
}
/**
* Create a log from a Log Snapshot JSON
* @param {IPFS} ipfs An IPFS instance
* @param {Object} json Log snapshot as JSON object
* @param {Number} [length=-1] How many entries to include in the log
* @param {Function(hash, entry, parent, depth)} [onProgressCallback]
* @return {Promise<Log>} New Log
*/
static async fromJSON (ipfs, access, identity, json, length = -1, timeout, onProgressCallback) {
if (!isDefined(ipfs)) throw LogError.IPFSNotDefinedError()
// TODO: need to verify the entries with 'key'
const data = await LogIO.fromJSON(ipfs, json, length, timeout, onProgressCallback)
return new Log(ipfs, access, identity, data.id, data.values)
}
/**
* Create a new log from an Entry instance
* @param {IPFS} ipfs An IPFS instance
* @param {Entry|Array<Entry>} sourceEntries An Entry or an array of entries to fetch a log from
* @param {Number} [length=-1] How many entries to include. Default: infinite.
* @param {Array<Entry|string>} [exclude] Array of entries or hashes or entries to not fetch (foe eg. cached entries)
* @param {Function(hash, entry, parent, depth)} [onProgressCallback]
* @return {Promise<Log>} New Log
*/
static async fromEntry (ipfs, access, identity, sourceEntries, length = -1, exclude, onProgressCallback) {
if (!isDefined(ipfs)) throw LogError.IPFSNotDefinedError()
if (!isDefined(sourceEntries)) throw new Error("'sourceEntries' must be defined")
// TODO: need to verify the entries with 'key'
const data = await LogIO.fromEntry(ipfs, sourceEntries, length, exclude, onProgressCallback)
return new Log(ipfs, access, identity, data.id, data.values)
}
/**
* Find heads from a collection of entries
*
* @description
* Finds entries that are the heads of this collection,
* ie. entries that are not referenced by other entries
*
* @param {Array<Entry>} Entries to search heads from
* @returns {Array<Entry>}
*/
static findHeads (entries) {
var indexReducer = (res, entry, idx, arr) => {
var addToResult = e => (res[e] = entry.hash)
entry.next.forEach(addToResult)
return res
}
var items = entries.reduce(indexReducer, {})
var exists = e => items[e.hash] === undefined
var compareIds = (a, b) => a.clock.id > b.clock.id
return entries.filter(exists).sort(compareIds)
}
// Find entries that point to another entry that is not in the
// input array
static findTails (entries) {
// Reverse index { next -> entry }
var reverseIndex = {}
// Null index containing entries that have no parents (nexts)
var nullIndex = []
// Hashes for all entries for quick lookups
var hashes = {}
// Hashes of all next entries
var nexts = []
var addToIndex = (e) => {
if (e.next.length === 0) {
nullIndex.push(e)
}
var addToReverseIndex = (a) => {
/* istanbul ignore else */
if (!reverseIndex[a]) reverseIndex[a] = []
reverseIndex[a].push(e)
}
// Add all entries and their parents to the reverse index
e.next.forEach(addToReverseIndex)
// Get all next references
nexts = nexts.concat(e.next)
// Get the hashes of input entries
hashes[e.hash] = true
}
// Create our indices
entries.forEach(addToIndex)
var addUniques = (res, entries, idx, arr) => res.concat(findUniques(entries, 'hash'))
var exists = e => hashes[e] === undefined
var findFromReverseIndex = e => reverseIndex[e]
// Drop hashes that are not in the input entries
const tails = nexts // For every multihash in nexts:
.filter(exists) // Remove undefineds and nulls
.map(findFromReverseIndex) // Get the Entry from the reverse index
.reduce(addUniques, []) // Flatten the result and take only uniques
.concat(nullIndex) // Combine with tails the have no next refs (ie. first-in-their-chain)
return findUniques(tails, 'hash').sort(Entry.compare)
}
// Find the hashes to entries that are not in a collection
// but referenced by other entries
static findTailHashes (entries) {
var hashes = {}
var addToIndex = e => (hashes[e.hash] = true)
var reduceTailHashes = (res, entry, idx, arr) => {
var addToResult = (e) => {
/* istanbul ignore else */
if (hashes[e] === undefined) {
res.splice(0, 0, e)
}
}
entry.next.reverse().forEach(addToResult)
return res
}
entries.forEach(addToIndex)
return entries.reduce(reduceTailHashes, [])
}
static difference (a, b) {
let stack = Object.keys(a._headsIndex)
let traversed = {}
let res = {}
const pushToStack = hash => {
if (!traversed[hash] && !b.get(hash)) {
stack.push(hash)
traversed[hash] = true
}
}
while (stack.length > 0) {
const hash = stack.shift()
const entry = a.get(hash)
if (entry && !b.get(hash) && entry.id === b.id) {
res[entry.hash] = entry
traversed[entry.hash] = true
entry.next.forEach(pushToStack)
}
}
return res
}
}
module.exports = Log
module.exports.AccessController = AccessController