Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
pushsync: move tag logic back to where it was
Browse files Browse the repository at this point in the history
  • Loading branch information
acud committed Oct 7, 2019
1 parent 49c0634 commit e04f33e
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
34 changes: 17 additions & 17 deletions pushsync/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ func (p *Pusher) sync() {
break
}

// increment synced count for the tag if exists
tag := item.tag
if tag != nil {
tag.Inc(chunk.StateSynced)
if tag.Done(chunk.StateSynced) {
p.logger.Debug("closing root span for tag", "taguid", tag.Uid, "tagname", tag.Name)
tag.FinishRootSpan()
}
// finish span for pushsync roundtrip, only have this span if we have a tag
item.span.Finish()
}

totalDuration := time.Since(item.sentAt)
metrics.GetOrRegisterResettingTimer("pusher.chunk.roundtrip", nil).Update(totalDuration)
metrics.GetOrRegisterCounter("pusher.receipts.synced", nil).Inc(1)
Expand All @@ -190,28 +202,16 @@ func (p *Pusher) sync() {
unsubscribe()
}

// delete from pushed items
for i := 0; i < len(syncedAddrs); i++ {
delete(p.pushed, syncedAddrs[i].Hex())
}

// set chunk status to synced, insert to db GC index
if err := p.store.Set(ctx, chunk.ModeSetSyncPush, syncedAddrs...); err != nil {
log.Error("pushsync: error setting chunks to synced", "err", err)
}

// delete from pushed items
for i := 0; i < len(syncedAddrs); i++ {
addr := syncedAddrs[i]
item := p.pushed[addr.Hex()]
// increment synced count for the tag if exists
tag := item.tag
if tag != nil {
if tag.Done(chunk.StateSynced) {
p.logger.Debug("closing root span for tag", "taguid", tag.Uid, "tagname", tag.Name)
tag.FinishRootSpan()
}
// finish span for pushsync roundtrip, only have this span if we have a tag
item.span.Finish()
}
delete(p.pushed, addr.Hex())
}

// reset synced list
syncedAddrs = nil

Expand Down
5 changes: 3 additions & 2 deletions storage/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
// moveToGc toggles the deletion of the item from pushsync index
// it will be false in the case pull sync was called but push sync was meant on that chunk (!tag.Anonymous)
moveToGc := true

if db.tags != nil {
i, err = db.pushIndex.Get(item)
switch err {
Expand All @@ -230,6 +229,9 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
if tag.Anonymous {
// this will not get called twice because we remove the item once after the !moveToGc check
tag.Inc(chunk.StateSent)
// this is needed since pushsync is checking if `tag.Done(chunk.StateSynced)` and when overlapping
// chunks are synced by both push and pull sync we have a problem. as an interim solution we increment this too
tag.Inc(chunk.StateSynced)
} else {
moveToGc = false
}
Expand All @@ -244,7 +246,6 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
}
}
if !moveToGc {
//panic(1)
return 0, nil
}
db.pushIndex.DeleteInBatch(batch, item)
Expand Down

0 comments on commit e04f33e

Please sign in to comment.