Skip to content

Commit

Permalink
[surfacer.postgres] Use pgx interface directly.
Browse files Browse the repository at this point in the history
Instead of going through database/sql layer.
  • Loading branch information
manugarg committed May 10, 2024
1 parent 874e767 commit 8c038b6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 50 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ require (
github.com/itchyny/timefmt-go v0.1.4 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
Expand Down
86 changes: 37 additions & 49 deletions surfacers/internal/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,14 @@ package postgres

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/cloudprober/cloudprober/logger"
"github.com/cloudprober/cloudprober/metrics"
"github.com/cloudprober/cloudprober/surfacers/internal/common/options"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/stdlib"

configpb "github.com/cloudprober/cloudprober/surfacers/internal/postgres/proto"
)
Expand Down Expand Up @@ -168,46 +165,9 @@ func (s *Surfacer) emToPGMetrics(em *metrics.EventMetrics) []pgMetric {
return pgMerics
}

// Surfacer structures for writing to postgres.
type Surfacer struct {
// Configuration
c *configpb.SurfacerConf
opts *options.Options
columns []string

// Channel for incoming data.
writeChan chan *metrics.EventMetrics

// Cloud logger
l *logger.Logger

openDB func(connectionString string) (*sql.DB, error)
db *sql.DB
}

// New initializes a Postgres surfacer. Postgres surfacer inserts probe results
// into a postgres database.
func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) {
s := &Surfacer{
c: config,
opts: opts,
l: l,
openDB: func(cs string) (*sql.DB, error) {
return sql.Open("pgx", cs)
},
}
return s, s.init(ctx)
}

// writeMetrics parses events metrics into postgres rows, starts a transaction
// and inserts all discreet metric rows represented by the EventMetrics
func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics) error {
conn, err := s.db.Conn(ctx)
if err != nil {
return fmt.Errorf("error acquiring conn from the DB pool: %v", err)
}
defer conn.Close()

var rows [][]any

for _, em := range ems {
Expand All @@ -229,21 +189,18 @@ func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics
}
}

return conn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
_, err := conn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows))
return err
})
_, err := s.dbconn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows))
return err
}

// init connects to postgres
func (s *Surfacer) init(ctx context.Context) error {
var err error

if s.db, err = s.openDB(s.c.GetConnectionString()); err != nil {
if s.dbconn, err = s.openDB(s.c.GetConnectionString()); err != nil {
return err
}
if err = s.db.Ping(); err != nil {
if err = s.dbconn.Ping(ctx); err != nil {
return err
}
s.writeChan = make(chan *metrics.EventMetrics, s.c.GetMetricsBufferSize())
Expand All @@ -255,7 +212,7 @@ func (s *Surfacer) init(ctx context.Context) error {
// Start a goroutine to run forever, polling on the writeChan. Allows
// for the surfacer to write asynchronously to the serial port.
go func() {
defer s.db.Close()
defer s.dbconn.Close(ctx)

metricsBatchSize, batchTimerSec := s.c.GetMetricsBatchSize(), s.c.GetBatchTimerSec()

Expand Down Expand Up @@ -314,7 +271,7 @@ func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []a
if val, ok := labels[v.GetLabel()]; ok {
args = append(args, val)
} else {
args = append(args, sql.NullByte{})
args = append(args, nil)
}
}

Expand All @@ -335,3 +292,34 @@ func generateColumns(ltc []*configpb.LabelToColumn) []string {
}
return columns
}

// Surfacer structures for writing to postgres.
type Surfacer struct {
// Configuration
c *configpb.SurfacerConf
opts *options.Options
columns []string

// Channel for incoming data.
writeChan chan *metrics.EventMetrics

// Cloud logger
l *logger.Logger

openDB func(connectionString string) (*pgx.Conn, error)
dbconn *pgx.Conn
}

// New initializes a Postgres surfacer. Postgres surfacer inserts probe results
// into a postgres database.
func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) {
s := &Surfacer{
c: config,
opts: opts,
l: l,
openDB: func(cs string) (*pgx.Conn, error) {
return pgx.Connect(ctx, cs)
},
}
return s, s.init(ctx)
}

0 comments on commit 8c038b6

Please sign in to comment.