Skip to content

Commit

Permalink
fix(trace): use LastValue for open connections (#58)
Browse files Browse the repository at this point in the history
This commit replaces an erroneous Sum aggregation for a LastValue
aggregation for open connections.
  • Loading branch information
enocom committed Feb 3, 2022
1 parent 016a821 commit 4ee6bea
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 50 deletions.
21 changes: 10 additions & 11 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/cloudsqlconn/errtypes"
Expand Down Expand Up @@ -144,10 +145,6 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
}

if err := trace.InitMetrics(); err != nil {
// This error means the internal metric configuration is incorrect and
// should never be surfaced to callers, as there's nothing actionable
// for a caller to do. Ignoring the error seems worse and so we return
// it.
return nil, err
}
d := &Dialer{
Expand Down Expand Up @@ -219,11 +216,15 @@ func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption)
}
latency := time.Since(startTime).Milliseconds()
go func() {
n := atomic.AddUint64(&i.OpenConns, 1)
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, i.String())
trace.RecordDialLatency(ctx, instance, d.dialerID, latency)
trace.RecordConnectionOpen(ctx, instance, d.dialerID)
}()

return newInstrumentedConn(tlsConn, instance, d.dialerID), nil
return newInstrumentedConn(tlsConn, func() {
n := atomic.AddUint64(&i.OpenConns, ^uint64(0))
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, i.String())
}), nil
}

// EngineVersion returns the engine type and version for the instance. The value will
Expand All @@ -243,12 +244,10 @@ func (d *Dialer) EngineVersion(ctx context.Context, instance string) (string, er

// newInstrumentedConn initializes an instrumentedConn that on closing will
// decrement the number of open connects and record the result.
func newInstrumentedConn(conn net.Conn, instance, dialerID string) *instrumentedConn {
func newInstrumentedConn(conn net.Conn, closeFunc func()) *instrumentedConn {
return &instrumentedConn{
Conn: conn,
closeFunc: func() {
trace.RecordConnectionClose(context.Background(), instance, dialerID)
},
Conn: conn,
closeFunc: closeFunc,
}
}

Expand Down
16 changes: 9 additions & 7 deletions dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,15 @@ func TestIAMAuthn(t *testing.T) {
}

for _, tc := range tcs {
d, err := NewDialer(context.Background(), tc.opts)
if err != nil {
t.Errorf("NewDialer failed with error = %v", err)
}
if gotTokenSource := d.iamTokenSource != nil; gotTokenSource != tc.wantTokenSource {
t.Errorf("%v, want = %v, got = %v", tc.desc, tc.wantTokenSource, gotTokenSource)
}
t.Run(tc.desc, func(t *testing.T) {
d, err := NewDialer(context.Background(), tc.opts)
if err != nil {
t.Fatalf("NewDialer failed with error = %v", err)
}
if gotTokenSource := d.iamTokenSource != nil; gotTokenSource != tc.wantTokenSource {
t.Fatalf("want = %v, got = %v", tc.wantTokenSource, gotTokenSource)
}
})
}
}

Expand Down
8 changes: 8 additions & 0 deletions internal/cloudsql/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Instance struct {
// replacement to occur.
next *refreshResult

// OpenConns is the number of open connections to the instance.
OpenConns uint64

// ctx is the default ctx for refresh operations. Canceling it prevents new refresh
// operations from being triggered.
ctx context.Context
Expand Down Expand Up @@ -269,3 +272,8 @@ func (i *Instance) scheduleRefresh(d time.Duration) *refreshResult {
})
return res
}

// String returns the instance's connection name.
func (i *Instance) String() string {
return i.connName.String()
}
59 changes: 27 additions & 32 deletions internal/trace/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package trace
import (
"context"
"fmt"
"sync"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
Expand All @@ -26,14 +27,18 @@ import (
var (
keyInstance, _ = tag.NewKey("cloudsql_instance")
keyDialerID, _ = tag.NewKey("cloudsql_dialer_id")
)

var (
mLatencyMS = stats.Int64(
"/cloudsqlconn/latency",
"The latency in milliseconds per Dial",
stats.UnitMilliseconds,
)
mConnections = stats.Int64(
"/cloudsqlconn/connection",
"A connect or disconnect event to Cloud SQL",
stats.UnitDimensionless,
)

latencyView = &view.View{
Name: "/cloudsqlconn/dial_latency",
Measure: mLatencyMS,
Expand All @@ -42,23 +47,30 @@ var (
Aggregation: view.Distribution(0, 5, 25, 100, 250, 500, 1000, 2000, 5000, 30000),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}
)

var (
mConnections = stats.Int64(
"/cloudsqlconn/connection",
"A connect or disconnect event to Cloud SQL",
stats.UnitDimensionless,
)
connectionsView = &view.View{
Name: "/cloudsqlconn/open_connections",
Measure: mConnections,
Description: "The sum of Cloud SQL connections",
Aggregation: view.Sum(),
Description: "The current number of open Cloud SQL connections",
Aggregation: view.LastValue(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}

registerOnce sync.Once
registerErr error
)

// InitMetrics registers all views once. Without registering views, metrics will
// not be reported. If any names of the registered views conflict, this function
// returns an error to indicate an internal configuration problem.
func InitMetrics() error {
registerOnce.Do(func() {
if rErr := view.Register(latencyView, connectionsView); rErr != nil {
registerErr = fmt.Errorf("failed to initialize metrics: %v", rErr)
}
})
return registerErr
}

// RecordDialLatency records a latency value for a call to dial.
func RecordDialLatency(ctx context.Context, instance, dialerID string, latency int64) {
// tag.New creates a new context and errors only if the new tag already
Expand All @@ -69,26 +81,9 @@ func RecordDialLatency(ctx context.Context, instance, dialerID string, latency i
stats.Record(ctx, mLatencyMS.M(latency))
}

// RecordConnectionOpen reports a connection event.
func RecordConnectionOpen(ctx context.Context, instance, dialerID string) {
// Why are we ignoring this error? See above under RecordDialLatency.
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mConnections.M(1))
}

// RecordConnectionClose records a disconnect event.
func RecordConnectionClose(ctx context.Context, instance, dialerID string) {
// RecordOpenConnections records the number of open connections
func RecordOpenConnections(ctx context.Context, num int64, dialerID, instance string) {
// Why are we ignoring this error? See above under RecordDialLatency.
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mConnections.M(-1))
}

// InitMetrics registers all views. Without registering views, metrics will not
// be reported. If any names of the registered views conflict, this function
// returns an error to indicate a configuration problem.
func InitMetrics() error {
if err := view.Register(latencyView, connectionsView); err != nil {
return fmt.Errorf("failed to initialize metrics: %v", err)
}
return nil
stats.Record(ctx, mConnections.M(num))
}

0 comments on commit 4ee6bea

Please sign in to comment.