Skip to content

Commit

Permalink
feat: add initial support for metrics (#40)
Browse files Browse the repository at this point in the history
This commit includes two metrics:

1. a metric that reports dial latency
2. a metric that reports open connections

Fixes #15.
  • Loading branch information
enocom committed Aug 25, 2021
1 parent f2de1e0 commit ee396ff
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 3 deletions.
55 changes: 53 additions & 2 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"cloud.google.com/go/cloudsqlconn/errtypes"
"cloud.google.com/go/cloudsqlconn/internal/cloudsql"
"cloud.google.com/go/cloudsqlconn/internal/trace"
"github.com/google/uuid"
"golang.org/x/net/proxy"
"google.golang.org/api/option"
sqladmin "google.golang.org/api/sqladmin/v1beta4"
Expand Down Expand Up @@ -73,6 +74,10 @@ type Dialer struct {
// defaultDialCfg holds the constructor level DialOptions, so that it can
// be copied and mutated by the Dial function.
defaultDialCfg dialCfg

// dialerID uniquely identifies a Dialer. Used for monitoring purposes,
// *only* when a client has configured OpenCensus exporters.
dialerID string
}

// NewDialer creates a new Dialer.
Expand Down Expand Up @@ -110,22 +115,33 @@ func NewDialer(ctx context.Context, opts ...DialerOption) (*Dialer, error) {
opt(&dialCfg)
}

if err := trace.InitMetrics(); err != nil {
// This error means the internal metric configuration is incorrect and
// should never be surfaced to callers, as there's nothing actionable
// for a caller to do. Ignoring the error seems worse and so we return
// it.
return nil, err
}
d := &Dialer{
instances: make(map[string]*cloudsql.Instance),
key: cfg.rsaKey,
refreshTimeout: cfg.refreshTimeout,
sqladmin: client,
defaultDialCfg: dialCfg,
dialerID: uuid.New().String(),
}
return d, nil
}

// Dial returns a net.Conn connected to the specified Cloud SQL instance. The instance argument must be the
// instance's connection name, which is in the format "project-name:region:instance-name".
func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption) (conn net.Conn, err error) {
startTime := time.Now()
var endDial trace.EndSpanFunc
ctx, endDial = trace.StartSpan(ctx, "cloud.google.com/go/cloudsqlconn.Dial",
trace.AddInstanceName(instance))
trace.AddInstanceName(instance),
trace.AddDialerID(d.dialerID),
)
defer func() { endDial(err) }()
cfg := d.defaultDialCfg
for _, opt := range opts {
Expand Down Expand Up @@ -171,7 +187,42 @@ func (d *Dialer) Dial(ctx context.Context, instance string, opts ...DialOption)
_ = tlsConn.Close() // best effort close attempt
return nil, errtypes.NewDialError("handshake failed", i.String(), err)
}
return tlsConn, nil
latency := time.Since(startTime).Milliseconds()
go func() {
trace.RecordDialLatency(ctx, instance, d.dialerID, latency)
trace.RecordConnectionOpen(ctx, instance, d.dialerID)
}()

return newInstrumentedConn(tlsConn, instance, d.dialerID), nil
}

// newInstrumentedConn initializes an instrumentedConn that on closing will
// decrement the number of open connects and record the result.
func newInstrumentedConn(conn net.Conn, instance, dialerID string) *instrumentedConn {
return &instrumentedConn{
Conn: conn,
closeFunc: func() {
trace.RecordConnectionClose(context.Background(), instance, dialerID)
},
}
}

// instrumentedConn wraps a net.Conn and invokes closeFunc when the connection
// is closed.
type instrumentedConn struct {
net.Conn
closeFunc func()
}

// Close delegates to the underylying net.Conn interface and reports the close
// to the provided closeFunc only when Close returns no error.
func (i *instrumentedConn) Close() error {
err := i.Conn.Close()
if err != nil {
return err
}
go i.closeFunc()
return nil
}

