Skip to content

Commit

Permalink
chore: prevents to bootstrapping a new cluster with a non-empty objec…
Browse files Browse the repository at this point in the history
…t store

The idea behind this patch it's to prevent any cluster boostrapping
if case the target ObjectStore it's already used by another backup
or it's not empty at all, thuse, we can prevent the overwrite of
clusters backups and misunderstanding in the PostgreSQL timeline
when trying to recover a pod.

Closes #61

Signed-off-by: Jonathan Gonzalez V <jonathan.gonzalez@enterprisedb.com>
  • Loading branch information
sxd authored and mnencia committed May 10, 2022
1 parent f991808 commit be48d40
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 87 deletions.
16 changes: 15 additions & 1 deletion api/v1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,15 @@ func (cluster *Cluster) IsPodMonitorEnabled() bool {
return false
}

// IsBarmanBackupConfigured returns true if one of the possible backup destination
// is configured, false otherwise
func (backupConfiguration *BackupConfiguration) IsBarmanBackupConfigured() bool {
return backupConfiguration != nil && backupConfiguration.BarmanObjectStore != nil &&
(backupConfiguration.BarmanObjectStore.AzureCredentials != nil ||
backupConfiguration.BarmanObjectStore.S3Credentials != nil ||
backupConfiguration.BarmanObjectStore.GoogleCredentials != nil)
}

