Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

cleanup GC related code and comments #1166

Merged
merged 1 commit into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type AggMetric struct {
ttl uint32
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
lastWrite uint32
firstTs uint32
lastWrite uint32 // wall clock time of when last point was successfully added (possibly to the ROB)
firstTs uint32 // timestamp of first point seen
}

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
Expand Down Expand Up @@ -331,7 +331,7 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
return result, nil
}

// this function must only be called while holding the lock
// caller must hold lock
func (a *AggMetric) addAggregators(ts uint32, val float64) {
for _, agg := range a.aggregators {
log.Debugf("AM: %s pushing %d,%f to aggregator %d", a.Key, ts, val, agg.span)
Expand All @@ -340,7 +340,7 @@ func (a *AggMetric) addAggregators(ts uint32, val float64) {
}

// pushToCache adds the chunk into the cache if it is hot
// assumes lock held by caller
// caller must hold lock
func (a *AggMetric) pushToCache(c *chunk.Chunk) {
if a.cachePusher == nil {
return
Expand All @@ -355,9 +355,10 @@ func (a *AggMetric) pushToCache(c *chunk.Chunk) {
go a.cachePusher.AddIfHot(a.Key, 0, itergen)
}

// write a chunk to persistent storage. This should only be called while holding a.Lock()
// write a chunk to persistent storage.
// never persist a chunk that may receive further updates!
// (because the stores will read out chunk data on the unlocked chunk)
// caller must hold lock.
func (a *AggMetric) persist(pos int) {
chunk := a.Chunks[pos]
pre := time.Now()
Expand Down Expand Up @@ -443,7 +444,7 @@ func (a *AggMetric) Add(ts uint32, val float64) {
}

// don't ever call with a ts of 0, cause we use 0 to mean not initialized!
// assumes a write lock is held by the call-site
// caller must hold write lock
func (a *AggMetric) add(ts uint32, val float64) {
t0 := ts - (ts % a.ChunkSpan)

Expand Down Expand Up @@ -546,22 +547,18 @@ func (a *AggMetric) add(ts uint32, val float64) {
// any reasonable realtime stream (e.g. up to 15 min behind wall-clock)
// could add points to the chunk
//
// caller must hold AggMetric lock
// caller must hold lock
func (a *AggMetric) collectable(now, chunkMinTs uint32) bool {

var currentChunk *chunk.Chunk
if len(a.Chunks) != 0 {
currentChunk = a.getChunk(a.CurrentChunkPos)
}

// no chunks at all means "possibly collectable"
// the caller (AggMetric.GC()) still has its own checks to
// handle the "no chunks" correctly later.
// also: we want AggMetric.GC() to go ahead with flushing the ROB in this case
if currentChunk == nil {
if len(a.Chunks) == 0 {
return a.lastWrite < chunkMinTs
}

currentChunk := a.getChunk(a.CurrentChunkPos)
woodsaj marked this conversation as resolved.
Show resolved Hide resolved
return a.lastWrite < chunkMinTs && currentChunk.Series.T0+a.ChunkSpan+15*60 < now
}

Expand All @@ -572,7 +569,7 @@ func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) bool {
a.Lock()
defer a.Unlock()

// abort unless it looks like the AggMetric is collectable
// unless it looks like the AggMetric is collectable, abort and mark as not stale
if !a.collectable(now, chunkMinTs) {
return false
}
Expand Down Expand Up @@ -627,6 +624,7 @@ func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) bool {
return false
}

// gcAggregators returns whether all aggregators are stale and can be removed
func (a *AggMetric) gcAggregators(now, chunkMinTs, metricMinTs uint32) bool {
ret := true
for _, agg := range a.aggregators {
Expand Down
1 change: 1 addition & 0 deletions mdata/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (agg *Aggregator) Add(ts uint32, val float64) {
}
}

// GC returns whether all of the associated series are stale and can be removed
func (agg *Aggregator) GC(now, chunkMinTs, metricMinTs, lastWriteTime uint32) bool {
ret := true

Expand Down