Skip to content

Commit

Permalink
add replication id
Browse files Browse the repository at this point in the history
Signed-off-by: oriyarde <oriyarde@ibm.com>
  • Loading branch information
oriyarde authored and mergify[bot] committed Sep 20, 2021
1 parent 597f3f8 commit 678adaa
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 100 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ VolumeReplicationClass corresponding to the driver providing replication.
+ `kind` is the kind of resource being replicated. For eg. `PersistentVolumeClaim`
+ `name` is the name of the resource

`replicationHandle` (optional) is an existing (but new) replication id

```yaml
apiVersion: replication.storage.openshift.io/v1alpha1
kind: VolumeReplication
Expand All @@ -64,6 +66,7 @@ metadata:
spec:
volumeReplicationClass: volumereplicationclass-sample
replicationState: primary
replicationHandle: replicationHandle # optional
dataSource:
kind: PersistentVolumeClaim
name: myPersistentVolumeClaim # should be in same namespace as VolumeReplication
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/volumereplication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type VolumeReplicationSpec struct {
// DataSource represents the object associated with the volume
// +kubebuilder:validation:Required
DataSource corev1.TypedLocalObjectReference `json:"dataSource"`

// replicationHandle represents an existing (but new) replication id
// +kubebuilder:validation:Optional
ReplicationHandle string `json:"replicationHandle"`
}

// VolumeReplicationStatus defines the observed state of VolumeReplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ spec:
- secondary
- resync
type: string
replicationHandle:
description: replicationHandle represents an existing (but new) replication id
type: string
volumeReplicationClass:
description: VolumeReplicationClass is the VolumeReplicationClass
name for this VolumeReplication resource
Expand Down
14 changes: 10 additions & 4 deletions controllers/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ type ReplicationResponse struct {

// CommonRequestParameters holds the common parameters across replication operations.
type CommonRequestParameters struct {
VolumeID string
Parameters map[string]string
Secrets map[string]string
Replication client.VolumeReplication
VolumeID string
ReplicationID string
Parameters map[string]string
Secrets map[string]string
Replication client.VolumeReplication
}

func (r *Replication) Enable() *ReplicationResponse {
resp, err := r.Params.Replication.EnableVolumeReplication(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.Secrets,
r.Params.Parameters,
)
Expand All @@ -56,6 +58,7 @@ func (r *Replication) Enable() *ReplicationResponse {
func (r *Replication) Disable() *ReplicationResponse {
resp, err := r.Params.Replication.DisableVolumeReplication(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.Secrets,
r.Params.Parameters,
)
Expand All @@ -66,6 +69,7 @@ func (r *Replication) Disable() *ReplicationResponse {
func (r *Replication) Promote() *ReplicationResponse {
resp, err := r.Params.Replication.PromoteVolume(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Force,
r.Params.Secrets,
r.Params.Parameters,
Expand All @@ -77,6 +81,7 @@ func (r *Replication) Promote() *ReplicationResponse {
func (r *Replication) Demote() *ReplicationResponse {
resp, err := r.Params.Replication.DemoteVolume(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.Secrets,
r.Params.Parameters,
)
Expand All @@ -87,6 +92,7 @@ func (r *Replication) Demote() *ReplicationResponse {
func (r *Replication) Resync() *ReplicationResponse {
resp, err := r.Params.Replication.ResyncVolume(
r.Params.VolumeID,
r.Params.ReplicationID,
r.Params.Secrets,
r.Params.Parameters,
)
Expand Down
82 changes: 50 additions & 32 deletions controllers/volumereplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
pvErr error
)

replicationHandle := instance.Spec.ReplicationHandle

nameSpacedName := types.NamespacedName{Name: instance.Spec.DataSource.Name, Namespace: req.Namespace}
switch instance.Spec.DataSource.Kind {
case pvcDataSource:
Expand All @@ -154,6 +156,9 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

logger.Info("volume handle", "VolumeHandleName", volumeHandle)
if replicationHandle != "" {
logger.Info("Replication handle", "ReplicationHandleName", replicationHandle)
}

// check if the object is being deleted
if instance.GetDeletionTimestamp().IsZero() {
Expand All @@ -167,7 +172,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}
} else {
if contains(instance.GetFinalizers(), volumeReplicationFinalizer) {
err := r.disableVolumeReplication(logger, volumeHandle, parameters, secret)
err := r.disableVolumeReplication(logger, volumeHandle, replicationHandle, parameters, secret)
if err != nil {
logger.Error(err, "failed to disable replication")
return ctrl.Result{}, err
Expand Down Expand Up @@ -195,7 +200,7 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}

// enable replication on every reconcile
if err = r.enableReplication(logger, volumeHandle, parameters, secret); err != nil {
if err = r.enableReplication(logger, volumeHandle, replicationHandle, parameters, secret); err != nil {
logger.Error(err, "failed to enable replication")
setFailureCondition(instance)
_ = r.updateReplicationStatus(instance, logger, getCurrentReplicationState(instance), err.Error())
Expand All @@ -207,14 +212,14 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re

switch instance.Spec.ReplicationState {
case replicationv1alpha1.Primary:
replicationErr = r.markVolumeAsPrimary(instance, logger, volumeHandle, parameters, secret)
replicationErr = r.markVolumeAsPrimary(instance, logger, volumeHandle, replicationHandle, parameters, secret)

case replicationv1alpha1.Secondary:
// For the first time, mark the volume as secondary and requeue the
// request. For some storage providers it takes some time to determine
// whether the volume need correction example:- correcting split brain.
if instance.Status.State != replicationv1alpha1.SecondaryState {
replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, parameters, secret)
replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, replicationHandle, parameters, secret)
if replicationErr == nil {
logger.Info("volume is not ready to use")
// set the status.State to secondary as the
Expand All @@ -230,15 +235,15 @@ func (r *VolumeReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Re
}, nil
}
} else {
replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, parameters, secret)
replicationErr = r.markVolumeAsSecondary(instance, logger, volumeHandle, replicationHandle, parameters, secret)
// resync volume if successfully marked Secondary
if replicationErr == nil {
requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, parameters, secret)
requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, replicationHandle, parameters, secret)
}
}

case replicationv1alpha1.Resync:
requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, parameters, secret)
requeueForResync, replicationErr = r.resyncVolume(instance, logger, volumeHandle, replicationHandle, parameters, secret)

default:
replicationErr = fmt.Errorf("unsupported volume state")
Expand Down Expand Up @@ -360,12 +365,15 @@ func (r *VolumeReplicationReconciler) waitForVolumeReplicationResource(logger lo
}

// markVolumeAsPrimary defines and runs a set of tasks required to mark a volume as primary
func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObject *replicationv1alpha1.VolumeReplication, logger logr.Logger, volumeID string, parameters, secrets map[string]string) error {
func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObject *replicationv1alpha1.VolumeReplication,
logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) error {

c := replication.CommonRequestParameters{
VolumeID: volumeID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
VolumeID: volumeID,
ReplicationID: replicationID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
}

volumeReplication := replication.Replication{
Expand Down Expand Up @@ -401,12 +409,14 @@ func (r *VolumeReplicationReconciler) markVolumeAsPrimary(volumeReplicationObjec

// markVolumeAsSecondary defines and runs a set of tasks required to mark a volume as secondary
func (r *VolumeReplicationReconciler) markVolumeAsSecondary(volumeReplicationObject *replicationv1alpha1.VolumeReplication,
logger logr.Logger, volumeID string, parameters, secrets map[string]string) error {
logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) error {

c := replication.CommonRequestParameters{
VolumeID: volumeID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
VolumeID: volumeID,
ReplicationID: replicationID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
}

volumeReplication := replication.Replication{
Expand All @@ -427,12 +437,14 @@ func (r *VolumeReplicationReconciler) markVolumeAsSecondary(volumeReplicationObj

// resyncVolume defines and runs a set of tasks required to resync the volume
func (r *VolumeReplicationReconciler) resyncVolume(volumeReplicationObject *replicationv1alpha1.VolumeReplication,
logger logr.Logger, volumeID string, parameters, secrets map[string]string) (bool, error) {
logger logr.Logger, volumeID, replicationID string, parameters, secrets map[string]string) (bool, error) {

c := replication.CommonRequestParameters{
VolumeID: volumeID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
VolumeID: volumeID,
ReplicationID: replicationID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
}

volumeReplication := replication.Replication{
Expand Down Expand Up @@ -467,12 +479,15 @@ func (r *VolumeReplicationReconciler) resyncVolume(volumeReplicationObject *repl
}

// disableVolumeReplication defines and runs a set of tasks required to disable volume replication
func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logger, volumeID string, parameters, secrets map[string]string) error {
func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logger, volumeID, replicationID string,
parameters, secrets map[string]string) error {

c := replication.CommonRequestParameters{
VolumeID: volumeID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
VolumeID: volumeID,
ReplicationID: replicationID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
}

volumeReplication := replication.Replication{
Expand All @@ -494,12 +509,15 @@ func (r *VolumeReplicationReconciler) disableVolumeReplication(logger logr.Logge
}

// enableReplication enable volume replication on the first reconcile
func (r *VolumeReplicationReconciler) enableReplication(logger logr.Logger, volumeID string, parameters, secrets map[string]string) error {
func (r *VolumeReplicationReconciler) enableReplication(logger logr.Logger, volumeID, replicationID string,
parameters, secrets map[string]string) error {

c := replication.CommonRequestParameters{
VolumeID: volumeID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
VolumeID: volumeID,
ReplicationID: replicationID,
Parameters: parameters,
Secrets: secrets,
Replication: r.Replication,
}

volumeReplication := replication.Replication{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/csi-addons/volume-replication-operator
go 1.15

require (
github.com/csi-addons/spec v0.1.0
github.com/csi-addons/spec v0.1.1
github.com/go-logr/logr v0.4.0
github.com/kubernetes-csi/csi-lib-utils v0.9.1
github.com/onsi/ginkgo v1.16.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/csi-addons/spec v0.1.0 h1:y3TOd7qtnwBQPikGa1VvaL7ObyddAZehYW8DNGBlOyc=
github.com/csi-addons/spec v0.1.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.1.1 h1:Bm9ZVCQ+nYMs7Y5PK+izkzCeer262W4rjCyGpuqu9C4=
github.com/csi-addons/spec v0.1.1/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
30 changes: 15 additions & 15 deletions pkg/client/fake/replication-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,38 @@ import (
// ReplicationClient to fake replication operations.
type ReplicationClient struct {
// EnableVolumeReplicationMock mocks EnableVolumeReplication RPC call.
EnableVolumeReplicationMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error)
EnableVolumeReplicationMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error)
// DisableVolumeReplicationMock mocks DisableVolumeReplication RPC call.
DisableVolumeReplicationMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error)
DisableVolumeReplicationMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error)
// PromoteVolumeMock mocks PromoteVolume RPC call.
PromoteVolumeMock func(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error)
PromoteVolumeMock func(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error)
// DemoteVolumeMock mocks DemoteVolume RPC call.
DemoteVolumeMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error)
DemoteVolumeMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error)
// ResyncVolumeMock mocks ResyncVolume RPC call.
ResyncVolumeMock func(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error)
ResyncVolumeMock func(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error)
}

// EnableVolumeReplication calls EnableVolumeReplicationMock mock function.
func (rc *ReplicationClient) EnableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) {
return rc.EnableVolumeReplicationMock(volumeID, secrets, parameters)
func (rc *ReplicationClient) EnableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.EnableVolumeReplicationResponse, error) {
return rc.EnableVolumeReplicationMock(volumeID, replicationID, secrets, parameters)
}

// DisableVolumeReplication calls DisableVolumeReplicationMock mock function.
func (rc *ReplicationClient) DisableVolumeReplication(volumeID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) {
return rc.DisableVolumeReplicationMock(volumeID, secrets, parameters)
func (rc *ReplicationClient) DisableVolumeReplication(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DisableVolumeReplicationResponse, error) {
return rc.DisableVolumeReplicationMock(volumeID, replicationID, secrets, parameters)
}

// PromoteVolume calls PromoteVolumeMock mock function.
func (rc *ReplicationClient) PromoteVolume(volumeID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) {
return rc.PromoteVolumeMock(volumeID, force, secrets, parameters)
func (rc *ReplicationClient) PromoteVolume(volumeID, replicationID string, force bool, secrets, parameters map[string]string) (*replicationlib.PromoteVolumeResponse, error) {
return rc.PromoteVolumeMock(volumeID, replicationID, force, secrets, parameters)
}

// DemoteVolume calls DemoteVolumeMock mock function.
func (rc *ReplicationClient) DemoteVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) {
return rc.DemoteVolumeMock(volumeID, secrets, parameters)
func (rc *ReplicationClient) DemoteVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.DemoteVolumeResponse, error) {
return rc.DemoteVolumeMock(volumeID, replicationID, secrets, parameters)
}

// ResyncVolume calls ResyncVolumeMock function.
func (rc *ReplicationClient) ResyncVolume(volumeID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) {
return rc.ResyncVolumeMock(volumeID, secrets, parameters)
func (rc *ReplicationClient) ResyncVolume(volumeID, replicationID string, secrets, parameters map[string]string) (*replicationlib.ResyncVolumeResponse, error) {
return rc.ResyncVolumeMock(volumeID, replicationID, secrets, parameters)
}

0 comments on commit 678adaa

Please sign in to comment.