Skip to content

Commit

Permalink
Purge all caches in the event that the prefaulter fails to query PG.
Browse files Browse the repository at this point in the history
Fixes: #13
  • Loading branch information
sean- committed Sep 12, 2017
1 parent 7c87bad commit d1813ad
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 32 deletions.
12 changes: 10 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,20 @@ RETRY:
// that is shared between a primary and follower interface.
switch state := dbState; state {
case _DBStatePrimary:
loopImmediately = a.runPrimary()
loopImmediately, err = a.runPrimary()
log.Error().Err(err).Msg("unable to run primary")
case _DBStateFollower:
loopImmediately = a.runFollower()
loopImmediately, err = a.runFollower()
log.Error().Err(err).Msg("unable to run follower")
default:
panic(fmt.Sprintf("unknown state: %+v", state))
}

// Purge all the things if we can't talk to PG. Calling Purge() on the
// WALCache purges all downstream caches (i.e. iocache and fhcache).
if err != nil {
a.walCache.Purge()
}
}
}

Expand Down
33 changes: 16 additions & 17 deletions agent/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,50 @@ package agent

import (
"github.com/bluele/gcache"
"github.com/joyent/pg_prefaulter/lsn"
log "github.com/rs/zerolog/log"
"github.com/joyent/pg_prefaulter/pg"
"github.com/pkg/errors"
)

// runFollower is excuted when talking to a readonly follower. When returning
// true, we're requesting an immediately loop without any pause between
// iterations.
func (a *Agent) runFollower() (loopImmediately bool) {
func (a *Agent) runFollower() (loopImmediately bool, err error) {
visibilityLagBytes, err := a.queryLag(_QueryLagFollower)
if err != nil {
log.Error().Err(err).Msg("unable to query follower lag")
return false
return false, errors.Wrap(err, "unable to query follower lag")
}

var timelineID lsn.TimelineID
var timelineID pg.TimelineID
timelineID, err = a.queryLastLog()
if err != nil {
log.Error().Err(err).Msg("unable to query last WAL lag")
return false
return false, errors.Wrap(err, "unable to query last WAL lag")
}

replayLSN, err := a.queryLSN(LastXLogReplayLocation)
if err != nil {
log.Error().Err(err).Msg("unable to query LSN")
return false
return false, errors.Wrap(err, "unable to query LSN")
}

// Precalculate the WAL files we need to proactively fault in based on the
// timeline and LSN. Don't read into the future.
maxBytes := uint64(a.walReadAhead * uint32(lsn.WALFileSize))
maxBytes := uint64(a.walCache.ReadAhead() * uint32(pg.WALFileSize))
if maxBytes > visibilityLagBytes {
maxBytes = visibilityLagBytes
}
walFiles := make([]string, 0, a.walReadAhead)
for i := uint32(0); i < a.walReadAhead; i++ {
walFiles := make([]string, 0, a.walCache.ReadAhead())
for i := uint32(0); i < a.walCache.ReadAhead(); i++ {
segNo := replayLSN.ID()
off := replayLSN.ByteOffset()
if uint64(off)+maxBytes <= uint64(off)+(uint64(i+1)*uint64(lsn.WALFileSize))-1 {
if uint64(off)+maxBytes <= uint64(off)+(uint64(i+1)*uint64(pg.WALFileSize))-1 {
// log.Debug().Int("segno", int(segNo)).Int("off", int(off)).Int("max bytes", int(maxBytes)).Int("vis lag", int(visibilityLagBytes)).Msg("run follower wal loop break")
break
}
l := lsn.New(segNo, lsn.Offset(uint32(off)+(i*uint32(lsn.WALFileSize))))
l := pg.New(segNo, pg.Offset(uint32(off)+(i*uint32(pg.WALFileSize))))
walFiles = append(walFiles, l.WALFileName(timelineID))
}

a.metrics.SetGauge(metricsWALFileCandidate, len(walFiles))

// 1) Read through the cache to prefault a given WAL file. The cache lies to
// us and begins faulting the WAL file as soon as we request it. Requests
// are deduped and the cache is in place in order to prevent a WAL file
Expand All @@ -85,12 +84,12 @@ func (a *Agent) runFollower() (loopImmediately bool) {
if loopImmediately {
for _, walFile := range walFiles {
if _, err := a.walCache.Get(walFile); err != nil {
log.Error().Err(err).Msg("unable to perform synchronous Get operation on WAL file cache")
return false, errors.Wrap(err, "unable to perform synchronous Get operation on WAL file cache")
}
}
}

// log.Debug().Bool("loop", loopImmediately).Str("current wal-file", replayLSN.WALFileName(timelineID)).Strs("wal files", walFiles).Msg("")

return loopImmediately
return loopImmediately, nil
}
20 changes: 7 additions & 13 deletions agent/primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,20 @@

package agent

import (
log "github.com/rs/zerolog/log"
)
import "github.com/pkg/errors"

// runPrimary is executed when talking to a writable database.
func (a *Agent) runPrimary() (loopImmediately bool) {
func (a *Agent) runPrimary() (loopImmediately bool, err error) {
// Connect to the primary and see what the lag is in bytes between the primary
// and its connected followers. Report out a histogram of lag.

_, err := a.queryLag(_QueryLagPrimary)
if err != nil {
log.Error().Err(err).Msg("unable to query primary lag")
return false
if _, err = a.queryLag(_QueryLagPrimary); err != nil {
return false, errors.Wrap(err, "unable to query primary lag")
}

_, err = a.queryLastLog()
if err != nil {
log.Error().Err(err).Msg("unable to query last WAL lag")
return false
if _, err = a.queryLastLog(); err != nil {
return false, errors.Wrap(err, "unable to query last WAL lag")
}

return false
return false, nil
}

0 comments on commit d1813ad

Please sign in to comment.