Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions cmd/event_handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ func main() {
switch *event {
case "repmgrd_failover_promote", "standby_promote":
// TODO - Need to figure out what to do when success == 0.
consul, err := state.NewConsulClient()

cs, err := state.NewClusterState()
if err != nil {
fmt.Printf("failed to initialize consul client: %s", err)
fmt.Printf("failed initialize cluster state store. %v", err)
}

node, err := consul.Node(int32(*nodeID))
member, err := cs.FindMember(int32(*nodeID))
if err != nil {
fmt.Printf("failed to find node: %s", err)
fmt.Printf("failed to find member %v: %s", *nodeID, err)
}

if err := consul.RegisterPrimary(string(node.Value)); err != nil {
if err := cs.AssignPrimary(member.ID); err != nil {
fmt.Printf("failed to register primary with consul: %s", err)
}

Expand All @@ -49,28 +50,32 @@ func main() {
}

fmt.Println("Reconfiguring pgbouncer primary")
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), string(node.Value), true); err != nil {
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
}
case "standby_follow":
consul, err := state.NewConsulClient()
cs, err := state.NewClusterState()
if err != nil {
fmt.Printf("failed to initialize consul client: %s", err)
fmt.Printf("failed initialize cluster state store. %v", err)
}
newNodeID, err := strconv.Atoi(*newPrimary)

newMemberID, err := strconv.Atoi(*newPrimary)
if err != nil {
fmt.Printf("failed to parse new node id: %s", err)
fmt.Printf("failed to parse new member id: %s", err)
}
node, err := consul.Node(int32(newNodeID))

member, err := cs.FindMember(int32(newMemberID))
if err != nil {
fmt.Printf("failed to find node in consul: %s", err)
fmt.Printf("failed to find member in consul: %s", err)
}

flypgNode, err := flypg.NewNode()
if err != nil {
fmt.Printf("failed to reference node: %s\n", err)
fmt.Printf("failed to reference member: %s\n", err)
}

fmt.Println("Reconfiguring pgbouncer primary")
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), string(node.Value), true); err != nil {
if err := flypgNode.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err)
}
default:
Expand Down
16 changes: 14 additions & 2 deletions cmd/standby_cleaner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package main
import (
"context"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/flypg"
"os"
"time"

"github.com/fly-apps/postgres-flex/pkg/flypg"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
)