// IsBarmanEndpointCASet returns true if we have a CA bundle for the endpoint
// false otherwise
func (backupConfiguration *BackupConfiguration) IsBarmanEndpointCASet() bool {
Expand All @@ -1621,8 +1630,13 @@ func (backupConfiguration *BackupConfiguration) IsBarmanEndpointCASet() bool {
// BuildPostgresOptions create the list of options that
// should be added to the PostgreSQL configuration to
// recover given a certain target
func (target RecoveryTarget) BuildPostgresOptions() string {
func (target *RecoveryTarget) BuildPostgresOptions() string {
result := ""

if target == nil {
return result
}

if target.TargetTLI != "" {
result += fmt.Sprintf(
"recovery_target_timeline = '%v'\n",
Expand Down
6 changes: 3 additions & 3 deletions controllers/cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,11 +877,11 @@ func (r *ClusterReconciler) createPrimaryInstance(

if cluster.Status.LatestGeneratedNode != 0 {
// We are we creating a new blank primary when we had previously generated
// other nodes and we don't have any PVC to reuse?
// other nodes, and we don't have any PVC to reuse?
// This can happen when:
//
// 1 - the user deletes all the PVCs and all the Pods in a cluster
// (and why would an user do that?)
// (and why would a user do that?)
// 2 - the cache isn't ready for Pods and ready for the Cluster,
// so we actually haven't the first pod in our managed list
// but it's still in the API Server
Expand All @@ -906,7 +906,7 @@ func (r *ClusterReconciler) createPrimaryInstance(
if err == specs.ErrorInvalidSize {
// This error should have been caught by the validating
// webhook, but since we are here the user must have disabled server-side
// validation and we must react.
// validation, and we must react.
contextLogger.Info("The size specified for the cluster is not valid",
"size",
cluster.Spec.StorageConfiguration.Size)
Expand Down
10 changes: 3 additions & 7 deletions internal/cmd/manager/instance/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func NewCmd() *cobra.Command {
var clusterName string
var namespace string
var pgData string
var recoveryTarget string

cmd := &cobra.Command{
Use: "restore [flags]",
Expand All @@ -41,10 +40,9 @@ func NewCmd() *cobra.Command {
ctx := context.Background()

info := postgres.InitInfo{
ClusterName: clusterName,
Namespace: namespace,
PgData: pgData,
RecoveryTarget: recoveryTarget,
ClusterName: clusterName,
Namespace: namespace,
PgData: pgData,
}

return restoreSubCommand(ctx, info)
Expand All @@ -56,8 +54,6 @@ func NewCmd() *cobra.Command {
cmd.Flags().StringVar(&namespace, "namespace", os.Getenv("NAMESPACE"), "The namespace of "+
"the cluster and the Pod in k8s")
cmd.Flags().StringVar(&pgData, "pg-data", os.Getenv("PGDATA"), "The PGDATA to be created")
cmd.Flags().StringVar(&recoveryTarget, "target", "", "The recovery target in the form of "+
"PostgreSQL options")

return cmd
}
Expand Down
83 changes: 35 additions & 48 deletions internal/cmd/manager/walarchive/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,25 +162,8 @@ func run(ctx context.Context, podName string, args []string, client client.WithW
// Step 3: gather the WAL files names to archive
walFilesList := gatherWALFilesToArchive(ctx, walName, maxParallel)

checkWalOptions, err := barmanCloudCheckWalArchiveOptions(cluster, cluster.Name)
if err != nil {
log.Error(err, "while getting barman-cloud-wal-archive options")
if errCond := manager.UpdateCondition(ctx, client,
cluster, buildArchiveCondition(err)); errCond != nil {
log.Error(errCond, "Error status.UpdateCondition()")
}
return err
}

// Step 4: Check if the archive location is safe to perform archiving
// This will output no error if we're not in the timeline 1 and archiving the wal file 1
if err := walArchiver.CheckWalArchive(ctx, walFilesList, checkWalOptions); err != nil {
log.Error(err, "while barman-cloud-check-wal-archive")
// Update the condition if needed.
if errCond := manager.UpdateCondition(ctx, client,
cluster, buildArchiveCondition(err)); errCond != nil {
log.Error(errCond, "Error status.UpdateCondition()")
}
if err := checkWalArchive(ctx, cluster, walArchiver, client, walFilesList); err != nil {
return err
}

Expand Down Expand Up @@ -358,36 +341,6 @@ func barmanCloudWalArchiveOptions(
return options, nil
}

func barmanCloudCheckWalArchiveOptions(
cluster *apiv1.Cluster,
clusterName string,
) ([]string, error) {
configuration := cluster.Spec.Backup.BarmanObjectStore

var options []string
if len(configuration.EndpointURL) > 0 {
options = append(
options,
"--endpoint-url",
configuration.EndpointURL)
}

options, err := barman.AppendCloudProviderOptionsFromConfiguration(options, configuration)
if err != nil {
return nil, err
}

serverName := clusterName
if len(configuration.ServerName) != 0 {
serverName = configuration.ServerName
}
options = append(
options,
configuration.DestinationPath,
serverName)
return options, nil
}

func buildArchiveCondition(err error) *apiv1.ClusterCondition {
if err != nil {
return &apiv1.ClusterCondition{
Expand All @@ -404,3 +357,37 @@ func buildArchiveCondition(err error) *apiv1.ClusterCondition {
Message: "",
}
}

func checkWalArchive(ctx context.Context,
cluster *apiv1.Cluster,
walArchiver *archiver.WALArchiver,
client client.WithWatch,
walFilesList []string,
) error {
checkWalOptions, err := walArchiver.BarmanCloudCheckWalArchiveOptions(cluster, cluster.Name)
if err != nil {
log.Error(err, "while getting barman-cloud-wal-archive options")
if errCond := manager.UpdateCondition(ctx, client,
cluster, buildArchiveCondition(err)); errCond != nil {
log.Error(errCond, "Error status.UpdateCondition()")
}
return err
}

firstWalFile := walArchiver.CheckWalFiles(ctx, walFilesList)
if !firstWalFile {
return nil
}

if err := walArchiver.CheckWalArchiveDestination(ctx, checkWalOptions); err != nil {
log.Error(err, "while barman-cloud-check-wal-archive")
// Update the condition if needed.
if errCond := manager.UpdateCondition(ctx, client,
cluster, buildArchiveCondition(err)); errCond != nil {
log.Error(errCond, "Error status.UpdateCondition()")
}
return err
}

return nil
}
65 changes: 50 additions & 15 deletions pkg/management/barman/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/cloudnative-pg/cloudnative-pg/pkg/management/barman"

apiv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1"
barmanCapabilities "github.com/cloudnative-pg/cloudnative-pg/pkg/management/barman/capabilities"
"github.com/cloudnative-pg/cloudnative-pg/pkg/management/barman/spool"
Expand Down Expand Up @@ -182,28 +184,31 @@ func (archiver *WALArchiver) Archive(walName string, baseOptions []string) error
return nil
}

// CheckWalArchive checks if the destinationObjectStore is ready perform archiving.
// Based on this ticket in Barman https://github.com/EnterpriseDB/barman/issues/432
// and its implementation https://github.com/EnterpriseDB/barman/pull/443
// The idea here is to check ONLY if we're archiving the wal files for the first time in the bucket
// since in this case the command barman-cloud-check-wal-archive will fail if the bucket exist and
// contain wal files inside
func (archiver *WALArchiver) CheckWalArchive(ctx context.Context, walFilesList, options []string) error {
// CheckWalFiles check a list of WAL files looking for the first WAL file of the first Timeline
// return true if the first file in the list it's the fir WAL file
func (archiver *WALArchiver) CheckWalFiles(ctx context.Context, walFilesList []string) bool {
contextLogger := log.FromContext(ctx)

// If walFileList is empty then, this is a no-op just like the method ArchiveList
if len(walFilesList) == 0 {
return nil
contextLogger.Debug("WAL file list is empty, skipping check")
return false
}

// Get the first wal file from the list
walName := path.Base(walFilesList[0])
// We check that we have the first wal file of the first timeline, otherwise, there's nothing to do here
if walName != "000000010000000000000001" {
return nil
}
return walName == "000000010000000000000001"
}

contextLogger.Info("barman-cloud-check-wal-archive checking the first wal", "walName", walName)
// CheckWalArchiveDestination checks if the destinationObjectStore is ready perform archiving.
// Based on this ticket in Barman https://github.com/EnterpriseDB/barman/issues/432
// and its implementation https://github.com/EnterpriseDB/barman/pull/443
// The idea here is to check ONLY if we're archiving the wal files for the first time in the bucket
// since in this case the command barman-cloud-check-wal-archive will fail if the bucket exist and
// contain wal files inside
func (archiver *WALArchiver) CheckWalArchiveDestination(ctx context.Context, options []string) error {
contextLogger := log.FromContext(ctx)
contextLogger.Info("barman-cloud-check-wal-archive checking the first wal")

// Check barman compatibility
capabilities, err := barmanCapabilities.CurrentCapabilities()
Expand All @@ -218,7 +223,6 @@ func (archiver *WALArchiver) CheckWalArchive(ctx context.Context, walFilesList,
}

contextLogger.Trace("Executing "+barmanCapabilities.BarmanCloudCheckWalArchive,
"walName", walName,
"currentPrimary", archiver.cluster.Status.CurrentPrimary,
"targetPrimary", archiver.cluster.Status.TargetPrimary,
"options", options,
Expand All @@ -230,7 +234,6 @@ func (archiver *WALArchiver) CheckWalArchive(ctx context.Context, walFilesList,
err = execlog.RunStreaming(barmanCloudWalArchiveCmd, barmanCapabilities.BarmanCloudCheckWalArchive)
if err != nil {
contextLogger.Error(err, "Error invoking "+barmanCapabilities.BarmanCloudCheckWalArchive,
"walName", walName,
"currentPrimary", archiver.cluster.Status.CurrentPrimary,
"targetPrimary", archiver.cluster.Status.TargetPrimary,
"options", options,
Expand All @@ -243,3 +246,35 @@ func (archiver *WALArchiver) CheckWalArchive(ctx context.Context, walFilesList,

return nil
}

// BarmanCloudCheckWalArchiveOptions create the options needed for the `barman-cloud-check-wal-archive`
// command.
func (archiver *WALArchiver) BarmanCloudCheckWalArchiveOptions(
cluster *apiv1.Cluster,
clusterName string,
) ([]string, error) {
configuration := cluster.Spec.Backup.BarmanObjectStore

var options []string
if len(configuration.EndpointURL) > 0 {
options = append(
options,
"--endpoint-url",
configuration.EndpointURL)
}

options, err := barman.AppendCloudProviderOptionsFromConfiguration(options, configuration)
if err != nil {
return nil, err
}

serverName := clusterName
if len(configuration.ServerName) != 0 {
serverName = configuration.ServerName
}
options = append(
options,
configuration.DestinationPath,
serverName)
return options, nil
}
4 changes: 0 additions & 4 deletions pkg/management/postgres/initdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ type InitInfo struct {
// database just after having configured a new instance
PostInitTemplateSQL []string

// The recovery target options, only applicable for the
// recovery bootstrap type
RecoveryTarget string

// Whether it is a temporary instance that will never contain real data.
Temporary bool
}
Expand Down

0 comments on commit be48d40

Please sign in to comment.