Skip to content
Permalink
Browse files
feat: implement "the leaf hack" fixing estuary uploads
This allows us to lie to estuary and uploads our dangling links by hiding them under raw leaves with identical multihashes.
  • Loading branch information
Jorropo committed Jan 23, 2022
1 parent 2c12035 commit bebaee67513e484c6b0f48fd66a2e04406cf6cb6
Showing 1 changed file with 83 additions and 110 deletions.
193 main.go
@@ -160,56 +160,75 @@ func (r *recursiveTraverser) send(job sendJobs) error {
return nil
}

// Write a linking root if we have multiple roots
var data []byte
var root cid.Cid
if len(job.roots) != 1 {
var err error
links := make([]*pb.PBLink, len(job.roots))

for i, v := range job.roots {
sSize := uint64(v.DagSize)
n := strconv.FormatUint(uint64(i), 32)
links[i] = &pb.PBLink{
Name: &n,
Tsize: &sSize,
Hash: v.Cid.Bytes(),
}
cidsToLink := make([]*pb.PBLink, len(job.roots))
var nameCounter uint64
for _, v := range job.roots {
sSize := uint64(v.DagSize)
n := strconv.FormatUint(uint64(nameCounter), 32)
cidsToLink[nameCounter] = &pb.PBLink{
Name: &n,
Tsize: &sSize,
Hash: v.Cid.Bytes(),
}
nameCounter++
}

data, err = proto.Marshal(&pb.PBNode{
Links: links,
Data: directoryData,
})
// Write a linking root if we have multiple roots
var data []byte
for len(cidsToLink) != 1 {
candidateCount := 2
sSize := *cidsToLink[0].Tsize + *cidsToLink[1].Tsize
lastBlockData, err := makeFakeRoot(cidsToLink[:candidateCount], sSize)
if err != nil {
return fmt.Errorf("can't Marshal rooting directory: %e", err)
return fmt.Errorf("serialising fake root: %e", err)
}
if len(data) > blockTarget {
return fmt.Errorf("rooting directory exceed block limit, TODO: support sharding directories")
for len(cidsToLink) > candidateCount {
sSize += *cidsToLink[candidateCount].Tsize
candidateCount++
newBlockData, err := makeFakeRoot(cidsToLink[:candidateCount], sSize)
if err != nil {
return fmt.Errorf("serialising fake root: %e", err)
}

if len(newBlockData) > blockTarget {
candidateCount--
sSize -= *cidsToLink[candidateCount].Tsize
break
}
lastBlockData = newBlockData
}
cidsToLink = cidsToLink[candidateCount-1:] // Saving space to overwrite the first element with the new directory

// Making block header
varuintHeader := make([]byte, binary.MaxVarintLen64+dagPBCIDLength+len(data))
uvarintSize := binary.PutUvarint(varuintHeader, uint64(dagPBCIDLength)+uint64(len(data)))
varuintHeader := make([]byte, binary.MaxVarintLen64+dagPBCIDLength+len(data)+len(lastBlockData))
uvarintSize := binary.PutUvarint(varuintHeader, uint64(dagPBCIDLength)+uint64(len(lastBlockData)))
varuintHeader = varuintHeader[:uvarintSize]

h := sha256.Sum256(data)
h := sha256.Sum256(lastBlockData)
mhash, err := mh.Encode(h[:], mh.SHA2_256)
if err != nil {
return fmt.Errorf("encoding multihash: %e", err)
}
c := cid.NewCidV1(cid.DagProtobuf, mhash)
data = append(append(varuintHeader, c.Bytes()...), data...)
root = c
} else {
root = job.roots[0].Cid
data = append(append(append(varuintHeader, c.Bytes()...), lastBlockData...), data...)
n := strconv.FormatUint(uint64(nameCounter), 32)
cidsToLink[0] = &pb.PBLink{
Name: &n,
Tsize: &sSize,
Hash: c.Bytes(),
}
nameCounter++
}

var buff io.Reader
// Writing CAR header
{
c, err := cid.Cast(cidsToLink[0].Hash)
if err != nil {
return fmt.Errorf("casting CID back from bytes: %e", err)
}
headerBuffer, err := cbor.DumpObject(&car.CarHeader{
Roots: []cid.Cid{root},
Roots: []cid.Cid{c},
Version: 1,
})
if err != nil {
@@ -252,23 +271,40 @@ func (r *recursiveTraverser) send(job sendJobs) error {
return nil
}

func makeFakeRoot(links []*pb.PBLink, sSize uint64) ([]byte, error) {
data, err := proto.Marshal(&pb.PBNode{
Links: links,
Data: directoryData,
})
if err != nil {
return nil, fmt.Errorf("can't Marshal rooting directory: %e", err)
}

return data, nil
}

func (r *recursiveTraverser) writePBNode(data []byte) (cid.Cid, error) {
// Making block header
varuintHeader := make([]byte, binary.MaxVarintLen64+dagPBCIDLength+len(data))
uvarintSize := binary.PutUvarint(varuintHeader, uint64(dagPBCIDLength)+uint64(len(data)))
varuintHeader = varuintHeader[:uvarintSize]

fullSize := len(data) + uvarintSize + dagPBCIDLength
fullSize := int64(len(data)) + int64(uvarintSize) + int64(rawleafCIDLength)

h := sha256.Sum256(data)
mhash, err := mh.Encode(h[:], mh.SHA2_256)
if err != nil {
return cid.Cid{}, fmt.Errorf("encoding multihash: %e", err)
}
c := cid.NewCidV1(cid.DagProtobuf, mhash)
rootBlock := append(append(varuintHeader, c.Bytes()...), data...)
fakeLeaf := cid.NewCidV1(cid.Raw, mhash)
rootBlock := append(append(varuintHeader, fakeLeaf.Bytes()...), data...)
r.newBlock(&cidSizePair{
Cid: fakeLeaf,
FileSize: fullSize,
DagSize: fullSize,
})

off, err := r.takeOffset(int64(fullSize))
off, err := r.takeOffset(fullSize)
if err != nil {
return cid.Cid{}, fmt.Errorf("taking offset: %e", err)
}
@@ -277,7 +313,7 @@ func (r *recursiveTraverser) writePBNode(data []byte) (cid.Cid, error) {
return cid.Cid{}, fmt.Errorf("writing root's header: %e", err)
}

return c, nil
return cid.NewCidV1(cid.DagProtobuf, mhash), nil
}

type swapAbleFile struct {
@@ -302,76 +338,25 @@ type recursiveTraverser struct {
estuaryKey string
estuaryShuttle string

rootsLen uint
roots *cidRootsNode
toSend []*cidSizePair

client http.Client
}

type cidRootsNode struct {
c *cidSizePair
next *cidRootsNode
prev *cidRootsNode
func (rt *recursiveTraverser) newBlock(c *cidSizePair) {
rt.toSend = append(rt.toSend, c)
}

func (rt *recursiveTraverser) addRoot(c *cidSizePair) {
if c.rootNode != nil {
panic("Internal bug, added same root object twice!")
}

oldFirstNode := rt.roots
r := &cidRootsNode{
c: c,
next: oldFirstNode,
}
c.rootNode = r
rt.roots = r
if oldFirstNode != nil {
oldFirstNode.prev = r
}
rt.rootsLen++
}

func (rt *recursiveTraverser) removeRoot(c *cidSizePair) {
r := c.rootNode
if r == nil {
return
}

n := r.next
p := r.prev
if p == nil {
rt.roots = n
} else {
p.next = n
}
if n != nil {
n.prev = p
}
rt.rootsLen--
}

func (r *recursiveTraverser) pullRoots() sendJobs {
cids := make([]*cidSizePair, r.rootsLen)
var i uint
cur := r.roots
for cur != nil {
o := cur.c
cids[i] = o
o.rootNode = nil
cur = cur.next
i++
}
r.roots = nil
r.rootsLen = 0
return sendJobs{cids, r.tempCarOffset}
func (r *recursiveTraverser) pullBlock() sendJobs {
j := sendJobs{r.toSend, r.tempCarOffset}
r.toSend = nil
return j
}

type cidSizePair struct {
Cid cid.Cid
FileSize int64
DagSize int64
rootNode *cidRootsNode
}

func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, error) {
@@ -458,14 +443,7 @@ func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, e
return nil, fmt.Errorf("writing directory %s: %e", task, err)
}

for _, v := range sCids {
r.removeRoot(v)
}

cp := &cidSizePair{c, fileSum, dagSum, nil}
r.addRoot(cp)

return cp, nil
return &cidSizePair{c, fileSum, dagSum}, nil
default:
// File
f, err := os.Open(task)
@@ -537,7 +515,7 @@ func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, e
FileSize: workSize,
DagSize: workSize,
}
r.addRoot(cp)
r.newBlock(cp)
CIDs[i] = cp

err = fullWriteAt(r.tempCarChunk.File, append(varuintHeader, c.Bytes()...), carOffset)
@@ -588,7 +566,7 @@ func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, e
if err != nil {
return nil, fmt.Errorf("building a root for %s: %e", task, err)
}
for len(lastRoot) < blockTarget && len(CIDs) > CIDCountAttempt {
for len(CIDs) > CIDCountAttempt {
fileSum += CIDs[CIDCountAttempt].FileSize
CIDCountAttempt++
newRoot, err := makeFileRoot(CIDs[:CIDCountAttempt], uint64(fileSum))
@@ -609,14 +587,9 @@ func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, e
if err != nil {
return nil, fmt.Errorf("writing root for %s: %e", task, err)
}

for _, v := range CIDs[:CIDCountAttempt] {
r.removeRoot(v)
}
CIDs = CIDs[CIDCountAttempt:]

cp := &cidSizePair{c, fileSum, dagSum, nil}
r.addRoot(cp)
cp := &cidSizePair{c, fileSum, dagSum}
newRoots = append(newRoots, cp)
}
CIDs = newRoots
@@ -629,7 +602,7 @@ func (r *recursiveTraverser) do(task string, entry os.FileInfo) (*cidSizePair, e
func (r *recursiveTraverser) swap() error {
<-r.chunkT
r.tempCarSend, r.tempCarChunk = r.tempCarChunk, r.tempCarSend
r.sendT <- r.pullRoots()
r.sendT <- r.pullBlock()
err := r.tempCarChunk.File.Truncate(0)
if err != nil {
return err

0 comments on commit bebaee6

Please sign in to comment.