Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Sia file format #3113

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 44 additions & 32 deletions modules/renter/download.go
Expand Up @@ -132,6 +132,7 @@ import (
"time"

"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/modules/renter/siafile"
"github.com/NebulousLabs/Sia/persist"
"github.com/NebulousLabs/Sia/types"

Expand Down Expand Up @@ -178,7 +179,7 @@ type (
destination downloadDestination // The place to write the downloaded data.
destinationType string // "file", "buffer", "http stream", etc.
destinationString string // The string to report to the user for the destination.
file *file // The file to download.
file *siafile.SiaFile // The file to download.

latencyTarget time.Duration // Workers above this latency will be automatically put on standby initially.
length uint64 // Length of download. Cannot be 0.
Expand Down Expand Up @@ -285,16 +286,16 @@ func (r *Renter) managedDownload(p modules.RenterDownloadParameters) (*download,
if p.Destination != "" && !filepath.IsAbs(p.Destination) {
return nil, errors.New("destination must be an absolute path")
}
if p.Offset == file.size {
if p.Offset == file.Size() {
return nil, errors.New("offset equals filesize")
}
// Sentinel: if length == 0, download the entire file.
if p.Length == 0 {
p.Length = file.size - p.Offset
p.Length = file.Size() - p.Offset
}
// Check whether offset and length is valid.
if p.Offset < 0 || p.Offset+p.Length > file.size {
return nil, fmt.Errorf("offset and length combination invalid, max byte is at index %d", file.size-1)
if p.Offset < 0 || p.Offset+p.Length > file.Size() {
return nil, fmt.Errorf("offset and length combination invalid, max byte is at index %d", file.Size()-1)
}

// Instantiate the correct downloadWriter implementation.
Expand All @@ -304,7 +305,7 @@ func (r *Renter) managedDownload(p modules.RenterDownloadParameters) (*download,
dw = newDownloadDestinationWriteCloserFromWriter(p.Httpwriter)
destinationType = "http stream"
} else {
osFile, err := os.OpenFile(p.Destination, os.O_CREATE|os.O_WRONLY, os.FileMode(file.mode))
osFile, err := os.OpenFile(p.Destination, os.O_CREATE|os.O_WRONLY, file.Mode())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,7 +353,7 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) {
if params.offset < 0 {
return nil, errors.New("download offset cannot be a negative number")
}
if params.offset+params.length > params.file.size {
if params.offset+params.length > params.file.Size() {
return nil, errors.New("download is requesting data past the boundary of the file")
}

Expand All @@ -369,57 +370,68 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) {
staticLength: params.length,
staticOffset: params.offset,
staticOverdrive: params.overdrive,
staticSiaPath: params.file.name,
staticSiaPath: params.file.SiaPath(),
staticPriority: params.priority,

log: r.log,
memoryManager: r.memoryManager,
}

// Determine which chunks to download.
minChunk := params.offset / params.file.staticChunkSize()
maxChunk := (params.offset + params.length - 1) / params.file.staticChunkSize()
minChunk, minChunkOffset := params.file.ChunkIndexByOffset(params.offset)
maxChunk, maxChunkOffset := params.file.ChunkIndexByOffset(params.offset + params.length)
if minChunk == params.file.NumChunks() || maxChunk == params.file.NumChunks() {
return nil, errors.New("download is requesting a chunk that is past the boundary of the file")
}
// If the maxChunkOffset is exactly 0 we need to subtract 1 chunk. e.g. if
// the chunkSize is 100 bytes and we want to download 100 bytes from offset
// 0, maxChunk would be 1 and maxChunkOffset would be 0. We want maxChunk
// to be 0 though since we don't actually need any data from chunk 1.
if maxChunk > 0 && maxChunkOffset == 0 {
maxChunk--
}

// For each chunk, assemble a mapping from the contract id to the index of
// the piece within the chunk that the contract is responsible for.
chunkMaps := make([]map[string]downloadPieceInfo, maxChunk-minChunk+1)
for i := range chunkMaps {
chunkMaps[i] = make(map[string]downloadPieceInfo)
}
params.file.mu.Lock()
for id, contract := range params.file.contracts {
resolvedKey := r.hostContractor.ResolveIDToPubKey(id)
for _, piece := range contract.Pieces {
if piece.Chunk >= minChunk && piece.Chunk <= maxChunk {
for chunkIndex := minChunk; chunkIndex <= maxChunk; chunkIndex++ {
// Create the map.
chunkMaps[chunkIndex-minChunk] = make(map[string]downloadPieceInfo)
// Get the pieces for the chunk.
pieces, err := params.file.Pieces(uint64(chunkIndex))
if err != nil {
return nil, err
}
for pieceIndex, pieceSet := range pieces {
for _, piece := range pieceSet {
// Sanity check - the same worker should not have two pieces for
// the same chunk.
_, exists := chunkMaps[piece.Chunk-minChunk][string(resolvedKey.Key)]
_, exists := chunkMaps[chunkIndex-minChunk][string(piece.HostPubKey.Key)]
if exists {
r.log.Println("ERROR: Worker has multiple pieces uploaded for the same chunk.")
}
chunkMaps[piece.Chunk-minChunk][string(resolvedKey.Key)] = downloadPieceInfo{
index: piece.Piece,
chunkMaps[chunkIndex-minChunk][string(piece.HostPubKey.Key)] = downloadPieceInfo{
index: uint64(pieceIndex),
root: piece.MerkleRoot,
}
}
}
}
params.file.mu.Unlock()

// Queue the downloads for each chunk.
writeOffset := int64(0) // where to write a chunk within the download destination.
d.chunksRemaining += maxChunk - minChunk + 1
for i := minChunk; i <= maxChunk; i++ {
udc := &unfinishedDownloadChunk{
destination: params.destination,
erasureCode: params.file.erasureCode,
masterKey: params.file.masterKey,
erasureCode: params.file.ErasureCode(i),
masterKey: params.file.MasterKey(),

staticChunkIndex: i,
staticCacheID: fmt.Sprintf("%v:%v", d.staticSiaPath, i),
staticChunkMap: chunkMaps[i-minChunk],
staticChunkSize: params.file.staticChunkSize(),
staticPieceSize: params.file.pieceSize,
staticChunkSize: params.file.ChunkSize(i),
staticPieceSize: params.file.PieceSize(),

// TODO: 25ms is just a guess for a good default. Really, we want to
// set the latency target such that slower workers will pick up the
Expand All @@ -434,8 +446,8 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) {
staticNeedsMemory: params.needsMemory,
staticPriority: params.priority,

physicalChunkData: make([][]byte, params.file.erasureCode.NumPieces()),
pieceUsage: make([]bool, params.file.erasureCode.NumPieces()),
physicalChunkData: make([][]byte, params.file.ErasureCode(i).NumPieces()),
pieceUsage: make([]bool, params.file.ErasureCode(i).NumPieces()),

download: d,
staticStreamCache: r.staticStreamCache,
Expand All @@ -444,16 +456,16 @@ func (r *Renter) managedNewDownload(params downloadParams) (*download, error) {
// Set the fetchOffset - the offset within the chunk that we start
// downloading from.
if i == minChunk {
udc.staticFetchOffset = params.offset % params.file.staticChunkSize()
udc.staticFetchOffset = minChunkOffset
} else {
udc.staticFetchOffset = 0
}
// Set the fetchLength - the number of bytes to fetch within the chunk
// that we start downloading from.
if i == maxChunk && (params.length+params.offset)%params.file.staticChunkSize() != 0 {
udc.staticFetchLength = ((params.length + params.offset) % params.file.staticChunkSize()) - udc.staticFetchOffset
if i == maxChunk && maxChunkOffset != 0 {
udc.staticFetchLength = maxChunkOffset - udc.staticFetchOffset
} else {
udc.staticFetchLength = params.file.staticChunkSize() - udc.staticFetchOffset
udc.staticFetchLength = params.file.ChunkSize(i) - udc.staticFetchOffset
}
// Set the writeOffset within the destination for where the data should
// be written.
Expand Down
23 changes: 12 additions & 11 deletions modules/renter/downloadstreamer.go
Expand Up @@ -7,14 +7,15 @@ import (
"math"
"time"

"github.com/NebulousLabs/Sia/modules/renter/siafile"
"github.com/NebulousLabs/errors"
)

type (
// streamer is a io.ReadSeeker that can be used to stream downloads from
// the sia network.
streamer struct {
file *file
file *siafile.SiaFile
offset int64
r *Renter
}
Expand All @@ -38,15 +39,15 @@ func (r *Renter) Streamer(siaPath string) (string, io.ReadSeeker, error) {
lockID := r.mu.RLock()
file, exists := r.files[siaPath]
r.mu.RUnlock(lockID)
if !exists || file.deleted {
if !exists || file.Deleted() {
return "", nil, fmt.Errorf("no file with that path: %s", siaPath)
}
// Create the streamer
s := &streamer{
file: file,
r: r,
}
return file.name, s, nil
return file.SiaPath(), s, nil
}

// Read implements the standard Read interface. It will download the requested
Expand All @@ -55,20 +56,22 @@ func (r *Renter) Streamer(siaPath string) (string, io.ReadSeeker, error) {
// only request a single chunk at once.
func (s *streamer) Read(p []byte) (n int, err error) {
// Get the file's size
s.file.mu.RLock()
fileSize := int64(s.file.size)
s.file.mu.RUnlock()
fileSize := int64(s.file.Size())

// Make sure we haven't reached the EOF yet.
if s.offset >= fileSize {
return 0, io.EOF
}

// Calculate how much we can download. We never download more than a single chunk.
chunkSize := s.file.staticChunkSize()
chunkIndex, chunkOffset := s.file.ChunkIndexByOffset(uint64(s.offset))
if chunkIndex == s.file.NumChunks() {
return 0, io.EOF
}
chunkSize := s.file.ChunkSize(chunkIndex)
remainingData := uint64(fileSize - s.offset)
requestedData := uint64(len(p))
remainingChunk := chunkSize - uint64(s.offset)%chunkSize
remainingChunk := chunkSize - chunkOffset
length := min(remainingData, requestedData, remainingChunk)

// Download data
Expand Down Expand Up @@ -127,9 +130,7 @@ func (s *streamer) Seek(offset int64, whence int) (int64, error) {
case io.SeekCurrent:
newOffset = s.offset
case io.SeekEnd:
s.file.mu.RLock()
newOffset = int64(s.file.size)
s.file.mu.RUnlock()
newOffset = int64(s.file.Size())
}
newOffset += offset

Expand Down