-
Notifications
You must be signed in to change notification settings - Fork 201
/
postgres.go
248 lines (200 loc) · 7.58 KB
/
postgres.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package postgres
import (
"context"
"fmt"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/exaring/otelpgx"
"golang.org/x/exp/slog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/Masterminds/squirrel"
)
// Postgres - Structure for Postresql instance
type Postgres struct {
ReadPool *pgxpool.Pool
WritePool *pgxpool.Pool
Builder squirrel.StatementBuilderType
// options
maxDataPerWrite int
maxRetries int
watchBufferSize int
maxConnectionLifeTime time.Duration
maxConnectionIdleTime time.Duration
maxOpenConnections int
maxIdleConnections int
}
// New -
func New(uri string, opts ...Option) (*Postgres, error) {
return newDB(uri, uri, opts...)
}
// NewWithSeparateURIs -
func NewWithSeparateURIs(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
return newDB(writerUri, readerUri, opts...)
}
// new - Creates new postgresql db instance
func newDB(writerUri, readerUri string, opts ...Option) (*Postgres, error) {
pg := &Postgres{
maxOpenConnections: _defaultMaxOpenConnections,
maxIdleConnections: _defaultMaxIdleConnections,
maxDataPerWrite: _defaultMaxDataPerWrite,
maxRetries: _defaultMaxRetries,
watchBufferSize: _defaultWatchBufferSize,
}
// Custom options
for _, opt := range opts {
opt(pg)
}
pg.Builder = squirrel.StatementBuilder.PlaceholderFormat(squirrel.Dollar)
writeConfig, err := pgxpool.ParseConfig(writerUri)
if err != nil {
return nil, err
}
readConfig, err := pgxpool.ParseConfig(readerUri)
if err != nil {
return nil, err
}
// Set the default execution mode for queries using the write and read configurations.
setDefaultQueryExecMode(writeConfig.ConnConfig)
setDefaultQueryExecMode(readConfig.ConnConfig)
// Set the plan cache mode for both write and read configurations to optimize query planning.
setPlanCacheMode(writeConfig.ConnConfig)
setPlanCacheMode(readConfig.ConnConfig)
// Set the minimum number of idle connections in the pool for both write and read configurations.
writeConfig.MinConns = int32(pg.maxIdleConnections)
readConfig.MinConns = int32(pg.maxIdleConnections)
// Set the maximum number of active connections in the pool for both write and read configurations.
writeConfig.MaxConns = int32(pg.maxOpenConnections)
readConfig.MaxConns = int32(pg.maxOpenConnections)
// Set the maximum amount of time a connection may be idle before being closed for both configurations.
writeConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
readConfig.MaxConnIdleTime = pg.maxConnectionIdleTime
// Set the maximum lifetime of a connection in the pool for both configurations.
writeConfig.MaxConnLifetime = pg.maxConnectionLifeTime
readConfig.MaxConnLifetime = pg.maxConnectionLifeTime
// Set a jitter to the maximum connection lifetime to prevent all connections from expiring at the same time.
writeConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
readConfig.MaxConnLifetimeJitter = time.Duration(0.2 * float64(pg.maxConnectionLifeTime))
writeConfig.ConnConfig.Tracer = otelpgx.NewTracer()
readConfig.ConnConfig.Tracer = otelpgx.NewTracer()
// Create connection pools for both writing and reading operations using the configured settings.
pg.WritePool, pg.ReadPool, err = createPools(
context.Background(), // Context used to control the lifecycle of the pools.
writeConfig, // Configuration settings for the write pool.
readConfig, // Configuration settings for the read pool.
)
// Handle errors during the creation of the connection pools.
if err != nil {
return nil, err
}
return pg, nil
}
func (p *Postgres) GetMaxDataPerWrite() int {
return p.maxDataPerWrite
}
func (p *Postgres) GetMaxRetries() int {
return p.maxRetries
}
func (p *Postgres) GetWatchBufferSize() int {
return p.watchBufferSize
}
// GetEngineType - Get the engine type which is postgresql in string
func (p *Postgres) GetEngineType() string {
return "postgres"
}
// Close - Close postgresql instance
func (p *Postgres) Close() error {
p.ReadPool.Close()
p.WritePool.Close()
return nil
}
// IsReady - Check if database is ready
func (p *Postgres) IsReady(ctx context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if err := p.ReadPool.Ping(ctx); err != nil {
return false, err
}
return true, nil
}
var queryExecModes = map[string]pgx.QueryExecMode{
"cache_statement": pgx.QueryExecModeCacheStatement,
"cache_describe": pgx.QueryExecModeCacheDescribe,
"describe_exec": pgx.QueryExecModeDescribeExec,
"mode_exec": pgx.QueryExecModeExec,
"simple_protocol": pgx.QueryExecModeSimpleProtocol,
}
func setDefaultQueryExecMode(config *pgx.ConnConfig) {
// Default mode if no specific mode is found in the connection string
defaultMode := "cache_statement"
// Iterate through the map keys to check if any are mentioned in the connection string
for key := range queryExecModes {
if strings.Contains(config.ConnString(), "default_query_exec_mode="+key) {
config.DefaultQueryExecMode = queryExecModes[key]
slog.Info("setDefaultQueryExecMode", slog.String("mode", key))
return
}
}
// Set to default mode if no matching mode is found
config.DefaultQueryExecMode = queryExecModes[defaultMode]
slog.Warn("setDefaultQueryExecMode", slog.String("mode", defaultMode))
}
var planCacheModes = map[string]string{
"auto": "auto",
"force_custom_plan": "force_custom_plan",
}
func setPlanCacheMode(config *pgx.ConnConfig) {
// Default mode if no specific mode is found in the connection string
defaultMode := "auto"
// Check if a plan cache mode is mentioned in the connection string and set it
for key := range planCacheModes {
if strings.Contains(config.ConnString(), "plan_cache_mode="+key) {
config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[key]
slog.Info("setPlanCacheMode", slog.String("mode", key))
return
}
}
// Set to default mode if no matching mode is found
config.Config.RuntimeParams["plan_cache_mode"] = planCacheModes[defaultMode]
slog.Warn("setPlanCacheMode", slog.String("mode", defaultMode))
}
// createPools initializes read and write connection pools with appropriate configurations and error handling.
func createPools(ctx context.Context, wConfig, rConfig *pgxpool.Config) (*pgxpool.Pool, *pgxpool.Pool, error) {
// Context with timeout for creating the pools
initCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Create write pool
writePool, err := pgxpool.NewWithConfig(initCtx, wConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create write pool: %w", err)
}
// Create read pool using the same configuration
readPool, err := pgxpool.NewWithConfig(initCtx, rConfig)
if err != nil {
writePool.Close() // Ensure write pool is closed on failure
return nil, nil, fmt.Errorf("failed to create read pool: %w", err)
}
// Set up retry policy for pinging pools
retryPolicy := backoff.NewExponentialBackOff()
retryPolicy.MaxElapsedTime = 1 * time.Minute
// Attempt to ping both pools to confirm connectivity
err = backoff.Retry(func() error {
pingCtx, pingCancel := context.WithTimeout(context.Background(), 2*time.Second)
defer pingCancel()
if err := writePool.Ping(pingCtx); err != nil {
return fmt.Errorf("write pool ping failed: %w", err)
}
if err := readPool.Ping(pingCtx); err != nil {
return fmt.Errorf("read pool ping failed: %w", err)
}
return nil
}, retryPolicy)
// Handle errors from pinging
if err != nil {
writePool.Close()
readPool.Close()
return nil, nil, fmt.Errorf("pinging pools failed: %w", err)
}
return writePool, readPool, nil
}