Skip to content

Commit

Permalink
[surfacer.postgres] Switch to the newer pgx postgres driver (#305)
Browse files Browse the repository at this point in the history
* Old driver is in the maintenance mode and has not been seeing updates.
* pgx has some performance improvements as well.
  • Loading branch information
manugarg committed May 13, 2024
1 parent 9e5800a commit d4bfee7
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 82 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.1
github.com/hoisie/redis v0.0.0-20160730154456-b5c6e81454e0
github.com/jackc/pgx/v5 v5.5.5
github.com/jhump/protoreflect v1.15.1
github.com/kylelemons/godebug v1.1.0
github.com/lib/pq v1.8.0
github.com/miekg/dns v1.1.33
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0
Expand Down Expand Up @@ -78,6 +78,8 @@ require (
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/itchyny/timefmt-go v0.1.4 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ github.com/itchyny/gojq v0.12.9 h1:biKpbKwMxVYhCU1d6mR7qMr3f0Hn9F5k5YykCVb3gmM=
github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE=
github.com/itchyny/timefmt-go v0.1.4 h1:hFEfWVdwsEi+CY8xY2FtgWHGQaBaC3JeHd+cve0ynVM=
github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand Down Expand Up @@ -283,8 +291,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/miekg/dns v1.1.33 h1:8KUVEKrUw2dmu1Ys0aWnkEJgoRaLAzNysfCh2KSMWiI=
github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down Expand Up @@ -314,6 +320,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
138 changes: 63 additions & 75 deletions surfacers/internal/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@ package postgres

import (
"context"
"database/sql"
"encoding/json"
"strconv"
"time"

"github.com/cloudprober/cloudprober/logger"
"github.com/cloudprober/cloudprober/metrics"
"github.com/cloudprober/cloudprober/surfacers/internal/common/options"
"github.com/lib/pq"
"github.com/jackc/pgx/v5"

configpb "github.com/cloudprober/cloudprober/surfacers/internal/postgres/proto"
)
Expand Down Expand Up @@ -166,107 +165,65 @@ func (s *Surfacer) emToPGMetrics(em *metrics.EventMetrics) []pgMetric {
return pgMerics
}

// Surfacer structures for writing to postgres.
type Surfacer struct {
// Configuration
c *configpb.SurfacerConf
opts *options.Options
columns []string

// Channel for incoming data.
writeChan chan *metrics.EventMetrics

// Cloud logger
l *logger.Logger

openDB func(connectionString string) (*sql.DB, error)
db *sql.DB
}

// New initializes a Postgres surfacer. Postgres surfacer inserts probe results
// into a postgres database.
func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) {
s := &Surfacer{
c: config,
opts: opts,
l: l,
openDB: func(cs string) (*sql.DB, error) {
return sql.Open("postgres", cs)
},
}
return s, s.init(ctx)
}

// writeMetrics parses events metrics into postgres rows, starts a transaction
// and inserts all discreet metric rows represented by the EventMetrics
func (s *Surfacer) writeMetrics(ems []*metrics.EventMetrics) error {
// Begin a transaction.
txn, err := s.db.Begin()
if err != nil {
return err
}

// Prepare a statement to COPY table from the STDIN.
stmt, err := txn.Prepare(pq.CopyIn(s.c.GetMetricsTableName(), s.columns...))
if err != nil {
return err
}
func (s *Surfacer) dbRows(ems []*metrics.EventMetrics) ([][]any, error) {
var rows [][]any

for _, em := range ems {
// Transaction for defined columns
if len(s.c.GetLabelToColumn()) > 0 {
for _, pgMetric := range s.emToPGMetrics(em) {
// args are the column values generated based on the chosen labels
args := []interface{}{pgMetric.time, pgMetric.metricName, pgMetric.value}
args = append(args, generateValues(pgMetric.labels, s.c.GetLabelToColumn())...)

if _, err = stmt.Exec(args...); err != nil {
return err
}
row := []any{pgMetric.time, pgMetric.metricName, pgMetric.value}
row = append(row, generateValues(pgMetric.labels, s.c.GetLabelToColumn())...)
rows = append(rows, row)
}
} else {
for _, pgMetric := range s.emToPGMetrics(em) {
var s string
if s, err = labelsJSON(pgMetric.labels); err != nil {
return err
}
if _, err = stmt.Exec(pgMetric.time, pgMetric.metricName, pgMetric.value, s); err != nil {
return err
s, err := labelsJSON(pgMetric.labels)
if err != nil {
return nil, err
}
rows = append(rows, []any{pgMetric.time, pgMetric.metricName, pgMetric.value, s})
}
}
}

if _, err = stmt.Exec(); err != nil {
return err
}
if err = stmt.Close(); err != nil {
return rows, nil
}

// writeMetrics parses events metrics into postgres rows, starts a transaction
// and inserts all discreet metric rows represented by the EventMetrics
func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics) error {
rows, err := s.dbRows(ems)
if err != nil {
return err
}

return txn.Commit()
_, err = s.dbconn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows))
return err
}

// init connects to postgres
func (s *Surfacer) init(ctx context.Context) error {
var err error

if s.db, err = s.openDB(s.c.GetConnectionString()); err != nil {
if s.dbconn, err = s.openDB(s.c.GetConnectionString()); err != nil {
return err
}
if err = s.db.Ping(); err != nil {
if err = s.dbconn.Ping(ctx); err != nil {
return err
}
s.writeChan = make(chan *metrics.EventMetrics, s.c.GetMetricsBufferSize())

// Generate the desired columns either with 'labels' by default
// or select 'labels' based on the label_to_column fields
s.columns = generateColumns(s.c.GetLabelToColumn())
s.columns = colName(s.c.GetLabelToColumn())

// Start a goroutine to run forever, polling on the writeChan. Allows
// for the surfacer to write asynchronously to the serial port.
go func() {
defer s.db.Close()
defer s.dbconn.Close(ctx)

metricsBatchSize, batchTimerSec := s.c.GetMetricsBatchSize(), s.c.GetBatchTimerSec()

Expand All @@ -287,15 +244,15 @@ func (s *Surfacer) init(ctx context.Context) error {
}
buffer = append(buffer, em)
if int32(len(buffer)) >= metricsBatchSize {
if err := s.writeMetrics(buffer); err != nil {
if err := s.writeMetrics(ctx, buffer); err != nil {
s.l.Warningf("Error while writing metrics: %v", err)
}
buffer = buffer[:0]
flushTicker.Reset(flushInterval)
}
case <-flushTicker.C:
if len(buffer) > 0 {
if err := s.writeMetrics(buffer); err != nil {
if err := s.writeMetrics(ctx, buffer); err != nil {
s.l.Warningf("Error while writing metrics: %v", err)
}
buffer = buffer[:0]
Expand All @@ -318,23 +275,23 @@ func (s *Surfacer) Write(ctx context.Context, em *metrics.EventMetrics) {

// generateValues generates column values or places NULL
// in the event label/value does not exist
func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []interface{} {
var args []interface{}
func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []any {
var args []any

for _, v := range ltc {
if val, ok := labels[v.GetLabel()]; ok {
args = append(args, val)
} else {
args = append(args, sql.NullByte{})
args = append(args, nil)
}
}

return args
}

// generateValues generates column values or places NULL
// in the event label/value does not exist
func generateColumns(ltc []*configpb.LabelToColumn) []string {
// colName figures out postgres table column names, based on the
// label_to_column configuration.
func colName(ltc []*configpb.LabelToColumn) []string {
var columns []string
if len(ltc) > 0 {
columns = append([]string{"time", "metric_name", "value"}, make([]string, len(ltc))...)
Expand All @@ -346,3 +303,34 @@ func generateColumns(ltc []*configpb.LabelToColumn) []string {
}
return columns
}

// Surfacer structures for writing to postgres.
type Surfacer struct {
// Configuration
c *configpb.SurfacerConf
opts *options.Options
columns []string

// Channel for incoming data.
writeChan chan *metrics.EventMetrics

// Cloud logger
l *logger.Logger

openDB func(connectionString string) (*pgx.Conn, error)
dbconn *pgx.Conn
}

// New initializes a Postgres surfacer. Postgres surfacer inserts probe results
// into a postgres database.
func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) {
s := &Surfacer{
c: config,
opts: opts,
l: l,
openDB: func(cs string) (*pgx.Conn, error) {
return pgx.Connect(ctx, cs)
},
}
return s, s.init(ctx)
}
64 changes: 60 additions & 4 deletions surfacers/internal/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"time"

"github.com/cloudprober/cloudprober/metrics"

configpb "github.com/cloudprober/cloudprober/surfacers/internal/postgres/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)

func TestEMToPGMetricsNoDistribution(t *testing.T) {
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestGenerateValues(t *testing.T) {
}
}

func TestGenerateColumns(t *testing.T) {
func TestColColumns(t *testing.T) {
label1 := "dst"
label2 := "code"
column1 := "dst"
Expand Down Expand Up @@ -179,9 +180,64 @@ func TestGenerateColumns(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := generateColumns(tt.args.ltc); !reflect.DeepEqual(got, tt.want) {
t.Errorf("generateColumns() = %v, want %v", got, tt.want)
if got := colName(tt.args.ltc); !reflect.DeepEqual(got, tt.want) {
t.Errorf("colNames() = %v, want %v", got, tt.want)
}
})
}
}

func TestDBRows(t *testing.T) {
ts := time.Now()
ems := []*metrics.EventMetrics{
metrics.NewEventMetrics(ts).AddMetric("sent", metrics.NewInt(32)).AddMetric("rcvd", metrics.NewInt(22)).AddLabel("dst", "dst1"),
metrics.NewEventMetrics(ts).AddMetric("sent", metrics.NewInt(33)).AddMetric("rcvd", metrics.NewInt(32)).AddLabel("dst", "dst2"),
}
tests := []struct {
name string
columns []string
labelToColumn []*configpb.LabelToColumn
want [][]any
wantErr bool
}{
{
name: "test-1",
columns: []string{"time", "metric_name", "value", "dst"},
labelToColumn: []*configpb.LabelToColumn{{
Label: proto.String("dst"),
Column: proto.String("dst"),
}},
want: [][]any{
{ts, "sent", "32", "dst1"},
{ts, "rcvd", "22", "dst1"},
{ts, "sent", "33", "dst2"},
{ts, "rcvd", "32", "dst2"},
},
},
{
name: "test-2",
columns: []string{"time", "metric_name", "value", "labels"},
labelToColumn: nil,
want: [][]any{
{ts, "sent", "32", "{\"dst\":\"dst1\"}"},
{ts, "rcvd", "22", "{\"dst\":\"dst1\"}"},
{ts, "sent", "33", "{\"dst\":\"dst2\"}"},
{ts, "rcvd", "32", "{\"dst\":\"dst2\"}"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Surfacer{
columns: tt.columns,
c: &configpb.SurfacerConf{LabelToColumn: tt.labelToColumn},
}
got, err := s.dbRows(ems)
if (err != nil) != tt.wantErr {
t.Errorf("Surfacer.dbRows() error = %v, wantErr %v", err, tt.wantErr)
return
}
assert.Equal(t, tt.want, got, "dbRows() = %v, want %v", got, tt.want)
})
}
}

0 comments on commit d4bfee7

Please sign in to comment.