Skip to content

Commit

Permalink
Ensure shell commands use a context
Browse files Browse the repository at this point in the history
  • Loading branch information
davissp14 committed Jul 8, 2024
1 parent eba2343 commit cd1bdb9
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 105 deletions.
2 changes: 1 addition & 1 deletion cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func deadMemberMonitorTick(ctx context.Context, node *flypg.Node, seenAt map[int
// TODO - Verify the exception that's getting thrown.
if time.Since(seenAt[voter.ID]) >= deadMemberRemovalThreshold {
log.Printf("Removing dead member: %s\n", voter.Hostname)
if err := node.RepMgr.UnregisterMember(voter); err != nil {
if err := node.RepMgr.UnregisterMember(ctx, voter); err != nil {
log.Printf("failed to unregister member %s: %v", voter.Hostname, err)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg_unregister/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func processUnregistration(ctx context.Context) error {
return fmt.Errorf("failed to resolve member: %s", err)
}

if err := node.RepMgr.UnregisterMember(*member); err != nil {
if err := node.RepMgr.UnregisterMember(ctx, *member); err != nil {
return fmt.Errorf("failed to unregister member: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions internal/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func handleReadonlyState(w http.ResponseWriter, _ *http.Request) {
renderJSON(w, res, http.StatusOK)
}

func handleHaproxyRestart(w http.ResponseWriter, _ *http.Request) {
if err := flypg.RestartHaproxy(); err != nil {
func handleHaproxyRestart(w http.ResponseWriter, r *http.Request) {
if err := flypg.RestartHaproxy(r.Context()); err != nil {
renderErr(w, err)
return
}
Expand Down
5 changes: 3 additions & 2 deletions internal/flypg/haproxy.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package flypg

import (
"context"
"fmt"

"github.com/fly-apps/postgres-flex/internal/utils"
)

func RestartHaproxy() error {
if _, err := utils.RunCommand("restart-haproxy", "root"); err != nil {
func RestartHaproxy(ctx context.Context) error {
if _, err := utils.RunCmd(ctx, "root", "restart-haproxy"); err != nil {
return fmt.Errorf("failed to restart haproxy: %s", err)
}

Expand Down
18 changes: 9 additions & 9 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to write pg password file: %s", err)
}

if err := n.PGConfig.initdb(); err != nil {
if err := n.PGConfig.initdb(ctx); err != nil {
return fmt.Errorf("failed to initialize postgres %s", err)
}
} else {
Expand All @@ -184,7 +184,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to resolve member over dns: %s", err)
}

if err := n.RepMgr.clonePrimary(cloneTarget.Hostname); err != nil {
if err := n.RepMgr.clonePrimary(ctx, cloneTarget.Hostname); err != nil {
// Clean-up the directory so it can be retried.
if rErr := os.Remove(n.DataDir); rErr != nil {
log.Printf("[ERROR] failed to cleanup postgresql dir after clone error: %s\n", rErr)
Expand All @@ -199,7 +199,7 @@ func (n *Node) Init(ctx context.Context) error {
return fmt.Errorf("failed to write pg password file: %s", err)
}

if err := n.PGConfig.initdb(); err != nil {
if err := n.PGConfig.initdb(ctx); err != nil {
return fmt.Errorf("failed to initialize postgres %s", err)
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Re-register primary to apply any configuration changes.
if err := n.RepMgr.registerPrimary(daemonRestartRequired); err != nil {
if err := n.RepMgr.registerPrimary(ctx, daemonRestartRequired); err != nil {
return fmt.Errorf("failed to re-register existing primary: %s", err)
}

Expand All @@ -309,7 +309,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}
case StandbyRoleName:
// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
if err := n.RepMgr.registerStandby(ctx, daemonRestartRequired); err != nil {
return fmt.Errorf("failed to register existing standby: %s", err)
}
case WitnessRoleName:
Expand All @@ -319,7 +319,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Register existing witness to apply any configuration changes.
if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(ctx, primary.Hostname); err != nil {
return fmt.Errorf("failed to register existing witness: %s", err)
}
default:
Expand Down Expand Up @@ -368,7 +368,7 @@ func (n *Node) PostInit(ctx context.Context) error {
}

// Register ourself as the primary
if err := n.RepMgr.registerPrimary(false); err != nil {
if err := n.RepMgr.registerPrimary(ctx, false); err != nil {
return fmt.Errorf("failed to register repmgr primary: %s", err)
}

Expand Down Expand Up @@ -401,12 +401,12 @@ func (n *Node) PostInit(ctx context.Context) error {
return fmt.Errorf("failed to resolve primary member: %s", err)
}

if err := n.RepMgr.registerWitness(primary.Hostname); err != nil {
if err := n.RepMgr.registerWitness(ctx, primary.Hostname); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}
} else {
log.Println("Registering standby")
if err := n.RepMgr.registerStandby(false); err != nil {
if err := n.RepMgr.registerStandby(ctx, false); err != nil {
return fmt.Errorf("failed to register new standby: %s", err)
}
}
Expand Down
22 changes: 19 additions & 3 deletions internal/flypg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,31 @@ func (c *PGConfig) RuntimeApply(ctx context.Context, conn *pgx.Conn) error {
return nil
}

func (c *PGConfig) initdb() error {
cmdStr := fmt.Sprintf("initdb --pgdata=%s --pwfile=%s", c.DataDir, c.passwordFilePath)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
func (c *PGConfig) initdb(ctx context.Context) error {
args := []string{
"--pgdata", c.DataDir,
"--pwfile", c.passwordFilePath,
}
if _, err := utils.RunCmd(ctx, "postgres", "initdb", args...); err != nil {
return fmt.Errorf("failed to init postgres: %s", err)
}

return nil
}

func (c *PGConfig) stopInstance(ctx context.Context) error {
args := []string{
"-D", c.DataDir,
"stop",
}

if _, err := utils.RunCmd(ctx, "postgres", "pg_ctl", args...); err != nil {
return fmt.Errorf("failed to stop postgres: %s", err)
}

return nil
}

func (c *PGConfig) isInitialized() bool {
_, err := os.Stat(c.DataDir)
return !os.IsNotExist(err)
Expand Down
162 changes: 108 additions & 54 deletions internal/flypg/repmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/rand"
"fmt"
"log"
"math"
"math/big"
"net"
Expand Down Expand Up @@ -219,103 +218,158 @@ func (r *RepMgr) resolveNodeID() (string, error) {
return nodeID, nil
}

func (r *RepMgr) registerPrimary(restartDaemon bool) error {
cmdStr := fmt.Sprintf("repmgr primary register -f %s -F", r.ConfigPath)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
func (r *RepMgr) registerPrimary(ctx context.Context, restartDaemon bool) error {
args := []string{
"primary",
"register",
"-f", r.ConfigPath,
"-F",
}

if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to register primary: %s", err)
}

if restartDaemon {
if err := r.restartDaemon(); err != nil {
if err := r.restartDaemon(ctx); err != nil {
return fmt.Errorf("failed to restart repmgr daemon: %s", err)
}
}

return nil
}

func (r *RepMgr) registerStandby(restartDaemon bool) error {
cmdStr := fmt.Sprintf("repmgr standby register -f %s -F", r.ConfigPath)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
func (r *RepMgr) registerStandby(ctx context.Context, restartDaemon bool) error {
args := []string{
"standby",
"register",
"-f", r.ConfigPath,
"-F",
}

if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to register standby: %s", err)
}

if restartDaemon {
if err := r.restartDaemon(); err != nil {
if err := r.restartDaemon(ctx); err != nil {
return fmt.Errorf("failed to restart repmgr daemon: %s", err)
}
}

return nil
}

func (r *RepMgr) registerWitness(primaryHostname string) error {
cmdStr := fmt.Sprintf("repmgr witness register -f %s -h %s -F", r.ConfigPath, primaryHostname)
_, err := utils.RunCommand(cmdStr, "postgres")
func (r *RepMgr) registerWitness(ctx context.Context, primaryHostname string) error {
args := []string{
"witness",
"register",
"-f", r.ConfigPath,
"-h", primaryHostname,
"-F",
}

if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to register witness: %s", err)
}

return err
return nil
}

func (r *RepMgr) unregisterPrimary(id int) error {
cmdStr := fmt.Sprintf("repmgr primary unregister -f %s --node-id=%d", r.ConfigPath, id)
_, err := utils.RunCommand(cmdStr, "postgres")
func (r *RepMgr) unregisterPrimary(ctx context.Context, id int) error {
args := []string{
"primary",
"unregister",
"-f", r.ConfigPath,
"--node-id", fmt.Sprint(id),
}

if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to unregister primary: %s", err)
}

return err
return nil
}

func (r *RepMgr) unregisterStandby(id int) error {
cmdStr := fmt.Sprintf("repmgr standby unregister -f %s --node-id=%d", r.ConfigPath, id)
_, err := utils.RunCommand(cmdStr, "postgres")
func (r *RepMgr) unregisterStandby(ctx context.Context, id int) error {
args := []string{
"standby",
"unregister",
"-f", r.ConfigPath,
"--node-id", fmt.Sprint(id),
}

return err
if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to unregister standby: %s", err)
}

return nil
}

func (*RepMgr) restartDaemon() error {
_, err := utils.RunCommand("restart-repmgrd", "postgres")
return err
func (*RepMgr) restartDaemon(ctx context.Context) error {
if _, err := utils.RunCmd(ctx, "postgres", "restart-repmgrd"); err != nil {
return fmt.Errorf("failed to restart repmgr daemon: %s", err)
}

return nil
}

func (r *RepMgr) daemonRestartRequired(m *Member) bool {
return m.Hostname != r.PrivateIP
}

func (r *RepMgr) unregisterWitness(id int) error {
cmdStr := fmt.Sprintf("repmgr witness unregister -f %s --node-id=%d", r.ConfigPath, id)
_, err := utils.RunCommand(cmdStr, "postgres")
func (r *RepMgr) unregisterWitness(ctx context.Context, id int) error {
args := []string{
"witness",
"unregister",
"-f", r.ConfigPath,
"--node-id", fmt.Sprint(id),
}

return err
if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to unregister witness: %s", err)
}

return nil
}

func (r *RepMgr) rejoinCluster(hostname string) error {
cmdStr := fmt.Sprintf("repmgr -f %s node rejoin -h %s -p %d -U %s -d %s --force-rewind --no-wait",
r.ConfigPath,
hostname,
r.Port,
r.Credentials.Username,
r.DatabaseName,
)
func (r *RepMgr) rejoinCluster(ctx context.Context, hostname string) error {
args := []string{
"node",
"rejoin",
"-f", r.ConfigPath,
"-h", hostname,
"-U", r.Credentials.Username,
"-d", r.DatabaseName,
"--force-rewind",
"--no-wait",
}

log.Println(cmdStr)
_, err := utils.RunCommand(cmdStr, "postgres")
if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to rejoin cluster: %s", err)
}

return err
return nil
}

func (r *RepMgr) clonePrimary(ipStr string) error {
cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
func (r *RepMgr) clonePrimary(ctx context.Context, ipStr string) error {
if err := os.MkdirAll(r.DataDir, 0700); err != nil {
return fmt.Errorf("failed to create pg directory: %s", err)
}

cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F",
ipStr,
r.Port,
r.DatabaseName,
r.Credentials.Username,
r.ConfigPath)
args := []string{
"-h", ipStr,
"-p", fmt.Sprint(r.Port),
"-d", r.DatabaseName,
"-U", r.Credentials.Username,
"-f", r.ConfigPath,
"standby",
"clone",
"-c",
"-F",
}

log.Println(cmdStr)
if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil {
if _, err := utils.RunCmd(ctx, "postgres", "repmgr", args...); err != nil {
return fmt.Errorf("failed to clone primary: %s", err)
}

Expand Down Expand Up @@ -483,18 +537,18 @@ func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error
return false, nil
}

func (r *RepMgr) UnregisterMember(member Member) error {
func (r *RepMgr) UnregisterMember(ctx context.Context, member Member) error {
switch member.Role {
case PrimaryRoleName:
if err := r.unregisterPrimary(member.ID); err != nil {
if err := r.unregisterPrimary(ctx, member.ID); err != nil {
return fmt.Errorf("failed to unregister member %d: %s", member.ID, err)
}
case StandbyRoleName:
if err := r.unregisterStandby(member.ID); err != nil {
if err := r.unregisterStandby(ctx, member.ID); err != nil {
return fmt.Errorf("failed to unregister standby %d: %s", member.ID, err)
}
case WitnessRoleName:
if err := r.unregisterWitness(member.ID); err != nil {
if err := r.unregisterWitness(ctx, member.ID); err != nil {
return fmt.Errorf("failed to unregister witness %d: %s", member.ID, err)
}
}
Expand Down
Loading

0 comments on commit cd1bdb9

Please sign in to comment.