diff --git a/README.md b/README.md index 75a1da1d..ec6541a9 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,8 @@ VolumeReplicationClass corresponding to the driver providing replication. + `kind` is the kind of resource being replicated. For eg. `PersistentVolumeClaim` + `name` is the name of the resource +`replicationHandle` (optional) is an existing (but new) replication id + ```yaml apiVersion: replication.storage.openshift.io/v1alpha1 kind: VolumeReplication @@ -64,6 +66,7 @@ metadata: spec: volumeReplicationClass: volumereplicationclass-sample replicationState: primary + replicationHandle: replicationHandle # optional dataSource: kind: PersistentVolumeClaim name: myPersistentVolumeClaim # should be in same namespace as VolumeReplication diff --git a/api/v1alpha1/volumereplication_types.go b/api/v1alpha1/volumereplication_types.go index aa9ad489..3c16db2e 100644 --- a/api/v1alpha1/volumereplication_types.go +++ b/api/v1alpha1/volumereplication_types.go @@ -64,6 +64,10 @@ type VolumeReplicationSpec struct { // DataSource represents the object associated with the volume // +kubebuilder:validation:Required DataSource corev1.TypedLocalObjectReference `json:"dataSource"` + + // replicationHandle represents an existing (but new) replication id + // +kubebuilder:validation:Optional + ReplicationHandle string `json:"replicationHandle"` } // VolumeReplicationStatus defines the observed state of VolumeReplication diff --git a/config/crd/bases/replication.storage.openshift.io_volumereplications.yaml b/config/crd/bases/replication.storage.openshift.io_volumereplications.yaml index b8fe8f29..38de773e 100644 --- a/config/crd/bases/replication.storage.openshift.io_volumereplications.yaml +++ b/config/crd/bases/replication.storage.openshift.io_volumereplications.yaml @@ -83,6 +83,9 @@ spec: - secondary - resync type: string + replicationHandle: + description: replicationHandle represents an existing (but new) replication id + type: string volumeReplicationClass: description: VolumeReplicationClass is the VolumeReplicationClass name for this VolumeReplication resource diff --git a/controllers/replication/replication.go b/controllers/replication/replication.go index 397adee4..9ae67407 100644 --- a/controllers/replication/replication.go +++ b/controllers/replication/replication.go @@ -37,15 +37,17 @@ type ReplicationResponse struct { // CommonRequestParameters holds the common parameters across replication operations. type CommonRequestParameters struct { - VolumeID string - Parameters map[string]string - Secrets map[string]string - Replication client.VolumeReplication + VolumeID string + ReplicationID string + Parameters map[string]string + Secrets map[string]string + Replication client.VolumeReplication } func (r *Replication) Enable() *ReplicationResponse { resp, err := r.Params.Replication.EnableVolumeReplication( r.Params.VolumeID, + r.Params.ReplicationID, r.Params.Secrets, r.Params.Parameters, ) @@ -56,6 +58,7 @@ func (r *Replication) Enable() *ReplicationResponse { func (r *Replication) Disable() *ReplicationResponse { resp, err := r.Params.Replication.DisableVolumeReplication( r.Params.VolumeID, + r.Params.ReplicationID, r.Params.Secrets, r.Params.Parameters, ) @@ -66,6 +69,7 @@ func (r *Replication) Disable() *ReplicationResponse { func (r *Replication) Promote() *ReplicationResponse { resp, err := r.Params.Replication.PromoteVolume( r.Params.VolumeID, + r.Params.ReplicationID, r.Force, r.Params.Secrets, r.Params.Parameters, @@ -77,6 +81,7 @@ func (r *Replication) Promote() *ReplicationResponse { func (r *Replication) Demote() *ReplicationResponse { resp, err := r.Params.Replication.DemoteVolume( r.Params.VolumeID, + r.Params.ReplicationID, r.Params.Secrets, r.Params.Parameters, ) @@ -87,6 +92,7 @@ func (r *Replication) Demote() *ReplicationResponse { func (r *Replication) Resync() *ReplicationResponse { resp, err := r.Params.Replication.ResyncVolume( r.Params.VolumeID, + r.Params.ReplicationID, r.Params.Secrets, r.Params.Parameters, ) diff --git a/controllers/volumereplication_controller.go b/controllers/volumereplication_controller.go index 61ff6db9..d97a9d5d 100644 --- a/controllers/volumereplication_controller.go +++ b/controllers/volumereplication_controller.go @@ -133,6 +133,8 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re pvErr error ) + replicationHandle := instance.Spec.ReplicationHandle + nameSpacedName := types.NamespacedName{Name: instance.Spec.DataSource.Name, Namespace: req.Namespace} switch instance.Spec.DataSource.Kind { case pvcDataSource: @@ -154,6 +156,9 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } logger.Info("volume handle", "VolumeHandleName", volumeHandle) + if replicationHandle != "" { + logger.Info("Replication handle", "ReplicationHandleName", replicationHandle) + } // check if the object is being deleted if instance.GetDeletionTimestamp().IsZero() { @@ -167,7 +172,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } } else { if contains(instance.GetFinalizers(), volumeReplicationFinalizer) { - err := r.disableVolumeReplication(logger, volumeHandle, parameters, secret) + err := r.disableVolumeReplication(logger, volumeHandle, replicationHandle, parameters, secret) if err != nil { logger.Error(err, "failed to disable replication") return ctrl.Result{}, err @@ -195,7 +200,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re } // enable replication on every reconcile - if err = r.enableReplication(logger, volumeHandle, parameters, secret); err != nil { + if err = r.enableReplication(logger, volumeHandle, replicationHandle, parameters, secret); err != nil { logger.Error(err, "failed to enable replication") setFailureCondition(instance) _ = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error()) @@ -207,14 +212,14 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re switch instance.Spec.ReplicationState { case replicationv1alpha1.Primary: - replicationErr = r.markVolumeAsPrimary(instance, logger, volumeHandle, parameters, secret) + replicationErr = r.markVolumeAsPrimary(instance, logger, volumeHandle, replicationHandle, parameters, secret) case replicationv1alpha1.Secondary: // For the first time, mark the volume as secondary and requeue the // request. For some storage providers it takes some time to determine // whether the volume need correction example:- correcting split brain. if instance.Status.State != replicationv1alpha1.SecondaryState { - replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, parameters, secret) + replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, replicationHandle, parameters, secret) if replicationErr == nil { logger.Info("volume is not ready to use") // set the status.State to secondary as the @@ -230,15 +235,15 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re }, nil } } else { - replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, parameters, secret) + replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, replicationHandle, parameters, secret) // resync volume if successfully marked Secondary if replicationErr == nil { - requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, parameters, secret) + requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, replicationHandle, parameters, secret) } } case replicationv1alpha1.Resync: - requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, parameters, secret) + requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, replicationHandle, parameters, secret) default: replicationErr = fmt.Errorf("unsupported volume state") @@ -360,12 +365,15 @@ func (r *VolumeReplicationReconciler) waitForVolumeReplicationResource(logger lo } // markVolumeAsPrimary defines and runs a set of tasks required to mark a volume as primary -func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObject *replicationv1alpha1.VolumeReplication, logger logr.Logger, volumeID string, parameters, secrets map[string]string) error { +func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObject *replicationv1alpha1.VolumeReplication, + logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) error { + c := replication.CommonRequestParameters{ - VolumeID: volumeID, - Parameters: parameters, - Secrets: secrets, - Replication: r.Replication, + VolumeID: volumeID, + ReplicationID: replicationID, + Parameters: parameters, + Secrets: secrets, + Replication: r.Replication, } volumeReplication := replication.Replication{ @@ -401,12 +409,14 @@ func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObjec // markVolumeAsSecondary defines and runs a set of tasks required to mark a volume as secondary func (r *VolumeReplicationReconciler) markVolumeAsSecondary(volumeReplicationObject *replicationv1alpha1.VolumeReplication, - logger logr.Logger, volumeID string, parameters, secrets map[string]string) error { + logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) error { + c := replication.CommonRequestParameters{ - VolumeID: volumeID, - Parameters: parameters, - Secrets: secrets, - Replication: r.Replication, + VolumeID: volumeID, + ReplicationID: replicationID, + Parameters: parameters, + Secrets: secrets, + Replication: r.Replication, } volumeReplication := replication.Replication{ @@ -427,12 +437,14 @@ func (r *VolumeReplicationReconciler) markVolumeAsSecondary(volumeReplicationObj // resyncVolume defines and runs a set of tasks required to resync the volume func (r *VolumeReplicationReconciler) resyncVolume(volumeReplicationObject *replicationv1alpha1.VolumeReplication, - logger logr.Logger, volumeID string, parameters, secrets map[string]string) (bool, error) { + logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) (bool, error) { + c := replication.CommonRequestParameters{ - VolumeID: volumeID, - Parameters: parameters, - Secrets: secrets, - Replication: r.Replication, + VolumeID: volumeID, + ReplicationID: replicationID, + Parameters: parameters, + Secrets: secrets, + Replication: r.Replication, } volumeReplication := replication.Replication{ @@ -467,12 +479,15 @@ func (r *VolumeReplicationReconciler) resyncVolume(volumeReplicationObject *repl } // disableVolumeReplication defines and runs a set of tasks required to disable volume replication -func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logger, volumeID string, parameters, secrets map[string]string) error { +func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logger, volumeID, replicationID string, + parameters, secrets map[string]string) error { + c := replication.CommonRequestParameters{ - VolumeID: volumeID, - Parameters: parameters, - Secrets: secrets, - Replication: r.Replication, + VolumeID: volumeID, + ReplicationID: replicationID, + Parameters: parameters, + Secrets: secrets, + Replication: r.Replication, } volumeReplication := replication.Replication{ @@ -494,12 +509,15 @@ func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logge } // enableReplication enable volume replication on the first reconcile -func (r *VolumeReplicationReconciler) enableReplication(logger logr.Logger, volumeID string, parameters, secrets map[string]string) error { +func (r *VolumeReplicationReconciler) enableReplication(logger logr.Logger, volumeID, replicationID string, + parameters, secrets map[string]string) error { + c := replication.CommonRequestParameters{ - VolumeID: volumeID, - Parameters: parameters, - Secrets: secrets, - Replication: r.Replication, + VolumeID: volumeID, + ReplicationID: replicationID, + Parameters: parameters, + Secrets: secrets, + Replication: r.Replication, } volumeReplication := replication.Replication{ diff --git a/go.mod b/go.mod index 3c83c912..c7816671 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/csi-addons/volume-replication-operator go 1.15 require ( - github.com/csi-addons/spec v0.1.0 + github.com/csi-addons/spec v0.1.1 github.com/go-logr/logr v0.4.0 github.com/kubernetes-csi/csi-lib-utils v0.9.1 github.com/onsi/ginkgo v1.16.4 diff --git a/go.sum b/go.sum index 1ec16071..3ac0143e 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/csi-addons/spec v0.1.0 h1:y3TOd7qtnwBQPikGa1VvaL7ObyddAZehYW8DNGBlOyc= -github.com/csi-addons/spec v0.1.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI= +github.com/csi-addons/spec v0.1.1 h1:Bm9ZVCQ+nYMs7Y5PK+izkzCeer262W4rjCyGpuqu9C4= +github.com/csi-addons/spec v0.1.1/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/client/fake/replication-client.go b/pkg/client/fake/replication-client.go index 09825564..2cdf9873 100644 --- a/pkg/client/fake/replication-client.go +++ b/pkg/client/fake/replication-client.go @@ -23,38 +23,38 @@ import ( // ReplicationClient to fake replication operations. type ReplicationClient struct { // EnableVolumeReplicationMock mocks EnableVolumeReplication RPC call. - EnableVolumeReplicationMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) + EnableVolumeReplicationMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) // DisableVolumeReplicationMock mocks DisableVolumeReplication RPC call. - DisableVolumeReplicationMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) + DisableVolumeReplicationMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) // PromoteVolumeMock mocks PromoteVolume RPC call. - PromoteVolumeMock func(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) + PromoteVolumeMock func(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) // DemoteVolumeMock mocks DemoteVolume RPC call. - DemoteVolumeMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) + DemoteVolumeMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) // ResyncVolumeMock mocks ResyncVolume RPC call. - ResyncVolumeMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) + ResyncVolumeMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) } // EnableVolumeReplication calls EnableVolumeReplicationMock mock function. -func (rc *ReplicationClient) EnableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { - return rc.EnableVolumeReplicationMock(volumeID, secrets, parameters) +func (rc *ReplicationClient) EnableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { + return rc.EnableVolumeReplicationMock(volumeID, replicationID, secrets, parameters) } // DisableVolumeReplication calls DisableVolumeReplicationMock mock function. -func (rc *ReplicationClient) DisableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { - return rc.DisableVolumeReplicationMock(volumeID, secrets, parameters) +func (rc *ReplicationClient) DisableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { + return rc.DisableVolumeReplicationMock(volumeID, replicationID, secrets, parameters) } // PromoteVolume calls PromoteVolumeMock mock function. -func (rc *ReplicationClient) PromoteVolume(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { - return rc.PromoteVolumeMock(volumeID, force, secrets, parameters) +func (rc *ReplicationClient) PromoteVolume(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { + return rc.PromoteVolumeMock(volumeID, replicationID, force, secrets, parameters) } // DemoteVolume calls DemoteVolumeMock mock function. -func (rc *ReplicationClient) DemoteVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { - return rc.DemoteVolumeMock(volumeID, secrets, parameters) +func (rc *ReplicationClient) DemoteVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { + return rc.DemoteVolumeMock(volumeID, replicationID, secrets, parameters) } // ResyncVolume calls ResyncVolumeMock function. -func (rc *ReplicationClient) ResyncVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { - return rc.ResyncVolumeMock(volumeID, secrets, parameters) +func (rc *ReplicationClient) ResyncVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { + return rc.ResyncVolumeMock(volumeID, replicationID, secrets, parameters) } diff --git a/pkg/client/replication-client.go b/pkg/client/replication-client.go index 6bd7b952..66eeb27a 100644 --- a/pkg/client/replication-client.go +++ b/pkg/client/replication-client.go @@ -32,17 +32,17 @@ type replicationClient struct { // VolumeReplication holds the methods required for volume replication. type VolumeReplication interface { // EnableVolumeReplication RPC call to enable the volume replication. - EnableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) + EnableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) // DisableVolumeReplication RPC call to disable the volume replication. - DisableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) + DisableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) // PromoteVolume RPC call to promote the volume. - PromoteVolume(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib. + PromoteVolume(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib. PromoteVolumeResponse, error) // DemoteVolume RPC call to demote the volume. - DemoteVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib. + DemoteVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib. DemoteVolumeResponse, error) // ResyncVolume RPC call to resync the volume. - ResyncVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib. + ResyncVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib. ResyncVolumeResponse, error) } @@ -53,11 +53,14 @@ func NewReplicationClient(cc *grpc.ClientConn, timeout time.Duration) VolumeRepl } // EnableVolumeReplication RPC call to enable the volume replication. -func (rc *replicationClient) EnableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { +func (rc *replicationClient) EnableVolumeReplication(volumeID, replicationID string, + secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { + req := &replicationlib.EnableVolumeReplicationRequest{ - VolumeId: volumeID, - Parameters: parameters, - Secrets: secrets, + VolumeId: volumeID, + ReplicationId: replicationID, + Parameters: parameters, + Secrets: secrets, } createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) @@ -67,11 +70,14 @@ func (rc *replicationClient) EnableVolumeReplication(volumeID string, secrets, p } // DisableVolumeReplication RPC call to disable the volume replication. -func (rc *replicationClient) DisableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { +func (rc *replicationClient) DisableVolumeReplication(volumeID, replicationID string, + secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { + req := &replicationlib.DisableVolumeReplicationRequest{ - VolumeId: volumeID, - Parameters: parameters, - Secrets: secrets, + VolumeId: volumeID, + ReplicationId: replicationID, + Parameters: parameters, + Secrets: secrets, } createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) @@ -81,12 +87,15 @@ func (rc *replicationClient) DisableVolumeReplication(volumeID string, secrets, } // PromoteVolume RPC call to promote the volume. -func (rc *replicationClient) PromoteVolume(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { +func (rc *replicationClient) PromoteVolume(volumeID, replicationID string, + force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { + req := &replicationlib.PromoteVolumeRequest{ - VolumeId: volumeID, - Force: force, - Parameters: parameters, - Secrets: secrets, + VolumeId: volumeID, + ReplicationId: replicationID, + Force: force, + Parameters: parameters, + Secrets: secrets, } createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) @@ -96,11 +105,14 @@ func (rc *replicationClient) PromoteVolume(volumeID string, force bool, secrets, } // DemoteVolume RPC call to demote the volume. -func (rc *replicationClient) DemoteVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { +func (rc *replicationClient) DemoteVolume(volumeID, replicationID string, + secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { + req := &replicationlib.DemoteVolumeRequest{ - VolumeId: volumeID, - Parameters: parameters, - Secrets: secrets, + VolumeId: volumeID, + ReplicationId: replicationID, + Parameters: parameters, + Secrets: secrets, } createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) defer cancel() @@ -109,11 +121,14 @@ func (rc *replicationClient) DemoteVolume(volumeID string, secrets, parameters m } // ResyncVolume RPC call to resync the volume. -func (rc *replicationClient) ResyncVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { +func (rc *replicationClient) ResyncVolume(volumeID, replicationID string, + secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { + req := &replicationlib.ResyncVolumeRequest{ - VolumeId: volumeID, - Parameters: parameters, - Secrets: secrets, + VolumeId: volumeID, + ReplicationId: replicationID, + Parameters: parameters, + Secrets: secrets, } createCtx, cancel := context.WithTimeout(context.Background(), rc.timeout) diff --git a/pkg/client/replication-client_test.go b/pkg/client/replication-client_test.go index 53b8894f..c66baea1 100644 --- a/pkg/client/replication-client_test.go +++ b/pkg/client/replication-client_test.go @@ -32,23 +32,23 @@ func TestEnableVolumeReplication(t *testing.T) { var client = NewReplicationClient(&grpc.ClientConn{}, time.Minute) // return success response mockedEnableReplication := &fake.ReplicationClient{ - EnableVolumeReplicationMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { + EnableVolumeReplicationMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { return &replicationlib.EnableVolumeReplicationResponse{}, nil }, } client = mockedEnableReplication - resp, err := client.EnableVolumeReplication("", nil, nil) + resp, err := client.EnableVolumeReplication("", "", nil, nil) assert.Equal(t, &replicationlib.EnableVolumeReplicationResponse{}, resp) assert.Nil(t, err) // return error mockedEnableReplication = &fake.ReplicationClient{ - EnableVolumeReplicationMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { + EnableVolumeReplicationMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) { return nil, errors.New("failed to enable mirroring") }, } client = mockedEnableReplication - resp, err = client.EnableVolumeReplication("", nil, nil) + resp, err = client.EnableVolumeReplication("", "", nil, nil) assert.Nil(t, resp) assert.NotNil(t, err) } @@ -57,23 +57,23 @@ func TestDisableVolumeReplication(t *testing.T) { var client = NewReplicationClient(&grpc.ClientConn{}, time.Minute) // return success response mockedDisableReplication := &fake.ReplicationClient{ - DisableVolumeReplicationMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { + DisableVolumeReplicationMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { return &replicationlib.DisableVolumeReplicationResponse{}, nil }, } client = mockedDisableReplication - resp, err := client.DisableVolumeReplication("", nil, nil) + resp, err := client.DisableVolumeReplication("", "", nil, nil) assert.Equal(t, &replicationlib.DisableVolumeReplicationResponse{}, resp) assert.Nil(t, err) // return error mockedDisableReplication = &fake.ReplicationClient{ - DisableVolumeReplicationMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { + DisableVolumeReplicationMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) { return nil, errors.New("failed to disable mirroring") }, } client = mockedDisableReplication - resp, err = client.DisableVolumeReplication("", nil, nil) + resp, err = client.DisableVolumeReplication("", "", nil, nil) assert.Nil(t, resp) assert.NotNil(t, err) } @@ -82,24 +82,24 @@ func TestPromoteVolume(t *testing.T) { var client = NewReplicationClient(&grpc.ClientConn{}, time.Minute) // return success response mockedPromoteVolume := &fake.ReplicationClient{ - PromoteVolumeMock: func(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { + PromoteVolumeMock: func(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { return &replicationlib.PromoteVolumeResponse{}, nil }, } force := false client = mockedPromoteVolume - resp, err := client.PromoteVolume("", force, nil, nil) + resp, err := client.PromoteVolume("", "", force, nil, nil) assert.Equal(t, &replicationlib.PromoteVolumeResponse{}, resp) assert.Nil(t, err) // return error mockedPromoteVolume = &fake.ReplicationClient{ - PromoteVolumeMock: func(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { + PromoteVolumeMock: func(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) { return nil, errors.New("failed to promote volume") }, } client = mockedPromoteVolume - resp, err = client.PromoteVolume("", force, nil, nil) + resp, err = client.PromoteVolume("", "", force, nil, nil) assert.Nil(t, resp) assert.NotNil(t, err) } @@ -108,23 +108,23 @@ func TestDemoteVolume(t *testing.T) { var client = NewReplicationClient(&grpc.ClientConn{}, time.Minute) // return success response mockedDemoteVolume := &fake.ReplicationClient{ - DemoteVolumeMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { + DemoteVolumeMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { return &replicationlib.DemoteVolumeResponse{}, nil }, } client = mockedDemoteVolume - resp, err := client.DemoteVolume("", nil, nil) + resp, err := client.DemoteVolume("", "", nil, nil) assert.Equal(t, &replicationlib.DemoteVolumeResponse{}, resp) assert.Nil(t, err) // return error mockedDemoteVolume = &fake.ReplicationClient{ - DemoteVolumeMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { + DemoteVolumeMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) { return nil, errors.New("failed to demote volume") }, } client = mockedDemoteVolume - resp, err = client.DemoteVolume("", nil, nil) + resp, err = client.DemoteVolume("", "", nil, nil) assert.Nil(t, resp) assert.NotNil(t, err) } @@ -133,23 +133,23 @@ func TestResyncVolume(t *testing.T) { var client = NewReplicationClient(&grpc.ClientConn{}, time.Minute) // return success response mockedResyncVolume := &fake.ReplicationClient{ - ResyncVolumeMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { + ResyncVolumeMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { return &replicationlib.ResyncVolumeResponse{}, nil }, } client = mockedResyncVolume - resp, err := client.ResyncVolume("", nil, nil) + resp, err := client.ResyncVolume("", "", nil, nil) assert.Equal(t, &replicationlib.ResyncVolumeResponse{}, resp) assert.Nil(t, err) // return error mockedResyncVolume = &fake.ReplicationClient{ - ResyncVolumeMock: func(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { + ResyncVolumeMock: func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) { return nil, errors.New("failed to resync volume") }, } client = mockedResyncVolume - resp, err = client.ResyncVolume("", nil, nil) + resp, err = client.ResyncVolume("", "", nil, nil) assert.Nil(t, resp) assert.NotNil(t, err) }