Skip to content

Commit

Permalink
Make rados connection short lived
Browse files Browse the repository at this point in the history
  • Loading branch information
yuezhu committed Oct 27, 2020
1 parent 3077483 commit 5571bf6
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 201 deletions.
45 changes: 27 additions & 18 deletions collectors/cluster_usage.go
Expand Up @@ -16,42 +16,46 @@ package collectors

import (
"encoding/json"
"log"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

const (
cephNamespace = "ceph"
)

// A ClusterUsageCollector is used to gather all the global stats about a given
// ceph cluster. It is sometimes essential to know how fast the cluster is growing
// or shrinking as a whole in order to zero in on the cause. The pool specific
// stats are provided separately.
// A ClusterUsageCollector is used to gather all the global stats about a
// given ceph cluster. It is sometimes essential to know how fast the cluster
// is growing or shrinking as a whole in order to zero in on the cause. The
// pool specific stats are provided separately.
type ClusterUsageCollector struct {
conn Conn
conn Conn
logger *logrus.Logger

// GlobalCapacity displays the total storage capacity of the cluster. This
// information is based on the actual no. of objects that are allocated. It
// does not take overcommitment into consideration.
// information is based on the actual no. of objects that are
// allocated. It does not take overcommitment into consideration.
GlobalCapacity prometheus.Gauge

// UsedCapacity shows the storage under use.
UsedCapacity prometheus.Gauge

// AvailableCapacity shows the remaining capacity of the cluster that is left unallocated.
// AvailableCapacity shows the remaining capacity of the cluster that is
// left unallocated.
AvailableCapacity prometheus.Gauge
}

// NewClusterUsageCollector creates and returns the reference to ClusterUsageCollector
// and internally defines each metric that display cluster stats.
func NewClusterUsageCollector(conn Conn, cluster string) *ClusterUsageCollector {
// NewClusterUsageCollector creates and returns the reference to
// ClusterUsageCollector and internally defines each metric that display
// cluster stats.
func NewClusterUsageCollector(conn Conn, cluster string, logger *logrus.Logger) *ClusterUsageCollector {
labels := make(prometheus.Labels)
labels["cluster"] = cluster

return &ClusterUsageCollector{
conn: conn,
conn: conn,
logger: logger,

GlobalCapacity: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: cephNamespace,
Expand Down Expand Up @@ -94,6 +98,10 @@ func (c *ClusterUsageCollector) collect() error {
cmd := c.cephUsageCommand()
buf, _, err := c.conn.MonCommand(cmd)
if err != nil {
c.logger.WithError(err).WithField(
"args", string(cmd),
).Error("error executing mon command")

return err
}

Expand All @@ -106,17 +114,17 @@ func (c *ClusterUsageCollector) collect() error {

totBytes, err = stats.Stats.TotalBytes.Float64()
if err != nil {
log.Println("[ERROR] cannot extract total bytes:", err)
c.logger.WithError(err).Error("error extracting total bytes")
}

usedBytes, err = stats.Stats.TotalUsedBytes.Float64()
if err != nil {
log.Println("[ERROR] cannot extract used bytes:", err)
c.logger.WithError(err).Error("error extracting used bytes")
}

availBytes, err = stats.Stats.TotalAvailBytes.Float64()
if err != nil {
log.Println("[ERROR] cannot extract available bytes:", err)
c.logger.WithError(err).Error("error extracting available bytes")
}

c.GlobalCapacity.Set(totBytes)
Expand All @@ -135,7 +143,7 @@ func (c *ClusterUsageCollector) cephUsageCommand() []byte {
if err != nil {
// panic! because ideally in no world this hard-coded input
// should fail.
panic(err)
c.logger.WithError(err).Panic("error marshalling ceph df detail")
}
return cmd
}
Expand All @@ -151,8 +159,9 @@ func (c *ClusterUsageCollector) Describe(ch chan<- *prometheus.Desc) {
// Collect sends the metric values for each metric pertaining to the global
// cluster usage over to the provided prometheus Metric channel.
func (c *ClusterUsageCollector) Collect(ch chan<- prometheus.Metric) {
c.logger.Debug("collecting cluster usage metrics")
if err := c.collect(); err != nil {
log.Println("[ERROR] failed collecting cluster usage metrics:", err)
c.logger.WithError(err).Error("error collecting cluster usage metrics")
return
}

Expand Down
6 changes: 2 additions & 4 deletions collectors/cluster_usage_test.go
Expand Up @@ -16,19 +16,17 @@ package collectors

import (
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"regexp"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

func TestClusterUsage(t *testing.T) {
log.SetOutput(ioutil.Discard)

for _, tt := range []struct {
input string
reMatch, reUnmatch []*regexp.Regexp
Expand Down Expand Up @@ -128,7 +126,7 @@ func TestClusterUsage(t *testing.T) {
},
} {
func() {
collector := NewClusterUsageCollector(NewNoopConn(tt.input), "ceph")
collector := NewClusterUsageCollector(NewNoopConn(tt.input), "ceph", logrus.New())
if err := prometheus.Register(collector); err != nil {
t.Fatalf("collector failed to register: %s", err)
}
Expand Down
148 changes: 104 additions & 44 deletions collectors/conn.go
Expand Up @@ -22,62 +22,140 @@ import (
"time"

"github.com/ceph/go-ceph/rados"
"github.com/sirupsen/logrus"
)

// Conn interface implements only necessary methods that are used
// in this repository of *rados.Conn. This keeps rest of the implementation
// clean and *rados.Conn doesn't need to show up everywhere (it being
// more of an implementation detail in reality). Also it makes mocking
// easier for unit-testing the collectors.
// Conn interface implements only necessary methods that are used in this
// repository on top of *rados.Conn. This keeps rest of the implementation
// clean and *rados.Conn doesn't need to show up everywhere (it being more of
// an implementation detail in reality). Also it makes mocking easier for
// unit-testing the collectors.
type Conn interface {
ReadDefaultConfigFile() error
Connect() error
Shutdown()
MonCommand([]byte) ([]byte, string, error)
OpenIOContext(string) (*rados.IOContext, error)
GetPoolStats(string) (*rados.PoolStat, error)
}

// Verify that *rados.Conn implements Conn correctly.
var _ Conn = &rados.Conn{}
// RadosConn implements the Conn interface with the underlying *rados.Conn
// that talks to a real Ceph cluster.
type RadosConn struct {
user string
configFile string
timeout time.Duration
logger *logrus.Logger
}

// CreateRadosConn creates an established rados connection to the Ceph cluster
// *RadosConn must implement the Conn.
var _ Conn = &RadosConn{}

// NewRadosConn returns a new RadosConn. Unlike the native rados.Conn, there
// is no need to manage the connection before/after talking to the rados; it
// is the responsibility of this *RadosConn to manage the connection.
func NewRadosConn(user, configFile string, timeout time.Duration, logger *logrus.Logger) *RadosConn {
return &RadosConn{
user: user,
configFile: configFile,
timeout: timeout,
logger: logger,
}
}

// newRadosConn creates an established rados connection to the Ceph cluster
// using the provided Ceph user and configFile. Ceph parameters
// rados_osd_op_timeout and rados_mon_op_timeout are specified by the timeout
// value, where 0 means no limit.
func CreateRadosConn(user, configFile string, timeout time.Duration) (*rados.Conn, error) {
conn, err := rados.NewConnWithUser(user)
func (c *RadosConn) newRadosConn() (*rados.Conn, error) {
conn, err := rados.NewConnWithUser(c.user)
if err != nil {
return nil, fmt.Errorf("cannot create new ceph connection: %s", err)
return nil, fmt.Errorf("error creating rados connection: %s", err)
}

err = conn.ReadConfigFile(configFile)
err = conn.ReadConfigFile(c.configFile)
if err != nil {
return nil, fmt.Errorf("cannot read ceph config file: %s", err)
return nil, fmt.Errorf("error reading config file: %s", err)
}

tv := strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)
tv := strconv.FormatFloat(c.timeout.Seconds(), 'f', -1, 64)
// Set rados_osd_op_timeout and rados_mon_op_timeout to avoid Mon
// and PG command hang.
// See
// https://github.com/ceph/ceph/blob/d4872ce97a2825afcb58876559cc73aaa1862c0f/src/common/legacy_config_opts.h#L1258-L1259
err = conn.SetConfigOption("rados_osd_op_timeout", tv)
if err != nil {
return nil, fmt.Errorf("cannot set rados_osd_op_timeout for ceph cluster: %s", err)
return nil, fmt.Errorf("error setting rados_osd_op_timeout: %s", err)
}

err = conn.SetConfigOption("rados_mon_op_timeout", tv)
if err != nil {
return nil, fmt.Errorf("cannot set rados_mon_op_timeout for ceph cluster: %s", err)
return nil, fmt.Errorf("error setting rados_mon_op_timeout: %s", err)
}

err = conn.Connect()
if err != nil {
return nil, fmt.Errorf("cannot connect to ceph cluster: %s", err)
return nil, fmt.Errorf("error connecting to rados: %s", err)
}

return conn, nil
}

// MonCommand executes a monitor command to rados.
func (c *RadosConn) MonCommand(args []byte) (buffer []byte, info string, err error) {
ll := c.logger.WithField("args", string(args))

ll.Trace("creating rados connection to execute mon command")

conn, err := c.newRadosConn()
if err != nil {
return nil, "", err
}
defer conn.Shutdown()

ll = ll.WithField("conn", conn.GetInstanceID())

ll.Trace("start executing mon command")

buffer, info, err = conn.MonCommand(args)

ll.WithError(err).Trace("complete executing mon command")

return
}

// GetPoolStats returns a *rados.PoolStat for the given rados pool.
func (c *RadosConn) GetPoolStats(pool string) (stat *rados.PoolStat, err error) {
ll := c.logger.WithField("pool", pool)

ll.Trace("creating rados connection to get pool stats")

conn, err := c.newRadosConn()
if err != nil {
return nil, err
}
defer conn.Shutdown()

ll = ll.WithField("conn", conn.GetInstanceID())

ll.Trace("opening IOContext for pool")

ioCtx, err := conn.OpenIOContext(pool)
if err != nil {
return nil, err
}
defer ioCtx.Destroy()

ll.Trace("start getting pool stats")

st, err := ioCtx.GetPoolStats()
if err != nil {
stat = nil
} else {
stat = &st
}

ll.WithError(err).Trace("complete getting pool stats")

return
}

// NoopConn is the stub we use for mocking rados Conn. Unit testing
// each individual collectors becomes a lot easier after that.
// TODO: both output and cmdOut provide the command outputs for "go test", but
Expand Down Expand Up @@ -114,23 +192,8 @@ func (n *NoopConn) IncIteration() {
n.iteration++
}

// ReadDefaultConfigFile does not need to return an error. It satisfies
// rados.Conn's function with the same prototype.
func (n *NoopConn) ReadDefaultConfigFile() error {
return nil
}

// Connect does not need to return an error. It satisfies
// rados.Conn's function with the same prototype.
func (n *NoopConn) Connect() error {
return nil
}

// Shutdown satisfies rados.Conn's function prototype.
func (n *NoopConn) Shutdown() {}

// MonCommand returns the provided output string to NoopConn as is, making
// it seem like it actually ran something and produced that string as a result.
// MonCommand returns the provided output string to NoopConn as is, making it
// seem like it actually ran something and produced that string as a result.
func (n *NoopConn) MonCommand(args []byte) ([]byte, string, error) {
// Unmarshal the input command and see if we need to intercept
cmd := map[string]interface{}{}
Expand Down Expand Up @@ -265,10 +328,7 @@ func (n *NoopConn) MonCommand(args []byte) ([]byte, string, error) {
return []byte(n.output), "", nil
}

// OpenIOContext always returns a nil rados.IOContext, and "not implemented"
// error. The OpenIOContext method in the rados package returns a pointer of
// rados.IOContext that contains an actual C.rados_ioctx_t, which is not
// available in this NoopConn.
func (n *NoopConn) OpenIOContext(pool string) (*rados.IOContext, error) {
return nil, errors.New("not implemented")
// GetPoolStats always returns a nil and "not implmemented" error.
func (n *NoopConn) GetPoolStats(pool string) (*rados.PoolStat, error) {
return nil, fmt.Errorf("not implemented")
}

0 comments on commit 5571bf6

Please sign in to comment.