// Close closes the Dialer; it prevents the Dialer from refreshing the information
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
cloud.google.com/go v0.75.0 // indirect
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v4 v4.10.1
github.com/pkg/errors v0.9.1 // indirect
go.opencensus.io v0.22.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
3 changes: 2 additions & 1 deletion internal/trace/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package trace provides an interface for tracing internal operations.
// Package trace provides an interface for tracing internal operations and
// reporting various metrics. Metrics are recorded on a package-global basis.
package trace // import "cloud.google.com/go/cloudsqlconn/internal/trace"
80 changes: 80 additions & 0 deletions internal/trace/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package trace

import (
"context"
"fmt"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
keyInstance, _ = tag.NewKey("cloudsql_instance")
keyDialerID, _ = tag.NewKey("cloudsql_dialer_id")
)

var (
mLatencyMS = stats.Int64(
"/cloudsqlconn/latency",
"The latency in milliseconds per Dial",
stats.UnitMilliseconds,
)
latencyView = &view.View{
Name: "/cloudsqlconn/dial_latency",
Measure: mLatencyMS,
Description: "The distribution of dialer latencies (ms)",
// Latency in buckets, e.g., >=0ms, >=100ms, etc.
Aggregation: view.Distribution(0, 5, 25, 100, 250, 500, 1000, 2000, 5000, 30000),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}
)

var (
mConnections = stats.Int64(
"/cloudsqlconn/connection",
"A connect or disconnect event to Cloud SQL",
stats.UnitDimensionless,
)
connectionsView = &view.View{
Name: "/cloudsqlconn/open_connections",
Measure: mConnections,
Description: "The sum of Cloud SQL connections",
Aggregation: view.Sum(),
TagKeys: []tag.Key{keyInstance, keyDialerID},
}
)

// RecordDialLatency records a latency value for a call to dial.
func RecordDialLatency(ctx context.Context, instance, dialerID string, latency int64) {
// tag.New creates a new context and errors only if the new tag already
// exists in the provided context. Since we're adding tags within this
// package only, we can be confident that there were be no duplicate tags
// and so can ignore the error.
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mLatencyMS.M(latency))
}

// RecordConnectionOpen reports a connection event.
func RecordConnectionOpen(ctx context.Context, instance, dialerID string) {
// Why are we ignoring this error? See above under RecordDialLatency.
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mConnections.M(1))
}

// RecordConnectionClose records a disconnect event.
func RecordConnectionClose(ctx context.Context, instance, dialerID string) {
// Why are we ignoring this error? See above under RecordDialLatency.
ctx, _ = tag.New(ctx, tag.Upsert(keyInstance, instance), tag.Upsert(keyDialerID, dialerID))
stats.Record(ctx, mConnections.M(-1))
}

// InitMetrics registers all views. Without registering views, metrics will not
// be reported. If any names of the registered views conflict, this function
// returns an error to indicate a configuration problem.
func InitMetrics() error {
if err := view.Register(latencyView, connectionsView); err != nil {
return fmt.Errorf("failed to initialize metrics: %v", err)
}
return nil
}
13 changes: 13 additions & 0 deletions internal/trace/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package trace_test

import (
"testing"

"cloud.google.com/go/cloudsqlconn/internal/trace"
)

func TestMetricsInitializes(t *testing.T) {
if err := trace.InitMetrics(); err != nil {
t.Fatalf("want no error, got = %v", err)
}
}
5 changes: 5 additions & 0 deletions internal/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func AddInstanceName(name string) Attribute {
return Attribute{key: "/cloudsql/instance", value: name}
}

// AddDialerID creates an attribute to identify a particular dialer.
func AddDialerID(dialerID string) Attribute {
return Attribute{key: "/cloudsql/dialer_id", value: dialerID}
}

// StartSpan begins a span with the provided name and returns a context and a
// function to end the created span.
func StartSpan(ctx context.Context, name string, attrs ...Attribute) (context.Context, EndSpanFunc) {
Expand Down

0 comments on commit ee396ff

Please sign in to comment.