Skip to content

Commit

Permalink
Use SDAM for MongoBetween( v2 ) (#55)
Browse files Browse the repository at this point in the history
* Use SDAM for MongoBetween

* Lint and Salus

* Improvements to logging

* Updates to make this flag-based instead of environment based, also moved logic to config section

* More style fixes

* Style fix

* Unit test fix
  • Loading branch information
d2army committed Feb 14, 2023
1 parent fe46b56 commit ca3d8d7
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 85 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ Usage: mongobetween [OPTIONS] address1=uri1 [address2=uri2] ...
MongoDB username
-dynamic string
File or URL to query for dynamic configuration
-enable-sdam-metrics
Enable SDAM(Server Discovery And Monitoring) metrics
-enable-sdam-logging
Enable SDAM(Server Discovery And Monitoring) logging
```

TCP socket example:
Expand Down
223 changes: 199 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"encoding/json"
"errors"
"flag"
"fmt"
Expand All @@ -9,29 +10,35 @@ import (
"strings"

"github.com/DataDog/datadog-go/statsd"
"go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/coinbase/mongobetween/mongo"
"github.com/coinbase/mongobetween/proxy"
"github.com/coinbase/mongobetween/util"
)

const usernamePlaceholder = "_"
const defaultStatsdAddress = "localhost:8125"

var validNetworks = []string{"tcp", "tcp4", "tcp6", "unix", "unixpacket"}

var newStatsdClientInit = newStatsdClient

type Config struct {
network string
unlink bool
ping bool
pretty bool
clients []client
statsd string
level zapcore.Level
dynamic string
network string
unlink bool
ping bool
pretty bool
clients []client
level zapcore.Level
dynamic string
statsdaddr string
logger *zap.Logger
statsd *statsd.Client
}

type client struct {
Expand All @@ -58,20 +65,23 @@ func (c *Config) Pretty() bool {
return c.pretty
}

func (c *Config) Proxies(log *zap.Logger) (proxies []*proxy.Proxy, err error) {
sd, err := statsd.New(c.statsd, statsd.WithNamespace("mongobetween"))
if err != nil {
return nil, err
}
func (c *Config) Logger() *zap.Logger {
return c.logger
}

func (c *Config) Statsd() *statsd.Client {
return c.statsd
}

func (c *Config) Proxies(log *zap.Logger) (proxies []*proxy.Proxy, err error) {
d, err := proxy.NewDynamic(c.dynamic, log)
if err != nil {
return nil, err
}

mongos := make(map[string]*mongo.Mongo)
for _, client := range c.clients {
m, err := mongo.Connect(log, sd, client.opts, c.ping)
m, err := mongo.Connect(log, c.statsd, client.opts, c.ping)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +92,7 @@ func (c *Config) Proxies(log *zap.Logger) (proxies []*proxy.Proxy, err error) {
}

for _, client := range c.clients {
p, err := proxy.NewProxy(log, sd, client.label, c.network, client.address, c.unlink, mongoLookup, d)
p, err := proxy.NewProxy(log, c.statsd, client.label, c.network, client.address, c.unlink, mongoLookup, d)
if err != nil {
return nil, err
}
Expand All @@ -106,7 +116,7 @@ func parseFlags() (*Config, error) {
flag.PrintDefaults()
}

var unlink, ping, pretty bool
var unlink, ping, pretty, enableSdamMetrics, enableSdamLogging bool
var network, username, password, stats, loglevel, dynamic string
flag.StringVar(&network, "network", "tcp4", "One of: tcp, tcp4, tcp6, unix or unixpacket")
flag.StringVar(&username, "username", "", "MongoDB username")
Expand All @@ -117,6 +127,8 @@ func parseFlags() (*Config, error) {
flag.BoolVar(&pretty, "pretty", false, "Pretty print logging")
flag.StringVar(&loglevel, "loglevel", "info", "One of: debug, info, warn, error, dpanic, panic, fatal")
flag.StringVar(&dynamic, "dynamic", "", "File or URL to query for dynamic configuration")
flag.BoolVar(&enableSdamMetrics, "enable-sdam-metrics", false, "Enable SDAM(Server Discovery And Monitoring) metrics")
flag.BoolVar(&enableSdamLogging, "enable-sdam-logging", false, "Enable SDAM(Server Discovery And Monitoring) logging")

flag.Parse()

Expand Down Expand Up @@ -161,12 +173,19 @@ func parseFlags() (*Config, error) {
return nil, errors.New("missing address=uri(s)")
}

loggerClient := newLogger(level, pretty)
statsdClient, err := newStatsdClientInit(stats)
if err != nil {
return nil, err
}

var clients []client
for address, uri := range addressMap {
label, opts, err := clientOptions(uri, username, password)
if err != nil {
return nil, err
}
initMonitoring(opts, statsdClient, loggerClient, enableSdamMetrics, enableSdamLogging)
clients = append(clients, client{
address: address,
label: label,
Expand All @@ -175,14 +194,16 @@ func parseFlags() (*Config, error) {
}

return &Config{
network: network,
unlink: unlink,
ping: ping,
pretty: pretty,
clients: clients,
statsd: stats,
level: level,
dynamic: dynamic,
network: network,
unlink: unlink,
ping: ping,
pretty: pretty,
statsdaddr: stats,
clients: clients,
level: level,
dynamic: dynamic,
logger: loggerClient,
statsd: statsdClient,
}, nil
}

Expand Down Expand Up @@ -232,6 +253,13 @@ func clientOptions(uri, username, password string) (string, *options.ClientOptio
return label, opts, nil
}

func initMonitoring(opts *options.ClientOptions, statsd *statsd.Client, logger *zap.Logger, enableSdamMetrics bool, enableSdamLogging bool) *options.ClientOptions {
// set up monitors for Pool and Server(SDAM)
opts = opts.SetPoolMonitor(poolMonitor(statsd))
opts = opts.SetServerMonitor(serverMonitoring(logger, statsd, enableSdamMetrics, enableSdamLogging))
return opts
}

func uriWorkaround(uri, username string) string {
// Workaround for a feature in the Mongo driver URI parsing where you can't set a URI
// without setting the username ("error parsing uri: authsource without username is
Expand All @@ -248,3 +276,150 @@ func uriWorkaround(uri, username string) string {
}
return uri
}

func newStatsdClient(statsAddress string) (*statsd.Client, error) {
return statsd.New(statsAddress, statsd.WithNamespace("mongobetween"))
}

func newLogger(level zapcore.Level, pretty bool) *zap.Logger {
var c zap.Config
if pretty {
c = zap.NewDevelopmentConfig()
c.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
} else {
c = zap.NewProductionConfig()
}

c.EncoderConfig.MessageKey = "message"
c.Level.SetLevel(level)

log, err := c.Build(zap.AddStacktrace(zap.FatalLevel))
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to initialize logger: %v\n", err)
os.Exit(1)
}

return log
}

func poolMonitor(sd *statsd.Client) *event.PoolMonitor {
checkedOut, checkedIn := util.StatsdBackgroundGauge(sd, "pool.checked_out_connections", []string{})
opened, closed := util.StatsdBackgroundGauge(sd, "pool.open_connections", []string{})

return &event.PoolMonitor{
Event: func(e *event.PoolEvent) {
snake := strings.ToLower(regexp.MustCompile("([a-z0-9])([A-Z])").ReplaceAllString(e.Type, "${1}_${2}"))
name := fmt.Sprintf("pool_event.%s", snake)
tags := []string{
fmt.Sprintf("address:%s", e.Address),
fmt.Sprintf("reason:%s", e.Reason),
}
switch e.Type {
case event.ConnectionCreated:
opened(name, tags)
case event.ConnectionClosed:
closed(name, tags)
case event.GetSucceeded:
checkedOut(name, tags)
case event.ConnectionReturned:
checkedIn(name, tags)
default:
_ = sd.Incr(name, tags, 1)
}
},
}
}

func serverMonitoring(log *zap.Logger, statsdClient *statsd.Client, enableSdamMetrics bool, enableSdamLogging bool) *event.ServerMonitor {

return &event.ServerMonitor{
ServerOpening: func(e *event.ServerOpeningEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("server_opening_event",
[]string{
fmt.Sprintf("address:%s", e.Address),
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}
},

ServerClosed: func(e *event.ServerClosedEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("server_closed_event",
[]string{
fmt.Sprintf("address:%s", e.Address),
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}
},

ServerDescriptionChanged: func(e *event.ServerDescriptionChangedEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("server_description_changed_event",
[]string{
fmt.Sprintf("address:%s", e.Address),
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}

if enableSdamLogging {
var prevDMap map[string]interface{}
var newDMap map[string]interface{}

prevDescription, _ := json.Marshal(&e.PreviousDescription)
_ = json.Unmarshal(prevDescription, &prevDMap)
newDescription, _ := json.Marshal(e.NewDescription)
_ = json.Unmarshal(newDescription, &newDMap)

log.Info("ServerDescriptionChangedEvent detected. ",
zap.Any("address", e.Address),
zap.String("topologyId", e.TopologyID.Hex()),
zap.Any("prevDescription", prevDMap),
zap.Any("newDescription", newDMap),
)
}
},

TopologyDescriptionChanged: func(e *event.TopologyDescriptionChangedEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("topology_description_changed_event",
[]string{
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}
if enableSdamLogging {
var prevDMap map[string]interface{}
var newDMap map[string]interface{}

prevDescription, _ := json.Marshal(&e.PreviousDescription)
_ = json.Unmarshal(prevDescription, &prevDMap)
newDescription, _ := json.Marshal(e.NewDescription)
_ = json.Unmarshal(newDescription, &newDMap)

log.Info("TopologyDescriptionChangedEvent detected. ",
zap.String("topologyId", e.TopologyID.Hex()),
zap.Any("prevDescription", prevDMap),
zap.Any("newDescription", newDMap),
)
}
},

TopologyOpening: func(e *event.TopologyOpeningEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("topology_opening_event",
[]string{
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}
},

TopologyClosed: func(e *event.TopologyClosedEvent) {
if enableSdamMetrics {
_ = statsdClient.Incr("topology_closed_event",
[]string{
fmt.Sprintf("topology_id:%s", e.TopologyID.Hex()),
}, 0)
}
},
}
}
27 changes: 26 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@ package config

import (
"flag"
"github.com/DataDog/datadog-go/statsd"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zapcore"
"os"
"testing"
"time"
)

type statsdWriterWrapper struct{}

func (statsdWriterWrapper) SetWriteTimeout(time.Duration) error {
return nil
}

func (statsdWriterWrapper) Close() error {
return nil
}

func (statsdWriterWrapper) Write(p []byte) (n int, err error) {
return 0, nil
}

func resetFlags() {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}
Expand All @@ -29,12 +45,21 @@ func TestParseFlags(t *testing.T) {
"/tmp/mongo2.sock=mongodb://localhost:27128/database?maxpoolsize=10&label=cluster2",
}

//mock out newStatsdClient
originalFunc := newStatsdClientInit
newStatsdClientInit = func(stats string) (*statsd.Client, error) {
return statsd.NewWithWriter(statsdWriterWrapper{})
}
defer func() {
newStatsdClientInit = originalFunc
}()

resetFlags()
c, err := parseFlags()
assert.Nil(t, err)

assert.True(t, c.pretty)
assert.Equal(t, "statsd:1234", c.statsd)
assert.Equal(t, "statsd:1234", c.statsdaddr)
assert.Equal(t, zapcore.DebugLevel, c.LogLevel())
assert.Equal(t, "unix", c.network)
assert.True(t, c.ping)
Expand Down

0 comments on commit ca3d8d7

Please sign in to comment.