Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload Overdrive #797

Merged
merged 26 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f4c3cc2
worker: use sector
peterjan Dec 6, 2023
2ba2184
worker: cleanup
peterjan Dec 6, 2023
cdc9060
worker: fix NDF
peterjan Dec 6, 2023
0d1301a
worker: update log
peterjan Dec 6, 2023
15bf037
worker: add isUploaded
peterjan Dec 6, 2023
2e8fa39
worker: lock sector when grabbing overdrives
peterjan Dec 6, 2023
8283666
worker: add isUsed
peterjan Dec 6, 2023
b413cd3
testing: add logging
peterjan Dec 6, 2023
e65202f
worker: get rid of types.FileContractID in used/allowed fields
peterjan Dec 7, 2023
b393915
worker: refactor out back refs
peterjan Dec 7, 2023
29e455f
worker: no changes, fixes diff
peterjan Dec 7, 2023
adba742
worker: only sort candidates when creating an upload
peterjan Dec 7, 2023
ab85af9
testing: rework TestUploadDownloadSameHost
peterjan Dec 7, 2023
a09453b
worker: cleanup upload refactor
peterjan Dec 7, 2023
685a701
autopilot: fix log
peterjan Dec 7, 2023
91e708e
Merge branch 'master' of https://github.com/SiaFoundation/renterd int…
peterjan Dec 8, 2023
1f337ff
worker: update memory manager init
peterjan Dec 8, 2023
cfd8374
worker: update refresh uploaders
peterjan Dec 8, 2023
316160f
worker: refactor upload
peterjan Dec 8, 2023
7bb7e72
worker: cleanup upload types more
peterjan Dec 8, 2023
89370ce
worker: clean up the upload code
peterjan Dec 8, 2023
f311a25
worker: rename signalWork
peterjan Dec 8, 2023
bedd98b
worker: rework candidate
peterjan Dec 8, 2023
291f220
worker: update ongoing/interrupt
peterjan Dec 8, 2023
ea408a7
worker: add buffer, fix duplicate uploader
peterjan Dec 11, 2023
da1bf91
worker: track stats
peterjan Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion autopilot/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (ap *Autopilot) Run() error {
// Trace/Log worker id chosen for this maintenance iteration.
workerID, err := w.ID(ctx)
if err != nil {
ap.logger.Errorf("failed to fetch worker id - abort maintenance", err)
ap.logger.Errorf("aborting maintenance, failed to fetch worker id, err: %v", err)
return
}
span.SetAttributes(attribute.String("worker", workerID))
Expand Down
79 changes: 23 additions & 56 deletions internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@
})

// run uploads again
uploadDownload()

Check failure on line 879 in internal/testing/cluster_test.go

View workflow job for this annotation

GitHub Actions / test (macos-latest, 1.20)

Test go.sia.tech/renterd/internal/testing/TestUploadDownloadSpending failed in 27.49s

cluster_test.go:879: couldn't upload object: no candidate uploader found

// check that the spending was recorded
tt.Retry(100, testBusFlushInterval, func() error {
Expand Down Expand Up @@ -1250,74 +1250,41 @@

// create a test cluster
cluster := newTestCluster(t, testClusterOptions{
hosts: 1,
hosts: testRedundancySettings.TotalShards,
})
defer cluster.Shutdown()
tt := cluster.tt
b := cluster.Bus
w := cluster.Worker

// shut down the autopilot to prevent it from doing contract maintenance if any kind
cluster.ShutdownAutopilot(context.Background())

// get wallet address
wallet, err := cluster.Bus.Wallet(context.Background())
tt.OK(err)

ac, err := cluster.Worker.Contracts(context.Background(), time.Minute)
tt.OK(err)

contracts := ac.Contracts
if len(contracts) != 1 {
t.Fatal("expected 1 contract", len(contracts))
}
c := contracts[0]

// form 2 more contracts with the same host
rev2, _, err := cluster.Worker.RHPForm(context.Background(), c.WindowStart, c.HostKey, c.HostIP, wallet.Address, c.RenterFunds(), c.Revision.ValidHostPayout())
tt.OK(err)
c2, err := cluster.Bus.AddContract(context.Background(), rev2, types.ZeroCurrency, c.TotalCost, c.StartHeight, api.ContractStatePending)
tt.OK(err)
rev3, _, err := cluster.Worker.RHPForm(context.Background(), c.WindowStart, c.HostKey, c.HostIP, wallet.Address, c.RenterFunds(), c.Revision.ValidHostPayout())
tt.OK(err)
c3, err := cluster.Bus.AddContract(context.Background(), rev3, types.ZeroCurrency, c.TotalCost, c.StartHeight, api.ContractStatePending)
tt.OK(err)
// upload 3 objects so every host has 3 sectors
var err error
var res api.ObjectsResponse
shards := make(map[types.PublicKey][]object.Sector)
for i := 0; i < 3; i++ {
// upload object
tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(frand.Bytes(rhpv2.SectorSize)), api.DefaultBucketName, fmt.Sprintf("foo_%d", i), api.UploadObjectOptions{}))

// create a contract set with all 3 contracts
err = cluster.Bus.SetContractSet(context.Background(), testAutopilotConfig.Contracts.Set, []types.FileContractID{c.ID, c2.ID, c3.ID})
tt.OK(err)
// download object from bus and keep track of its shards
res, err = b.Object(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i), api.GetObjectOptions{})
tt.OK(err)
for _, shard := range res.Object.Slabs[0].Shards {
shards[shard.LatestHost] = append(shards[shard.LatestHost], shard)
}

