Skip to content

Hypercore replication fails to complete #41

@hackergrrl

Description

@hackergrrl

I experienced this bug during Mapeo development, which I managed to reproduce with multifeed, and then again with just hypercore + hypercore-protocol.

The repro setup is:

  1. create two feeds (A, B)
  2. sync A<->B
  3. create two more feeds (C, D)
  4. sync A<->C
  5. sync B<->D
  6. sync AC<->BD
  7. process ends, but replication stream does not

I tried to get the code as minimal as possible, but there's a lot of scaffolding needed to set up replication of several hypercores:

var ram = require('random-access-memory')
var protocol = require('hypercore-protocol')
var hypercore = require('hypercore')
var pump = require('pump')

var defaultEncryptionKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')

function FeedSet () {
  this.feeds = []
}

FeedSet.prototype.writer = function (cb) {
  var core = hypercore(ram, { valueEncoding: 'json' })
  core.ready(() => {
    this.feeds.push(core)
    cb(null, core)
  })
}

FeedSet.prototype.getFeedKeys = function () {
  return this.feeds.map(f => f.key)
}

FeedSet.prototype.replicate = function (opts) {
  var self = this
  var stream = protocol(Object.assign({}, opts, {
    extensions: []
  }))

  stream.feed(defaultEncryptionKey)

  stream.on('prefinalize', function(cb){
    console.log('prefinal')
    cb()
  })

  stream.on('error', console.log)
  stream.on('end', () => console.log('stream end'))

  stream.expectedFeeds += self.feeds.length + opts.remoteKeys.length
  
  // create remote key hypercores
  var pending = opts.remoteKeys.length
  opts.remoteKeys.forEach(key => {
    var core = hypercore(ram, key, { valueEncoding: 'json' })
    core.ready(() => {
      self.feeds.push(core)
      if (!--pending) sync()
    })
  })
  
  // set up protocol stream + feeds
  function sync () {
    self.feeds.forEach(feed => {
      console.log('replicating', feed.key.toString('hex'), feed.writable)
      var fStream = feed.replicate({
        live: false,
        stream: stream
      })
      fStream.once('error', console.log)
    })
  }

  return stream
}

function create (cb) {
  var ff = new FeedSet()
  process.nextTick(cb, null, ff)
}

function addFeed (mf, cb) {
  var batch = new Array(10).fill(0).map((_, n) => { return { foo: n } })
  mf.writer((_, feed) => {
    feed.append(batch, cb)
  })
}

function createWithFeeds (n, cb) {
  var pending = n
  create((_, mf) => {
    for (var i=0; i < n; i++) {
      addFeed(mf, () => {
        if (!--pending) cb(null, mf)
      })
    }
  })
}

function createMultifeeds (numMultifeeds, numFeedsPerMultifeed, cb) {
  var pending = numMultifeeds
  var mfs = []
  for (var i=0; i < numMultifeeds; i++) {
    createWithFeeds(numFeedsPerMultifeed, (_, mf) => {
      mfs.push(mf)
      if (!--pending) cb(null, mfs)
    })
  }
}

function sync (a, b, cb) {
  var r1 = a.replicate({live:false, remoteKeys:b.getFeedKeys()})
  var r2 = b.replicate({live:false, remoteKeys:a.getFeedKeys()})
  pump(r1, r2, r1, cb)
}

createMultifeeds(4, 1, (_, mfs) => {
  console.error('0 <-> 1')
  sync(mfs[0], mfs[1], err => {
    if (err) console.log(err)
    console.error('DONE 0 <-> 1')
    sync(mfs[2], mfs[3], () => {
      console.error('DONE 2 <-> 3')
      sync(mfs[1], mfs[2], () => {
        console.error('DONE 1 <-> 2')
      })
    })
  })
})

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions