Skip to content

Commit

Permalink
fix: ensure HA slots don't hold xmin (#4811)
Browse files Browse the repository at this point in the history
When `hot_standby_feedback` is enabled, an inactive replication slot
with `xmin` or `catalog_xmin` set to `NOT NULL`, could prevent the
autovacuum on the primary from cleaning the old tuples.
This issue may occur when a former primary rejoins the cluster after
a switchover.

This patch resolves the issue by recreating any HA replication slot on
a standby instance if it has `xmin` or `catalog_xmin` set.

Closes #4802

Signed-off-by: Marco Nenciarini <marco.nenciarini@enterprisedb.com>
Signed-off-by: Jaime Silvela <jaime.silvela@enterprisedb.com>
Signed-off-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
Co-authored-by: Jaime Silvela <jaime.silvela@enterprisedb.com>
Co-authored-by: Armando Ruocco <armando.ruocco@enterprisedb.com>
(cherry picked from commit d932383)
  • Loading branch information
mnencia committed Jun 11, 2024
1 parent a0a97d6 commit 50aa7b0
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (sm PostgresManager) List(

rows, err := db.QueryContext(
ctx,
`SELECT slot_name, slot_type, active, coalesce(restart_lsn::TEXT, '') AS restart_lsn FROM pg_replication_slots
`SELECT slot_name, slot_type, active, coalesce(restart_lsn::TEXT, '') AS restart_lsn,
xmin IS NOT NULL OR catalog_xmin IS NOT NULL AS holds_xmin
FROM pg_replication_slots
WHERE NOT temporary AND slot_name ^@ $1 AND slot_type = 'physical'`,
config.HighAvailability.GetSlotPrefix(),
)
Expand All @@ -71,6 +73,7 @@ func (sm PostgresManager) List(
&slot.Type,
&slot.Active,
&slot.RestartLSN,
&slot.HoldsXmin,
)
if err != nil {
return ReplicationSlotList{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ var _ = Describe("PostgresManager", func() {
})

It("should successfully list replication slots", func() {
rows := sqlmock.NewRows([]string{"slot_name", "slot_type", "active", "restart_lsn"}).
AddRow("slot1", string(SlotTypePhysical), true, "lsn1").
AddRow("slot2", string(SlotTypePhysical), true, "lsn2")
rows := sqlmock.NewRows([]string{"slot_name", "slot_type", "active", "restart_lsn", "holds_xmin"}).
AddRow("slot1", string(SlotTypePhysical), true, "lsn1", false).
AddRow("slot2", string(SlotTypePhysical), true, "lsn2", false)

mock.ExpectQuery("^SELECT (.+) FROM pg_replication_slots").
WithArgs(config.HighAvailability.SlotPrefix).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ReplicationSlot struct {
Type SlotType `json:"type,omitempty"`
Active bool `json:"active"`
RestartLSN string `json:"restartLSN,omitempty"`
HoldsXmin bool `json:"holdsXmin,omitempty"`
}

// ReplicationSlotList contains a list of replication slots
Expand Down
10 changes: 7 additions & 3 deletions internal/management/controller/slots/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,13 @@ func synchronizeReplicationSlots(
}
}
for _, slot := range slotsInLocal.Items {
if !slotsInPrimary.Has(slot.SlotName) || slot.SlotName == mySlotName {
err := localSlotManager.Delete(ctx, slot)
if err != nil {
// Delete slots on standby with wrong state:
// * slots not present on the primary
// * the slot used by this node
// * slots holding xmin (this can happen on a former primary, and will prevent VACUUM from
// removing tuples deleted by any later transaction.)
if !slotsInPrimary.Has(slot.SlotName) || slot.SlotName == mySlotName || slot.HoldsXmin {
if err := localSlotManager.Delete(ctx, slot); err != nil {
return err
}
}
Expand Down
33 changes: 26 additions & 7 deletions internal/management/controller/slots/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
type fakeSlot struct {
name string
restartLSN string
holdsXmin bool
}

type fakeSlotManager struct {
Expand All @@ -52,6 +53,7 @@ func (sm *fakeSlotManager) List(
RestartLSN: slot.restartLSN,
Type: infrastructure.SlotTypePhysical,
Active: false,
HoldsXmin: slot.holdsXmin,
})
}
return slotList, nil
Expand Down Expand Up @@ -88,7 +90,6 @@ func (sm *fakeSlotManager) Delete(_ context.Context, slot infrastructure.Replica
}

var _ = Describe("Slot synchronization", func() {
ctx := context.TODO()
localPodName := "cluster-2"
localSlotName := "_cnpg_cluster_2"
slot3 := "cluster-3"
Expand All @@ -111,12 +112,12 @@ var _ = Describe("Slot synchronization", func() {
},
}

It("can create slots in local from those on primary", func() {
It("can create slots in local from those on primary", func(ctx SpecContext) {
localSlotsBefore, err := local.List(ctx, &config)
Expect(err).ShouldNot(HaveOccurred())
Expect(localSlotsBefore.Items).Should(BeEmpty())

err = synchronizeReplicationSlots(context.TODO(), primary, local, localPodName, &config)
err = synchronizeReplicationSlots(ctx, primary, local, localPodName, &config)
Expect(err).ShouldNot(HaveOccurred())

localSlotsAfter, err := local.List(ctx, &config)
Expand All @@ -126,13 +127,13 @@ var _ = Describe("Slot synchronization", func() {
Expect(localSlotsAfter.Has(slot4)).To(BeTrue())
Expect(local.slotsCreated).To(Equal(2))
})
It("can update slots in local when ReplayLSN in primary advanced", func() {
It("can update slots in local when ReplayLSN in primary advanced", func(ctx SpecContext) {
// advance slot3 in primary
newLSN := "0/308C4D8"
err := primary.Update(ctx, infrastructure.ReplicationSlot{SlotName: slot3, RestartLSN: newLSN})
Expect(err).ShouldNot(HaveOccurred())

err = synchronizeReplicationSlots(context.TODO(), primary, local, localPodName, &config)
err = synchronizeReplicationSlots(ctx, primary, local, localPodName, &config)
Expect(err).ShouldNot(HaveOccurred())

localSlotsAfter, err := local.List(ctx, &config)
Expand All @@ -143,11 +144,11 @@ var _ = Describe("Slot synchronization", func() {
Expect(slot.RestartLSN).To(Equal(newLSN))
Expect(local.slotsUpdated).To(Equal(1))
})
It("can drop slots in local when they are no longer in primary", func() {
It("can drop slots in local when they are no longer in primary", func(ctx SpecContext) {
err := primary.Delete(ctx, infrastructure.ReplicationSlot{SlotName: slot4})
Expect(err).ShouldNot(HaveOccurred())

err = synchronizeReplicationSlots(context.TODO(), primary, local, localPodName, &config)
err = synchronizeReplicationSlots(ctx, primary, local, localPodName, &config)
Expect(err).ShouldNot(HaveOccurred())

localSlotsAfter, err := local.List(ctx, &config)
Expand All @@ -156,4 +157,22 @@ var _ = Describe("Slot synchronization", func() {
Expect(localSlotsAfter.Has(slot3)).To(BeTrue())
Expect(local.slotsDeleted).To(Equal(1))
})
It("can drop slots in local that hold xmin", func(ctx SpecContext) {
slotWithXmin := "_cnpg_xmin"
err := primary.Create(ctx, infrastructure.ReplicationSlot{SlotName: slotWithXmin})
Expect(err).ShouldNot(HaveOccurred())
local.slots[slotWithXmin] = fakeSlot{name: slotWithXmin, holdsXmin: true}
localSlotsBefore, err := local.List(ctx, &config)
Expect(err).ShouldNot(HaveOccurred())
Expect(localSlotsBefore.Has(slotWithXmin)).To(BeTrue())

err = synchronizeReplicationSlots(ctx, primary, local, localPodName, &config)
Expect(err).ShouldNot(HaveOccurred())

localSlotsAfter, err := local.List(ctx, &config)
Expect(err).ShouldNot(HaveOccurred())
Expect(localSlotsAfter.Has(slotWithXmin)).To(BeFalse())
Expect(localSlotsAfter.Items).Should(HaveLen(1))
Expect(local.slotsDeleted).To(Equal(2))
})
})

0 comments on commit 50aa7b0

Please sign in to comment.