// check the bus returns the desired contracts
up, err := cluster.Bus.UploadParams(context.Background())
tt.OK(err)
csc, err := cluster.Bus.ContractSetContracts(context.Background(), up.ContractSet)
tt.OK(err)
if len(csc) != 3 {
t.Fatal("expected 3 contracts", len(csc))
// delete the object
tt.OK(b.DeleteObject(context.Background(), api.DefaultBucketName, fmt.Sprintf("foo_%d", i), api.DeleteObjectOptions{}))
}

// upload a file
data := frand.Bytes(5*rhpv2.SectorSize + 1)
tt.OKAll(cluster.Worker.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, "foo", api.UploadObjectOptions{}))
// build a frankenstein object constructed with all sectors on the same host
res.Object.Slabs[0].Shards = shards[res.Object.Slabs[0].Shards[0].LatestHost]
tt.OK(b.AddObject(context.Background(), api.DefaultBucketName, "frankenstein", testContractSet, res.Object.Object, api.AddObjectOptions{}))

// Download the file multiple times.
var wg sync.WaitGroup
for tt := 0; tt < 3; tt++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
buf := &bytes.Buffer{}
if err := cluster.Worker.DownloadObject(context.Background(), buf, api.DefaultBucketName, "foo", api.DownloadObjectOptions{}); err != nil {
t.Error(err)
break
}
if !bytes.Equal(buf.Bytes(), data) {
t.Error("data mismatch")
break
}
}
}()
}
wg.Wait()
// assert we can download this object
tt.OK(w.DownloadObject(context.Background(), io.Discard, api.DefaultBucketName, "frankenstein", api.DownloadObjectOptions{}))
}

func TestContractArchival(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
id [8]byte

downloadManager struct {
mm *memoryManager
mm memoryManager
hp hostProvider
pss partialSlabStore
slm sectorLostMarker
Expand Down Expand Up @@ -159,15 +159,15 @@ type (
}
)

func (w *worker) initDownloadManager(mm *memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) {
func (w *worker) initDownloadManager(mm memoryManager, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) {
if w.downloadManager != nil {
panic("download manager already initialized") // developer error
}

w.downloadManager = newDownloadManager(w, w, mm, w.bus, maxOverdrive, overdriveTimeout, logger)
}

func newDownloadManager(hp hostProvider, pss partialSlabStore, mm *memoryManager, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager {
func newDownloadManager(hp hostProvider, pss partialSlabStore, mm memoryManager, slm sectorLostMarker, maxOverdrive uint64, overdriveTimeout time.Duration, logger *zap.SugaredLogger) *downloadManager {
return &downloadManager{
hp: hp,
mm: mm,
Expand Down Expand Up @@ -1122,7 +1122,7 @@ func (s *slabDownload) finish() ([][]byte, bool, error) {
}
}

return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d, inflight=%d, launched=%d, relaunched=%d, overpaid=%d, downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), unused, len(s.errs), s.errs)
return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), unused, len(s.errs), s.errs)
}
return s.sectors, s.numOverpaid > 0, nil
}
Expand Down
19 changes: 13 additions & 6 deletions worker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import (
type (
// memoryManager helps regulate processes that use a lot of memory. Such as
// uploads and downloads.
memoryManager struct {
memoryManager interface {
Status() api.MemoryStatus
AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory
}

manager struct {
peterjan marked this conversation as resolved.
Show resolved Hide resolved
totalAvailable uint64
logger *zap.SugaredLogger

Expand All @@ -22,17 +27,19 @@ type (
}

acquiredMemory struct {
mm *memoryManager
mm *manager

remaining uint64
}
)

func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (*memoryManager, error) {
var _ memoryManager = (*manager)(nil)

func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (memoryManager, error) {
if maxMemory == 0 {
return nil, fmt.Errorf("maxMemory cannot be 0")
}
mm := &memoryManager{
mm := &manager{
logger: logger,
totalAvailable: maxMemory,
}
Expand All @@ -41,7 +48,7 @@ func newMemoryManager(logger *zap.SugaredLogger, maxMemory uint64) (*memoryManag
return mm, nil
}

func (mm *memoryManager) Status() api.MemoryStatus {
func (mm *manager) Status() api.MemoryStatus {
mm.mu.Lock()
defer mm.mu.Unlock()
return api.MemoryStatus{
Expand All @@ -50,7 +57,7 @@ func (mm *memoryManager) Status() api.MemoryStatus {
}
}

func (mm *memoryManager) AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory {
func (mm *manager) AcquireMemory(ctx context.Context, amt uint64) *acquiredMemory {
if amt == 0 {
mm.logger.Fatal("cannot acquire 0 memory")
} else if mm.totalAvailable < amt {
Expand Down
Loading
Loading