Skip to content

Commit

Permalink
gazctl: shard pruning should consider all hints
Browse files Browse the repository at this point in the history
If a shard is in a persistent state where it immediately becomes FAILED
after recovery, it's very possible for the oldest recovered backup hints
to actually be _newer_ than primary hints.

Then, if we prune without considering the primary hints, the shard is
unable to recover from its log.

Fortunately this is easily remedied -- delete the primary hints while
leaving backup recovery hints in place -- but better to not do it in the
first place.
  • Loading branch information
jgraettinger committed Sep 28, 2023
1 parent fb76383 commit 2fb3920
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 43 deletions.
73 changes: 31 additions & 42 deletions cmd/gazctl/gazctlcmd/shards_prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gazctlcmd

import (
"context"
"fmt"

log "github.com/sirupsen/logrus"
"go.gazette.dev/core/broker/client"
Expand Down Expand Up @@ -53,29 +52,40 @@ func (cmd *cmdShardsPrune) Execute([]string) error {

for _, shard := range listShards(rsc, cmd.Selector).Shards {
metrics.shardsTotal++
var lastHints = fetchOldestHints(ctx, rsc, shard.Spec.Id)

var allHints, err = consumer.FetchHints(ctx, rsc, &pc.GetHintsRequest{
Shard: shard.Spec.Id,
})
mbp.Must(err, "failed to fetch hints")

var recoveryLog = shard.Spec.RecoveryLog()

// We require that we see hints for _all_ shards before we may make _any_ deletions.
// This is because shards could technically include segments from any log,
// and without comprehensive hints which are proof-positive that _no_ shard
// references a given journal fragment, we cannot be sure it's safe to remove.
// For this reason, we must track the journals to be skipped, so we can be sure
// we don't prune journals that are used by a shard that hasn't persisted hints.
if lastHints != nil && len(lastHints.LiveNodes) > 0 {
foldHintsIntoSegments(*lastHints, logSegmentSets)
} else {
skipRecoveryLogs[recoveryLog] = true
metrics.skippedJournals++
var reason = "has not written backup hints required for pruning"
if lastHints != nil {
reason = "hints have no live files"
for _, curHints := range append(allHints.BackupHints, allHints.PrimaryHints) {
var hints = curHints.Hints

// We require that we see _all_ hints for a shards before we may make _any_ deletions.
// This is because shards could technically include segments from any log,
// and without comprehensive hints which are proof-positive that _no_ shard
// references a given journal fragment, we cannot be sure it's safe to remove.
// For this reason, we must track the journals to be skipped, so we can be sure
// we don't prune journals that are used by a shard that hasn't persisted hints.
if hints != nil && len(hints.LiveNodes) > 0 {
foldHintsIntoSegments(*hints, logSegmentSets)
} else {
skipRecoveryLogs[recoveryLog] = true
metrics.skippedJournals++
var reason = "has not written all hints required for pruning"
if hints != nil {
reason = "hints have no live files"
}
log.WithFields(log.Fields{
"shard": shard.Spec.Id,
"reason": reason,
"journal": recoveryLog,
}).Warn("will skip pruning recovery log journal")

break
}
log.WithFields(log.Fields{
"shard": shard.Spec.Id,
"reason": reason,
"journal": recoveryLog,
}).Warn("will skip pruning recovery log journal")
}
}

Expand Down Expand Up @@ -113,27 +123,6 @@ func (cmd *cmdShardsPrune) Execute([]string) error {
return nil
}

func fetchOldestHints(ctx context.Context, shardClient pc.ShardClient, id pc.ShardID) *recoverylog.FSMHints {
var req = &pc.GetHintsRequest{
Shard: id,
}

var resp, err = consumer.FetchHints(ctx, shardClient, req)
mbp.Must(err, "failed to fetch hints")
if resp.Status != pc.Status_OK {
err = fmt.Errorf(resp.Status.String())
}
mbp.Must(err, "failed to fetch oldest hints")

for i := len(resp.BackupHints) - 1; i >= 0; i-- {
if resp.BackupHints[i].Hints != nil {
return resp.BackupHints[i].Hints
}
}

return nil
}

func fetchFragments(ctx context.Context, journalClient pb.RoutedJournalClient, journal pb.Journal) []pb.FragmentsResponse__Fragment {
var err error
var req = pb.FragmentsRequest{
Expand Down
2 changes: 2 additions & 0 deletions cmd/gazctl/gazctlcmd/shards_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (cmd *cmdShardsRecover) Execute([]string) error {
})
mbp.Must(err, "failed to fetch hints for shard")

log.WithField("hints", hintResp).Debug("fetched shard recovery hints")

var recoveryLog = shardResp.Shards[0].Spec.RecoveryLog()
var hints = consumer.PickFirstHints(hintResp, recoveryLog)
var rjc = ShardsCfg.Broker.MustRoutedJournalClient(ctx)
Expand Down
5 changes: 4 additions & 1 deletion consumer/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
return cp, errors.WithMessage(err, "store.RestoreCheckpoint")
}

// Store |recoveredHints| as a backup.
// Store |recoveredHints| as a backup. We do this _after_ restoring the
// checkpoint as a sanity check, so that any integrity issues encountered
// during checkpoint recovery are surfaced before we over-write backup hints.
//
// For some workflows, the recoveredHints.Log may not equal our own log,
// in which case we omit this step.
// For example, Flow's shard split workflow instruments GetHints to
Expand Down
11 changes: 11 additions & 0 deletions consumer/recoverylog/playback.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,17 @@ func playLog(ctx context.Context, hints FSMHints, dir string, ajc client.AsyncJo
readLog = segment.Log
offset = reader.seek(segment.Log, segment.FirstOffset)
readThrough = barriers[segment.Log].Response().Commit.End

log.WithFields(log.Fields{
"log": readLog,
"offset": offset,
"firstOffset": segment.FirstOffset,
"firstSeqNo": segment.FirstSeqNo,
"lastOffset": segment.LastOffset,
"lastSeqNo": segment.LastSeqNo,
"readThrough": readThrough,
}).Debug("seeking to next hinted segment")

} else if offset >= readThrough {
// We've read through |readThrough|, but still have not read all hinted log segments.
err = errors.Errorf("offset %v:%d >= readThrough %d, but FSM has unused hints; possible data loss",
Expand Down

0 comments on commit 2fb3920

Please sign in to comment.