diff --git a/go.mod b/go.mod index fc81ab537f..505f26b6e8 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,6 @@ require ( github.com/itchyny/timefmt-go v0.1.4 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect diff --git a/surfacers/internal/postgres/postgres.go b/surfacers/internal/postgres/postgres.go index c011fdd64f..840bd83ac2 100644 --- a/surfacers/internal/postgres/postgres.go +++ b/surfacers/internal/postgres/postgres.go @@ -31,9 +31,7 @@ package postgres import ( "context" - "database/sql" "encoding/json" - "fmt" "strconv" "time" @@ -41,7 +39,6 @@ import ( "github.com/cloudprober/cloudprober/metrics" "github.com/cloudprober/cloudprober/surfacers/internal/common/options" "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/stdlib" configpb "github.com/cloudprober/cloudprober/surfacers/internal/postgres/proto" ) @@ -168,46 +165,9 @@ func (s *Surfacer) emToPGMetrics(em *metrics.EventMetrics) []pgMetric { return pgMerics } -// Surfacer structures for writing to postgres. -type Surfacer struct { - // Configuration - c *configpb.SurfacerConf - opts *options.Options - columns []string - - // Channel for incoming data. - writeChan chan *metrics.EventMetrics - - // Cloud logger - l *logger.Logger - - openDB func(connectionString string) (*sql.DB, error) - db *sql.DB -} - -// New initializes a Postgres surfacer. Postgres surfacer inserts probe results -// into a postgres database. -func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) { - s := &Surfacer{ - c: config, - opts: opts, - l: l, - openDB: func(cs string) (*sql.DB, error) { - return sql.Open("pgx", cs) - }, - } - return s, s.init(ctx) -} - // writeMetrics parses events metrics into postgres rows, starts a transaction // and inserts all discreet metric rows represented by the EventMetrics func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics) error { - conn, err := s.db.Conn(ctx) - if err != nil { - return fmt.Errorf("error acquiring conn from the DB pool: %v", err) - } - defer conn.Close() - var rows [][]any for _, em := range ems { @@ -229,21 +189,18 @@ func (s *Surfacer) writeMetrics(ctx context.Context, ems []*metrics.EventMetrics } } - return conn.Raw(func(driverConn any) error { - conn := driverConn.(*stdlib.Conn).Conn() - _, err := conn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows)) - return err - }) + _, err := s.dbconn.CopyFrom(ctx, pgx.Identifier{s.c.GetMetricsTableName()}, s.columns, pgx.CopyFromRows(rows)) + return err } // init connects to postgres func (s *Surfacer) init(ctx context.Context) error { var err error - if s.db, err = s.openDB(s.c.GetConnectionString()); err != nil { + if s.dbconn, err = s.openDB(s.c.GetConnectionString()); err != nil { return err } - if err = s.db.Ping(); err != nil { + if err = s.dbconn.Ping(ctx); err != nil { return err } s.writeChan = make(chan *metrics.EventMetrics, s.c.GetMetricsBufferSize()) @@ -255,7 +212,7 @@ func (s *Surfacer) init(ctx context.Context) error { // Start a goroutine to run forever, polling on the writeChan. Allows // for the surfacer to write asynchronously to the serial port. go func() { - defer s.db.Close() + defer s.dbconn.Close(ctx) metricsBatchSize, batchTimerSec := s.c.GetMetricsBatchSize(), s.c.GetBatchTimerSec() @@ -314,7 +271,7 @@ func generateValues(labels map[string]string, ltc []*configpb.LabelToColumn) []a if val, ok := labels[v.GetLabel()]; ok { args = append(args, val) } else { - args = append(args, sql.NullByte{}) + args = append(args, nil) } } @@ -335,3 +292,34 @@ func generateColumns(ltc []*configpb.LabelToColumn) []string { } return columns } + +// Surfacer structures for writing to postgres. +type Surfacer struct { + // Configuration + c *configpb.SurfacerConf + opts *options.Options + columns []string + + // Channel for incoming data. + writeChan chan *metrics.EventMetrics + + // Cloud logger + l *logger.Logger + + openDB func(connectionString string) (*pgx.Conn, error) + dbconn *pgx.Conn +} + +// New initializes a Postgres surfacer. Postgres surfacer inserts probe results +// into a postgres database. +func New(ctx context.Context, config *configpb.SurfacerConf, opts *options.Options, l *logger.Logger) (*Surfacer, error) { + s := &Surfacer{ + c: config, + opts: opts, + l: l, + openDB: func(cs string) (*pgx.Conn, error) { + return pgx.Connect(ctx, cs) + }, + } + return s, s.init(ctx) +}