var Minute int64 = 60
Expand Down Expand Up @@ -47,12 +49,22 @@ func main() {
newConn, err := flypgNode.RepMgr.NewRemoteConnection(ctx, standby.Ip)
if err != nil {
if time.Now().Unix()-seenAt[standby.Id] >= 10*Minute {
err := flypgNode.RepMgr.UnregisterStandby(standby.Id)
cs, err := state.NewClusterState()
if err != nil {
fmt.Printf("failed initialize cluster state store. %v", err)
}

err = flypgNode.RepMgr.UnregisterStandby(standby.Id)
if err != nil {
fmt.Printf("Failed to unregister %d: %s", standby.Id, err)
continue
}
delete(seenAt, standby.Id)

// Remove from Consul
if err = cs.UnregisterMember(int32(standby.Id)); err != nil {
fmt.Printf("Failed to unregister %d from consul: %s", standby.Id, err)
}
}
} else {
seenAt[standby.Id] = time.Now().Unix()
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package api
import (
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/fly-apps/postgres-flex/pkg/flypg"
"github.com/fly-apps/postgres-flex/pkg/flypg/admin"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
"golang.org/x/exp/slices"
"net/http"
"strings"
)

func handleRole(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -118,7 +119,7 @@ func (s *Server) handleApplyConfig(w http.ResponseWriter, r *http.Request) {
}
defer close()

consul, err := state.NewConsulClient()
consul, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
Expand Down
11 changes: 6 additions & 5 deletions pkg/flypg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"bufio"
"encoding/json"
"fmt"
"github.com/fly-apps/postgres-flex/pkg/flypg/state"
"os"
"strings"

"github.com/fly-apps/postgres-flex/pkg/flypg/state"
)

type ConfigMap map[string]interface{}
Expand All @@ -20,7 +21,7 @@ type Config interface {
ConsulKey() string
}

func WriteUserConfig(c Config, consul *state.ConsulClient) error {
func WriteUserConfig(c Config, consul *state.Store) error {
if c.UserConfig() != nil {
if err := pushToConsul(c, consul); err != nil {
return fmt.Errorf("failed to write to consul: %s", err)
Expand All @@ -34,7 +35,7 @@ func WriteUserConfig(c Config, consul *state.ConsulClient) error {
return nil
}

func SyncUserConfig(c Config, consul *state.ConsulClient) error {
func SyncUserConfig(c Config, consul *state.Store) error {
cfg, err := pullFromConsul(c, consul)
if err != nil {
return fmt.Errorf("failed to pull config from consul: %s", err)
Expand All @@ -51,7 +52,7 @@ func SyncUserConfig(c Config, consul *state.ConsulClient) error {
return nil
}

func pushToConsul(c Config, consul *state.ConsulClient) error {
func pushToConsul(c Config, consul *state.Store) error {
if c.UserConfig() == nil {
return nil
}
Expand All @@ -68,7 +69,7 @@ func pushToConsul(c Config, consul *state.ConsulClient) error {
return nil
}

func pullFromConsul(c Config, consul *state.ConsulClient) (ConfigMap, error) {
func pullFromConsul(c Config, consul *state.Store) (ConfigMap, error) {
configBytes, err := consul.PullUserConfig(c.ConsulKey())
if err != nil {
return nil, err
Expand Down
67 changes: 33 additions & 34 deletions pkg/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (n *Node) Init(ctx context.Context) error {
return err
}

consul, err := state.NewConsulClient()
cs, err := state.NewClusterState()
if err != nil {
return fmt.Errorf("failed to establish connection with consul: %s", err)
return fmt.Errorf("failed initialize cluster state store. %v", err)
}

primaryIP, err := consul.CurrentPrimary()
primary, err := cs.PrimaryMember()
if err != nil {
return fmt.Errorf("failed to query current primary: %s", err)
}
Expand All @@ -133,7 +133,7 @@ func (n *Node) Init(ctx context.Context) error {
fmt.Printf("Failed to initialize repmgr: %s\n", err.Error())
}

err = SyncUserConfig(&repmgr, consul)
err = SyncUserConfig(&repmgr, cs.Store)
if err != nil {
fmt.Printf("Failed to sync user config from consul for repmgr: %s\n", err.Error())
}
Expand All @@ -148,7 +148,7 @@ func (n *Node) Init(ctx context.Context) error {
return err
}

err = SyncUserConfig(&pgbouncer, consul)
err = SyncUserConfig(&pgbouncer, cs.Store)
if err != nil {
fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error())
}
Expand All @@ -158,10 +158,8 @@ func (n *Node) Init(ctx context.Context) error {
fmt.Printf("Failed to write config files for pgbouncer: %s\n", err.Error())
}

switch primaryIP {
case n.PrivateIP:
// noop
case "":
switch {
case primary == nil:
// Initialize ourselves as the primary.
fmt.Println("Initializing postgres")
if err := n.initialize(); err != nil {
Expand All @@ -172,20 +170,22 @@ func (n *Node) Init(ctx context.Context) error {
if err := n.setDefaultHBA(); err != nil {
return fmt.Errorf("failed updating pg_hba.conf: %s", err)
}
case primary.Hostname == n.PrivateIP:
// noop
default:
// If we are here we are either a standby, new node or primary coming back from the dead.
clonePrimary := true
if n.isInitialized() {
// Attempt to resolve our role by querying the primary.
remoteConn, err := repmgr.NewRemoteConnection(ctx, primaryIP)
remoteConn, err := repmgr.NewRemoteConnection(ctx, primary.Hostname)
if err != nil {
return fmt.Errorf("failed to resolve my role according to the primary: %s", err)
}
defer remoteConn.Close(ctx)

role, err := repmgr.memberRoleByHostname(ctx, remoteConn, n.PrivateIP)
if err != nil {
return fmt.Errorf("failed to resolve role for %s: %s", primaryIP, err)
return fmt.Errorf("failed to resolve role for %s: %s", primary.Hostname, err)
}

fmt.Printf("My role is: %s\n", role)
Expand All @@ -196,7 +196,7 @@ func (n *Node) Init(ctx context.Context) error {

if clonePrimary {
fmt.Println("Cloning from primary")
if err := repmgr.clonePrimary(primaryIP); err != nil {
if err := repmgr.clonePrimary(primary.Hostname); err != nil {
return fmt.Errorf("failed to clone primary: %s", err)
}
}
Expand All @@ -205,7 +205,7 @@ func (n *Node) Init(ctx context.Context) error {
fmt.Println("Resolving PG configuration settings.")
PGConfig.Setup()

err = SyncUserConfig(PGConfig, consul)
err = SyncUserConfig(PGConfig, cs.Store)
if err != nil {
fmt.Printf("Failed to sync user config from consul for pgbouncer: %s\n", err.Error())
}
Expand All @@ -226,27 +226,21 @@ func (n *Node) PostInit(ctx context.Context) error {
}
defer conn.Close(ctx)

consul, err := state.NewConsulClient()
cs, err := state.NewClusterState()
if err != nil {
return fmt.Errorf("failed to establish connection with consul: %s", err)
return fmt.Errorf("failed initialize cluster state store. %v", err)
}

primaryIP, err := consul.CurrentPrimary()
primary, err := cs.PrimaryMember()
if err != nil {
return fmt.Errorf("failed to query current primary: %s", err)
}

repmgr := n.RepMgr
pgbouncer := n.PGBouncer

switch primaryIP {
case n.PrivateIP:
// Re-register the primary in order to pick up any changes made to the configuration file.
fmt.Println("Updating primary record")
if err := repmgr.registerPrimary(); err != nil {
fmt.Printf("failed to register primary with repmgr: %s", err)
}
case "":
switch {
case primary == nil:
// Check if we can be a primary
if !repmgr.eligiblePrimary() {
return fmt.Errorf("no primary to follow and can't configure self as primary because primary region is '%s' and we are in '%s'", os.Getenv("PRIMARY_REGION"), repmgr.Region)
Expand All @@ -258,17 +252,22 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Setup repmgr database, extension, and register ourselves as the primary
fmt.Println("Perform Repmgr setup")
fmt.Println("Performing Repmgr setup")
if err := repmgr.setup(ctx, conn); err != nil {
fmt.Printf("failed to setup repmgr: %s\n", err)
}

if err := consul.RegisterPrimary(n.PrivateIP); err != nil {
return fmt.Errorf("failed to register primary with consul: %s", err)
// Register primary member with consul
fmt.Println("Registering member")
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, true); err != nil {
return fmt.Errorf("failed to register member with consul: %s", err)
}

if err := consul.RegisterNode(repmgr.ID, n.PrivateIP); err != nil {
return fmt.Errorf("failed to register member with consul: %s", err)
case primary.Hostname == n.PrivateIP:
// Re-register the primary in order to pick up any changes made to the configuration file.
fmt.Println("Updating primary record")
if err := repmgr.registerPrimary(); err != nil {
fmt.Printf("failed to register primary with repmgr: %s", err)
}
default:
// If we are here we are a new node, standby or a demoted primary who needs to be reconfigured as a standby.
Expand Down Expand Up @@ -301,19 +300,19 @@ func (n *Node) PostInit(ctx context.Context) error {
fmt.Printf("failed to register standby: %s\n", err)
}

fmt.Println("Registering Node with Consul")
if err := consul.RegisterNode(repmgr.ID, n.PrivateIP); err != nil {
// Register member with consul if it hasn't been already
if err := cs.RegisterMember(repmgr.ID, n.PrivateIP, repmgr.Region, false); err != nil {
return fmt.Errorf("failed to register member with consul: %s", err)
}
}

// Requery the primaryIP in case a new primary was assigned above.
primaryIP, err = consul.CurrentPrimary()
// Requery the primaryIP from consul in case the primary was assigned above.
primary, err = cs.PrimaryMember()
if err != nil {
return fmt.Errorf("failed to query current primary: %s", err)
}

if err := pgbouncer.ConfigurePrimary(ctx, primaryIP, true); err != nil {
if err := pgbouncer.ConfigurePrimary(ctx, primary.Hostname, true); err != nil {
return fmt.Errorf("failed to configure pgbouncer's primary: %s", err)
}

Expand Down
Loading