Skip to content

Commit

Permalink
Merge pull request #241 from digitalocean/oldest-inactive-internal-poll
Browse files Browse the repository at this point in the history
osd: Internally poll PG dump for oldest active PG tracking
  • Loading branch information
baergj committed May 9, 2023
2 parents e6a0a46 + 5b487fa commit 76ec6f8
Showing 1 changed file with 54 additions and 47 deletions.
101 changes: 54 additions & 47 deletions ceph/osd.go
Expand Up @@ -29,12 +29,12 @@ import (

const (
osdLabelFormat = "osd.%v"
)

const (
scrubStateIdle = 0
scrubStateScrubbing = 1
scrubStateDeepScrubbing = 2

oldestInactivePGUpdatePeriod = 10 * time.Second
)

// OSDCollector displays statistics about OSD in the Ceph cluster.
Expand All @@ -54,9 +54,6 @@ type OSDCollector struct {
// a PG to not have an active state in it.
oldestInactivePGMap map[string]time.Time

// pgDumpBrief holds the content of PG dump brief
pgDumpBrief cephPGDumpBrief

// CrushWeight is a persistent setting, and it affects how CRUSH assigns data to OSDs.
// It displays the CRUSH weight for the OSD
CrushWeight *prometheus.GaugeVec
Expand Down Expand Up @@ -159,7 +156,7 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
labels["cluster"] = exporter.Cluster
osdLabels := []string{"osd", "device_class", "host", "rack", "root"}

return &OSDCollector{
o := &OSDCollector{
conn: exporter.Conn,
logger: exporter.Logger,

Expand Down Expand Up @@ -438,6 +435,9 @@ func NewOSDCollector(exporter *Exporter) *OSDCollector {
},
),
}

go o.oldestInactivePGLoop()
return o
}

func (o *OSDCollector) collectorList() []prometheus.Collector {
Expand Down Expand Up @@ -951,26 +951,31 @@ func (o *OSDCollector) collectOSDDump() error {

}

func (o *OSDCollector) performPGDumpBrief() error {
func (o *OSDCollector) performPGDumpBrief() (*cephPGDumpBrief, error) {
args := o.cephPGDumpCommand()
buf, _, err := o.conn.MgrCommand(args)
if err != nil {
o.logger.WithError(err).WithField(
"args", string(bytes.Join(args, []byte(","))),
).Error("error executing mgr command")

return err
return nil, err
}

o.pgDumpBrief = cephPGDumpBrief{}
if err := json.Unmarshal(buf, &o.pgDumpBrief); err != nil {
return err
pgDumpBrief := cephPGDumpBrief{}
if err := json.Unmarshal(buf, &pgDumpBrief); err != nil {
return nil, err
}

return nil
return &pgDumpBrief, nil
}

func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
pgDumpBrief, err := o.performPGDumpBrief()
if err != nil {
return err
}

// need to reset the PG scrub state since the scrub might have ended within
// the last prom scrape interval.
// This forces us to report scrub state on all previously discovered OSDs We
Expand All @@ -980,7 +985,7 @@ func (o *OSDCollector) collectOSDScrubState(ch chan<- prometheus.Metric) error {
o.osdScrubCache[i] = scrubStateIdle
}

for _, pg := range o.pgDumpBrief.PGStats {
for _, pg := range pgDumpBrief.PGStats {
if strings.Contains(pg.State, "scrubbing") {
scrubState := scrubStateScrubbing
if strings.Contains(pg.State, "deep") {
Expand Down Expand Up @@ -1070,36 +1075,46 @@ func (o *OSDCollector) cephPGDumpCommand() [][]byte {
return [][]byte{cmd}
}

func (o *OSDCollector) collectPGStates(ch chan<- prometheus.Metric) error {
// - See if there are PGs that we're tracking that are now active
// - See if there are new ones to add
// - Find the oldest one
now := time.Now()
oldestTime := now

for _, pg := range o.pgDumpBrief.PGStats {
// If we were tracking it, and it's now active, remove it
active := strings.Contains(pg.State, "active")
if active {
delete(o.oldestInactivePGMap, pg.PGID)
func (o *OSDCollector) oldestInactivePGLoop() {
for {
pgDumpBrief, err := o.performPGDumpBrief()
if err != nil {
o.logger.WithError(err).Warning("failed to get latest PG dump for oldest inactive PG update")
time.Sleep(oldestInactivePGUpdatePeriod)
continue
}

// Now see if it's not here, we'll need to track it now
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
if !ok {
pgTime = now
o.oldestInactivePGMap[pg.PGID] = now
}
// - See if there are PGs that we're tracking that are now active
// - See if there are new ones to add
// - Find the oldest one
now := time.Now()
oldestTime := now

for _, pg := range pgDumpBrief.PGStats {
// If we were tracking it, and it's now active, remove it
active := strings.Contains(pg.State, "active")
if active {
delete(o.oldestInactivePGMap, pg.PGID)
continue
}

// And finally, track our oldest time
if pgTime.Before(oldestTime) {
oldestTime = pgTime
// Now see if it's not here, we'll need to track it now
pgTime, ok := o.oldestInactivePGMap[pg.PGID]
if !ok {
pgTime = now
o.oldestInactivePGMap[pg.PGID] = now
}

// And finally, track our oldest time
if pgTime.Before(oldestTime) {
oldestTime = pgTime
}
}
}

o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))
return nil
o.OldestInactivePG.Set(float64(now.Unix() - oldestTime.Unix()))

time.Sleep(oldestInactivePGUpdatePeriod)
}
}

// Describe sends the descriptors of each OSDCollector related metrics we have
Expand Down Expand Up @@ -1169,21 +1184,13 @@ func (o *OSDCollector) Collect(ch chan<- prometheus.Metric, version *Version) {
localWg.Add(1)
go func() {
defer localWg.Done()
if err := o.performPGDumpBrief(); err != nil {
o.logger.WithError(err).Error("error collecting PG dump metrics")
if err := o.collectOSDScrubState(ch); err != nil {
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
}
}()

localWg.Wait()

// These don't run any mon/mgr commands, and are dependent on the goroutines completing
if err := o.collectOSDScrubState(ch); err != nil {
o.logger.WithError(err).Error("error collecting OSD scrub metrics")
}
if err := o.collectPGStates(ch); err != nil {
o.logger.WithError(err).Error("error collecting PG state metrics")
}

for _, metric := range o.collectorList() {
metric.Collect(ch)
}
Expand Down

0 comments on commit 76ec6f8

Please sign in to comment.