From 755650e8a1bc8f8b5940afb5b7fcdaee252635c0 Mon Sep 17 00:00:00 2001 From: Christopher Schinnerl Date: Mon, 25 Jun 2018 18:00:27 -0400 Subject: [PATCH] move erasure code to chunks, get rid of tracking map and move available out of metadata.go --- modules/renter/download.go | 30 ++++-- modules/renter/downloadstreamer.go | 8 +- modules/renter/files.go | 4 +- modules/renter/files_test.go | 31 +++--- modules/renter/persist.go | 6 +- modules/renter/persist_test.go | 2 +- modules/renter/renter.go | 9 -- modules/renter/siafile/compat.go | 21 ++-- modules/renter/siafile/metadata.go | 151 +++++++-------------------- modules/renter/siafile/siafile.go | 160 ++++++++++++++++++++++++----- modules/renter/upload.go | 24 ++++- modules/renter/uploadchunk.go | 4 +- modules/renter/uploadheap.go | 29 ++++-- 13 files changed, 264 insertions(+), 215 deletions(-) diff --git a/modules/renter/download.go b/modules/renter/download.go index cc4ff33ed0..018eab6c0d 100644 --- a/modules/renter/download.go +++ b/modules/renter/download.go @@ -377,8 +377,18 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) { } // Determine which chunks to download. - minChunk := params.offset / params.file.ChunkSize() - maxChunk := (params.offset + params.length - 1) / params.file.ChunkSize() + minChunk, minChunkOffset := params.file.ChunkIndexByOffset(params.offset) + maxChunk, maxChunkOffset := params.file.ChunkIndexByOffset(params.offset + params.length) + if minChunk == params.file.NumChunks() || maxChunk == params.file.NumChunks() { + return nil, errors.New("download is requesting a chunk that is past the boundary of the file") + } + // If the maxChunkOffset is exactly 0 we need to subtract 1 chunk. e.g. if + // the chunkSize is 100 bytes and we want to download 100 bytes from offset + // 0, maxChunk would be 1 and maxChunkOffset would be 0. We want maxChunk + // to be 0 though since we don't actually need any data from chunk 1. + if maxChunk > 0 && maxChunkOffset == 0 { + maxChunk-- + } // For each chunk, assemble a mapping from the contract id to the index of // the piece within the chunk that the contract is responsible for. @@ -413,13 +423,13 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) { for i := minChunk; i <= maxChunk; i++ { udc := &unfinishedDownloadChunk{ destination: params.destination, - erasureCode: params.file.ErasureCode(), + erasureCode: params.file.ErasureCode(i), masterKey: params.file.MasterKey(), staticChunkIndex: i, staticCacheID: fmt.Sprintf("%v:%v", d.staticSiaPath, i), staticChunkMap: chunkMaps[i-minChunk], - staticChunkSize: params.file.ChunkSize(), + staticChunkSize: params.file.ChunkSize(i), staticPieceSize: params.file.PieceSize(), // TODO: 25ms is just a guess for a good default. Really, we want to @@ -435,8 +445,8 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) { staticNeedsMemory: params.needsMemory, staticPriority: params.priority, - physicalChunkData: make([][]byte, params.file.ErasureCode().NumPieces()), - pieceUsage: make([]bool, params.file.ErasureCode().NumPieces()), + physicalChunkData: make([][]byte, params.file.ErasureCode(i).NumPieces()), + pieceUsage: make([]bool, params.file.ErasureCode(i).NumPieces()), download: d, staticStreamCache: r.staticStreamCache, @@ -445,16 +455,16 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) { // Set the fetchOffset - the offset within the chunk that we start // downloading from. if i == minChunk { - udc.staticFetchOffset = params.offset % params.file.ChunkSize() + udc.staticFetchOffset = minChunkOffset } else { udc.staticFetchOffset = 0 } // Set the fetchLength - the number of bytes to fetch within the chunk // that we start downloading from. - if i == maxChunk && (params.length+params.offset)%params.file.ChunkSize() != 0 { - udc.staticFetchLength = ((params.length + params.offset) % params.file.ChunkSize()) - udc.staticFetchOffset + if i == maxChunk && maxChunkOffset != 0 { + udc.staticFetchLength = maxChunkOffset - udc.staticFetchOffset } else { - udc.staticFetchLength = params.file.ChunkSize() - udc.staticFetchOffset + udc.staticFetchLength = params.file.ChunkSize(i) - udc.staticFetchOffset } // Set the writeOffset within the destination for where the data should // be written. diff --git a/modules/renter/downloadstreamer.go b/modules/renter/downloadstreamer.go index 18dca3deab..544f79a18c 100644 --- a/modules/renter/downloadstreamer.go +++ b/modules/renter/downloadstreamer.go @@ -64,10 +64,14 @@ func (s *streamer) Read(p []byte) (n int, err error) { } // Calculate how much we can download. We never download more than a single chunk. - chunkSize := s.file.ChunkSize() + chunkIndex, chunkOffset := s.file.ChunkIndexByOffset(uint64(s.offset)) + if chunkIndex == s.file.NumChunks() { + return 0, io.EOF + } + chunkSize := s.file.ChunkSize(chunkIndex) remainingData := uint64(fileSize - s.offset) requestedData := uint64(len(p)) - remainingChunk := chunkSize - uint64(s.offset)%chunkSize + remainingChunk := chunkSize - chunkOffset length := min(remainingData, requestedData, remainingChunk) // Download data diff --git a/modules/renter/files.go b/modules/renter/files.go index e211bd4336..6e3d00516b 100644 --- a/modules/renter/files.go +++ b/modules/renter/files.go @@ -144,7 +144,7 @@ func (r *Renter) FileList() []modules.FileInfo { tf, exists := r.persist.Tracking[siaPath] r.mu.RUnlock(lockID) if exists { - localPath = tf.RepairPath + localPath = tf.LocalPath() } fileList = append(fileList, modules.FileInfo{ @@ -196,7 +196,7 @@ func (r *Renter) File(siaPath string) (modules.FileInfo, error) { var localPath string tf, exists := r.persist.Tracking[file.SiaPath()] if exists { - localPath = tf.RepairPath + localPath = tf.LocalPath() } fileInfo = modules.FileInfo{ SiaPath: file.SiaPath(), diff --git a/modules/renter/files_test.go b/modules/renter/files_test.go index 3d762c3e05..745f0a1fe2 100644 --- a/modules/renter/files_test.go +++ b/modules/renter/files_test.go @@ -7,7 +7,6 @@ import ( "github.com/NebulousLabs/Sia/crypto" "github.com/NebulousLabs/Sia/modules" - "github.com/NebulousLabs/Sia/modules/renter/siafile" "github.com/NebulousLabs/Sia/types" "github.com/NebulousLabs/errors" ) @@ -32,7 +31,7 @@ func TestFileNumChunks(t *testing.T) { for _, test := range tests { rsc, _ := NewRSCode(test.piecesPerChunk, 1) // can't use 0 - f := siafile.New(t.Name(), rsc, test.pieceSize, test.size) + f := newFile(t.Name(), rsc, test.pieceSize, test.size, 0777, "") if f.NumChunks() != test.expNumChunks { t.Errorf("Test %v: expected %v, got %v", test, test.expNumChunks, f.NumChunks()) } @@ -42,7 +41,7 @@ func TestFileNumChunks(t *testing.T) { // TestFileAvailable probes the available method of the file type. func TestFileAvailable(t *testing.T) { rsc, _ := NewRSCode(1, 1) // can't use 0 - f := siafile.New(t.Name(), rsc, pieceSize, 100) + f := newFile(t.Name(), rsc, pieceSize, 100, 0777, "") neverOffline := make(map[string]bool) if f.Available(neverOffline) { @@ -69,7 +68,7 @@ func TestFileAvailable(t *testing.T) { func TestFileUploadedBytes(t *testing.T) { // ensure that a piece fits within a sector rsc, _ := NewRSCode(1, 3) - f := siafile.New(t.Name(), rsc, modules.SectorSize/2, 1000) + f := newFile(t.Name(), rsc, modules.SectorSize/2, 1000, 0777, "") for i := uint64(0); i < 4; i++ { err := f.AddPiece(types.SiaPublicKey{}, uint64(0), i, crypto.Hash{}) if err != nil { @@ -85,7 +84,7 @@ func TestFileUploadedBytes(t *testing.T) { // 100%, even if more pieces have been uploaded, func TestFileUploadProgressPinning(t *testing.T) { rsc, _ := NewRSCode(1, 1) - f := siafile.New(t.Name(), rsc, 2, 4) + f := newFile(t.Name(), rsc, 2, 4, 0777, "") for i := uint64(0); i < 2; i++ { err1 := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(0)}}, uint64(0), i, crypto.Hash{}) err2 := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(1)}}, uint64(0), i, crypto.Hash{}) @@ -111,7 +110,7 @@ func TestFileRedundancy(t *testing.T) { for _, nData := range nDatas { rsc, _ := NewRSCode(nData, 10) - f := siafile.New(t.Name(), rsc, 100, 1000) + f := newFile(t.Name(), rsc, 100, 1000, 0777, "") // Test that an empty file has 0 redundancy. if r := f.Redundancy(neverOffline, goodForRenew); r != 0 { t.Error("expected 0 redundancy, got", r) @@ -145,33 +144,33 @@ func TestFileRedundancy(t *testing.T) { t.Fatal(err) } // 1.0 / MinPieces because the chunk with the least number of pieces has 1 piece. - expectedR := 1.0 / float64(f.ErasureCode().MinPieces()) + expectedR := 1.0 / float64(f.ErasureCode(0).MinPieces()) if r := f.Redundancy(neverOffline, goodForRenew); r != expectedR { t.Errorf("expected %f redundancy, got %f", expectedR, r) } // Test that adding a file contract that has erasureCode.MinPieces() pieces // per chunk for all chunks results in a file with redundancy > 1. for iChunk := uint64(0); iChunk < f.NumChunks(); iChunk++ { - for iPiece := uint64(1); iPiece < uint64(f.ErasureCode().MinPieces()); iPiece++ { + for iPiece := uint64(1); iPiece < uint64(f.ErasureCode(0).MinPieces()); iPiece++ { err := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(3)}}, iChunk, iPiece, crypto.Hash{}) if err != nil { t.Fatal(err) } } - err := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(4)}}, iChunk, uint64(f.ErasureCode().MinPieces()), crypto.Hash{}) + err := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(4)}}, iChunk, uint64(f.ErasureCode(0).MinPieces()), crypto.Hash{}) if err != nil { t.Fatal(err) } } // 1+MinPieces / MinPieces because the chunk with the least number of pieces has 1+MinPieces pieces. - expectedR = float64(1+f.ErasureCode().MinPieces()) / float64(f.ErasureCode().MinPieces()) + expectedR = float64(1+f.ErasureCode(0).MinPieces()) / float64(f.ErasureCode(0).MinPieces()) if r := f.Redundancy(neverOffline, goodForRenew); r != expectedR { t.Errorf("expected %f redundancy, got %f", expectedR, r) } // verify offline file contracts are not counted in the redundancy for iChunk := uint64(0); iChunk < f.NumChunks(); iChunk++ { - for iPiece := uint64(0); iPiece < uint64(f.ErasureCode().MinPieces()); iPiece++ { + for iPiece := uint64(0); iPiece < uint64(f.ErasureCode(0).MinPieces()); iPiece++ { err := f.AddPiece(types.SiaPublicKey{Key: []byte{byte(5)}}, iChunk, iPiece, crypto.Hash{}) if err != nil { t.Fatal(err) @@ -191,7 +190,8 @@ func TestFileRedundancy(t *testing.T) { // TestFileExpiration probes the expiration method of the file type. func TestFileExpiration(t *testing.T) { - f := newTestingFile() + rsc, _ := NewRSCode(1, 2) + f := newFile(t.Name(), rsc, pieceSize, 1000, 0777, "") contracts := make(map[string]modules.RenterContract) if f.Expiration(contracts) != 0 { t.Error("file with no pieces should report as having no time remaining") @@ -245,10 +245,9 @@ func TestRenterFileListLocalPath(t *testing.T) { defer rt.Close() id := rt.renter.mu.Lock() f := newTestingFile() + f.SetLocalPath("TestPath") rt.renter.files[f.SiaPath()] = f - rt.renter.persist.Tracking[f.SiaPath()] = trackedFile{ - RepairPath: "TestPath", - } + rt.renter.persist.Tracking[f.SiaPath()] = f rt.renter.mu.Unlock(id) files := rt.renter.FileList() if len(files) != 1 { @@ -414,7 +413,7 @@ func TestRenterRenameFile(t *testing.T) { } // Renaming should also update the tracking set - rt.renter.persist.Tracking["1"] = trackedFile{"foo"} + rt.renter.persist.Tracking["1"] = f2 err = rt.renter.RenameFile("1", "1b") if err != nil { t.Fatal(err) diff --git a/modules/renter/persist.go b/modules/renter/persist.go index 56761fac16..5691302f57 100644 --- a/modules/renter/persist.go +++ b/modules/renter/persist.go @@ -55,7 +55,7 @@ type ( MaxDownloadSpeed int64 MaxUploadSpeed int64 StreamCacheSize uint64 - Tracking map[string]trackedFile + Tracking map[string]*siafile.SiaFile } ) @@ -247,7 +247,7 @@ func (r *Renter) loadSiaFiles() error { // load fetches the saved renter data from disk. func (r *Renter) loadSettings() error { r.persist = persistence{ - Tracking: make(map[string]trackedFile), + Tracking: make(map[string]*siafile.SiaFile), } err := persist.LoadJSON(settingsMetadata, &r.persist, filepath.Join(r.persistDir, PersistFilename)) if os.IsNotExist(err) { @@ -489,7 +489,7 @@ func convertPersistVersionFrom040To133(path string) error { Version: persistVersion040, } p := persistence{ - Tracking: make(map[string]trackedFile), + Tracking: make(map[string]*siafile.SiaFile), } err := persist.LoadJSON(metadata, &p, path) diff --git a/modules/renter/persist_test.go b/modules/renter/persist_test.go index 9c8b7ea0ee..504875ad67 100644 --- a/modules/renter/persist_test.go +++ b/modules/renter/persist_test.go @@ -23,7 +23,7 @@ func newTestingFile() *siafile.SiaFile { name := "testfile-" + strconv.Itoa(int(data[0])) - return siafile.New(name, rsc, pieceSize, 1000) + return newFile(name, rsc, pieceSize, 1000, 0777, "") } // equalFiles is a helper function that compares two files for equality. diff --git a/modules/renter/renter.go b/modules/renter/renter.go index a29d2f4f62..5f6503d8e1 100644 --- a/modules/renter/renter.go +++ b/modules/renter/renter.go @@ -152,15 +152,6 @@ type hostContractor interface { SetRateLimits(int64, int64, uint64) } -// A trackedFile contains metadata about files being tracked by the Renter. -// Tracked files are actively repaired by the Renter. By default, files -// uploaded by the user are tracked, and files that are added (via loading a -// .sia file) are not. -type trackedFile struct { - // location of original file on disk - RepairPath string -} - // A Renter is responsible for tracking all of the files that a user has // uploaded to Sia, as well as the locations and health of these files. // diff --git a/modules/renter/siafile/compat.go b/modules/renter/siafile/compat.go index 5f046ce22a..1b099e9f48 100644 --- a/modules/renter/siafile/compat.go +++ b/modules/renter/siafile/compat.go @@ -40,18 +40,17 @@ func NewFromFileData(fd FileData) *SiaFile { pieceSize: fd.PieceSize, siaPath: fd.Name, }, - deleted: fd.Deleted, - erasureCode: fd.ErasureCode, - uid: fd.UID, + deleted: fd.Deleted, + uid: fd.UID, } - chunks := make([]Chunk, file.NumChunks()) - for i := range chunks { - chunks[i].erasureCodeType = [4]byte{0, 0, 0, 1} - binary.LittleEndian.PutUint32(chunks[i].erasureCodeParams[0:4], uint32(file.erasureCode.MinPieces())) - binary.LittleEndian.PutUint32(chunks[i].erasureCodeParams[4:8], uint32(file.erasureCode.NumPieces()-file.erasureCode.MinPieces())) - chunks[i].pieces = make([][]Piece, file.erasureCode.NumPieces()) + file.chunks = make([]Chunk, len(fd.Chunks)) + for i := range file.chunks { + file.chunks[i].erasureCode = fd.ErasureCode + file.chunks[i].erasureCodeType = [4]byte{0, 0, 0, 1} + binary.LittleEndian.PutUint32(file.chunks[i].erasureCodeParams[0:4], uint32(file.chunks[i].erasureCode.MinPieces())) + binary.LittleEndian.PutUint32(file.chunks[i].erasureCodeParams[4:8], uint32(file.chunks[i].erasureCode.NumPieces()-file.chunks[i].erasureCode.MinPieces())) + file.chunks[i].pieces = make([][]Piece, file.chunks[i].erasureCode.NumPieces()) } - file.chunks = chunks // Populate the pubKeyTable of the file and add the pieces. pubKeyMap := make(map[string]int) @@ -83,7 +82,7 @@ func (sf *SiaFile) ExportFileData() FileData { Name: sf.metadata.siaPath, FileSize: uint64(sf.metadata.fileSize), MasterKey: sf.metadata.masterKey, - ErasureCode: sf.erasureCode, + ErasureCode: sf.chunks[0].erasureCode, PieceSize: sf.metadata.pieceSize, Mode: sf.metadata.mode, Deleted: sf.deleted, diff --git a/modules/renter/siafile/metadata.go b/modules/renter/siafile/metadata.go index 1190ba70d4..bc80df911d 100644 --- a/modules/renter/siafile/metadata.go +++ b/modules/renter/siafile/metadata.go @@ -5,7 +5,6 @@ import ( "os" "time" - "github.com/NebulousLabs/Sia/build" "github.com/NebulousLabs/Sia/crypto" "github.com/NebulousLabs/Sia/modules" "github.com/NebulousLabs/Sia/types" @@ -14,12 +13,12 @@ import ( type ( // Metadata is the metadata of a SiaFile and is JSON encoded. Metadata struct { - version [16]byte // version of the sia file format used - fileSize int64 // total size of the file - masterKey crypto.TwofishKey // masterkey used to encrypt pieces - pieceSize uint64 // size of a single piece of the file - trackingPath string // file to the local copy of the file used for repairing - siaPath string + version [16]byte // version of the sia file format used + fileSize int64 // total size of the file + masterKey crypto.TwofishKey // masterkey used to encrypt pieces + pieceSize uint64 // size of a single piece of the file + localPath string // file to the local copy of the file used for repairing + siaPath string // the path of the file on the Sia network // The following fields are the usual unix timestamps of files. modTime time.Time // time of last content modification @@ -32,10 +31,10 @@ type ( uid int // id of the user who owns the file gid int // id of the group that owns the file - // chunkHeaderSize is the size of each of the following chunk's metadata. - chunkHeaderSize uint64 - // chunkBodySize is the size of each of the following chunk's bodies. - chunkBodySize uint64 + // chunkMetadataSize is the amount of space allocated within the + // siafile for the metadata of a single chunk. It allows us to do + // random access operations on the file in constant time. + chunkMetadataSize uint64 // The following fields are the offsets for data that is written to disk // after the pubKeyTable. We reserve a generous amount of space for the @@ -43,9 +42,9 @@ type ( // need to resize later on. // // chunkOffset is the offset of the first chunk, forced to be a factor of - // 4096, default 16kib + // 4096, default 4kib // - // pubKeyTableOffset is the office of the publicKeyTable within the + // pubKeyTableOffset is the offset of the publicKeyTable within the // file. // chunkOffset int64 @@ -53,37 +52,11 @@ type ( } ) -// Available indicates whether the file is ready to be downloaded. -func (sf *SiaFile) Available(offline map[string]bool) bool { - sf.mu.RLock() - defer sf.mu.RUnlock() - // We need to find at least erasureCode.MinPieces different pieces for each - // chunk for the file to be available. - for _, chunk := range sf.chunks { - piecesForChunk := 0 - for _, pieceSet := range chunk.pieces { - for _, piece := range pieceSet { - if !offline[string(piece.HostPubKey.Key)] { - piecesForChunk++ - break // break out since we only count unique pieces - } - } - if piecesForChunk >= sf.erasureCode.MinPieces() { - break // we already have enough pieces for this chunk. - } - } - if piecesForChunk < sf.erasureCode.MinPieces() { - return false // this chunk isn't available. - } - } - return true -} - // ChunkSize returns the size of a single chunk of the file. -func (sf *SiaFile) ChunkSize() uint64 { +func (sf *SiaFile) ChunkSize(chunkIndex uint64) uint64 { sf.mu.RLock() defer sf.mu.RUnlock() - return sf.chunkSize() + return sf.chunkSize(chunkIndex) } // Delete removes the file from disk and marks it as deleted. Once the file is @@ -131,6 +104,13 @@ func (sf *SiaFile) HostPublicKeys() []types.SiaPublicKey { return sf.pubKeyTable } +// LocalPath returns the path of the local data of the file. +func (sf *SiaFile) LocalPath() string { + sf.mu.RLock() + defer sf.mu.RUnlock() + return sf.metadata.localPath +} + // MasterKey returns the masterkey used to encrypt the file. func (sf *SiaFile) MasterKey() crypto.TwofishKey { sf.mu.RLock() @@ -152,78 +132,6 @@ func (sf *SiaFile) PieceSize() uint64 { return sf.metadata.pieceSize } -// Redundancy returns the redundancy of the least redundant chunk. A file -// becomes available when this redundancy is >= 1. Assumes that every piece is -// unique within a file contract. -1 is returned if the file has size 0. It -// takes one argument, a map of offline contracts for this file. -func (sf *SiaFile) Redundancy(offlineMap map[string]bool, goodForRenewMap map[string]bool) float64 { - sf.mu.RLock() - defer sf.mu.RUnlock() - if sf.metadata.fileSize == 0 { - return -1 - } - - minPiecesRenew := ^uint64(0) - minPiecesNoRenew := ^uint64(0) - for _, chunk := range sf.chunks { - // Loop over chunks and remember how many unique pieces of the chunk - // were goodForRenew and how many were not. - numPiecesRenew := uint64(0) - numPiecesNoRenew := uint64(0) - for _, pieceSet := range chunk.pieces { - // Remember if we encountered a goodForRenew piece or a - // !goodForRenew piece that was at least online. - foundGoodForRenew := false - foundOnline := false - for _, piece := range pieceSet { - offline, exists1 := offlineMap[string(piece.HostPubKey.Key)] - goodForRenew, exists2 := goodForRenewMap[string(piece.HostPubKey.Key)] - if exists1 != exists2 { - build.Critical("contract can't be in one map but not in the other") - } - if !exists1 || offline { - continue - } - // If we found a goodForRenew piece we can stop. - if goodForRenew { - foundGoodForRenew = true - break - } - // Otherwise we continue since there might be other hosts with - // the same piece that are goodForRenew. We still remember that - // we found an online piece though. - foundOnline = true - } - if foundGoodForRenew { - numPiecesRenew++ - numPiecesNoRenew++ - } else if foundOnline { - numPiecesNoRenew++ - } - } - // Remember the smallest number of goodForRenew pieces encountered. - if numPiecesRenew < minPiecesRenew { - minPiecesRenew = numPiecesRenew - } - // Remember the smallest number of !goodForRenew pieces encountered. - if numPiecesNoRenew < minPiecesNoRenew { - minPiecesNoRenew = numPiecesNoRenew - } - } - - // If the redundancy is smaller than 1x we return the redundancy that - // includes contracts that are not good for renewal. The reason for this is - // a better user experience. If the renter operates correctly, redundancy - // should never go above numPieces / minPieces and redundancyNoRenew should - // never go below 1. - redundancy := float64(minPiecesRenew) / float64(sf.erasureCode.MinPieces()) - redundancyNoRenew := float64(minPiecesNoRenew) / float64(sf.erasureCode.MinPieces()) - if redundancy < 1 { - return redundancyNoRenew - } - return redundancy -} - // Rename changes the name of the file to a new one. // TODO: This will actually rename the file on disk once we persist the new // file format. @@ -241,6 +149,14 @@ func (sf *SiaFile) SetMode(mode os.FileMode) { sf.metadata.mode = mode } +// SetLocalPath changes the local path of the file which is used to repair +// the file from disk. +func (sf *SiaFile) SetLocalPath(path string) { + sf.mu.Lock() + defer sf.mu.Unlock() + sf.metadata.localPath = path +} + // SiaPath returns the file's sia path. func (sf *SiaFile) SiaPath() string { sf.mu.RLock() @@ -278,11 +194,14 @@ func (sf *SiaFile) UploadedBytes() uint64 { // reaches 100%, and UploadProgress may report a value greater than 100%. func (sf *SiaFile) UploadProgress() float64 { uploaded := sf.UploadedBytes() - desired := modules.SectorSize * uint64(sf.ErasureCode().NumPieces()) * sf.NumChunks() + var desired uint64 + for i := uint64(0); i < sf.NumChunks(); i++ { + desired += modules.SectorSize * uint64(sf.ErasureCode(i).NumPieces()) + } return math.Min(100*(float64(uploaded)/float64(desired)), 100) } // ChunkSize returns the size of a single chunk of the file. -func (sf *SiaFile) chunkSize() uint64 { - return sf.metadata.pieceSize * uint64(sf.erasureCode.MinPieces()) +func (sf *SiaFile) chunkSize(chunkIndex uint64) uint64 { + return sf.metadata.pieceSize * uint64(sf.chunks[chunkIndex].erasureCode.MinPieces()) } diff --git a/modules/renter/siafile/siafile.go b/modules/renter/siafile/siafile.go index 3261f36314..f79185b239 100644 --- a/modules/renter/siafile/siafile.go +++ b/modules/renter/siafile/siafile.go @@ -2,11 +2,12 @@ package siafile import ( "bytes" - "encoding/base32" "encoding/binary" "fmt" + "os" "sync" + "github.com/NebulousLabs/Sia/build" "github.com/NebulousLabs/Sia/modules" "github.com/NebulousLabs/Sia/types" "github.com/NebulousLabs/fastrand" @@ -35,10 +36,9 @@ type ( chunks []Chunk // utility fields. These are not persisted. - deleted bool - erasureCode modules.ErasureCoder - mu sync.RWMutex - uid string + deleted bool + mu sync.RWMutex + uid string } // Chunk represents a single chunk of a file on disk @@ -56,6 +56,7 @@ type ( // erasureCodeType [4]byte erasureCodeParams [8]byte + erasureCode modules.ErasureCoder // extensionInfo is some reserved space for each chunk that allows us // to indicate if a chunk is special. @@ -74,30 +75,33 @@ type ( ) // New create a new SiaFile. -func New(siaPath string, erasureCode modules.ErasureCoder, pieceSize, fileSize uint64) *SiaFile { +// TODO needs changes once we move persistence over. +func New(siaPath string, erasureCode []modules.ErasureCoder, pieceSize, fileSize uint64, fileMode os.FileMode, source string) *SiaFile { file := &SiaFile{ metadata: Metadata{ fileSize: int64(fileSize), + localPath: source, masterKey: crypto.GenerateTwofishKey(), + mode: fileMode, pieceSize: pieceSize, siaPath: siaPath, }, - erasureCode: erasureCode, - uid: base32.StdEncoding.EncodeToString(fastrand.Bytes(20))[:20], + uid: string(fastrand.Bytes(20)), } - chunks := make([]Chunk, file.NumChunks()) - for i := range chunks { - chunks[i].erasureCodeType = [4]byte{0, 0, 0, 1} - binary.LittleEndian.PutUint32(chunks[i].erasureCodeParams[0:4], uint32(erasureCode.MinPieces())) - binary.LittleEndian.PutUint32(chunks[i].erasureCodeParams[4:8], uint32(erasureCode.NumPieces()-erasureCode.MinPieces())) - chunks[i].pieces = make([][]Piece, erasureCode.NumPieces()) + file.chunks = make([]Chunk, len(erasureCode)) + for i := range file.chunks { + file.chunks[i].erasureCode = erasureCode[i] + file.chunks[i].erasureCodeType = [4]byte{0, 0, 0, 1} + binary.LittleEndian.PutUint32(file.chunks[i].erasureCodeParams[0:4], uint32(erasureCode[i].MinPieces())) + binary.LittleEndian.PutUint32(file.chunks[i].erasureCodeParams[4:8], uint32(erasureCode[i].NumPieces()-erasureCode[i].MinPieces())) + file.chunks[i].pieces = make([][]Piece, erasureCode[i].NumPieces()) } - file.chunks = chunks return file } // AddPiece adds an uploaded piece to the file. It also updates the host table // if the public key of the host is not aleady known. +// TODO needs changes once we move persistence over. func (sf *SiaFile) AddPiece(pk types.SiaPublicKey, chunkIndex, pieceIndex uint64, merkleRoot crypto.Hash) error { sf.mu.Lock() defer sf.mu.Unlock() @@ -131,11 +135,50 @@ func (sf *SiaFile) AddPiece(pk types.SiaPublicKey, chunkIndex, pieceIndex uint64 return nil } +// Available indicates whether the file is ready to be downloaded. +func (sf *SiaFile) Available(offline map[string]bool) bool { + sf.mu.RLock() + defer sf.mu.RUnlock() + // We need to find at least erasureCode.MinPieces different pieces for each + // chunk for the file to be available. + for chunkIndex, chunk := range sf.chunks { + piecesForChunk := 0 + for _, pieceSet := range chunk.pieces { + for _, piece := range pieceSet { + if !offline[string(piece.HostPubKey.Key)] { + piecesForChunk++ + break // break out since we only count unique pieces + } + } + if piecesForChunk >= sf.chunks[chunkIndex].erasureCode.MinPieces() { + break // we already have enough pieces for this chunk. + } + } + if piecesForChunk < sf.chunks[chunkIndex].erasureCode.MinPieces() { + return false // this chunk isn't available. + } + } + return true +} + +// ChunkIndexByOffset will return the chunkIndex that contains the provided +// offset of a file and also the relative offset within the chunk. If the +// offset is out of bounds, chunkIndex will be equal to NumChunk(). +func (sf *SiaFile) ChunkIndexByOffset(offset uint64) (chunkIndex uint64, off uint64) { + for chunkIndex := uint64(0); chunkIndex < uint64(len(sf.chunks)); chunkIndex++ { + if sf.chunkSize(chunkIndex) > offset { + return chunkIndex, offset + } + offset -= sf.chunkSize(chunkIndex) + } + return +} + // ErasureCode returns the erasure coder used by the file. -func (sf *SiaFile) ErasureCode() modules.ErasureCoder { +func (sf *SiaFile) ErasureCode(chunkIndex uint64) modules.ErasureCoder { sf.mu.RLock() defer sf.mu.RUnlock() - return sf.erasureCode + return sf.chunks[chunkIndex].erasureCode } // NumChunks returns the number of chunks the file consists of. This will @@ -144,16 +187,7 @@ func (sf *SiaFile) ErasureCode() modules.ErasureCoder { func (sf *SiaFile) NumChunks() uint64 { sf.mu.RLock() defer sf.mu.RUnlock() - // empty files still need at least one chunk - if sf.metadata.fileSize == 0 { - return 1 - } - n := uint64(sf.metadata.fileSize) / sf.chunkSize() - // last chunk will be padded, unless chunkSize divides file evenly. - if uint64(sf.metadata.fileSize)%sf.chunkSize() != 0 { - n++ - } - return n + return uint64(len(sf.chunks)) } // Pieces returns all the pieces for a chunk in a slice of slices that contains @@ -173,6 +207,78 @@ func (sf *SiaFile) Pieces(chunkIndex uint64) ([][]Piece, error) { return pieces, nil } +// Redundancy returns the redundancy of the least redundant chunk. A file +// becomes available when this redundancy is >= 1. Assumes that every piece is +// unique within a file contract. -1 is returned if the file has size 0. It +// takes one argument, a map of offline contracts for this file. +func (sf *SiaFile) Redundancy(offlineMap map[string]bool, goodForRenewMap map[string]bool) float64 { + sf.mu.RLock() + defer sf.mu.RUnlock() + if sf.metadata.fileSize == 0 { + return -1 + } + + minPiecesRenew := ^uint64(0) + minPiecesNoRenew := ^uint64(0) + for _, chunk := range sf.chunks { + // Loop over chunks and remember how many unique pieces of the chunk + // were goodForRenew and how many were not. + numPiecesRenew := uint64(0) + numPiecesNoRenew := uint64(0) + for _, pieceSet := range chunk.pieces { + // Remember if we encountered a goodForRenew piece or a + // !goodForRenew piece that was at least online. + foundGoodForRenew := false + foundOnline := false + for _, piece := range pieceSet { + offline, exists1 := offlineMap[string(piece.HostPubKey.Key)] + goodForRenew, exists2 := goodForRenewMap[string(piece.HostPubKey.Key)] + if exists1 != exists2 { + build.Critical("contract can't be in one map but not in the other") + } + if !exists1 || offline { + continue + } + // If we found a goodForRenew piece we can stop. + if goodForRenew { + foundGoodForRenew = true + break + } + // Otherwise we continue since there might be other hosts with + // the same piece that are goodForRenew. We still remember that + // we found an online piece though. + foundOnline = true + } + if foundGoodForRenew { + numPiecesRenew++ + numPiecesNoRenew++ + } else if foundOnline { + numPiecesNoRenew++ + } + } + // Remember the smallest number of goodForRenew pieces encountered. + if numPiecesRenew < minPiecesRenew { + minPiecesRenew = numPiecesRenew + } + // Remember the smallest number of !goodForRenew pieces encountered. + if numPiecesNoRenew < minPiecesNoRenew { + minPiecesNoRenew = numPiecesNoRenew + } + } + + // If the redundancy is smaller than 1x we return the redundancy that + // includes contracts that are not good for renewal. The reason for this is + // a better user experience. If the renter operates correctly, redundancy + // should never go above numPieces / minPieces and redundancyNoRenew should + // never go below 1. + redundancy := float64(minPiecesRenew) / float64(sf.chunks[0].erasureCode.MinPieces()) // TODO this shouldn't be chunks[0] + redundancyNoRenew := float64(minPiecesNoRenew) / float64(sf.chunks[0].erasureCode.MinPieces()) //TODO this shouldn't be chunks[0] + if redundancy < 1 { + return redundancyNoRenew + } + return redundancy +} + // UID returns a unique identifier for this file. func (sf *SiaFile) UID() string { sf.mu.RLock() diff --git a/modules/renter/upload.go b/modules/renter/upload.go index 576c28e238..bbb9eb1b8d 100644 --- a/modules/renter/upload.go +++ b/modules/renter/upload.go @@ -28,6 +28,23 @@ var ( errUploadDirectory = errors.New("cannot upload directory") ) +// newFile is a helper to more easily create a new Siafile for testing. +func newFile(name string, rsc modules.ErasureCoder, pieceSize, fileSize uint64, mode os.FileMode, source string) *siafile.SiaFile { + numChunks := 1 + chunkSize := pieceSize * uint64(rsc.MinPieces()) + if fileSize > 0 { + numChunks = int(fileSize / chunkSize) + if fileSize%chunkSize != 0 { + numChunks++ + } + } + ecs := make([]modules.ErasureCoder, numChunks) + for i := 0; i < numChunks; i++ { + ecs[i] = rsc + } + return siafile.New(name, ecs, pieceSize, fileSize, mode, source) +} + // validateSource verifies that a sourcePath meets the // requirements for upload. func validateSource(sourcePath string) error { @@ -82,15 +99,12 @@ func (r *Renter) Upload(up modules.FileUploadParams) error { } // Create file object. - f := siafile.New(up.SiaPath, up.ErasureCode, pieceSize, uint64(fileInfo.Size())) - f.SetMode(fileInfo.Mode()) + f := newFile(up.SiaPath, up.ErasureCode, pieceSize, uint64(fileInfo.Size()), fileInfo.Mode(), up.Source) // Add file to renter. lockID = r.mu.Lock() r.files[up.SiaPath] = f - r.persist.Tracking[up.SiaPath] = trackedFile{ - RepairPath: up.Source, - } + r.persist.Tracking[up.SiaPath] = f r.saveSync() err = r.saveFile(f) r.mu.Unlock(lockID) diff --git a/modules/renter/uploadchunk.go b/modules/renter/uploadchunk.go index 5daed93ad4..98e640128d 100644 --- a/modules/renter/uploadchunk.go +++ b/modules/renter/uploadchunk.go @@ -177,7 +177,7 @@ func (r *Renter) managedDownloadLogicalChunkData(chunk *unfinishedUploadChunk) e func (r *Renter) managedFetchAndRepairChunk(chunk *unfinishedUploadChunk) { // Calculate the amount of memory needed for erasure coding. This will need // to be released if there's an error before erasure coding is complete. - erasureCodingMemory := chunk.renterFile.PieceSize() * uint64(chunk.renterFile.ErasureCode().MinPieces()) + erasureCodingMemory := chunk.renterFile.PieceSize() * uint64(chunk.renterFile.ErasureCode(chunk.index).MinPieces()) // Calculate the amount of memory to release due to already completed // pieces. This memory gets released during encryption, but needs to be @@ -222,7 +222,7 @@ func (r *Renter) managedFetchAndRepairChunk(chunk *unfinishedUploadChunk) { // fact to reduce the total memory required to create the physical data. // That will also change the amount of memory we need to allocate, and the // number of times we need to return memory. - chunk.physicalChunkData, err = chunk.renterFile.ErasureCode().EncodeShards(chunk.logicalChunkData) + chunk.physicalChunkData, err = chunk.renterFile.ErasureCode(chunk.index).EncodeShards(chunk.logicalChunkData) chunk.logicalChunkData = nil r.memoryManager.Return(erasureCodingMemory) chunk.memoryReleased += erasureCodingMemory diff --git a/modules/renter/uploadheap.go b/modules/renter/uploadheap.go index ee0fa3c85d..39114ce8e9 100644 --- a/modules/renter/uploadheap.go +++ b/modules/renter/uploadheap.go @@ -111,7 +111,14 @@ func (r *Renter) buildUnfinishedChunks(f *siafile.SiaFile, hosts map[string]stru } // If we don't have enough workers for the file, don't repair it right now. - if len(r.workerPool) < f.ErasureCode().MinPieces() { + minWorkers := 0 + for i := uint64(0); i < f.NumChunks(); i++ { + minPieces := f.ErasureCode(i).MinPieces() + if minPieces > minWorkers { + minWorkers = minPieces + } + } + if len(r.workerPool) < minWorkers { return nil } @@ -125,7 +132,7 @@ func (r *Renter) buildUnfinishedChunks(f *siafile.SiaFile, hosts map[string]stru for i := uint64(0); i < chunkCount; i++ { newUnfinishedChunks[i] = &unfinishedUploadChunk{ renterFile: f, - localPath: trackedFile.RepairPath, + localPath: trackedFile.LocalPath(), id: uploadChunkID{ fileUID: f.UID(), @@ -133,8 +140,8 @@ func (r *Renter) buildUnfinishedChunks(f *siafile.SiaFile, hosts map[string]stru }, index: i, - length: f.ChunkSize(), - offset: int64(i * f.ChunkSize()), + length: f.ChunkSize(i), + offset: int64(i * f.ChunkSize(i)), // memoryNeeded has to also include the logical data, and also // include the overhead for encryption. @@ -145,13 +152,13 @@ func (r *Renter) buildUnfinishedChunks(f *siafile.SiaFile, hosts map[string]stru // TODO: Currently we request memory for all of the pieces as well // as the minimum pieces, but we perhaps don't need to request all // of that. - memoryNeeded: f.PieceSize()*uint64(f.ErasureCode().NumPieces()+f.ErasureCode().MinPieces()) + uint64(f.ErasureCode().NumPieces()*crypto.TwofishOverhead), - minimumPieces: f.ErasureCode().MinPieces(), - piecesNeeded: f.ErasureCode().NumPieces(), + memoryNeeded: f.PieceSize()*uint64(f.ErasureCode(i).NumPieces()+f.ErasureCode(i).MinPieces()) + uint64(f.ErasureCode(i).NumPieces()*crypto.TwofishOverhead), + minimumPieces: f.ErasureCode(i).MinPieces(), + piecesNeeded: f.ErasureCode(i).NumPieces(), - physicalChunkData: make([][]byte, f.ErasureCode().NumPieces()), + physicalChunkData: make([][]byte, f.ErasureCode(i).NumPieces()), - pieceUsage: make([]bool, f.ErasureCode().NumPieces()), + pieceUsage: make([]bool, f.ErasureCode(i).NumPieces()), unusedHosts: make(map[string]struct{}), } // Every chunk can have a different set of unused hosts. @@ -275,8 +282,8 @@ func (r *Renter) managedBuildChunkHeap(hosts map[string]struct{}) { if exists { // Check if local file is missing and redundancy is less than 1 // log warning to renter log - if _, err := os.Stat(tf.RepairPath); os.IsNotExist(err) && file.Redundancy(offline, goodForRenew) < 1 { - r.log.Println("File not found on disk and possibly unrecoverable:", tf.RepairPath) + if _, err := os.Stat(tf.LocalPath()); os.IsNotExist(err) && file.Redundancy(offline, goodForRenew) < 1 { + r.log.Println("File not found on disk and possibly unrecoverable:", tf.LocalPath()) } } }