Skip to content

Commit

Permalink
[surfacer.postgres] Switch to newer pgx postgres driver
Browse files Browse the repository at this point in the history
  • Loading branch information
manugarg committed May 10, 2024
1 parent 9e5800a commit 874e767
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 37 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.1
github.com/hoisie/redis v0.0.0-20160730154456-b5c6e81454e0
github.com/jackc/pgx/v5 v5.5.5
github.com/jhump/protoreflect v1.15.1
github.com/kylelemons/godebug v1.1.0
github.com/lib/pq v1.8.0
github.com/miekg/dns v1.1.33
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0
Expand Down Expand Up @@ -78,6 +78,9 @@ require (
github.com/huandu/xstrings v1.3.3 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/itchyny/timefmt-go v0.1.4 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ github.com/itchyny/gojq v0.12.9 h1:biKpbKwMxVYhCU1d6mR7qMr3f0Hn9F5k5YykCVb3gmM=
github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE=
github.com/itchyny/timefmt-go v0.1.4 h1:hFEfWVdwsEi+CY8xY2FtgWHGQaBaC3JeHd+cve0ynVM=
github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand Down Expand Up @@ -283,8 +291,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/miekg/dns v1.1.33 h1:8KUVEKrUw2dmu1Ys0aWnkEJgoRaLAzNysfCh2KSMWiI=
github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down Expand Up @@ -314,6 +320,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
57 changes: 23 additions & 34 deletions surfacers/internal/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/cloudprober/cloudprober/logger"
"github.com/cloudprober/cloudprober/metrics"
"github.com/cloudprober/cloudprober/surfacers/internal/common/options"
"github.com/lib/pq"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"

configpb "github.com/cloudprober/cloudprober/surfacers/internal/postgres/proto"
)
Expand Down Expand Up @@ -191,60 +193,47 @@ func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Optio
opts: opts,
l: l,
openDB: func(cs string) (*sql.DB, error) {
return sql.Open("postgres", cs)
return sql.Open("pgx", cs)
},
}
return s, s.init(ctx)
}

// writeMetrics parses events metrics into postgres rows, starts a transaction
// and inserts all discreet metric rows represented by the EventMetrics
func (s *Surfacer) writeMetrics(ems []*metrics.EventMetrics) error {
// Begin a transaction.
txn, err := s.db.Begin()
func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return err
return fmt.Errorf("error acquiring conn from the DB pool: %v", err)
}
defer conn.Close()

// Prepare a statement to COPY table from the STDIN.
stmt, err := txn.Prepare(pq.CopyIn(s.c.GetMetricsTableName(), s.columns...))
if err != nil {
return err
}
var rows [][]any

for _, em := range ems {
// Transaction for defined columns
if len(s.c.GetLabelToColumn()) > 0 {
for _, pgMetric := range s.emToPGMetrics(em) {
// args are the column values generated based on the chosen labels
args := []interface{}{pgMetric.time, pgMetric.metricName, pgMetric.value}
args = append(args, generateValues(pgMetric.labels, s.c.GetLabelToColumn())...)

if _, err = stmt.Exec(args...); err != nil {
return err
}
row := []any{pgMetric.time, pgMetric.metricName, pgMetric.value}
row = append(row, generateValues(pgMetric.labels, s.c.GetLabelToColumn())...)
rows = append(rows, row)
}
} else {
for _, pgMetric := range s.emToPGMetrics(em) {
var s string
if s, err = labelsJSON(pgMetric.labels); err != nil {
return err
}
if _, err = stmt.Exec(pgMetric.time, pgMetric.metricName, pgMetric.value, s); err != nil {
s, err := labelsJSON(pgMetric.labels)
if err != nil {
return err
}
rows = append(rows, []any{pgMetric.time, pgMetric.metricName, pgMetric.value, s})
}
}
}

if _, err = stmt.Exec(); err != nil {
return err
}
if err = stmt.Close(); err != nil {
return conn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
_, err := conn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows))
return err
}

return txn.Commit()
})
}

// init connects to postgres
Expand Down Expand Up @@ -287,15 +276,15 @@ func (s *Surfacer) init(ctx context.Context) error {
}
buffer = append(buffer, em)
if int32(len(buffer)) >= metricsBatchSize {
if err := s.writeMetrics(buffer); err != nil {
if err := s.writeMetrics(ctx, buffer); err != nil {
s.l.Warningf("Error while writing metrics: %v", err)
}
buffer = buffer[:0]
flushTicker.Reset(flushInterval)
}
case <-flushTicker.C:
if len(buffer) > 0 {
if err := s.writeMetrics(buffer); err != nil {
if err := s.writeMetrics(ctx, buffer); err != nil {
s.l.Warningf("Error while writing metrics: %v", err)
}
buffer = buffer[:0]
Expand All @@ -318,8 +307,8 @@ func (s *Surfacer) Write(ctx context.Context, em *metrics.EventMetrics) {

// generateValues generates column values or places NULL
// in the event label/value does not exist
func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []interface{} {
var args []interface{}
func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []any {
var args []any

for _, v := range ltc {
if val, ok := labels[v.GetLabel()]; ok {
Expand Down

0 comments on commit 874e767

Please sign in to comment.