Skip to content

Commit

Permalink
worker: add missing interaction recorder (#795)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl committed Dec 6, 2023
1 parent ca2d363 commit 278b237
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
3 changes: 2 additions & 1 deletion worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,9 @@ func (w *worker) threadedUploadPackedSlabs(rs api.RedundancySettings, contractSe
}()

// keep uploading packed slabs until we're done
ctx := context.WithValue(w.shutdownCtx, keyInteractionRecorder, w)
for {
uploaded, err := w.uploadPackedSlabs(context.Background(), defaultPackedSlabsLockDuration, rs, contractSet, lockPriority)
uploaded, err := w.uploadPackedSlabs(ctx, defaultPackedSlabsLockDuration, rs, contractSet, lockPriority)
if err != nil {
w.logger.Errorf("couldn't upload packed slabs, err: %v", err)
return
Expand Down
11 changes: 9 additions & 2 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,10 @@ type worker struct {
contractSpendingRecorder *contractSpendingRecorder
contractLockingDuration time.Duration

transportPoolV3 *transportPoolV3
logger *zap.SugaredLogger
transportPoolV3 *transportPoolV3
logger *zap.SugaredLogger
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
}

func dial(ctx context.Context, hostIP string) (net.Conn, error) {
Expand Down Expand Up @@ -1393,6 +1395,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
return nil, errors.New("upload overdrive timeout must be positive")
}

ctx, cancel := context.WithCancel(context.Background())
w := &worker{
alerts: alerts.WithOrigin(b, fmt.Sprintf("worker.%s", id)),
allowPrivateIPs: allowPrivateIPs,
Expand All @@ -1404,6 +1407,8 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush
logger: l.Sugar().Named("worker").Named(id),
startTime: time.Now(),
uploadingPackedSlabs: make(map[string]bool),
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}
w.initTransportPool()
w.initAccounts(b)
Expand Down Expand Up @@ -1457,6 +1462,8 @@ func (w *worker) Handler() http.Handler {

// Shutdown shuts down the worker.
func (w *worker) Shutdown(_ context.Context) error {
w.shutdownCtxCancel()

w.interactionsMu.Lock()
if w.interactionsFlushTimer != nil {
w.interactionsFlushTimer.Stop()
Expand Down

0 comments on commit 278b237

Please sign in to comment.