diff --git a/config/samples/_v1alpha1_hotbackup_agent.yaml b/config/samples/_v1alpha1_hotbackup_agent.yaml index 65e1b57a7d2..824dfb843e9 100644 --- a/config/samples/_v1alpha1_hotbackup_agent.yaml +++ b/config/samples/_v1alpha1_hotbackup_agent.yaml @@ -4,11 +4,11 @@ metadata: name: hot-backup spec: hazelcastResourceName: hazelcast - bucketURI: "s3://operator-backup" + bucketURI: "s3://operator-e2e-external-backup" secret: "br-secret-s3" # bucketURI: "gs://operator-agent-backup" # secret: "br-secret-gcp" # bucketURI: "azblob://backup" -# secret: "br-secret-az" \ No newline at end of file +# secret: "br-secret-az" diff --git a/controllers/hazelcast/br_agent_rest_client.go b/controllers/hazelcast/br_agent_rest_client.go deleted file mode 100644 index 84b636ae992..00000000000 --- a/controllers/hazelcast/br_agent_rest_client.go +++ /dev/null @@ -1,96 +0,0 @@ -package hazelcast - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - "time" - - "github.com/hazelcast/hazelcast-platform-operator/api/v1alpha1" - n "github.com/hazelcast/hazelcast-platform-operator/internal/naming" -) - -// Section contains the REST API endpoints. -const ( - uploadBackup = "/upload" -) - -type uploadRequest struct { - BucketURL string `json:"bucket_url"` - BackupFolderPath string `json:"backup_folder_path"` - HazelcastCRName string `json:"hz_cr_name"` - SecretName string `json:"secret_name"` - MemberUUID string `json:"member_uuid"` -} - -type AgentRestClient struct { - h *v1alpha1.Hazelcast - bucketURL string - backupFolderPath string - hazelcastCRName string - secretName string -} - -func NewAgentRestClient(h *v1alpha1.Hazelcast, hb *v1alpha1.HotBackup) *AgentRestClient { - return &AgentRestClient{ - h: h, - bucketURL: hb.Spec.BucketURI, - backupFolderPath: h.Spec.Persistence.BaseDir, - hazelcastCRName: hb.Spec.HazelcastResourceName, - secretName: hb.Spec.Secret, - } -} - -func (ac *AgentRestClient) UploadBackup(ctx context.Context) error { - ctxT, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - for _, member := range ac.h.Status.Members { - address := fmt.Sprintf("%s:%d", member.Ip, n.DefaultAgentPort) - upload := uploadRequest{ - BucketURL: ac.bucketURL, - BackupFolderPath: ac.backupFolderPath + "/hot-backup", - HazelcastCRName: ac.hazelcastCRName, - SecretName: ac.secretName, - MemberUUID: member.Uid, - } - reqBody, err := json.Marshal(upload) - if err != nil { - return err - } - req, err := postRequestWithBody(ctxT, reqBody, address, uploadBackup) - if err != nil { - return fmt.Errorf("request creation failed: %s, address --> %q , URL --> %q ", err, address, address+uploadBackup) - } - res, err := ac.executeRequest(req) - if err != nil { - return err - } - defer res.Body.Close() - } - return nil -} - -func (ac *AgentRestClient) executeRequest(req *http.Request) (*http.Response, error) { - res, err := http.DefaultClient.Do(req) - if err != nil { - return res, err - } - if res.StatusCode < http.StatusOK || res.StatusCode >= http.StatusBadRequest { - buf := new(strings.Builder) - _, _ = io.Copy(buf, res.Body) - return res, fmt.Errorf("unexpected HTTP error: %s, %s", res.Status, buf.String()) - } - return res, nil -} - -func postRequestWithBody(ctx context.Context, body []byte, address string, endpoint string) (*http.Request, error) { - req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s%s", address, endpoint), bytes.NewBuffer(body)) - if err != nil { - return nil, err - } - return req, nil -} diff --git a/controllers/hazelcast/hot_backup_controller.go b/controllers/hazelcast/hot_backup_controller.go index 166c01de17b..960aa63470d 100644 --- a/controllers/hazelcast/hot_backup_controller.go +++ b/controllers/hazelcast/hot_backup_controller.go @@ -6,14 +6,13 @@ import ( "errors" "fmt" "sync" - "time" - - "github.com/hazelcast/hazelcast-platform-operator/controllers/hazelcast/validation" "github.com/go-logr/logr" "github.com/robfig/cron/v3" + "golang.org/x/sync/errgroup" apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -21,6 +20,7 @@ import ( hazelcastv1alpha1 "github.com/hazelcast/hazelcast-platform-operator/api/v1alpha1" n "github.com/hazelcast/hazelcast-platform-operator/internal/naming" + "github.com/hazelcast/hazelcast-platform-operator/internal/upload" "github.com/hazelcast/hazelcast-platform-operator/internal/util" ) @@ -29,7 +29,6 @@ type HotBackupReconciler struct { Log logr.Logger scheduled sync.Map cron *cron.Cron - statuses sync.Map } func NewHotBackupReconciler(c client.Client, log logr.Logger) *HotBackupReconciler { @@ -49,196 +48,95 @@ func NewHotBackupReconciler(c client.Client, log logr.Logger) *HotBackupReconcil // ClusterRole related to Reconcile() //+kubebuilder:rbac:groups="rbac.authorization.k8s.io",resources=clusterroles;clusterrolebindings,verbs=get;list;watch;create;update;patch;delete -func (r *HotBackupReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { +func (r *HotBackupReconciler) Reconcile(ctx context.Context, req reconcile.Request) (result reconcile.Result, err error) { logger := r.Log.WithValues("hazelcast-hot-backup", req.NamespacedName) hb := &hazelcastv1alpha1.HotBackup{} - err := r.Client.Get(ctx, req.NamespacedName, hb) + err = r.Client.Get(ctx, req.NamespacedName, hb) if err != nil { if apiErrors.IsNotFound(err) { logger.Info("HotBackup resource not found. Ignoring since object must be deleted") - return ctrl.Result{}, nil + return result, nil } logger.Error(err, "Failed to get HotBackup") - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(err)) + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(err)) } err = r.addFinalizer(ctx, hb, logger) if err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(err)) + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(err)) } //Check if the HotBackup CR is marked to be deleted if hb.GetDeletionTimestamp() != nil { err = r.executeFinalizer(ctx, hb, logger) if err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(err)) + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(err)) } logger.V(util.DebugLevel).Info("Finalizer's pre-delete function executed successfully and the finalizer removed from custom resource", "Name:", n.Finalizer) - return ctrl.Result{}, nil + return } if hb.Status.State.IsRunning() { logger.Info("HotBackup is already running.", "name", hb.Name, "namespace", hb.Namespace, "state", hb.Status.State) - return ctrl.Result{}, nil + return } hs, err := json.Marshal(hb.Spec) if err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(fmt.Errorf("error marshaling Hot Backup as JSON: %w", err))) + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(fmt.Errorf("error marshaling Hot Backup as JSON: %w", err))) } if s, ok := hb.ObjectMeta.Annotations[n.LastSuccessfulSpecAnnotation]; ok && s == string(hs) { logger.Info("HotBackup was already applied.", "name", hb.Name, "namespace", hb.Namespace) - return reconcile.Result{}, nil + return } + hazelcastName := types.NamespacedName{Namespace: req.Namespace, Name: hb.Spec.HazelcastResourceName} + h := &hazelcastv1alpha1.Hazelcast{} - err = r.Client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: hb.Spec.HazelcastResourceName}, h) + err = r.Client.Get(ctx, hazelcastName, h) if err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(fmt.Errorf("could not trigger Hot Backup: Hazelcast resource not found: %w", err))) + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(fmt.Errorf("could not trigger Hot Backup: Hazelcast resource not found: %w", err))) } if h.Status.Phase != hazelcastv1alpha1.Running { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(apiErrors.NewServiceUnavailable("Hazelcast CR is not ready"))) - } - - rest := NewRestClient(h) - - if hb.Spec.Schedule != "" { - entry, err := r.cron.AddFunc(hb.Spec.Schedule, func() { - logger.Info("Triggering scheduled HotBackup process.", "Schedule", hb.Spec.Schedule) - err := r.triggerHotBackup(ctx, req, rest, logger) - if err != nil { - logger.Error(err, "Hot Backups process failed") - } - r.reconcileHotBackupStatus(ctx, hb, h) - }) - if err != nil { - logger.Error(err, "Error creating new Schedule Hot Restart.") - } - logger.V(util.DebugLevel).Info("Adding cron Job.", "EntryId", entry) - oldV, loaded := r.scheduled.LoadOrStore(req.NamespacedName, entry) - if loaded { - r.cron.Remove(oldV.(cron.EntryID)) - r.scheduled.Store(req.NamespacedName, entry) - } - r.cron.Start() - } else { - r.removeSchedule(req.NamespacedName, logger) - err = r.triggerHotBackup(ctx, req, rest, logger) - if err != nil { - _ = r.Client.Get(ctx, req.NamespacedName, hb) - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(err)) - } - - r.reconcileHotBackupStatus(ctx, hb, h) - } - - if h.Spec.Persistence.IsExternal() { - if err := validation.ValidateHotBackupSpec(hb); err != nil { - return ctrl.Result{}, err - } - agentRest := NewAgentRestClient(h, hb) - err = r.triggerUploadBackup(ctx, hb, agentRest, logger) - if err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(fmt.Errorf("error while uploading the backup: %w", err))) - } - if err = r.setHotBackupStatus(ctx, hb, hazelcastv1alpha1.HotBackupSuccess); err != nil { - return updateHotBackupStatus(ctx, r.Client, hb, failedHbStatus(fmt.Errorf("error while updating state: %w", err))) - } + return r.updateStatus(ctx, req.NamespacedName, failedHbStatus(apiErrors.NewServiceUnavailable("Hazelcast CR is not ready"))) } - err = r.updateLastSuccessfulConfiguration(ctx, hb, logger) + err = r.updateLastSuccessfulConfiguration(ctx, req.NamespacedName, logger) if err != nil { logger.Info("Could not save the current successful spec as annotation to the custom resource") - } - - return ctrl.Result{}, nil -} - -func (r *HotBackupReconciler) reconcileHotBackupStatus(ctx context.Context, hb *hazelcastv1alpha1.HotBackup, hz *hazelcastv1alpha1.Hazelcast) { - hzClient, ok := GetClient(types.NamespacedName{Name: hb.Spec.HazelcastResourceName, Namespace: hb.Namespace}) - if !ok { return } - t := &StatusTicker{ - ticker: time.NewTicker(2 * time.Second), - done: make(chan bool), - } - r.statuses.Store(types.NamespacedName{Namespace: hb.Namespace, Name: hb.Name}, t) - go func(ctx context.Context, s *StatusTicker) { - for { - select { - case <-s.done: - return - case <-s.ticker.C: - r.updateHotBackupStatus(hzClient, ctx, hb, hz) - } - } - }(ctx, t) -} -func (r *HotBackupReconciler) updateHotBackupStatus(hzClient *Client, ctx context.Context, h *hazelcastv1alpha1.HotBackup, hz *hazelcastv1alpha1.Hazelcast) { - currentState := hazelcastv1alpha1.HotBackupUnknown - for uuid := range hzClient.Status.MemberMap { - state := hzClient.getTimedMemberState(ctx, uuid) - if state == nil { - continue - } - r.Log.V(util.DebugLevel).Info("Received HotBackup state for member.", "HotRestartState", state) - currentState = hotBackupState(state.TimedMemberState.MemberState.HotRestartState, currentState, hz) + logger.Info("Ready to start backup") + if hb.Spec.Schedule != "" { + logger.Info("Adding backup to schedule") + r.scheduleBackup(context.Background(), hb.Spec.Schedule, req.NamespacedName, hazelcastName, logger) + } else { + r.removeSchedule(req.NamespacedName, logger) + go r.startBackup(context.Background(), req.NamespacedName, hazelcastName, logger) } - if err := r.setHotBackupStatus(ctx, h, currentState); err != nil { - return - } - if currentState.IsFinished() { - r.Log.Info("HotBackup task finished.", "state", currentState) - namespacedName := types.NamespacedName{Name: h.Name, Namespace: h.Namespace} - if s, ok := r.statuses.LoadAndDelete(namespacedName); ok { - s.(*StatusTicker).stop() - } - } + return } -func (r *HotBackupReconciler) setHotBackupStatus(ctx context.Context, h *hazelcastv1alpha1.HotBackup, state hazelcastv1alpha1.HotBackupState) error { - hb := &hazelcastv1alpha1.HotBackup{} - namespacedName := types.NamespacedName{Name: h.Name, Namespace: h.Namespace} - err := r.Client.Get(ctx, namespacedName, hb) - if err != nil { - if apiErrors.IsNotFound(err) { - r.Log.Info("HotBackup resource not found. Ignoring since object must be deleted") +func (r *HotBackupReconciler) updateLastSuccessfulConfiguration(ctx context.Context, name types.NamespacedName, logger logr.Logger) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Always fetch the new version of the resource + hb := &hazelcastv1alpha1.HotBackup{} + if err := r.Client.Get(ctx, name, hb); err != nil { return err } - r.Log.Error(err, "Failed to get HotBackup") - return err - } - _, err = updateHotBackupStatus(ctx, r.Client, hb, hbWithStatus(state)) - if err != nil { - r.Log.Error(err, "Could not update HotBackup status") - return err - } - return nil -} - -func (r *HotBackupReconciler) updateLastSuccessfulConfiguration(ctx context.Context, hb *hazelcastv1alpha1.HotBackup, logger logr.Logger) error { - hs, err := json.Marshal(hb.Spec) - if err != nil { - return err - } - - opResult, err := util.CreateOrUpdate(ctx, r.Client, hb, func() error { - if hb.ObjectMeta.Annotations == nil { - ans := map[string]string{} - hb.ObjectMeta.Annotations = ans + hs, err := json.Marshal(hb.Spec) + if err != nil { + return err } - hb.ObjectMeta.Annotations[n.LastSuccessfulSpecAnnotation] = string(hs) - return nil + if hb.ObjectMeta.Annotations != nil { + hb.ObjectMeta.Annotations[n.LastSuccessfulSpecAnnotation] = string(hs) + } + return r.Client.Update(ctx, hb) }) - if opResult != controllerutil.OperationResultNone { - logger.Info("Operation result", "Hazelcast Annotation", hb.Name, "result", opResult) - } - return err } func (r *HotBackupReconciler) addFinalizer(ctx context.Context, hb *hazelcastv1alpha1.HotBackup, logger logr.Logger) error { @@ -257,16 +155,11 @@ func (r *HotBackupReconciler) executeFinalizer(ctx context.Context, hb *hazelcas if !controllerutil.ContainsFinalizer(hb, n.Finalizer) { return nil } - key := types.NamespacedName{ Name: hb.Name, Namespace: hb.Namespace, } r.removeSchedule(key, logger) - if s, ok := r.statuses.LoadAndDelete(key); ok { - logger.V(util.DebugLevel).Info("Stopping status ticker for HotBackup.", "CR", key) - s.(*StatusTicker).stop() - } controllerutil.RemoveFinalizer(hb, n.Finalizer) err := r.Update(ctx, hb) if err != nil { @@ -282,60 +175,155 @@ func (r *HotBackupReconciler) removeSchedule(key types.NamespacedName, logger lo } } -func (r *HotBackupReconciler) triggerHotBackup(ctx context.Context, req reconcile.Request, rest *RestClient, logger logr.Logger) error { - hb := &hazelcastv1alpha1.HotBackup{} - err := r.Get(ctx, req.NamespacedName, hb) +func (r *HotBackupReconciler) updateStatus(ctx context.Context, name types.NamespacedName, options hotBackupOptionsBuilder) (ctrl.Result, error) { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Always fetch the new version of the resource + hb := &hazelcastv1alpha1.HotBackup{} + if err := r.Get(ctx, name, hb); err != nil { + return err + } + hb.Status.State = options.status + hb.Status.Message = options.message + return r.Status().Update(ctx, hb) + }) + + if options.status == hazelcastv1alpha1.HotBackupFailure { + return ctrl.Result{}, options.err + } + return ctrl.Result{}, err +} + +func (r *HotBackupReconciler) scheduleBackup(ctx context.Context, schedule string, backupName types.NamespacedName, hazelcastName types.NamespacedName, logger logr.Logger) { + entry, err := r.cron.AddFunc(schedule, func() { + r.startBackup(ctx, backupName, hazelcastName, logger) + }) if err != nil { - return err + logger.Error(err, "Error creating new Schedule Hot Restart.") } - if !hb.Status.State.IsRunning() { - _, _ = updateHotBackupStatus(ctx, r.Client, hb, pendingHbStatus()) + if old, loaded := r.scheduled.LoadOrStore(backupName, entry); loaded { + r.cron.Remove(old.(cron.EntryID)) + r.scheduled.Store(backupName, entry) } + r.cron.Start() +} + +var ( + errBackupClientNotFound = errors.New("Client not found for hot backup CR") + errBackupClientNoMembers = errors.New("Client couldnt connect to members") +) + +func (r *HotBackupReconciler) startBackup(ctx context.Context, backupName types.NamespacedName, hazelcastName types.NamespacedName, logger logr.Logger) (ctrl.Result, error) { + logger.Info("Starting backup") + defer logger.Info("Finished backup") - err = rest.ChangeState(ctx, Passive) + // Change state to In Progress + _, err := r.updateStatus(ctx, backupName, hbWithStatus(hazelcastv1alpha1.HotBackupInProgress)) if err != nil { - return fmt.Errorf("error creating HotBackup. Could not change the cluster state to PASSIVE: %w", err) + // setting status failed so this most likely will fail too + return r.updateStatus(ctx, backupName, failedHbStatus(err)) } - defer func(rest *RestClient) { - e := rest.ChangeState(ctx, Active) - if e != nil { - logger.Error(e, "Could not change the cluster state to ACTIVE") - } - }(rest) - err = rest.HotBackup(ctx) - if err != nil { - return fmt.Errorf("error creating HotBackup: %w", err) + + // Get latest version as this may be running in cron + hz := &hazelcastv1alpha1.Hazelcast{} + if err := r.Get(ctx, hazelcastName, hz); err != nil { + logger.Error(err, "Get latest hazelcast CR failed") + return r.updateStatus(ctx, backupName, failedHbStatus(err)) } - return nil -} -func (r *HotBackupReconciler) triggerUploadBackup(ctx context.Context, h *hazelcastv1alpha1.HotBackup, agentRest *AgentRestClient, logger logr.Logger) error { - for { - hb := &hazelcastv1alpha1.HotBackup{} - namespacedName := types.NamespacedName{Name: h.Name, Namespace: h.Namespace} - err := r.Client.Get(ctx, namespacedName, hb) - if err != nil { - if apiErrors.IsNotFound(err) { - logger.Info("HotBackup resource not found. Ignoring since object must be deleted") + logger.Info("Trigger cluster wide backup") + if err := r.hotBackup(ctx, hz); err != nil { + logger.Error(err, "Cluster backup failed") + return r.updateStatus(ctx, backupName, failedHbStatus(err)) + } + + c, ok := GetClient(hazelcastName) + if !ok { + return r.updateStatus(ctx, backupName, failedHbStatus(errBackupClientNotFound)) + } + c.updateMembers(context.TODO()) + + if c.Status == nil { + return r.updateStatus(ctx, backupName, failedHbStatus(errBackupClientNoMembers)) + } + + if len(c.Status.MemberMap) == 0 { + return r.updateStatus(ctx, backupName, failedHbStatus(errBackupClientNoMembers)) + } + + g, groupCtx := errgroup.WithContext(ctx) + + // for each member monitor and upload backup if needed + for uuid, member := range c.Status.MemberMap { + uuid, memberAddress := uuid, member.Address + g.Go(func() error { + logger := logger.WithValues("uuid", uuid) + + logger.Info("Member status monitor started") + defer logger.Info("Member status monitor finished") + + logger.Info("Wait for member backup to finish") + if err := waitUntilBackupSucceed(groupCtx, c, uuid); err != nil { return err } - return fmt.Errorf("failed to get HotBackup: %w", err) - } - if hb.Status.State.IsFinished() { - if hb.Status.State == hazelcastv1alpha1.HotBackupWaiting { - err := agentRest.UploadBackup(ctx) - if err != nil { - return fmt.Errorf("failed to upload backup folders to external storage: %w", err) - } + + // skip upload for local backup + if !hz.Spec.Persistence.IsExternal() { return nil - } else if hb.Status.State == hazelcastv1alpha1.HotBackupFailure { - return errors.New("HotBackup task failed") } - } else { - logger.Info("HotBackup task is not finished yet. Waiting...") - time.Sleep(1 * time.Second) - } + + hb := &hazelcastv1alpha1.HotBackup{} + if err := r.Get(groupCtx, backupName, hb); err != nil { + return err + } + + logger.Info("Start and wait for member backup upload") + u, err := upload.NewUpload(&upload.Config{ + MemberAddress: memberAddress, + BucketURI: hb.Spec.BucketURI, + BackupPath: hz.Spec.Persistence.BaseDir, + HazelcastName: hb.Spec.HazelcastResourceName, + SecretName: hb.Spec.Secret, + }) + if err != nil { + return err + } + + // now start and wait for upload + if err := u.Start(groupCtx); err != nil { + return err + } + + if err := u.Wait(groupCtx); err != nil { + if errors.Is(err, context.Canceled) { + // notify agent so we can cleanup if needed + logger.Info("Cancel upload") + return u.Cancel(groupCtx) + } + return err + } + + // member success + return nil + }) + } + + logger.Info("Waiting for members") + if err := g.Wait(); err != nil { + logger.Error(err, "One or more members failed, returning first error") + return r.updateStatus(ctx, backupName, failedHbStatus(err)) + } + + logger.Info("All members finished with no errors") + return r.updateStatus(ctx, backupName, hbWithStatus(hazelcastv1alpha1.HotBackupSuccess)) +} + +func (r *HotBackupReconciler) hotBackup(ctx context.Context, hz *hazelcastv1alpha1.Hazelcast) error { + client := NewRestClient(hz) + if err := client.ChangeState(ctx, Passive); err != nil { + return err } + defer client.ChangeState(ctx, Active) + return client.HotBackup(ctx) } func (r *HotBackupReconciler) SetupWithManager(mgr ctrl.Manager) error { diff --git a/controllers/hazelcast/hot_backup_controller_test.go b/controllers/hazelcast/hot_backup_controller_test.go index a8c1dbda64f..db0cba6f52f 100644 --- a/controllers/hazelcast/hot_backup_controller_test.go +++ b/controllers/hazelcast/hot_backup_controller_test.go @@ -3,12 +3,11 @@ package hazelcast import ( "context" "net/http" - "sigs.k8s.io/controller-runtime/pkg/client" - "sync" - "sync/atomic" "testing" "time" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/robfig/cron/v3" ctrl "sigs.k8s.io/controller-runtime" @@ -39,8 +38,9 @@ func TestHotBackupReconciler_shouldScheduleHotBackupExecution(t *testing.T) { } hb := &hazelcastv1alpha1.HotBackup{ ObjectMeta: metav1.ObjectMeta{ - Name: n.Name, - Namespace: n.Namespace, + Name: n.Name, + Namespace: n.Namespace, + Annotations: make(map[string]string), }, Spec: hazelcastv1alpha1.HotBackupSpec{ HazelcastResourceName: "hazelcast", @@ -99,119 +99,6 @@ func TestHotBackupReconciler_shouldRemoveScheduledBackup(t *testing.T) { }) } -func TestHotBackupReconciler_shouldSetStatusToFailedWhenHbCallFails(t *testing.T) { - RegisterFailHandler(fail(t)) - n := types.NamespacedName{ - Name: "hazelcast", - Namespace: "default", - } - h := &hazelcastv1alpha1.Hazelcast{ - ObjectMeta: metav1.ObjectMeta{ - Name: n.Name, - Namespace: n.Namespace, - }, - Status: hazelcastv1alpha1.HazelcastStatus{ - Phase: hazelcastv1alpha1.Running, - }, - } - hb := &hazelcastv1alpha1.HotBackup{ - ObjectMeta: metav1.ObjectMeta{ - Name: n.Name, - Namespace: n.Namespace, - }, - Spec: hazelcastv1alpha1.HotBackupSpec{ - HazelcastResourceName: n.Name, - }, - } - - ts, err := fakeHttpServer(hazelcastUrl(h), func(writer http.ResponseWriter, request *http.Request) { - if request.RequestURI == hotBackup { - writer.WriteHeader(500) - _, _ = writer.Write([]byte("{\"status\":\"failed\"}")) - } else { - writer.WriteHeader(200) - _, _ = writer.Write([]byte("{\"status\":\"success\"}")) - } - }) - if err != nil { - t.Errorf("Failed to start fake HTTP server: %v", err) - } - defer ts.Close() - - r := hotBackupReconcilerWithCRs(h, hb) - _, err = r.Reconcile(context.TODO(), reconcile.Request{NamespacedName: n}) - Expect(err).Should(Not(BeNil())) - - _ = r.Client.Get(context.TODO(), n, hb) - Expect(hb.Status.State).Should(Equal(hazelcastv1alpha1.HotBackupFailure)) -} - -func TestHotBackupReconciler_shouldNotTriggerHotBackupTwice(t *testing.T) { - RegisterFailHandler(fail(t)) - n := types.NamespacedName{ - Name: "hazelcast", - Namespace: "default", - } - h := &hazelcastv1alpha1.Hazelcast{ - ObjectMeta: metav1.ObjectMeta{ - Name: n.Name, - Namespace: n.Namespace, - }, - Status: hazelcastv1alpha1.HazelcastStatus{ - Phase: hazelcastv1alpha1.Running, - }, - } - hb := &hazelcastv1alpha1.HotBackup{ - ObjectMeta: metav1.ObjectMeta{ - Name: n.Name, - Namespace: n.Namespace, - }, - Spec: hazelcastv1alpha1.HotBackupSpec{ - HazelcastResourceName: n.Name, - }, - } - - var restCallWg sync.WaitGroup - restCallWg.Add(1) - var hotBackupTriggers int32 - ts, err := fakeHttpServer(hazelcastUrl(h), func(writer http.ResponseWriter, request *http.Request) { - if request.RequestURI == hotBackup { - atomic.AddInt32(&hotBackupTriggers, 1) - restCallWg.Wait() - } - writer.WriteHeader(200) - _, _ = writer.Write([]byte("{\"status\":\"success\"}")) - }) - if err != nil { - t.Errorf("Failed to start fake HTTP server: %v", err) - } - defer ts.Close() - - r := hotBackupReconcilerWithCRs(h, hb) - - var reconcileWg sync.WaitGroup - reconcileWg.Add(1) - go func() { - defer reconcileWg.Done() - _, _ = r.Reconcile(context.TODO(), reconcile.Request{NamespacedName: n}) - }() - - Eventually(func() hazelcastv1alpha1.HotBackupState { - _ = r.Client.Get(context.TODO(), n, hb) - return hb.Status.State - }, 2*time.Second, 100*time.Millisecond).Should(Equal(hazelcastv1alpha1.HotBackupPending)) - - reconcileWg.Add(1) - go func() { - defer reconcileWg.Done() - _, _ = r.Reconcile(context.TODO(), reconcile.Request{NamespacedName: n}) - }() - restCallWg.Done() - reconcileWg.Wait() - - Expect(hotBackupTriggers).Should(Equal(int32(1))) -} - func TestHotBackupReconciler_shouldUpdateWhenScheduledBackupChangedToInstantBackup(t *testing.T) { RegisterFailHandler(fail(t)) n := types.NamespacedName{ diff --git a/controllers/hazelcast/hot_backup_status.go b/controllers/hazelcast/hot_backup_status.go index aeb71e74443..a1739eee651 100644 --- a/controllers/hazelcast/hot_backup_status.go +++ b/controllers/hazelcast/hot_backup_status.go @@ -2,10 +2,11 @@ package hazelcast import ( "context" + "errors" + "time" + hztypes "github.com/hazelcast/hazelcast-go-client/types" hazelcastv1alpha1 "github.com/hazelcast/hazelcast-platform-operator/api/v1alpha1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) type hotBackupOptionsBuilder struct { @@ -28,41 +29,36 @@ func failedHbStatus(err error) hotBackupOptionsBuilder { } } -func pendingHbStatus() hotBackupOptionsBuilder { - return hotBackupOptionsBuilder{ - status: hazelcastv1alpha1.HotBackupPending, - } -} +var ( + errBackupFailed = errors.New("Backup failed") + errBackupUnknownStatus = errors.New("Backup unknown status") + errBackupMemberStateFailed = errors.New("Backup member state update failed") +) -func updateHotBackupStatus(ctx context.Context, c client.Client, hb *hazelcastv1alpha1.HotBackup, options hotBackupOptionsBuilder) (ctrl.Result, error) { - hb.Status.State = options.status - hb.Status.Message = options.message - err := c.Status().Update(ctx, hb) - if options.status == hazelcastv1alpha1.HotBackupFailure { - return ctrl.Result{}, options.err - } - return ctrl.Result{}, err -} +func waitUntilBackupSucceed(ctx context.Context, client *Client, uuid hztypes.UUID) error { + for { + state := client.getTimedMemberState(ctx, uuid) + if state == nil { + return errBackupMemberStateFailed + } -func hotBackupState(hbs HotRestartState, currentState hazelcastv1alpha1.HotBackupState, hz *hazelcastv1alpha1.Hazelcast) hazelcastv1alpha1.HotBackupState { - switch hbs.BackupTaskState { - case "NOT_STARTED": - if currentState == hazelcastv1alpha1.HotBackupUnknown { - return hazelcastv1alpha1.HotBackupNotStarted + switch state.TimedMemberState.MemberState.HotRestartState.BackupTaskState { + case "FAILURE": + return errBackupFailed + case "SUCCESS": + return nil + case "IN_PROGRESS": + // expected, check status again (no return) + default: + return errBackupUnknownStatus } - case "IN_PROGRESS": - return hazelcastv1alpha1.HotBackupInProgress - case "FAILURE": - return hazelcastv1alpha1.HotBackupFailure - case "SUCCESS": - if currentState == hazelcastv1alpha1.HotBackupUnknown { - if hz.Spec.Persistence.IsExternal() { - return hazelcastv1alpha1.HotBackupWaiting - } - return hazelcastv1alpha1.HotBackupSuccess + + // wait for timer or context to cancel + select { + case <-time.After(1 * time.Second): + continue + case <-ctx.Done(): + return ctx.Err() } - default: - return hazelcastv1alpha1.HotBackupUnknown } - return currentState } diff --git a/go.mod b/go.mod index 325321b7bec..24e33765e5a 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ go 1.16 require ( cloud.google.com/go/bigquery v1.4.0 github.com/go-logr/logr v0.3.0 + github.com/google/uuid v1.1.2 github.com/hazelcast/hazelcast-go-client v1.2.0 github.com/onsi/ginkgo/v2 v2.1.3 github.com/onsi/gomega v1.18.1 github.com/robfig/cron/v3 v3.0.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/tools v0.1.7 // indirect google.golang.org/api v0.20.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/internal/rest/client.go b/internal/rest/client.go new file mode 100644 index 00000000000..3d09bbdd6a3 --- /dev/null +++ b/internal/rest/client.go @@ -0,0 +1,114 @@ +package rest + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" +) + +type Client struct { + BaseURL *url.URL + client *http.Client +} + +func (c *Client) NewRequest(method, url string, body interface{}) (*http.Request, error) { + u, err := c.BaseURL.Parse(url) + if err != nil { + return nil, err + } + + header := make(http.Header) + var r io.Reader + + // serialize the request body + if body != nil { + switch v := body.(type) { + case nil: + // do nothing + case io.Reader: + r = v + case string: + r = strings.NewReader(v) + default: + buf := &bytes.Buffer{} + enc := json.NewEncoder(buf) + enc.SetEscapeHTML(false) + if err := enc.Encode(body); err != nil { + return nil, err + } + header.Set("Content-Type", "application/json") + r = buf + } + } + + // prepare new request + req, err := http.NewRequest(method, u.String(), r) + if err != nil { + return nil, err + } + if body != nil { + req.Header = header + } + + return req, nil +} + +func (c *Client) Do(ctx context.Context, req *http.Request, v interface{}) (*http.Response, error) { + // send the request + resp, err := c.client.Do(req) + if err != nil { + // If we got an error, and the context has been canceled, + // the context's error is probably more useful. + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + return nil, err + } + defer resp.Body.Close() + + // check for http status errors + err = checkResponse(resp) + if err != nil { + return resp, err + } + + // decode the body if needed + switch v := v.(type) { + case nil: + // do nothing + case io.Writer: + _, err = io.Copy(v, resp.Body) + default: + d := json.NewDecoder(resp.Body) + if err2 := d.Decode(v); err2 != nil { + // ignore EOF errors caused by empty response body + if err2 != io.EOF { + err = err2 + } + } + } + + return resp, err +} + +func checkResponse(r *http.Response) error { + if c := r.StatusCode; 200 <= c && c <= 299 { + return nil + } + return &ErrorResponse{Response: r} +} + +type ErrorResponse struct { + Response *http.Response +} + +func (r *ErrorResponse) Error() string { + return fmt.Sprintf("%v %v: %d", r.Response.Request.Method, r.Response.Request.URL, r.Response.StatusCode) +} diff --git a/internal/rest/upload_service.go b/internal/rest/upload_service.go new file mode 100644 index 00000000000..6c381ddcd9a --- /dev/null +++ b/internal/rest/upload_service.go @@ -0,0 +1,93 @@ +package rest + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "github.com/google/uuid" +) + +type UploadService struct { + client *Client +} + +func NewUploadService(address string) (*UploadService, error) { + baseURL, err := url.Parse(address) + if err != nil { + return nil, err + } + return &UploadService{ + client: &Client{ + BaseURL: baseURL, + client: &http.Client{}, + }, + }, nil +} + +type Upload struct { + ID uuid.UUID `json:"ID,omitempty"` +} + +type UploadOptions struct { + BucketURL string `json:"bucket_url"` + BackupFolderPath string `json:"backup_folder_path"` + HazelcastCRName string `json:"hz_cr_name"` + SecretName string `json:"secret_name"` + MemberUUID string `json:"member_uuid"` +} + +func (s *UploadService) Upload(ctx context.Context, opts *UploadOptions) (*Upload, *http.Response, error) { + u := "upload" + + req, err := s.client.NewRequest("POST", u, opts) + if err != nil { + return nil, nil, err + } + + upload := new(Upload) + resp, err := s.client.Do(ctx, req, upload) + if err != nil { + return nil, resp, err + } + + return upload, resp, nil +} + +type UploadStatus struct { + Status string `json:"status,omitempty"` +} + +func (s *UploadService) Status(ctx context.Context, uploadID uuid.UUID) (*UploadStatus, *http.Response, error) { + u := fmt.Sprintf("upload/%v", uploadID) + + req, err := s.client.NewRequest("GET", u, nil) + if err != nil { + return nil, nil, err + } + + status := new(UploadStatus) + resp, err := s.client.Do(ctx, req, status) + if err != nil { + return nil, resp, err + } + + return status, resp, nil +} + +func (s *UploadService) Delete(ctx context.Context, uploadID uuid.UUID) (*http.Response, error) { + u := fmt.Sprintf("upload/%v", uploadID) + + req, err := s.client.NewRequest("DELETE", u, nil) + if err != nil { + return nil, err + } + + resp, err := s.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + + return resp, nil +} diff --git a/internal/upload/upload.go b/internal/upload/upload.go new file mode 100644 index 00000000000..da5463767c7 --- /dev/null +++ b/internal/upload/upload.go @@ -0,0 +1,107 @@ +package upload + +import ( + "context" + "errors" + "net" + "time" + + "github.com/google/uuid" + "github.com/hazelcast/hazelcast-platform-operator/internal/rest" +) + +var ( + errUploadNotStarted = errors.New("Upload not started") + errUploadAlreadyStarted = errors.New("Upload already started") + errUploadCanceled = errors.New("Upload canceled") + errUploadFailed = errors.New("Upload failed") + errUploadUnknownStatus = errors.New("Upload unknown status") +) + +type Upload struct { + service *rest.UploadService + uploadID *uuid.UUID + config *Config +} + +type Config struct { + MemberAddress string + BucketURI string + BackupPath string + HazelcastName string + SecretName string +} + +func NewUpload(config *Config) (*Upload, error) { + host, _, err := net.SplitHostPort(config.MemberAddress) + if err != nil { + return nil, err + } + s, err := rest.NewUploadService("http://" + host + ":8080") + if err != nil { + return nil, err + } + return &Upload{ + service: s, + config: config, + }, nil +} + +func (u *Upload) Start(ctx context.Context) error { + if u.uploadID != nil { + return errUploadAlreadyStarted + } + upload, _, err := u.service.Upload(ctx, &rest.UploadOptions{ + BucketURL: u.config.BucketURI, + BackupFolderPath: u.config.BackupPath, + HazelcastCRName: u.config.HazelcastName, + SecretName: u.config.SecretName, + }) + if err != nil { + return err + } + + u.uploadID = &upload.ID + return nil +} + +func (u *Upload) Wait(ctx context.Context) error { + if u.uploadID == nil { + return errUploadNotStarted + } + for { + status, _, err := u.service.Status(ctx, *u.uploadID) + if err != nil { + return err + } + + switch status.Status { + case "CANCELED": + return errUploadCanceled + case "FAILURE": + return errUploadFailed + case "SUCCESS": + return nil + case "IN_PROGRESS": + // expected, check status again (no return) + default: + return errUploadUnknownStatus + } + + // wait for timer or context to cancel + select { + case <-time.After(1 * time.Second): + continue + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (u *Upload) Cancel(ctx context.Context) error { + if u.uploadID == nil { + return errUploadNotStarted + } + _, err := u.service.Delete(ctx, *u.uploadID) + return err +} diff --git a/test/e2e/config/hazelcast/config.go b/test/e2e/config/hazelcast/config.go index 6e99280cd89..704b33d133a 100644 --- a/test/e2e/config/hazelcast/config.go +++ b/test/e2e/config/hazelcast/config.go @@ -200,6 +200,10 @@ var ( Repository: repo(ee), Version: naming.HazelcastVersion, LicenseKeySecret: licenseKey(ee), + Agent: &hazelcastv1alpha1.AgentConfiguration{ + Repository: "docker.io/dzeromskhazelcast/platform-operator-agent", + Version: "latest", + }, Persistence: &hazelcastv1alpha1.HazelcastPersistenceConfiguration{ BaseDir: "/data/hot-restart", BackupType: "External", @@ -225,6 +229,10 @@ var ( Repository: repo(ee), Version: naming.HazelcastVersion, LicenseKeySecret: licenseKey(ee), + Agent: &hazelcastv1alpha1.AgentConfiguration{ + Repository: "docker.io/dzeromskhazelcast/platform-operator-agent", + Version: "latest", + }, Persistence: &hazelcastv1alpha1.HazelcastPersistenceConfiguration{ BaseDir: "/data/hot-restart", ClusterDataRecoveryPolicy: hazelcastv1alpha1.FullRecovery, diff --git a/test/e2e/hazelcast_persistence_test.go b/test/e2e/hazelcast_persistence_test.go index 2c1ed1423f6..0ed1ba51d7b 100644 --- a/test/e2e/hazelcast_persistence_test.go +++ b/test/e2e/hazelcast_persistence_test.go @@ -232,7 +232,7 @@ var _ = Describe("Hazelcast CR with Persistence feature enabled", Label("hz_pers Skip("This test will only run in EE configuration") } setLabelAndCRName("hp-5") - + By("Create cluster with external backup enabled") hazelcast := hazelcastconfig.ExternalBackup(hzLookupKey, true, labels) CreateHazelcastCR(hazelcast)