Skip to content

Commit

Permalink
Merge pull request ipfs#32 from ipfs/ensure-shard-manipulations-have-…
Browse files Browse the repository at this point in the history
…same-hashes

fix: make sure hashes are the same after shard changes
  • Loading branch information
achingbrain committed Dec 1, 2018
2 parents 71fd8fa + 4e68e8d commit 1d26cf8
Show file tree
Hide file tree
Showing 12 changed files with 705 additions and 269 deletions.
215 changes: 88 additions & 127 deletions src/core/utils/add-link.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded')
const series = require('async/series')
const log = require('debug')('ipfs:mfs:core:utils:add-link')
const UnixFS = require('ipfs-unixfs')
const Bucket = require('hamt-sharding')
const {
generatePath,
updateHamtDirectory
} = require('./hamt-utils')

const defaultOptions = {
parent: undefined,
Expand Down Expand Up @@ -78,25 +81,33 @@ const addLink = (context, options, callback) => {
return convertToShardedDirectory(context, options, callback)
}

log('Adding to regular directory')
log(`Adding ${options.name} to regular directory`)

addToDirectory(context, options, callback)
}

const convertToShardedDirectory = (context, options, callback) => {
createShard(context, options.parent.links.map(link => ({
name: link.name,
size: link.size,
multihash: link.cid.buffer
})).concat({
name: options.name,
size: options.size,
multihash: options.cid.buffer
}), {}, (err, result) => {
if (!err) {
log('Converted directory to sharded directory', result.cid.toBaseEncodedString())
}

callback(err, result)
})
}

const addToDirectory = (context, options, callback) => {
waterfall([
(done) => {
if (options.name) {
// Remove the old link if necessary
return DAGNode.rmLink(options.parent, options.name, done)
}

done(null, options.parent)
},
(parent, done) => {
// Add the new link to the parent
DAGNode.addLink(parent, new DAGLink(options.name, options.size, options.cid), done)
},
(done) => DAGNode.rmLink(options.parent, options.name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(options.name, options.size, options.cid), done),
(parent, done) => {
// Persist the new parent DAGNode
context.ipld.put(parent, {
Expand All @@ -112,125 +123,67 @@ const addToDirectory = (context, options, callback) => {
], callback)
}

const addToShardedDirectory = async (context, options, callback) => {
const bucket = new Bucket({
hashFn: DirSharded.hashFn
})
const position = await bucket._findNewBucketAndPos(options.name)
const prefix = position.pos
.toString('16')
.toUpperCase()
.padStart(2, '0')
.substring(0, 2)

const existingSubShard = options.parent.links
.filter(link => link.name === prefix)
.pop()

if (existingSubShard) {
log(`Descending into sub-shard ${prefix} to add link ${options.name}`)

return addLink(context, {
...options,
parent: null,
parentCid: existingSubShard.cid
}, (err, { cid, node }) => {
if (err) {
return callback(err)
}

// make sure parent is updated with new sub-shard cid
addToDirectory(context, {
...options,
parent: options.parent,
parentCid: options.parentCid,
name: prefix,
size: node.size,
cid: cid
}, callback)
})
}

const existingFile = options.parent.links
.filter(link => link.name.substring(2) === options.name)
.pop()

if (existingFile) {
log(`Updating file ${existingFile.name}`)

return addToDirectory(context, {
...options,
name: existingFile.name
}, callback)
}
const addToShardedDirectory = (context, options, callback) => {
return waterfall([
(cb) => generatePath(context, options.name, options.parent, cb),
({ rootBucket, path }, cb) => {
updateShard(context, path, {
name: options.name,
cid: options.cid,
size: options.size
}, options, (err, result = {}) => cb(err, { rootBucket, ...result }))
},
({ rootBucket, node }, cb) => updateHamtDirectory(context, node.links, rootBucket, options, cb)
], callback)
}

const existingUnshardedFile = options.parent.links
.filter(link => link.name.substring(0, 2) === prefix)
.pop()

if (existingUnshardedFile) {
log(`Replacing file ${existingUnshardedFile.name} with sub-shard`)

return createShard(context, [{
name: existingUnshardedFile.name.substring(2),
size: existingUnshardedFile.size,
multihash: existingUnshardedFile.cid.buffer
}, {
name: options.name,
size: options.size,
multihash: options.cid.buffer
}], {
root: false
}, (err, result) => {
if (err) {
return callback(err)
const updateShard = (context, positions, child, options, callback) => {
const {
bucket,
prefix,
node
} = positions.pop()

const link = node.links
.find(link => link.name.substring(0, 2) === prefix && link.name !== `${prefix}${child.name}`)

return waterfall([
(cb) => {
if (link && link.name.length > 2) {
log(`Converting existing file ${link.name} into sub-shard for ${child.name}`)

return waterfall([
(done) => createShard(context, [{
name: link.name.substring(2),
size: link.size,
multihash: link.cid.buffer
}, {
name: child.name,
size: child.size,
multihash: child.cid.buffer
}], {}, done),
({ node: { links: [ shard ] } }, done) => {
return context.ipld.get(shard.cid, (err, result) => {
done(err, { cid: shard.cid, node: result && result.value })
})
},
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}

const newShard = result.node.links[0]

waterfall([
(done) => DAGNode.rmLink(options.parent, existingUnshardedFile.name, done),
(parent, done) => DAGNode.addLink(parent, newShard, done),
(parent, done) => {
// Persist the new parent DAGNode
context.ipld.put(parent, {
version: options.cidVersion,
format: options.codec,
hashAlg: options.hashAlg,
hashOnly: !options.flush
}, (error, cid) => done(error, {
node: parent,
cid
}))
}
], callback)
})
}

log(`Appending ${prefix + options.name} to shard`)
if (link && link.name.length === 2) {
log(`Descending into sub-shard ${link.name} for ${child.name}`)

return addToDirectory(context, {
...options,
name: prefix + options.name
}, callback)
}
return waterfall([
(cb) => updateShard(context, positions, child, options, cb),
(result, cb) => updateShardParent(context, bucket, node, link.name, result.node, result.cid, prefix, options, cb)
], cb)
}

const convertToShardedDirectory = (context, options, callback) => {
createShard(context, options.parent.links.map(link => ({
name: link.name,
size: link.size,
multihash: link.cid.buffer
})).concat({
name: options.name,
size: options.size,
multihash: options.cid.buffer
}), {}, (err, result) => {
if (!err) {
log('Converted directory to sharded directory', result.cid.toBaseEncodedString())
log(`Adding or replacing file`, prefix + child.name)
updateShardParent(context, bucket, node, prefix + child.name, child, child.cid, prefix + child.name, options, cb)
}

callback(err, result)
})
], callback)
}

const createShard = (context, contents, options, callback) => {
Expand Down Expand Up @@ -267,4 +220,12 @@ const createShard = (context, contents, options, callback) => {
)
}

const updateShardParent = (context, bucket, parent, name, node, cid, prefix, options, callback) => {
waterfall([
(done) => DAGNode.rmLink(parent, name, done),
(parent, done) => DAGNode.addLink(parent, new DAGLink(prefix, node.size, cid), done),
(parent, done) => updateHamtDirectory(context, parent.links, bucket, options, done)
], callback)
}

module.exports = addLink

0 comments on commit 1d26cf8

Please sign in to comment.