Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate pg_xlogdump once an iteration #41

Merged
merged 4 commits into from
Oct 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Agent struct {
shutdown func()
shutdownCtx context.Context

pgConnCtx context.Context
pgConnShutdown func()

metrics *cgm.CirconusMetrics

// pgStateLock protects the following values. lastWALLog and lastTimelineID
Expand Down Expand Up @@ -100,7 +103,7 @@ func New(cfg *config.Config) (a *Agent, err error) {
}

{
walCache, err := walcache.New(a.shutdownCtx, cfg, a.metrics, a.ioCache)
walCache, err := walcache.New(a.pgConnCtx, a.shutdownCtx, cfg, a.metrics, a.ioCache)
if err != nil {
return nil, errors.Wrap(err, "unable to initialize WAL cache")
}
Expand Down Expand Up @@ -150,8 +153,8 @@ func (a *Agent) Start() {
// the following six steps:
//
// 1) Shutdown if we've been told to shutdown.
// 2) Dump caches if a cache-invalidation event occurred.
// 3) Sleep if we've been told to sleep in the previous iteration.
// 2) Sleep if we've been told to sleep in the previous iteration.
// 3) Dump caches if a cache-invalidation event occurred.
// 4) Attempt to find WAL files.
// 4a) Attempt to query the DB to find the WAL files.
// 4b) Attempt to query the process args to find the WAL files.
Expand Down Expand Up @@ -193,20 +196,25 @@ RETRY:
break RETRY
}

// 2) Dump cache. Calling Purge() on the WALCache purges all downstream
// caches (i.e. ioCache and fhCache).
if purgeCache {
a.walCache.Purge()
purgeCache = false
}

// 3) Sleep
// 2) Sleep. Sleep before purging the WALCache in order to allow processes
// in flight to complete. If the sleep is not called before the purge,
// it's possible that an in-flight pg_xlogdump(1) would be cancelled
// before it completed a run. This means that during an unexpected
// shutdown, FDs won't be closed for up to config.KeyPGPollInterval.
if !sleepBetweenIterations {
d := viper.GetDuration(config.KeyPGPollInterval)
time.Sleep(d)
sleepBetweenIterations = false
}

// 3) Dump cache. Calling Purge() on the WALCache purges all downstream
// caches (i.e. ioCache and fhCache).
if purgeCache {
a.resetPGConnCtx()
a.walCache.Purge()
purgeCache = false
}

// 4) Get WAL files
var walFiles []pg.WALFilename
walFiles, err = a.getWALFiles()
Expand Down Expand Up @@ -358,6 +366,18 @@ func (a *Agent) prefaultWALFiles(walFiles pg.WALFiles) (moreWork bool, err error
return len(waitWALFiles) > pg.NumOldLSNs, nil
}

// resetPGConnCtx is a convenience function that:
//
// 1. Shuts down the current context
// 2. Creates a new pgConnCtx
// 3. Resets the pgConnCtx with a new context
func (a *Agent) resetPGConnCtx() {
a.pgConnShutdown()
a.pgConnCtx, a.pgConnShutdown = context.WithCancel(a.shutdownCtx)

a.walCache.ResetPGConnCtx(a.pgConnCtx)
}

// stopSignalHandler disables the signal handler
func (a *Agent) stopSignalHandler() {
signal.Stop(a.signalCh)
Expand Down
1 change: 1 addition & 0 deletions agent/signals_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (a *Agent) setupSignals() {
signal.Notify(a.signalCh, os.Interrupt, unix.SIGTERM, unix.SIGHUP, unix.SIGPIPE)

a.shutdownCtx, a.shutdown = context.WithCancel(context.Background())
a.pgConnCtx, a.pgConnShutdown = context.WithCancel(a.shutdownCtx)
}

// handleSignals runs the signal handler thread
Expand Down
1 change: 1 addition & 0 deletions agent/signals_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (a *Agent) setupSignals() {
signal.Notify(a.signalCh, os.Interrupt, unix.SIGTERM, unix.SIGHUP, unix.SIGPIPE, unix.SIGINFO)

a.shutdownCtx, a.shutdown = context.WithCancel(context.Background())
a.pgConnCtx, a.pgConnShutdown = context.WithCancel(a.shutdownCtx)
}

// handleSignals runs the signal handler thread
Expand Down
31 changes: 20 additions & 11 deletions agent/walcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ var xlogdumpRE = regexp.MustCompile(`s/d/r:([\d]+)/([\d]+)/([\d]+) (?:tid |blk/o
// c) deliberately intollerant of scans because we know the input is monotonic
// d) sized to include only the KeyWALReadahead
type WALCache struct {
ctx context.Context
wg sync.WaitGroup
cfg *config.WALCacheConfig
pgConnCtx context.Context
shutdownCtx context.Context
wg sync.WaitGroup
cfg *config.WALCacheConfig

purgeLock sync.Mutex
c gcache.Cache
Expand All @@ -83,13 +84,16 @@ var (
numConcurrentWALs int64
)

func New(ctx context.Context, cfg *config.Config, circMetrics *cgm.CirconusMetrics, ioCache *iocache.IOCache) (*WALCache, error) {
func New(pgConnCtx context.Context, shutdownCtx context.Context,
cfg *config.Config, circMetrics *cgm.CirconusMetrics,
ioCache *iocache.IOCache) (*WALCache, error) {
walWorkers := pg.NumOldLSNs * int(math.Ceil(float64(cfg.ReadaheadBytes)/float64(pg.WALSegmentSize)))

wc := &WALCache{
ctx: ctx,
metrics: circMetrics,
cfg: &cfg.WALCacheConfig,
pgConnCtx: pgConnCtx,
shutdownCtx: shutdownCtx,
metrics: circMetrics,
cfg: &cfg.WALCacheConfig,

inFlightWALFiles: make(map[pg.WALFilename]struct{}, walWorkers),
ioCache: ioCache,
Expand All @@ -115,7 +119,7 @@ func New(ctx context.Context, cfg *config.Config, circMetrics *cgm.CirconusMetri

for {
select {
case <-wc.ctx.Done():
case <-wc.shutdownCtx.Done():
return
case walFile, ok := <-walFilePrefaultWorkQueue:
if !ok {
Expand Down Expand Up @@ -168,15 +172,15 @@ func New(ctx context.Context, cfg *config.Config, circMetrics *cgm.CirconusMetri
walFilename := keyRaw.(pg.WALFilename)

select {
case <-wc.ctx.Done():
case <-wc.shutdownCtx.Done():
case walFilePrefaultWorkQueue <- walFilename:
}

return true, nil
}).
Build()

go lib.LogCacheStats(wc.ctx, wc.c, "walcache-stats")
go lib.LogCacheStats(wc.shutdownCtx, wc.c, "walcache-stats")

return wc, nil
}
Expand Down Expand Up @@ -253,6 +257,11 @@ func (wc *WALCache) ReadaheadBytes() units.Base2Bytes {
return wc.cfg.ReadaheadBytes
}

// ResetPGConnCtx resets a pgConnCtx
func (wc *WALCache) ResetPGConnCtx(pgConnCtx context.Context) {
wc.pgConnCtx = pgConnCtx
}

// Wait blocks until the WALCache finishes shutting down its workers (including
// the workers of its IOCache).
func (wc *WALCache) Wait() {
Expand All @@ -275,7 +284,7 @@ func (wc *WALCache) prefaultWALFile(walFile pg.WALFilename) (err error) {
return errors.Wrap(err, "WAL file does not exist")
}

cmd := exec.CommandContext(wc.ctx, wc.cfg.XLogDumpPath, "-f", walFileAbs)
cmd := exec.CommandContext(wc.pgConnCtx, wc.cfg.XLogDumpPath, "-f", walFileAbs)
var errbuf bytes.Buffer
cmd.Stderr = &errbuf

Expand Down