diff --git a/custom-scorecard-tests/config.yaml b/custom-scorecard-tests/config.yaml
index f09a07155..543869ec2 100644
--- a/custom-scorecard-tests/config.yaml
+++ b/custom-scorecard-tests/config.yaml
@@ -75,26 +75,6 @@ stages:
storage:
spec:
mountPath: {}
- - entrypoint:
- - volsync-custom-scorecard-tests
- - test_replication_sched_snap.yml
- image: quay.io/backube/volsync-custom-scorecard-tests:latest
- labels:
- suite: volsync-e2e
- test: test_replication_sched_snap.yml
- storage:
- spec:
- mountPath: {}
- - entrypoint:
- - volsync-custom-scorecard-tests
- - test_replication_sync_direct.yml
- image: quay.io/backube/volsync-custom-scorecard-tests:latest
- labels:
- suite: volsync-e2e
- test: test_replication_sync_direct.yml
- storage:
- spec:
- mountPath: {}
- entrypoint:
- volsync-custom-scorecard-tests
- test_restic_manual_normal.yml
@@ -257,6 +237,46 @@ stages:
storage:
spec:
mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sched_snap.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sched_snap.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct_tls_normal.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct_tls_normal.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct_tls_priv.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct_tls_priv.yml
+ storage:
+ spec:
+ mountPath: {}
- entrypoint:
- volsync-custom-scorecard-tests
- test_roles.yml
diff --git a/custom-scorecard-tests/generateE2ETestsConfig.sh b/custom-scorecard-tests/generateE2ETestsConfig.sh
index 58bf9d6ad..034dbd2e0 100755
--- a/custom-scorecard-tests/generateE2ETestsConfig.sh
+++ b/custom-scorecard-tests/generateE2ETestsConfig.sh
@@ -16,8 +16,10 @@ TESTS=$(echo "${TESTS_UNSORTED}" | LC_ALL=C sort)
# Group tests into 2 stages (each stage gets run sequentially but
# all tests in a stage can run in parallel)
-E2E_TESTS_GROUP1=$(echo "${TESTS}" | grep -v -e role -e syncthing)
-E2E_TESTS_GROUP2=$(echo "${TESTS}" | grep -e role -e syncthing)
+# Group1 contains most e2e tests
+# Group2 contains syncthing tests, cli tests and role tests
+E2E_TESTS_GROUP1=$(echo "${TESTS}" | grep -v -e role -e syncthing -e test_replication_ )
+E2E_TESTS_GROUP2=$(echo "${TESTS}" | grep -e role -e syncthing -e test_replication_ )
echo "####################"
echo "# E2E test list is: "
diff --git a/custom-scorecard-tests/scorecard/patches/e2e-tests-stage1.yaml b/custom-scorecard-tests/scorecard/patches/e2e-tests-stage1.yaml
index 0accbfbf8..ddfb15b97 100644
--- a/custom-scorecard-tests/scorecard/patches/e2e-tests-stage1.yaml
+++ b/custom-scorecard-tests/scorecard/patches/e2e-tests-stage1.yaml
@@ -61,26 +61,6 @@
storage:
spec:
mountPath: {}
- - entrypoint:
- - volsync-custom-scorecard-tests
- - test_replication_sched_snap.yml
- image: quay.io/backube/volsync-custom-scorecard-tests:latest
- labels:
- suite: volsync-e2e
- test: test_replication_sched_snap.yml
- storage:
- spec:
- mountPath: {}
- - entrypoint:
- - volsync-custom-scorecard-tests
- - test_replication_sync_direct.yml
- image: quay.io/backube/volsync-custom-scorecard-tests:latest
- labels:
- suite: volsync-e2e
- test: test_replication_sync_direct.yml
- storage:
- spec:
- mountPath: {}
- entrypoint:
- volsync-custom-scorecard-tests
- test_restic_manual_normal.yml
diff --git a/custom-scorecard-tests/scorecard/patches/e2e-tests-stage2.yaml b/custom-scorecard-tests/scorecard/patches/e2e-tests-stage2.yaml
index 005102d57..63f30206b 100644
--- a/custom-scorecard-tests/scorecard/patches/e2e-tests-stage2.yaml
+++ b/custom-scorecard-tests/scorecard/patches/e2e-tests-stage2.yaml
@@ -11,6 +11,46 @@
storage:
spec:
mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sched_snap.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sched_snap.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct_tls_normal.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct_tls_normal.yml
+ storage:
+ spec:
+ mountPath: {}
+ - entrypoint:
+ - volsync-custom-scorecard-tests
+ - test_replication_sync_direct_tls_priv.yml
+ image: quay.io/backube/volsync-custom-scorecard-tests:latest
+ labels:
+ suite: volsync-e2e
+ test: test_replication_sync_direct_tls_priv.yml
+ storage:
+ spec:
+ mountPath: {}
- entrypoint:
- volsync-custom-scorecard-tests
- test_roles.yml
diff --git a/kubectl-volsync/cmd/migration.go b/kubectl-volsync/cmd/migration.go
index bf2cf2681..2b75f1061 100644
--- a/kubectl-volsync/cmd/migration.go
+++ b/kubectl-volsync/cmd/migration.go
@@ -19,13 +19,11 @@ package cmd
import (
"context"
"fmt"
- "time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/klog/v2"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/component-base/logs"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -36,20 +34,43 @@ import (
// MigrationRelationship defines the "type" of migration Relationships
const MigrationRelationshipType RelationshipType = "migration"
+const (
+ defaultLocalStunnelPort = 9000
+ defaultDestinationStunnelPort = 8000
+)
+
// migrationRelationship holds the config state for migration-type
// relationships
type migrationRelationship struct {
Relationship
- data *migrationRelationshipData
+ data *migrationRelationshipDataV2
+ mh migrationHandler
}
-// migrationRelationshipData is the state that will be saved to the
-// relationship config file
+type migrationHandler interface {
+ EnsureReplicationDestination(ctx context.Context, c client.Client,
+ destConfig *migrationRelationshipDestinationV2) (*volsyncv1alpha1.ReplicationDestination, error)
+ WaitForRDStatus(ctx context.Context, c client.Client,
+ replicationDestination *volsyncv1alpha1.ReplicationDestination) (*volsyncv1alpha1.ReplicationDestination, error)
+ RunMigration(ctx context.Context, c client.Client, source string, destConfig *migrationRelationshipDestinationV2,
+ sTunnelLocalPort int32) error
+}
+
+// Old v1 version of the data
type migrationRelationshipData struct {
Version int
Destination *migrationRelationshipDestination
}
+// migrationRelationshipData is the state that will be saved to the
+// relationship config file
+type migrationRelationshipDataV2 struct {
+ Version int
+ // True if the ReplicationDestination should use RsyncTLS
+ IsRsyncTLS bool
+ Destination *migrationRelationshipDestinationV2
+}
+
type migrationRelationshipDestination struct {
// Cluster context name
Cluster string
@@ -65,20 +86,50 @@ type migrationRelationshipDestination struct {
Destination volsyncv1alpha1.ReplicationDestinationRsyncSpec
}
+type migrationRelationshipDestinationV2 struct {
+ // Cluster context name
+ Cluster string
+ // Namespace on destination cluster
+ Namespace string
+ // Name of PVC being replicated
+ PVCName string
+ // Name of the ReplicationDestination object
+ RDName string
+ // Name of Secret holding ssh or psk secret
+ //RsyncSecretName string //TODO: is this necessary? doesn't seem to get written to conf file in ~/.volsync
+ // Service Type for the ReplicationDestination
+ ServiceType *corev1.ServiceType
+ // Copy Method for the ReplicationDestination (will always be Direct for migration)
+ CopyMethod volsyncv1alpha1.CopyMethodType
+ // MoverSecurityContext allows specifying the PodSecurityContext that will
+ // be used by the data mover
+ MoverSecurityContext *corev1.PodSecurityContext
+}
+
func (mr *migrationRelationship) Save() error {
err := mr.SetData(mr.data)
if err != nil {
return err
}
- if mr.data.Destination != nil && mr.data.Destination.Destination.Capacity != nil {
- mr.Set("data.destination.destination.replicationdestinationvolumeoptions.capacity",
- mr.data.Destination.Destination.Capacity.String())
- }
-
return mr.Relationship.Save()
}
+func (mr *migrationRelationship) convertDataToV2(datav1 *migrationRelationshipData) {
+ mr.data = &migrationRelationshipDataV2{
+ Version: 2,
+ IsRsyncTLS: false, // Rsync TLS support wasn't there in v1
+ Destination: &migrationRelationshipDestinationV2{
+ RDName: datav1.Destination.RDName,
+ PVCName: datav1.Destination.PVCName,
+ Namespace: datav1.Destination.Namespace,
+ Cluster: datav1.Destination.Cluster,
+ ServiceType: datav1.Destination.Destination.ServiceType,
+ CopyMethod: volsyncv1alpha1.CopyMethodDirect, // Default, but wasn't specified in v1
+ },
+ }
+}
+
func newMigrationRelationship(cmd *cobra.Command) (*migrationRelationship, error) {
r, err := CreateRelationshipFromCommand(cmd, MigrationRelationshipType)
if err != nil {
@@ -87,8 +138,8 @@ func newMigrationRelationship(cmd *cobra.Command) (*migrationRelationship, error
return &migrationRelationship{
Relationship: *r,
- data: &migrationRelationshipData{
- Version: 1,
+ data: &migrationRelationshipDataV2{
+ Version: 2,
},
}, nil
}
@@ -108,6 +159,9 @@ var migrationCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(migrationCmd)
+ // Add logging flags to all sub-commands
+ logs.AddFlags(migrationCmd.PersistentFlags())
+
migrationCmd.PersistentFlags().StringP("relationship", "r", "", "relationship name")
cobra.CheckErr(migrationCmd.MarkPersistentFlagRequired("relationship"))
cobra.CheckErr(viper.BindPFlag("relationship", migrationCmd.PersistentFlags().Lookup("relationship")))
@@ -127,63 +181,25 @@ func loadMigrationRelationship(cmd *cobra.Command) (*migrationRelationship, erro
version := mr.GetInt("data.version")
switch version {
case 1:
+ // version2 is now the default, read in the v1 data and migrate to v2
+ datav1 := &migrationRelationshipData{}
+ if err := mr.GetData(&datav1); err != nil {
+ return nil, err
+ }
+ mr.convertDataToV2(datav1) // Convert from v1 to v2
+ case 2:
if err := mr.GetData(&mr.data); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported config file version %d", version)
}
- return mr, nil
-}
-
-func (mrd *migrationRelationshipDestination) waitForRDStatus(ctx context.Context, client client.Client) (
- *volsyncv1alpha1.ReplicationDestination, error) {
- // wait for migrationdestination to become ready
- var (
- rd *volsyncv1alpha1.ReplicationDestination
- err error
- )
- klog.Infof("waiting for keys & address of destination to be available")
- err = wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
- func(ctx context.Context) (bool, error) {
- rd, err = mrd.getDestination(ctx, client)
- if err != nil {
- return false, err
- }
- if rd.Status == nil || rd.Status.Rsync == nil {
- return false, nil
- }
- if rd.Status.Rsync.Address == nil {
- klog.V(2).Infof("Waiting for MigrationDestination %s RSync address to populate", rd.Name)
- return false, nil
- }
-
- if rd.Status.Rsync.SSHKeys == nil {
- klog.V(2).Infof("Waiting for MigrationDestination %s RSync sshkeys to populate", rd.Name)
- return false, nil
- }
-
- klog.V(2).Infof("Found MigrationDestination RSync Address: %s", *rd.Status.Rsync.Address)
- return true, nil
- })
- if err != nil {
- return nil, fmt.Errorf("failed to fetch rd status: %w,", err)
- }
-
- return rd, nil
-}
-func (mrd *migrationRelationshipDestination) getDestination(ctx context.Context, client client.Client) (
- *volsyncv1alpha1.ReplicationDestination, error) {
- nsName := types.NamespacedName{
- Namespace: mrd.Namespace,
- Name: mrd.RDName,
- }
- rd := &volsyncv1alpha1.ReplicationDestination{}
- err := client.Get(ctx, nsName, rd)
- if err != nil {
- return nil, err
+ if mr.data.IsRsyncTLS {
+ mr.mh = &migrationHandlerRsyncTLS{}
+ } else {
+ mr.mh = &migrationHandlerRsync{}
}
- return rd, nil
+ return mr, nil
}
diff --git a/kubectl-volsync/cmd/migration_create.go b/kubectl-volsync/cmd/migration_create.go
index 4ac323348..8ffb6a78b 100644
--- a/kubectl-volsync/cmd/migration_create.go
+++ b/kubectl-volsync/cmd/migration_create.go
@@ -47,6 +47,8 @@ type migrationCreate struct {
DestinationPVC string
// Name of the ReplicationDestination object
RDName string
+ // True if the ReplicationDestination should use RsyncTLS
+ IsRsyncTLS bool
// copyMethod describes how a point-in-time (PiT) image of the destination
// volume should be created
CopyMethod volsyncv1alpha1.CopyMethodType
@@ -64,6 +66,9 @@ type migrationCreate struct {
client client.Client
// PVC object associated with pvcName used to create destination object
PVC *corev1.PersistentVolumeClaim
+ // MoverSecurity context to use for the ReplicationDestination
+ // Individual fields will come from cli parameters
+ MoverSecurityContext *corev1.PodSecurityContext
}
// migrationCreateCmd represents the create command
@@ -101,6 +106,11 @@ func initMigrationCreateCmd(migrationCreateCmd *cobra.Command) {
migrationCreateCmd.Flags().String("storageclass", "", "StorageClass name for the PVC")
migrationCreateCmd.Flags().String("servicetype", "LoadBalancer",
"Service Type or ingress methods for a service. viz: ClusterIP, LoadBalancer")
+ migrationCreateCmd.Flags().String("rdname", "", "name of the ReplicationDestination to create")
+ migrationCreateCmd.Flags().Bool("rsynctls", false, "if true, will use rsync-tls")
+
+ // MoverSecurityContext flags - will only apply if rsyncTLS is true
+ addCLIRsyncTLSMoverSecurityContextFlags(migrationCreateCmd, true)
}
func newMigrationCreate(cmd *cobra.Command) (*migrationCreate, error) {
@@ -116,6 +126,13 @@ func newMigrationCreate(cmd *cobra.Command) (*migrationCreate, error) {
return nil, err
}
+ mc.mr.data.IsRsyncTLS = mc.IsRsyncTLS
+ if mc.IsRsyncTLS {
+ mc.mr.mh = &migrationHandlerRsyncTLS{}
+ } else {
+ mc.mr.mh = &migrationHandlerRsync{}
+ }
+
return mc, nil
}
@@ -181,15 +198,38 @@ func (mc *migrationCreate) parseCLI(cmd *cobra.Command) error {
return fmt.Errorf("unsupported service type: %v", corev1.ServiceType(serviceType))
}
mc.ServiceType = (*corev1.ServiceType)(&serviceType)
- mc.RDName = mc.Namespace + "-" + mc.DestinationPVC + "-migration-dest"
+
+ rdName, err := cmd.Flags().GetString("rdname")
+ if err != nil {
+ return fmt.Errorf("failed to fetch rdname, %w", err)
+ }
+ if rdName == "" {
+ rdName = mc.DestinationPVC + "-mig-dst" // Generate default value
+ }
+ mc.RDName = rdName
+
+ isRsyncTLS, err := cmd.Flags().GetBool("rsynctls")
+ if err != nil {
+ return fmt.Errorf("failed to fetch rsynctls, %w", err)
+ }
+ mc.IsRsyncTLS = isRsyncTLS
+
+ if isRsyncTLS {
+ // Parse the moverSecurityContext flags (these flags will not apply to the
+ // rsync ssh case)
+ mc.MoverSecurityContext, err = parseCLIRsyncTLSMoverSecurityContextFlags(cmd)
+ if err != nil {
+ return err
+ }
+ }
return nil
}
//nolint:funlen
func (mc *migrationCreate) newMigrationRelationshipDestination() (
- *migrationRelationshipDestination, error) {
- mrd := &migrationRelationshipDestination{}
+ *migrationRelationshipDestinationV2, error) {
+ mrd := &migrationRelationshipDestinationV2{}
// Assign the values from migrationCreate built after parsing cmd args
mrd.RDName = mc.RDName
@@ -203,8 +243,12 @@ func (mc *migrationCreate) newMigrationRelationshipDestination() (
}
}
- mrd.Destination.DestinationPVC = &mc.DestinationPVC
- mrd.Destination.ServiceType = mc.ServiceType
+ // Some migration create Cli parameters such as capacity, storageclassname, accesmodes etc are not
+ // saved to the migrationrelationship .yaml file. These are only used at create time when the
+ // destination PVC and ReplicationDestination will be created.
+
+ mrd.ServiceType = mc.ServiceType
+ mrd.CopyMethod = mc.CopyMethod
return mrd, nil
}
@@ -239,13 +283,14 @@ func (mc *migrationCreate) Run(ctx context.Context) error {
}
// Creates the RD if it doesn't exist
- _, err = mc.ensureReplicationDestination(ctx)
+ rd, err := mc.mr.mh.EnsureReplicationDestination(ctx, mc.client, mc.mr.data.Destination)
+ //_, err = mc.ensureReplicationDestination(ctx)
if err != nil {
return err
}
// Wait for ReplicationDestination to post address, sshkeys
- _, err = mc.mr.data.Destination.waitForRDStatus(ctx, mc.client)
+ _, err = mc.mr.mh.WaitForRDStatus(ctx, mc.client, rd)
if err != nil {
return err
}
@@ -333,29 +378,3 @@ func (mc *migrationCreate) getDestinationPVC(ctx context.Context) (*corev1.Persi
}
return destPVC, nil
}
-
-func (mc *migrationCreate) ensureReplicationDestination(ctx context.Context) (
- *volsyncv1alpha1.ReplicationDestination, error) {
- mrd := mc.mr.data.Destination
- rd := &volsyncv1alpha1.ReplicationDestination{
- ObjectMeta: metav1.ObjectMeta{
- Name: mrd.RDName,
- Namespace: mrd.Namespace,
- },
- Spec: volsyncv1alpha1.ReplicationDestinationSpec{
- Rsync: &volsyncv1alpha1.ReplicationDestinationRsyncSpec{
- ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
- DestinationPVC: mrd.Destination.DestinationPVC,
- },
- ServiceType: mrd.Destination.ServiceType,
- },
- },
- }
- if err := mc.client.Create(ctx, rd); err != nil {
- return nil, err
- }
- klog.Infof("Created ReplicationDestination: \"%s\" in Namespace: \"%s\"",
- rd.Name, rd.Namespace)
-
- return rd, nil
-}
diff --git a/kubectl-volsync/cmd/migration_create_test.go b/kubectl-volsync/cmd/migration_create_test.go
index a6aa5546c..d890eaa84 100644
--- a/kubectl-volsync/cmd/migration_create_test.go
+++ b/kubectl-volsync/cmd/migration_create_test.go
@@ -2,6 +2,7 @@ package cmd
import (
"context"
+ "fmt"
"os"
. "github.com/onsi/ginkgo/v2"
@@ -10,6 +11,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
krand "k8s.io/apimachinery/pkg/util/rand"
+ "k8s.io/utils/ptr"
volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
)
@@ -24,15 +26,17 @@ var _ = Describe("migration", func() {
)
BeforeEach(func() {
- ns = &corev1.Namespace{}
- cmd = &cobra.Command{}
- mc = &migrationCreate{}
- var err error
-
+ // Default values for tests
migrationCmdArgs = map[string]string{
"capacity": "2Gi",
"pvcname": "dest/volsync",
}
+ })
+
+ JustBeforeEach(func() {
+ ns = &corev1.Namespace{}
+ cmd = &cobra.Command{}
+ var err error
initMigrationCreateCmd(cmd)
cmd.Flags().String("relationship", "test", "")
@@ -41,16 +45,11 @@ var _ = Describe("migration", func() {
Expect(err).NotTo(HaveOccurred())
cmd.Flags().String("config-dir", dirname, "")
- mr, err := newMigrationRelationship(cmd)
- Expect(err).ToNot(HaveOccurred())
- Expect(mr).ToNot(BeNil())
- mc.mr = mr
-
err = migrationCmdArgsSet(cmd, migrationCmdArgs)
Expect(err).ToNot(HaveOccurred())
- err = mc.parseCLI(cmd)
- Expect(err).NotTo(HaveOccurred())
+ mc, err = newMigrationCreate(cmd)
+ Expect(err).ToNot(HaveOccurred())
mc.client = k8sClient
mc.Namespace = "foo-" + krand.String(5)
@@ -98,6 +97,71 @@ var _ = Describe("migration", func() {
Expect(err).To(HaveOccurred())
})
+ Describe("Rsync-TLS moverSecurityContext flags", func() {
+ moverSecCtxParams := map[string]string{
+ "runasuser": "5001",
+ "runasgroup": "201",
+ "fsgroup": "202",
+ "runasnonroot": "true",
+ "seccompprofiletype": "RuntimeDefault",
+ }
+
+ Context("When rsync-tls is not used", func() {
+ It("Should ignore the moverSecurityContext flags", func() {
+ Expect(mc.IsRsyncTLS).To(BeFalse())
+
+ for k, v := range moverSecCtxParams {
+ Expect(cmd.Flags().Set(k, v)).To(Succeed())
+ Expect(mc.parseCLI(cmd)).To(Succeed())
+
+ // Mover Security context should not be set (params ignored when rsynctls is not used)
+ Expect(mc.MoverSecurityContext).To(BeNil())
+ }
+ })
+ })
+ Context("When rsync-tls is not used", func() {
+ BeforeEach(func() {
+ migrationCmdArgs["rsynctls"] = "True"
+ })
+
+ Context("Parsing flags when they are set individually", func() {
+ for k := range moverSecCtxParams {
+ Context(fmt.Sprintf("When only the %s moverSecurityContext flag is set", k), func() {
+ flagName := k
+ flagValue := moverSecCtxParams[k]
+ It("Should parse the flag correctly", func() {
+ Expect(mc.IsRsyncTLS).To(BeTrue())
+ Expect(cmd.Flags().Set(flagName, flagValue)).To(Succeed())
+ Expect(mc.parseCLI(cmd)).To(Succeed())
+ Expect(mc.MoverSecurityContext).NotTo(BeNil())
+ })
+ })
+ }
+ })
+
+ Context("When using moverSecurityContext flags", func() {
+ BeforeEach(func() {
+ for k, v := range moverSecCtxParams {
+ migrationCmdArgs[k] = v
+ }
+ })
+ It("Should configure the moverSecurityContext correctly", func() {
+ Expect(mc.IsRsyncTLS).To(BeTrue())
+ Expect(mc.MoverSecurityContext).NotTo(BeNil())
+ Expect(mc.MoverSecurityContext).To(Equal(&corev1.PodSecurityContext{
+ RunAsUser: ptr.To[int64](5001),
+ RunAsGroup: ptr.To[int64](201),
+ FSGroup: ptr.To[int64](202),
+ RunAsNonRoot: ptr.To[bool](true),
+ SeccompProfile: &corev1.SeccompProfile{
+ Type: corev1.SeccompProfileTypeRuntimeDefault,
+ },
+ }))
+ })
+ })
+ })
+ })
+
It("Ensure namespace creation", func() {
ns = &corev1.Namespace{}
Expect(k8sClient.Get(context.Background(), types.NamespacedName{Name: mc.Namespace}, ns)).To(Succeed())
@@ -133,50 +197,129 @@ var _ = Describe("migration", func() {
})
- It("Ensure replicationdestination creation", func() {
- // Create a PVC
- PVC, err := mc.ensureDestPVC(context.Background())
- Expect(err).NotTo(HaveOccurred())
- Expect(PVC).NotTo(BeNil())
- PVC = &corev1.PersistentVolumeClaim{}
- Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
- Name: mc.DestinationPVC}, PVC)).To(Succeed())
-
- mrd, err := mc.newMigrationRelationshipDestination()
- Expect(err).ToNot(HaveOccurred())
- Expect(mrd).ToNot(BeNil())
- mc.mr.data.Destination = mrd
+ Context("When using rsync (the default)", func() {
+ //nolint:dupl
+ It("Ensure replicationdestination creation", func() {
+ // Create a PVC
+ PVC, err := mc.ensureDestPVC(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ Expect(PVC).NotTo(BeNil())
+ PVC = &corev1.PersistentVolumeClaim{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.DestinationPVC}, PVC)).To(Succeed())
+
+ mrd, err := mc.newMigrationRelationshipDestination()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(mrd).ToNot(BeNil())
+ mc.mr.data.Destination = mrd
+
+ // data version should be 2
+ Expect(mc.mr.data.Version).To(Equal(2))
+
+ // Should use rsync SSH by default
+ Expect(mc.mr.data.IsRsyncTLS).To(BeFalse())
+ // Check the migrationHandler is RsyncTLS
+ _, ok := mc.mr.mh.(*migrationHandlerRsync)
+ Expect(ok).To(BeTrue())
+
+ // Create replicationdestination
+ rd, err := mc.mr.mh.EnsureReplicationDestination(context.Background(), mc.client, mc.mr.data.Destination)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(rd).ToNot(BeNil())
+ rd = &volsyncv1alpha1.ReplicationDestination{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.RDName}, rd)).To(Succeed())
+
+ Expect(rd.Spec.Rsync).NotTo(BeNil()) // Rsync Spec should be used
+ Expect(rd.Spec.RsyncTLS).To(BeNil())
+
+ // Post status field in rd to mock controller
+ address := "Volsync-mock-address"
+ sshKey := "Volsync-mock-sshKey"
+ rd.Status = &volsyncv1alpha1.ReplicationDestinationStatus{
+ Rsync: &volsyncv1alpha1.ReplicationDestinationRsyncStatus{
+ Address: &address,
+ SSHKeys: &sshKey,
+ }}
+ Expect(k8sClient.Status().Update(context.Background(), rd)).To(Succeed())
+ // Wait for mock address and sshKey to pop up
+ _, err = mc.mr.mh.WaitForRDStatus(context.Background(), mc.client, rd)
+ Expect(err).ToNot(HaveOccurred())
+ // Retry creation of replicationdestination and it should fail as destination already exists
+ rd, err = mc.mr.mh.EnsureReplicationDestination(context.Background(), mc.client, mc.mr.data.Destination)
+ Expect(err).To(HaveOccurred())
+ Expect(rd).To(BeNil())
+ rd = &volsyncv1alpha1.ReplicationDestination{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.RDName}, rd)).To(Succeed())
+ // Should return existing address and sshkey
+ _, err = mc.mr.mh.WaitForRDStatus(context.Background(), mc.client, rd)
+ Expect(err).ToNot(HaveOccurred())
+ })
- // Create replicationdestination
- rd, err := mc.ensureReplicationDestination(context.Background())
- Expect(err).ToNot(HaveOccurred())
- Expect(rd).ToNot(BeNil())
- rd = &volsyncv1alpha1.ReplicationDestination{}
- Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
- Name: mc.RDName}, rd)).To(Succeed())
-
- // Post status field in rd to mock controller
- address := "Volsync-mock-address"
- sshKey := "Volsync-mock-sshKey"
- rd.Status = &volsyncv1alpha1.ReplicationDestinationStatus{
- Rsync: &volsyncv1alpha1.ReplicationDestinationRsyncStatus{
- Address: &address,
- SSHKeys: &sshKey,
- }}
- Expect(k8sClient.Status().Update(context.Background(), rd)).To(Succeed())
- // Wait for mock address and sshKey to pop up
- _, err = mc.mr.data.Destination.waitForRDStatus(context.Background(), mc.client)
- Expect(err).ToNot(HaveOccurred())
- // Retry creation of replicationdestination and it should fail as destination already exists
- rd, err = mc.ensureReplicationDestination(context.Background())
- Expect(err).To(HaveOccurred())
- Expect(rd).To(BeNil())
- rd = &volsyncv1alpha1.ReplicationDestination{}
- Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
- Name: mc.RDName}, rd)).To(Succeed())
- // Should return existing address and sshkey
- _, err = mc.mr.data.Destination.waitForRDStatus(context.Background(), mc.client)
- Expect(err).ToNot(HaveOccurred())
+ })
+ Context("When using rsync-tls", func() {
+ BeforeEach(func() {
+ migrationCmdArgs["rsynctls"] = "true"
+ })
+ //nolint:dupl
+ It("Ensure replicationdestination (rsynctls) creation", func() {
+ // Create a PVC
+ PVC, err := mc.ensureDestPVC(context.Background())
+ Expect(err).NotTo(HaveOccurred())
+ Expect(PVC).NotTo(BeNil())
+ PVC = &corev1.PersistentVolumeClaim{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.DestinationPVC}, PVC)).To(Succeed())
+
+ mrd, err := mc.newMigrationRelationshipDestination()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(mrd).ToNot(BeNil())
+ mc.mr.data.Destination = mrd
+
+ // data version should be 2
+ Expect(mc.mr.data.Version).To(Equal(2))
+
+ // Should use rsync TLS by default
+ Expect(mc.mr.data.IsRsyncTLS).To(BeTrue())
+ // Check the migrationHandler is RsyncTLS
+ _, ok := mc.mr.mh.(*migrationHandlerRsyncTLS)
+ Expect(ok).To(BeTrue())
+
+ // Create replicationdestination
+ rd, err := mc.mr.mh.EnsureReplicationDestination(context.Background(), mc.client, mc.mr.data.Destination)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(rd).ToNot(BeNil())
+ rd = &volsyncv1alpha1.ReplicationDestination{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.RDName}, rd)).To(Succeed())
+
+ Expect(rd.Spec.RsyncTLS).NotTo(BeNil()) // RsyncTLS Spec should be used
+ Expect(rd.Spec.Rsync).To(BeNil())
+
+ // Post status field in rd to mock controller
+ address := "Volsync-mock-address"
+ keys := "Volsync-mock-ks"
+ rd.Status = &volsyncv1alpha1.ReplicationDestinationStatus{
+ RsyncTLS: &volsyncv1alpha1.ReplicationDestinationRsyncTLSStatus{
+ Address: &address,
+ KeySecret: &keys,
+ }}
+ Expect(k8sClient.Status().Update(context.Background(), rd)).To(Succeed())
+ // Wait for mock address and keySecret to pop up
+ _, err = mc.mr.mh.WaitForRDStatus(context.Background(), mc.client, rd)
+ Expect(err).ToNot(HaveOccurred())
+ // Retry creation of replicationdestination and it should fail as destination already exists
+ rd, err = mc.mr.mh.EnsureReplicationDestination(context.Background(), mc.client, mc.mr.data.Destination)
+ Expect(err).To(HaveOccurred())
+ Expect(rd).To(BeNil())
+ rd = &volsyncv1alpha1.ReplicationDestination{}
+ Expect(k8sClient.Get(context.Background(), types.NamespacedName{Namespace: mc.Namespace,
+ Name: mc.RDName}, rd)).To(Succeed())
+ // Should return existing address and keysec
+ _, err = mc.mr.mh.WaitForRDStatus(context.Background(), mc.client, rd)
+ Expect(err).ToNot(HaveOccurred())
+ })
})
})
diff --git a/kubectl-volsync/cmd/migration_delete.go b/kubectl-volsync/cmd/migration_delete.go
index e76e36a4d..d417c4f75 100644
--- a/kubectl-volsync/cmd/migration_delete.go
+++ b/kubectl-volsync/cmd/migration_delete.go
@@ -18,12 +18,14 @@ package cmd
import (
"context"
- "fmt"
"github.com/spf13/cobra"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/i18n"
"sigs.k8s.io/controller-runtime/pkg/client"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
)
type migrationDelete struct {
@@ -82,14 +84,17 @@ func newMigrationDelete() *migrationDelete {
func (md *migrationDelete) deleteReplicationDestination(ctx context.Context) error {
mrd := md.mr.data.Destination
- rd, err := mrd.getDestination(ctx, md.client)
- if err != nil {
- return fmt.Errorf("get ReplicationDestination returned:%w", err)
+
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: mrd.RDName,
+ Namespace: mrd.Namespace,
+ },
}
- err = md.client.Delete(ctx, rd)
+ err := md.client.Delete(ctx, rd)
if err != nil {
- return err
+ return err // Note this will return error if the RD doesn't exist
}
klog.Infof("Deleted ReplicationDestination: \"%s\"", mrd.RDName)
diff --git a/kubectl-volsync/cmd/migration_delete_test.go b/kubectl-volsync/cmd/migration_delete_test.go
index ecde6c389..9bbd41b03 100644
--- a/kubectl-volsync/cmd/migration_delete_test.go
+++ b/kubectl-volsync/cmd/migration_delete_test.go
@@ -10,25 +10,32 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
krand "k8s.io/apimachinery/pkg/util/rand"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
)
var _ = Describe("migration delete", func() {
var (
- ns *corev1.Namespace
- cmd *cobra.Command
- dirname string
+ ns *corev1.Namespace
+ cmd *cobra.Command
+ dirname string
+ migrationRelationshipFile string
+ migrationCmdArgs = make(map[string]string)
)
BeforeEach(func() {
- ns = &corev1.Namespace{}
- cmd = &cobra.Command{}
- mc := &migrationCreate{}
- var err error
-
- migrationCmdArgs := map[string]string{
+ // Defaults for tests
+ migrationCmdArgs = map[string]string{
"capacity": "2Gi",
"pvcname": "dest/volsync",
}
+ })
+
+ JustBeforeEach(func() {
+ ns = &corev1.Namespace{}
+ cmd = &cobra.Command{}
+ var mc *migrationCreate
+ var err error
initMigrationCreateCmd(cmd)
cmd.Flags().String("relationship", "test", "")
@@ -37,15 +44,10 @@ var _ = Describe("migration delete", func() {
Expect(err).NotTo(HaveOccurred())
cmd.Flags().String("config-dir", dirname, "")
- mr, err := newMigrationRelationship(cmd)
- Expect(err).ToNot(HaveOccurred())
- Expect(mr).ToNot(BeNil())
- mc.mr = mr
-
err = migrationCmdArgsSet(cmd, migrationCmdArgs)
Expect(err).ToNot(HaveOccurred())
- err = mc.parseCLI(cmd)
+ mc, err = newMigrationCreate(cmd)
Expect(err).NotTo(HaveOccurred())
mc.client = k8sClient
@@ -68,12 +70,17 @@ var _ = Describe("migration delete", func() {
mc.mr.data.Destination = mrd
// Create replicationdestination
- rd, err := mc.ensureReplicationDestination(context.Background())
+ rd, err := mc.mr.mh.EnsureReplicationDestination(context.Background(), mc.client, mrd)
Expect(err).ToNot(HaveOccurred())
Expect(rd).ToNot(BeNil())
err = mc.mr.Save()
Expect(err).ToNot(HaveOccurred())
+
+ // Verify migration relationship file was created
+ migrationRelationshipFile = dirname + "/test.yaml"
+ _, err = os.Stat(migrationRelationshipFile)
+ Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
@@ -99,6 +106,10 @@ var _ = Describe("migration delete", func() {
err = md.mr.Delete()
Expect(err).ToNot(HaveOccurred())
+ _, err = os.Stat(migrationRelationshipFile)
+ Expect(err).To(HaveOccurred())
+ Expect(os.IsNotExist(err)).To(BeTrue())
+
// Verify delete replicationdestination that does not exist
err = md.deleteReplicationDestination(context.Background())
Expect(err).To(HaveOccurred())
@@ -108,4 +119,73 @@ var _ = Describe("migration delete", func() {
Expect(err).To(HaveOccurred())
})
+ Context("Loading older migrationship (data at v1)", func() {
+ const v1MigrationRelationshipFileContents string = `data:
+ destination:
+ cluster: "test-cluster"
+ destination:
+ replicationdestinationvolumeoptions:
+ accessmodes: []
+ copymethod: ""
+ destinationpvc: volsync
+ servicetype: LoadBalancer
+ namespace: dest
+ pvcname: volsync
+ rdname: dest-volsync-migration-dest
+ sshkeyname: ""
+ version: 1
+id: c22818bd-8716-436f-a48d-c2c8746afde6
+type: migration`
+
+ When("The migration relationship file is at v1", func() {
+ JustBeforeEach(func() {
+ // Parent JustBeforeEach sets up a v2 migration relationship file - overwrite it
+ // with a v1 file for this test
+ Expect(os.WriteFile(migrationRelationshipFile, []byte(v1MigrationRelationshipFileContents), 0600)).To(Succeed())
+ })
+
+ It("Should convert data to v2", func() {
+ mr, err := loadMigrationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(mr.data.Version).To(Equal(2))
+ Expect(mr.data.IsRsyncTLS).To(BeFalse())
+ Expect(mr.data.Destination.RDName).To(Equal("dest-volsync-migration-dest"))
+ Expect(mr.data.Destination.PVCName).To(Equal("volsync"))
+ Expect(mr.data.Destination.Namespace).To(Equal("dest"))
+ Expect(mr.data.Destination.Cluster).To(Equal("test-cluster"))
+ Expect(*mr.data.Destination.ServiceType).To(Equal(corev1.ServiceTypeLoadBalancer))
+ // Was not specified in v1 but should be filled out when we convert to v2
+ Expect(mr.data.Destination.CopyMethod).To(Equal(volsyncv1alpha1.CopyMethodDirect))
+ })
+ })
+ })
+
+ Context("Test loading migrationrelationship file instantiates correctly", func() {
+ Context("When the migrationRelationship uses rsync (the default)", func() {
+ It("Should load correctly", func() {
+ mr, err := loadMigrationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(mr.data.IsRsyncTLS).To(BeFalse())
+ // Check the migrationHandler is Rsync
+ _, ok := mr.mh.(*migrationHandlerRsync)
+ Expect(ok).To(BeTrue())
+ })
+ })
+ Context("When the migrationRelationship uses rsync-tls", func() {
+ BeforeEach(func() {
+ migrationCmdArgs["rsynctls"] = "true"
+ })
+
+ It("Should load correctly", func() {
+ mr, err := loadMigrationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(mr.data.IsRsyncTLS).To(BeTrue())
+ // Check the migrationHandler is RsyncTLS
+ _, ok := mr.mh.(*migrationHandlerRsyncTLS)
+ Expect(ok).To(BeTrue())
+ })
+ })
+ })
})
diff --git a/kubectl-volsync/cmd/migration_handler_rsync.go b/kubectl-volsync/cmd/migration_handler_rsync.go
new file mode 100644
index 000000000..919e30bd4
--- /dev/null
+++ b/kubectl-volsync/cmd/migration_handler_rsync.go
@@ -0,0 +1,216 @@
+/*
+Copyright © 2024 The VolSync authors
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
+)
+
+type migrationHandlerRsync struct{}
+
+var _ migrationHandler = &migrationHandlerRsync{}
+
+func (mhr *migrationHandlerRsync) EnsureReplicationDestination(ctx context.Context, c client.Client,
+ destConfig *migrationRelationshipDestinationV2) (*volsyncv1alpha1.ReplicationDestination, error) {
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ Spec: volsyncv1alpha1.ReplicationDestinationSpec{
+ Rsync: &volsyncv1alpha1.ReplicationDestinationRsyncSpec{
+ ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ DestinationPVC: &destConfig.PVCName,
+ CopyMethod: destConfig.CopyMethod,
+ },
+ ServiceType: destConfig.ServiceType,
+ },
+ },
+ }
+ if err := c.Create(ctx, rd); err != nil {
+ return nil, err
+ }
+ klog.Infof("Created ReplicationDestination: \"%s\" in Namespace: \"%s\"",
+ rd.Name, rd.Namespace)
+
+ return rd, nil
+}
+
+//nolint:dupl
+func (mhr *migrationHandlerRsync) WaitForRDStatus(ctx context.Context, c client.Client,
+ rd *volsyncv1alpha1.ReplicationDestination) (*volsyncv1alpha1.ReplicationDestination, error) {
+ // wait for migrationdestination to become ready
+ klog.Infof("waiting for keySecret & address of destination to be available")
+ err := wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
+ func(ctx context.Context) (bool, error) {
+ err := c.Get(ctx, client.ObjectKeyFromObject(rd), rd)
+ if err != nil {
+ return false, err
+ }
+ if rd.Status == nil {
+ return false, nil
+ }
+
+ if rd.Status.Rsync == nil {
+ return false, nil
+ }
+
+ if rd.Status.Rsync.Address == nil {
+ klog.V(2).Infof("Waiting for MigrationDestination %s RSync address to populate", rd.GetName())
+ return false, nil
+ }
+
+ if rd.Status.Rsync.SSHKeys == nil {
+ klog.V(2).Infof("Waiting for MigrationDestination %s RSync sshkeys to populate", rd.GetName())
+ return false, nil
+ }
+
+ klog.V(2).Infof("Found MigrationDestination RSync Address: %s", *rd.Status.Rsync.Address)
+ return true, nil
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch rd status: %w,", err)
+ }
+
+ return rd, nil
+}
+
+func (mhr *migrationHandlerRsync) RunMigration(ctx context.Context, c client.Client,
+ source string, destConfig *migrationRelationshipDestinationV2,
+ _ int32 /*stunnellocalport, not needed for rsyncssh*/) error {
+ var sshKeyDir *string
+ var destAddr string
+ var err error
+ defer func() {
+ // Remove the directory containing secrets
+ if sshKeyDir != nil {
+ if err = os.RemoveAll(*sshKeyDir); err != nil {
+ klog.Infof("failed to remove temporary directory with ssh keys (%s): %v",
+ *sshKeyDir, err)
+ }
+ }
+ }()
+ // Retrieve Secrets/keys
+ sshKeyDir, destAddr, err = mhr.retrieveSecretsAndDestAddr(ctx, c, destConfig)
+ if err != nil {
+ return err
+ }
+
+ // Do rsync
+ err = mhr.runRsync(ctx, source, *sshKeyDir, destAddr, destConfig)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (mhr *migrationHandlerRsync) retrieveSecretsAndDestAddr(ctx context.Context, c client.Client,
+ destConfig *migrationRelationshipDestinationV2) (*string, string, error) {
+ klog.Infof("Extracting ReplicationDestination secrets")
+
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ _, err := mhr.WaitForRDStatus(ctx, c, rd)
+ if err != nil {
+ return nil, "", err
+ }
+
+ destAddr := *rd.Status.Rsync.Address
+ sshKeysSecret := rd.Status.Rsync.SSHKeys
+ sshSecret := &corev1.Secret{}
+ nsName := types.NamespacedName{
+ Namespace: destConfig.Namespace,
+ Name: *sshKeysSecret,
+ }
+ err = c.Get(ctx, nsName, sshSecret)
+ if err != nil {
+ return nil, "", fmt.Errorf("error retrieving destination sshSecret %s: %w", *sshKeysSecret, err)
+ }
+
+ sshKeydir, err := os.MkdirTemp("", "sshkeys")
+ if err != nil {
+ return nil, "", fmt.Errorf("unable to create temporary directory %w", err)
+ }
+
+ filename := filepath.Join(sshKeydir, "source")
+ err = os.WriteFile(filename, sshSecret.Data["source"], 0600)
+ if err != nil {
+ return &sshKeydir, "", fmt.Errorf("unable to write to the file, %w", err)
+ }
+
+ filename = filepath.Join(sshKeydir, "source.pub")
+ err = os.WriteFile(filename, sshSecret.Data["source.pub"], 0600)
+ if err != nil {
+ return &sshKeydir, "", fmt.Errorf("unable to write to the file, %w", err)
+ }
+
+ filename = filepath.Join(sshKeydir, "destination.pub")
+ destinationPub := fmt.Sprintf("%s %s", destAddr, sshSecret.Data["destination.pub"])
+ err = os.WriteFile(filename, []byte(destinationPub), 0600)
+ if err != nil {
+ return &sshKeydir, "", fmt.Errorf("unable to write to the file, %w", err)
+ }
+
+ return &sshKeydir, destAddr, nil
+}
+
+func (mhr *migrationHandlerRsync) runRsync(ctx context.Context, source string,
+ sshKeydir string, destAddr string, destConfig *migrationRelationshipDestinationV2) error {
+ sshKey := filepath.Join(sshKeydir, "source")
+ knownHostfile := filepath.Join(sshKeydir, "destination.pub")
+ ssh := fmt.Sprintf("ssh -i %s -o UserKnownHostsFile=%s -o StrictHostKeyChecking=yes",
+ sshKey, knownHostfile)
+ dest := fmt.Sprintf("root@%s:.", destAddr)
+
+ cmd := exec.CommandContext(ctx, "rsync", "-aAhHSxze", ssh, "--delete",
+ "--itemize-changes", "--info=stats2,misc2", source, dest)
+
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+ klog.Infof("Migrating Data from \"%s\" to \"%s\\%s\\%s\"", source, destConfig.Cluster,
+ destConfig.Namespace, destConfig.PVCName)
+ err := cmd.Start()
+ if err != nil {
+ return fmt.Errorf("failed to run =%w", err)
+ }
+ err = cmd.Wait()
+ if err != nil {
+ return fmt.Errorf("command finished with an error, %w", err)
+ }
+
+ return nil
+}
diff --git a/kubectl-volsync/cmd/migration_handler_rsynctls.go b/kubectl-volsync/cmd/migration_handler_rsynctls.go
new file mode 100644
index 000000000..4fe8bae40
--- /dev/null
+++ b/kubectl-volsync/cmd/migration_handler_rsynctls.go
@@ -0,0 +1,348 @@
+/*
+Copyright © 2024 The VolSync authors
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package cmd
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "text/template"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
+)
+
+type migrationHandlerRsyncTLS struct{}
+
+var _ migrationHandler = &migrationHandlerRsyncTLS{}
+
+type stunnelConfParams struct {
+ StunnelConfFile string
+ StunnelPIDFile string
+ PSKFile string
+ LocalPort int32
+ DestinationPort int32
+ DestinationAddress string
+}
+
+func (mhrtls *migrationHandlerRsyncTLS) EnsureReplicationDestination(ctx context.Context, c client.Client,
+ destConfig *migrationRelationshipDestinationV2) (*volsyncv1alpha1.ReplicationDestination, error) {
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ Spec: volsyncv1alpha1.ReplicationDestinationSpec{
+ RsyncTLS: &volsyncv1alpha1.ReplicationDestinationRsyncTLSSpec{
+ ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ DestinationPVC: &destConfig.PVCName,
+ CopyMethod: destConfig.CopyMethod,
+ },
+ ServiceType: destConfig.ServiceType,
+ MoverConfig: volsyncv1alpha1.MoverConfig{
+ MoverSecurityContext: destConfig.MoverSecurityContext,
+ },
+ },
+ },
+ }
+ if err := c.Create(ctx, rd); err != nil {
+ return nil, err
+ }
+ klog.Infof("Created ReplicationDestination: \"%s\" in Namespace: \"%s\"",
+ rd.Name, rd.Namespace)
+
+ rd.Spec.RsyncTLS.MoverSecurityContext = destConfig.MoverSecurityContext
+
+ return rd, nil
+}
+
+//nolint:dupl
+func (mhrtls *migrationHandlerRsyncTLS) WaitForRDStatus(ctx context.Context, c client.Client,
+ rd *volsyncv1alpha1.ReplicationDestination) (*volsyncv1alpha1.ReplicationDestination, error) {
+ // wait for migrationdestination to become ready
+ klog.Infof("waiting for keySecret & address of destination to be available")
+ err := wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
+ func(ctx context.Context) (bool, error) {
+ err := c.Get(ctx, client.ObjectKeyFromObject(rd), rd)
+ if err != nil {
+ return false, err
+ }
+ if rd.Status == nil {
+ return false, nil
+ }
+
+ if rd.Status.RsyncTLS == nil {
+ return false, nil
+ }
+
+ if rd.Status.RsyncTLS.Address == nil {
+ klog.V(2).Infof("Waiting for MigrationDestination %s RSyncTLS address to populate", rd.GetName())
+ return false, nil
+ }
+
+ if rd.Status.RsyncTLS.KeySecret == nil {
+ klog.V(2).Infof("Waiting for MigrationDestination %s RSyncTLS keySecret to populate", rd.GetName())
+ return false, nil
+ }
+
+ klog.V(2).Infof("Found MigrationDestination RSyncTLS Address: %s", *rd.Status.RsyncTLS.Address)
+ return true, nil
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch rd status: %w,", err)
+ }
+
+ return rd, nil
+}
+
+func (mhrtls *migrationHandlerRsyncTLS) RunMigration(ctx context.Context, c client.Client,
+ source string, destConfig *migrationRelationshipDestinationV2, sTunnelLocalPort int32) error {
+ var stunnelTempDir *string
+ var destAddr string
+ var err error
+ defer func() {
+ // Remove the directory containing secrets
+ if stunnelTempDir != nil {
+ if err = os.RemoveAll(*stunnelTempDir); err != nil {
+ klog.Infof("failed to remove temporary directory with stunnel conf (%s): %v",
+ *stunnelTempDir, err)
+ }
+ klog.Infof("removed temp dir: %s", *stunnelTempDir)
+ }
+ }()
+
+ tempDir, err := os.MkdirTemp("", "stunnelConf")
+ if err != nil {
+ return fmt.Errorf("unable to create temporary directory %w", err)
+ }
+ stunnelTempDir = &tempDir
+ klog.Infof("created temp dir: %s", *stunnelTempDir)
+
+ pskFile := filepath.Join(*stunnelTempDir, "psk.txt")
+ // Write pre-shared key to psk file and find dest addr
+ destAddr, err = mhrtls.retrieveSecretsAndDestAddr(ctx, c, pskFile, destConfig)
+ if err != nil {
+ return err
+ }
+
+ stunnelParams := stunnelConfParams{
+ StunnelConfFile: filepath.Join(*stunnelTempDir, "stunnel-client.conf"),
+ StunnelPIDFile: filepath.Join(*stunnelTempDir, "stunnel-client.pid"),
+ PSKFile: pskFile,
+ LocalPort: sTunnelLocalPort,
+ DestinationPort: defaultDestinationStunnelPort,
+ DestinationAddress: destAddr,
+ }
+
+ err = mhrtls.createStunnelConf(stunnelParams)
+ if err != nil {
+ return err
+ }
+
+ // Do rsync
+ err = mhrtls.runRsyncTLS(ctx, source, stunnelParams, destConfig)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (mhrtls *migrationHandlerRsyncTLS) retrieveSecretsAndDestAddr(ctx context.Context, c client.Client,
+ pskFile string, destConfig *migrationRelationshipDestinationV2) (string, error) {
+ klog.Infof("Extracting ReplicationDestination secrets")
+
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ _, err := mhrtls.WaitForRDStatus(ctx, c, rd)
+ if err != nil {
+ return "", err
+ }
+
+ destAddr := *rd.Status.RsyncTLS.Address
+ keySecretName := rd.Status.RsyncTLS.KeySecret
+ keySecret := &corev1.Secret{}
+ nsName := types.NamespacedName{
+ Namespace: destConfig.Namespace,
+ Name: *keySecretName,
+ }
+ err = c.Get(ctx, nsName, keySecret)
+ if err != nil {
+ return "", fmt.Errorf("error retrieving destination keySecret %s: %w", *keySecretName, err)
+ }
+
+ err = os.WriteFile(pskFile, keySecret.Data["psk.txt"], 0600)
+ if err != nil {
+ return "", fmt.Errorf("unable to write to the file, %w", err)
+ }
+
+ return destAddr, nil
+}
+
+// Writes out a stunnel conf file to the path in stunnelParams.StunnelConfFile
+func (mhrtls *migrationHandlerRsyncTLS) createStunnelConf(stunnelParams stunnelConfParams) error {
+ f, err := os.Create(stunnelParams.StunnelConfFile)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+
+ // Note this differs from mover-rsync-tls client.sh in that we run in the foreground
+ stunnelConfTemplate := `
+; Global options
+debug = debug
+foreground = yes
+output = /dev/stdout
+pid = {{ .StunnelPIDFile }}
+socket = l:SO_KEEPALIVE=1
+socket = l:TCP_KEEPIDLE=180
+socket = r:SO_KEEPALIVE=1
+socket = r:TCP_KEEPIDLE=180
+syslog = no
+
+[rsync]
+ciphers = PSK
+PSKsecrets = {{ .PSKFile }}
+; Port to listen for incoming connection from rsync
+accept = 127.0.0.1:{{ .LocalPort }}
+; We are the client
+client = yes
+connect = {{ .DestinationAddress }}:{{ .DestinationPort }}
+`
+
+ t, err := template.New("stunnelconf").Parse(stunnelConfTemplate)
+ if err != nil {
+ return err
+ }
+
+ err = t.Execute(f, stunnelParams)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+//nolint:funlen
+func (mhrtls *migrationHandlerRsyncTLS) runRsyncTLS(ctx context.Context, source string,
+ stunnelParams stunnelConfParams, destConfig *migrationRelationshipDestinationV2) error {
+ stunnelContext, stunnelCancel := context.WithCancel(ctx)
+ //nolint:gosec
+ stunnelCmd := exec.CommandContext(stunnelContext, "stunnel", stunnelParams.StunnelConfFile)
+ defer func() {
+ stunnelCancel()
+ _ = stunnelCmd.Wait() // Ignore errors, the tunnel will show error because we killed it via context
+ klog.Info("stunnel shutdown complete.")
+ }()
+
+ stunnelCmd.Stderr = os.Stderr
+ // stunnel in foreground mode will also log everything to stderr so don't bother with stdout
+ //stunnelCmd.Stdout = os.Stdout
+ klog.Infof("Starting local stunnel listening on port %d", stunnelParams.LocalPort)
+ err := stunnelCmd.Start()
+ if err != nil {
+ return fmt.Errorf("failed to run =%w", err)
+ }
+
+ // Make sure stunnel has started
+ const maxRetries = 20
+ pidFileExists := false
+ for i := 0; i < maxRetries; i++ {
+ time.Sleep(1 * time.Second)
+ _, err = os.Stat(stunnelParams.StunnelPIDFile)
+ if err == nil {
+ pidFileExists = true
+ break
+ }
+ }
+ if !pidFileExists {
+ return fmt.Errorf("stunnel failed to start - pid file %s not found", stunnelParams.StunnelPIDFile)
+ }
+
+ // Now run rsync
+ // 1st run preserves as much as possible, but excludes the root directory
+ localRsyncTunnelAddr := fmt.Sprintf("rsync://127.0.0.1:%d/data", stunnelParams.LocalPort)
+
+ cmdArgs := []string{
+ "-aAhHSxz",
+ "--exclude=lost+found",
+ "--itemize-changes",
+ "--info=stats2,misc2",
+ }
+
+ sourcePath := filepath.Clean(source) // Will remove any trailing slash
+ // filepath.Glob() includes hidden files (files that start with .) but does not include . or ..
+ sourceFiles, err := filepath.Glob(sourcePath + "/*")
+ if err != nil {
+ return err
+ }
+
+ cmdArgs = append(cmdArgs, sourceFiles...)
+ cmdArgs = append(cmdArgs, localRsyncTunnelAddr)
+
+ //nolint:gosec
+ cmd := exec.CommandContext(ctx, "rsync", cmdArgs...)
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+ klog.Infof("Migrating Data from \"%s\" to \"%s\\%s\\%s\"", source, destConfig.Cluster,
+ destConfig.Namespace, destConfig.PVCName)
+ err = cmd.Start()
+ if err != nil {
+ return fmt.Errorf("failed to run =%w", err)
+ }
+ err = cmd.Wait()
+ if err != nil {
+ return fmt.Errorf("command finished with an error, %w", err)
+ }
+
+ // rsync - 2nd run
+ // To delete extra files, must sync at the directory-level, but need to avoid
+ // trying to modify the directory itself. This pass will only delete files
+ // that exist on the destination but not on the source, not make updates.
+
+ //nolint:gosec
+ cmd2 := exec.CommandContext(ctx, "rsync", "-rx", "--exclude=lost+found",
+ "--ignore-existing", "--ignore-non-existing", "--delete",
+ "--itemize-changes", "--info=stats2,misc2", sourcePath+"/", localRsyncTunnelAddr)
+ cmd2.Stderr = os.Stderr
+ cmd2.Stdout = os.Stdout
+ klog.Infof("\n2nd rsync to clean up extra files at dest...")
+ err = cmd2.Start()
+ if err != nil {
+ return fmt.Errorf("failed to run =%w", err)
+ }
+ err = cmd2.Wait()
+ if err != nil {
+ return fmt.Errorf("command finished with an error, %w", err)
+ }
+
+ return nil
+}
diff --git a/kubectl-volsync/cmd/migration_rsync.go b/kubectl-volsync/cmd/migration_rsync.go
index 26c0fa7e5..69657ed6f 100644
--- a/kubectl-volsync/cmd/migration_rsync.go
+++ b/kubectl-volsync/cmd/migration_rsync.go
@@ -20,13 +20,8 @@ import (
"context"
"fmt"
"os"
- "os/exec"
- "path/filepath"
"github.com/spf13/cobra"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -34,15 +29,15 @@ import (
type migrationSync struct {
mr *migrationRelationship
- // Address is the remote address to connect to for replication.
- DestAddr string
// Source volume to be migrated
Source string
// client object to communicate with a cluster
client client.Client
+ // Local Port to use for stunnel (only applies for rsync-tls)
+ StunnelLocalPort int32
}
-// migrationCreateCmd represents the create command
+// migrationSyncCmd represents the create command
var migrationSyncCmd = &cobra.Command{
Use: "rsync",
Short: i18n.T("Rsync data from source to destination"),
@@ -70,11 +65,14 @@ func init() {
initmigrationSyncCmd(migrationSyncCmd)
}
-func initmigrationSyncCmd(migrationCreateCmd *cobra.Command) {
- migrationCmd.AddCommand(migrationCreateCmd)
+func initmigrationSyncCmd(migrationSyncCmd *cobra.Command) {
+ migrationCmd.AddCommand(migrationSyncCmd)
- migrationCreateCmd.Flags().String("source", "", "source volume to be migrated")
- cobra.CheckErr(migrationCreateCmd.MarkFlagRequired("source"))
+ migrationSyncCmd.Flags().String("source", "", "source volume to be migrated")
+ cobra.CheckErr(migrationSyncCmd.MarkFlagRequired("source"))
+
+ migrationSyncCmd.Flags().Int32("stunnellocalport", defaultLocalStunnelPort,
+ "if using rsyncl-tls, stunnel will need to run locally. Set this to override the default local port used")
}
func (ms *migrationSync) Run(ctx context.Context) error {
@@ -90,29 +88,7 @@ func (ms *migrationSync) Run(ctx context.Context) error {
return fmt.Errorf("failed to access the source volume, %w", err)
}
- var sshKeyDir *string
- defer func() {
- // Remove the directory containing secrets
- if sshKeyDir != nil {
- if err = os.RemoveAll(*sshKeyDir); err != nil {
- klog.Infof("failed to remove temporary directory with ssh keys (%s): %v",
- *sshKeyDir, err)
- }
- }
- }()
- // Retrieve Secrets/keys
- sshKeyDir, err = ms.retrieveSecrets(ctx)
- if err != nil {
- return err
- }
-
- // Do rysnc
- err = ms.runRsync(ctx, *sshKeyDir)
- if err != nil {
- return err
- }
-
- return nil
+ return ms.mr.mh.RunMigration(ctx, ms.client, ms.Source, ms.mr.data.Destination, ms.StunnelLocalPort)
}
func newMigrationSync(cmd *cobra.Command) (*migrationSync, error) {
@@ -123,79 +99,12 @@ func newMigrationSync(cmd *cobra.Command) (*migrationSync, error) {
}
ms.Source = source
- return ms, nil
-}
-
-//nolint:funlen
-func (ms *migrationSync) retrieveSecrets(ctx context.Context) (*string, error) {
- klog.Infof("Extracting ReplicationDestination secrets")
- mrd := ms.mr.data.Destination
- rd, err := mrd.waitForRDStatus(ctx, ms.client)
- if err != nil {
- return nil, err
- }
- ms.DestAddr = *rd.Status.Rsync.Address
- sshKeysSecret := rd.Status.Rsync.SSHKeys
- sshSecret := &corev1.Secret{}
- nsName := types.NamespacedName{
- Namespace: mrd.Namespace,
- Name: *sshKeysSecret,
- }
- err = ms.client.Get(ctx, nsName, sshSecret)
- if err != nil {
- return nil, fmt.Errorf("error retrieving destination sshSecret %s: %w", *sshKeysSecret, err)
- }
-
- sshKeydir, err := os.MkdirTemp("", "sshkeys")
- if err != nil {
- return nil, fmt.Errorf("unable to create temporary directory %w", err)
- }
-
- filename := filepath.Join(sshKeydir, "source")
- err = os.WriteFile(filename, sshSecret.Data["source"], 0600)
- if err != nil {
- return &sshKeydir, fmt.Errorf("unable to write to the file, %w", err)
- }
-
- filename = filepath.Join(sshKeydir, "source.pub")
- err = os.WriteFile(filename, sshSecret.Data["source.pub"], 0600)
- if err != nil {
- return &sshKeydir, fmt.Errorf("unable to write to the file, %w", err)
- }
-
- filename = filepath.Join(sshKeydir, "destination.pub")
- destinationPub := fmt.Sprintf("%s %s", ms.DestAddr,
- sshSecret.Data["destination.pub"])
- err = os.WriteFile(filename, []byte(destinationPub), 0600)
+ // Allow users to specify different local stunnel port
+ sTunnelLocalPort, err := cmd.Flags().GetInt32("stunnellocalport")
if err != nil {
- return &sshKeydir, fmt.Errorf("unable to write to the file, %w", err)
+ return nil, fmt.Errorf("failed to fetch stunnellocalport, %w", err)
}
+ ms.StunnelLocalPort = sTunnelLocalPort
- return &sshKeydir, nil
-}
-
-func (ms *migrationSync) runRsync(ctx context.Context, sshKeydir string) error {
- sshKey := filepath.Join(sshKeydir, "source")
- knownHostfile := filepath.Join(sshKeydir, "destination.pub")
- ssh := fmt.Sprintf("ssh -i %s -o UserKnownHostsFile=%s -o StrictHostKeyChecking=yes",
- sshKey, knownHostfile)
- dest := fmt.Sprintf("root@%s:.", ms.DestAddr)
-
- cmd := exec.CommandContext(ctx, "rsync", "-aAhHSxze", ssh, "--delete",
- "--itemize-changes", "--info=stats2,misc2", ms.Source, dest)
-
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stdout
- klog.Infof("Migrating Data from \"%s\" to \"%s\\%s\\%s\"", ms.Source, ms.mr.data.Destination.Cluster,
- ms.mr.data.Destination.Namespace, ms.mr.data.Destination.PVCName)
- err := cmd.Start()
- if err != nil {
- return fmt.Errorf("failed to run =%w", err)
- }
- err = cmd.Wait()
- if err != nil {
- return fmt.Errorf("command finished with an error, %w", err)
- }
-
- return nil
+ return ms, nil
}
diff --git a/kubectl-volsync/cmd/replication.go b/kubectl-volsync/cmd/replication.go
index fd2d6185f..c34f31170 100644
--- a/kubectl-volsync/cmd/replication.go
+++ b/kubectl-volsync/cmd/replication.go
@@ -19,15 +19,13 @@ package cmd
import (
"context"
"fmt"
- "time"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
- "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/component-base/logs"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
@@ -43,11 +41,20 @@ const ReplicationRelationshipType RelationshipType = "replication"
// relationships
type replicationRelationship struct {
Relationship
- data replicationRelationshipData
+ data replicationRelationshipDataV2
+ rh replicationHandler
}
-// replicationRelationshipData is the state that will be saved to the
-// relationship config file
+type replicationHandler interface {
+ ApplyDestination(ctx context.Context, c client.Client,
+ dstPVC *corev1.PersistentVolumeClaim, addIDLabel func(obj client.Object),
+ destConfig *replicationRelationshipDestinationV2) (*string, *corev1.Secret, error)
+ ApplySource(ctx context.Context, c client.Client,
+ address *string, dstKeys *corev1.Secret, addIDLabel func(obj client.Object),
+ sourceConfig *replicationRelationshipSourceV2) error
+}
+
+// Old v1 version of the data
type replicationRelationshipData struct {
// Config file/struct version used so we know how to decode when parsing
// from disk
@@ -58,6 +65,20 @@ type replicationRelationshipData struct {
Destination *replicationRelationshipDestination
}
+// replicationRelationshipData is the state that will be saved to the
+// relationship config file
+type replicationRelationshipDataV2 struct {
+ // Config file/struct version used so we know how to decode when parsing
+ // from disk
+ Version int
+ // True if the ReplicationDestination should use RsyncTLS
+ IsRsyncTLS bool
+ // Config info for the source side of the relationship
+ Source *replicationRelationshipSourceV2
+ // Config info for the destination side of the relationship
+ Destination *replicationRelationshipDestinationV2
+}
+
type replicationRelationshipSource struct {
// Cluster context name
Cluster string
@@ -73,6 +94,24 @@ type replicationRelationshipSource struct {
Trigger volsyncv1alpha1.ReplicationSourceTriggerSpec
}
+type replicationRelationshipSourceV2 struct {
+ // Cluster context name
+ Cluster string
+ // Namespace on source cluster
+ Namespace string
+ // Name of PVC being replicated
+ PVCName string
+ // Name of ReplicationSource object
+ RSName string
+ // Parameters for the ReplicationSource volume options
+ ReplicationSourceVolumeOptions volsyncv1alpha1.ReplicationSourceVolumeOptions
+ // Scheduling parameters
+ Trigger volsyncv1alpha1.ReplicationSourceTriggerSpec
+ // MoverSecurityContext allows specifying the PodSecurityContext that will
+ // be used by the data mover
+ MoverSecurityContext *corev1.PodSecurityContext
+}
+
type replicationRelationshipDestination struct {
// Cluster context name
Cluster string
@@ -84,6 +123,22 @@ type replicationRelationshipDestination struct {
Destination volsyncv1alpha1.ReplicationDestinationRsyncSpec
}
+type replicationRelationshipDestinationV2 struct {
+ // Cluster context name
+ Cluster string
+ // Namespace on destination cluster
+ Namespace string
+ // Name of the ReplicationDestination object
+ RDName string
+ // Parameters for the ReplicationDestination volume options
+ ReplicationDestinationVolumeOptions volsyncv1alpha1.ReplicationDestinationVolumeOptions
+ // Service Type for the ReplicationDestination
+ ServiceType *corev1.ServiceType
+ // MoverSecurityContext allows specifying the PodSecurityContext that will
+ // be used by the data mover
+ MoverSecurityContext *corev1.PodSecurityContext
+}
+
// replicationCmd represents the replication command
var replicationCmd = &cobra.Command{
Use: "replication",
@@ -101,24 +156,13 @@ var replicationCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(replicationCmd)
+ // Add logging flags to all sub-commands
+ logs.AddFlags(migrationCmd.PersistentFlags())
+
replicationCmd.PersistentFlags().StringP("relationship", "r", "", "relationship name")
cobra.CheckErr(replicationCmd.MarkPersistentFlagRequired("relationship"))
}
-func newReplicationRelationship(cmd *cobra.Command) (*replicationRelationship, error) {
- r, err := CreateRelationshipFromCommand(cmd, ReplicationRelationshipType)
- if err != nil {
- return nil, err
- }
-
- return &replicationRelationship{
- Relationship: *r,
- data: replicationRelationshipData{
- Version: 1,
- },
- }, nil
-}
-
func loadReplicationRelationship(cmd *cobra.Command) (*replicationRelationship, error) {
r, err := LoadRelationshipFromCommand(cmd, ReplicationRelationshipType)
if err != nil {
@@ -132,12 +176,26 @@ func loadReplicationRelationship(cmd *cobra.Command) (*replicationRelationship,
version := rr.GetInt("data.version")
switch version {
case 1:
+ // Old version of config, migrate to v2
+ datav1 := &replicationRelationshipData{}
+ if err := rr.GetData(datav1); err != nil {
+ return nil, err
+ }
+ rr.convertDataToV2(datav1)
+ case 2:
if err := rr.GetData(&rr.data); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported config file version %d", version)
}
+
+ if rr.data.IsRsyncTLS {
+ rr.rh = &replicationHandlerRsyncTLS{}
+ } else {
+ rr.rh = &replicationHandlerRsync{}
+ }
+
return rr, nil
}
@@ -146,17 +204,39 @@ func (rr *replicationRelationship) Save() error {
return err
}
// resource.Quantity doesn't properly encode, so we need to do it manually
- if rr.data.Source != nil && rr.data.Source.Source.Capacity != nil {
- rr.Set("data.source.source.replicationsourcevolumeoptions.capacity",
- rr.data.Source.Source.Capacity.String())
+ if rr.data.Source != nil && rr.data.Source.ReplicationSourceVolumeOptions.Capacity != nil {
+ rr.Set("data.source.replicationsourcevolumeoptions.capacity",
+ rr.data.Source.ReplicationSourceVolumeOptions.Capacity.String())
}
- if rr.data.Destination != nil && rr.data.Destination.Destination.Capacity != nil {
- rr.Set("data.destination.destination.replicationdestinationvolumeoptions.capacity",
- rr.data.Destination.Destination.Capacity.String())
+ if rr.data.Destination != nil && rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity != nil {
+ rr.Set("data.destination.replicationdestinationvolumeoptions.capacity",
+ rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity.String())
}
return rr.Relationship.Save()
}
+func (rr *replicationRelationship) convertDataToV2(datav1 *replicationRelationshipData) {
+ rr.data = replicationRelationshipDataV2{
+ Version: 2,
+ IsRsyncTLS: false, // Rsync-TLS support wasn't there in v1
+ Source: &replicationRelationshipSourceV2{
+ Cluster: datav1.Source.Cluster,
+ Namespace: datav1.Source.Namespace,
+ PVCName: datav1.Source.PVCName,
+ RSName: datav1.Source.RSName,
+ ReplicationSourceVolumeOptions: datav1.Source.Source.ReplicationSourceVolumeOptions,
+ Trigger: datav1.Source.Trigger,
+ },
+ Destination: &replicationRelationshipDestinationV2{
+ Cluster: datav1.Destination.Cluster,
+ Namespace: datav1.Destination.Namespace,
+ RDName: datav1.Destination.RDName,
+ ReplicationDestinationVolumeOptions: datav1.Destination.Destination.ReplicationDestinationVolumeOptions,
+ ServiceType: datav1.Destination.Destination.ServiceType,
+ },
+ }
+}
+
// GetClients returns clients to access the src & dst clusters (srcClient,
// dstClient, error)
func (rr *replicationRelationship) GetClients() (client.Client, client.Client, error) {
@@ -249,14 +329,14 @@ func (rr *replicationRelationship) Apply(ctx context.Context, srcClient client.C
}
var dstPVC *corev1.PersistentVolumeClaim
- if rr.data.Destination.Destination.CopyMethod == volsyncv1alpha1.CopyMethodSnapshot {
+ if rr.data.Destination.ReplicationDestinationVolumeOptions.CopyMethod == volsyncv1alpha1.CopyMethodSnapshot {
// We need to ensure the RD has defaults based on the source volume
- if rr.data.Destination.Destination.Capacity == nil {
+ if rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity == nil {
capacity := srcPVC.Spec.Resources.Requests[corev1.ResourceStorage]
- rr.data.Destination.Destination.Capacity = &capacity
+ rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity = &capacity
}
- if len(rr.data.Destination.Destination.AccessModes) == 0 {
- rr.data.Destination.Destination.AccessModes = srcPVC.Spec.AccessModes
+ if len(rr.data.Destination.ReplicationDestinationVolumeOptions.AccessModes) == 0 {
+ rr.data.Destination.ReplicationDestinationVolumeOptions.AccessModes = srcPVC.Spec.AccessModes
}
} else {
// Since we're not snapshotting on the dest, we need to ensure there's a
@@ -268,12 +348,12 @@ func (rr *replicationRelationship) Apply(ctx context.Context, srcClient client.C
}
}
- address, keys, err := rr.applyDestination(ctx, dstClient, dstPVC)
+ address, secret, err := rr.rh.ApplyDestination(ctx, dstClient, dstPVC, rr.AddIDLabel, rr.data.Destination)
if err != nil {
return err
}
- return rr.applySource(ctx, srcClient, address, keys)
+ return rr.rh.ApplySource(ctx, srcClient, address, secret, rr.AddIDLabel, rr.data.Source)
}
// Gets or creates the destination PVC
@@ -287,11 +367,11 @@ func (rr *replicationRelationship) ensureDestinationPVC(ctx context.Context, c c
accessModes = srcPVC.Spec.AccessModes
capacity = srcPVC.Spec.Resources.Requests[corev1.ResourceStorage]
}
- if len(rr.data.Destination.Destination.AccessModes) > 0 {
- accessModes = rr.data.Destination.Destination.AccessModes
+ if len(rr.data.Destination.ReplicationDestinationVolumeOptions.AccessModes) > 0 {
+ accessModes = rr.data.Destination.ReplicationDestinationVolumeOptions.AccessModes
}
- if rr.data.Destination.Destination.Capacity != nil {
- capacity = *rr.data.Destination.Destination.Capacity
+ if rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity != nil {
+ capacity = *rr.data.Destination.ReplicationDestinationVolumeOptions.Capacity
}
pvc := &corev1.PersistentVolumeClaim{
@@ -318,7 +398,7 @@ func (rr *replicationRelationship) ensureDestinationPVC(ctx context.Context, c c
corev1.ResourceStorage: capacity,
},
},
- StorageClassName: rr.data.Destination.Destination.StorageClassName,
+ StorageClassName: rr.data.Destination.ReplicationDestinationVolumeOptions.StorageClassName,
}
return nil
})
@@ -328,125 +408,3 @@ func (rr *replicationRelationship) ensureDestinationPVC(ctx context.Context, c c
return pvc, nil
}
-
-func (rr *replicationRelationship) applyDestination(ctx context.Context,
- c client.Client, dstPVC *corev1.PersistentVolumeClaim) (*string, *corev1.Secret, error) {
- params := rr.data.Destination
-
- // Create destination
- rd := &volsyncv1alpha1.ReplicationDestination{
- ObjectMeta: metav1.ObjectMeta{
- Name: params.RDName,
- Namespace: params.Namespace,
- },
- }
- _, err := ctrlutil.CreateOrUpdate(ctx, c, rd, func() error {
- rr.AddIDLabel(rd)
- rd.Spec = volsyncv1alpha1.ReplicationDestinationSpec{
- Rsync: ¶ms.Destination,
- }
- if dstPVC != nil {
- rd.Spec.Rsync.DestinationPVC = &dstPVC.Name
- }
- return nil
- })
- if err != nil {
- klog.Errorf("unable to create ReplicationDestination: %v", err)
- return nil, nil, err
- }
-
- rd, err = rr.awaitDestAddrKeys(ctx, c, client.ObjectKeyFromObject(rd))
- if err != nil {
- klog.Errorf("error while waiting for destination keys and address: %v", err)
- return nil, nil, err
- }
-
- // Fetch the keys
- secret := &corev1.Secret{
- ObjectMeta: metav1.ObjectMeta{
- Name: *rd.Status.Rsync.SSHKeys,
- Namespace: params.Namespace,
- },
- }
- if err = c.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil {
- klog.Errorf("unable to retrieve ssh keys: %v", err)
- return nil, nil, err
- }
-
- return rd.Status.Rsync.Address, secret, nil
-}
-
-func (rr *replicationRelationship) awaitDestAddrKeys(ctx context.Context, c client.Client,
- rdName types.NamespacedName) (*volsyncv1alpha1.ReplicationDestination, error) {
- klog.Infof("waiting for keys & address of destination to be available")
- rd := volsyncv1alpha1.ReplicationDestination{}
- err := wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
- func(ctx context.Context) (bool, error) {
- if err := c.Get(ctx, rdName, &rd); err != nil {
- return false, err
- }
- if rd.Status == nil || rd.Status.Rsync == nil {
- return false, nil
- }
- if rd.Status.Rsync.Address == nil {
- return false, nil
- }
- if rd.Status.Rsync.SSHKeys == nil {
- return false, nil
- }
- return true, nil
- })
- if err != nil {
- return nil, err
- }
- return &rd, nil
-}
-
-func (rr *replicationRelationship) applySource(ctx context.Context, c client.Client,
- address *string, dstKeys *corev1.Secret) error {
- klog.Infof("creating resources on Source")
- srcKeys, err := rr.applySourceKeys(ctx, c, dstKeys)
- if err != nil {
- klog.Errorf("unable to create source ssh keys: %v", err)
- return err
- }
-
- rs := &volsyncv1alpha1.ReplicationSource{
- ObjectMeta: metav1.ObjectMeta{
- Name: rr.data.Source.RSName,
- Namespace: rr.data.Source.Namespace,
- },
- }
- _, err = ctrlutil.CreateOrUpdate(ctx, c, rs, func() error {
- rr.AddIDLabel(rs)
- rs.Spec = volsyncv1alpha1.ReplicationSourceSpec{
- SourcePVC: rr.data.Source.PVCName,
- Trigger: &rr.data.Source.Trigger,
- Rsync: &rr.data.Source.Source,
- }
- rs.Spec.Rsync.Address = address
- rs.Spec.Rsync.SSHKeys = &srcKeys.Name
- return nil
- })
- return err
-}
-
-// Copies the ssh keys into the source cluster
-func (rr *replicationRelationship) applySourceKeys(ctx context.Context,
- c client.Client, dstKeys *corev1.Secret) (*corev1.Secret, error) {
- srcKeys := &corev1.Secret{
- ObjectMeta: metav1.ObjectMeta{
- Name: rr.data.Source.RSName,
- Namespace: rr.data.Source.Namespace,
- },
- }
- _, err := ctrlutil.CreateOrUpdate(ctx, c, srcKeys, func() error {
- rr.AddIDLabel(srcKeys)
- srcKeys.Data = dstKeys.Data
- return nil
- })
- if err != nil {
- return nil, err
- }
- return srcKeys, nil
-}
diff --git a/kubectl-volsync/cmd/replication_create.go b/kubectl-volsync/cmd/replication_create.go
index 9fc912ba7..d53490d69 100644
--- a/kubectl-volsync/cmd/replication_create.go
+++ b/kubectl-volsync/cmd/replication_create.go
@@ -47,7 +47,41 @@ var replicationCreateCmd = &cobra.Command{
}
func init() {
+ initReplicationCreateCmd(replicationCreateCmd)
+}
+
+func initReplicationCreateCmd(replicationCreateCmd *cobra.Command) {
replicationCmd.AddCommand(replicationCreateCmd)
+
+ replicationCreateCmd.Flags().Bool("rsynctls", false, "if true, will use rsync-tls")
+}
+
+func newReplicationRelationship(cmd *cobra.Command) (*replicationRelationship, error) {
+ r, err := CreateRelationshipFromCommand(cmd, ReplicationRelationshipType)
+ if err != nil {
+ return nil, err
+ }
+
+ isRsyncTLS, err := cmd.Flags().GetBool("rsynctls")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch rsynctls, %w", err)
+ }
+
+ var rHandler replicationHandler
+ if isRsyncTLS {
+ rHandler = &replicationHandlerRsyncTLS{}
+ } else {
+ rHandler = &replicationHandlerRsync{}
+ }
+
+ return &replicationRelationship{
+ Relationship: *r,
+ data: replicationRelationshipDataV2{
+ Version: 2,
+ IsRsyncTLS: isRsyncTLS,
+ },
+ rh: rHandler,
+ }, nil
}
func (cmd *replicationCreate) Run() error {
diff --git a/kubectl-volsync/cmd/replication_handler_rsync.go b/kubectl-volsync/cmd/replication_handler_rsync.go
new file mode 100644
index 000000000..2d83cba06
--- /dev/null
+++ b/kubectl-volsync/cmd/replication_handler_rsync.go
@@ -0,0 +1,166 @@
+/*
+Copyright © 2024 The VolSync authors
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+//nolint:dupl
+package cmd
+
+import (
+ "context"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
+)
+
+type replicationHandlerRsync struct{}
+
+var _ replicationHandler = &replicationHandlerRsync{}
+
+func (rhr *replicationHandlerRsync) ApplyDestination(ctx context.Context,
+ c client.Client, dstPVC *corev1.PersistentVolumeClaim, addIDLabel func(obj client.Object),
+ destConfig *replicationRelationshipDestinationV2) (*string, *corev1.Secret, error) {
+ // Create destination
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ _, err := ctrlutil.CreateOrUpdate(ctx, c, rd, func() error {
+ addIDLabel(rd)
+ rd.Spec = volsyncv1alpha1.ReplicationDestinationSpec{
+ Rsync: &volsyncv1alpha1.ReplicationDestinationRsyncSpec{
+ ReplicationDestinationVolumeOptions: destConfig.ReplicationDestinationVolumeOptions,
+ ServiceType: destConfig.ServiceType,
+ },
+ }
+ if dstPVC != nil {
+ rd.Spec.Rsync.DestinationPVC = &dstPVC.Name
+ }
+ return nil
+ })
+ if err != nil {
+ klog.Errorf("unable to create ReplicationDestination: %v", err)
+ return nil, nil, err
+ }
+
+ rd, err = rhr.awaitDestAddrKeys(ctx, c, client.ObjectKeyFromObject(rd))
+ if err != nil {
+ klog.Errorf("error while waiting for destination keys and address: %v", err)
+ return nil, nil, err
+ }
+
+ // Fetch the keys
+ secret := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: *rd.Status.Rsync.SSHKeys,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ if err = c.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil {
+ klog.Errorf("unable to retrieve ssh keys: %v", err)
+ return nil, nil, err
+ }
+
+ return rd.Status.Rsync.Address, secret, nil
+}
+
+func (rhr *replicationHandlerRsync) awaitDestAddrKeys(ctx context.Context, c client.Client,
+ rdName types.NamespacedName) (*volsyncv1alpha1.ReplicationDestination, error) {
+ klog.Infof("waiting for keys & address of destination to be available")
+ rd := volsyncv1alpha1.ReplicationDestination{}
+ err := wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
+ func(ctx context.Context) (bool, error) {
+ if err := c.Get(ctx, rdName, &rd); err != nil {
+ return false, err
+ }
+ if rd.Status == nil || rd.Status.Rsync == nil {
+ return false, nil
+ }
+ if rd.Status.Rsync.Address == nil {
+ return false, nil
+ }
+ if rd.Status.Rsync.SSHKeys == nil {
+ return false, nil
+ }
+ return true, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &rd, nil
+}
+
+func (rhr *replicationHandlerRsync) ApplySource(ctx context.Context, c client.Client,
+ address *string, dstKeys *corev1.Secret, addIDLabel func(obj client.Object),
+ sourceConfig *replicationRelationshipSourceV2) error {
+ klog.Infof("creating resources on Source")
+ srcKeys, err := rhr.applySourceKeys(ctx, c, dstKeys, addIDLabel, sourceConfig)
+ if err != nil {
+ klog.Errorf("unable to create source ssh keys: %v", err)
+ return err
+ }
+
+ rs := &volsyncv1alpha1.ReplicationSource{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: sourceConfig.RSName,
+ Namespace: sourceConfig.Namespace,
+ },
+ }
+ _, err = ctrlutil.CreateOrUpdate(ctx, c, rs, func() error {
+ addIDLabel(rs)
+ rs.Spec = volsyncv1alpha1.ReplicationSourceSpec{
+ SourcePVC: sourceConfig.PVCName,
+ Trigger: &sourceConfig.Trigger,
+ Rsync: &volsyncv1alpha1.ReplicationSourceRsyncSpec{
+ ReplicationSourceVolumeOptions: sourceConfig.ReplicationSourceVolumeOptions,
+ },
+ }
+ rs.Spec.Rsync.Address = address
+ rs.Spec.Rsync.SSHKeys = &srcKeys.Name
+ return nil
+ })
+ return err
+}
+
+// Copies the ssh keys into the source cluster
+func (rhr *replicationHandlerRsync) applySourceKeys(ctx context.Context,
+ c client.Client, dstKeys *corev1.Secret, addIDLabel func(obj client.Object),
+ sourceConfig *replicationRelationshipSourceV2) (*corev1.Secret, error) {
+ srcKeys := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: sourceConfig.RSName,
+ Namespace: sourceConfig.Namespace,
+ },
+ }
+ _, err := ctrlutil.CreateOrUpdate(ctx, c, srcKeys, func() error {
+ addIDLabel(srcKeys)
+ srcKeys.Data = dstKeys.Data
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return srcKeys, nil
+}
diff --git a/kubectl-volsync/cmd/replication_handler_rsynctls.go b/kubectl-volsync/cmd/replication_handler_rsynctls.go
new file mode 100644
index 000000000..34ee1e74e
--- /dev/null
+++ b/kubectl-volsync/cmd/replication_handler_rsynctls.go
@@ -0,0 +1,172 @@
+/*
+Copyright © 2024 The VolSync authors
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+//nolint:dupl
+package cmd
+
+import (
+ "context"
+ "time"
+
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/klog/v2"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
+)
+
+type replicationHandlerRsyncTLS struct{}
+
+var _ replicationHandler = &replicationHandlerRsyncTLS{}
+
+func (rhrtls *replicationHandlerRsyncTLS) ApplyDestination(ctx context.Context,
+ c client.Client, dstPVC *corev1.PersistentVolumeClaim, addIDLabel func(obj client.Object),
+ destConfig *replicationRelationshipDestinationV2) (*string, *corev1.Secret, error) {
+ // Create destination
+ rd := &volsyncv1alpha1.ReplicationDestination{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: destConfig.RDName,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ _, err := ctrlutil.CreateOrUpdate(ctx, c, rd, func() error {
+ addIDLabel(rd)
+ rd.Spec = volsyncv1alpha1.ReplicationDestinationSpec{
+ RsyncTLS: &volsyncv1alpha1.ReplicationDestinationRsyncTLSSpec{
+ ReplicationDestinationVolumeOptions: destConfig.ReplicationDestinationVolumeOptions,
+ ServiceType: destConfig.ServiceType,
+ MoverConfig: volsyncv1alpha1.MoverConfig{
+ MoverSecurityContext: destConfig.MoverSecurityContext,
+ },
+ },
+ }
+ if dstPVC != nil {
+ rd.Spec.RsyncTLS.DestinationPVC = &dstPVC.Name
+ }
+ return nil
+ })
+ if err != nil {
+ klog.Errorf("unable to create ReplicationDestination: %v", err)
+ return nil, nil, err
+ }
+
+ rd, err = rhrtls.awaitDestAddrKeys(ctx, c, client.ObjectKeyFromObject(rd))
+ if err != nil {
+ klog.Errorf("error while waiting for destination keys and address: %v", err)
+ return nil, nil, err
+ }
+
+ // Fetch the keys
+ secret := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: *rd.Status.RsyncTLS.KeySecret,
+ Namespace: destConfig.Namespace,
+ },
+ }
+ if err = c.Get(ctx, client.ObjectKeyFromObject(secret), secret); err != nil {
+ klog.Errorf("unable to retrieve tls keySecret: %v", err)
+ return nil, nil, err
+ }
+
+ return rd.Status.RsyncTLS.Address, secret, nil
+}
+
+func (rhrtls *replicationHandlerRsyncTLS) awaitDestAddrKeys(ctx context.Context, c client.Client,
+ rdName types.NamespacedName) (*volsyncv1alpha1.ReplicationDestination, error) {
+ klog.Infof("waiting for keys & address of destination to be available")
+ rd := volsyncv1alpha1.ReplicationDestination{}
+ err := wait.PollUntilContextTimeout(ctx, 5*time.Second, defaultRsyncKeyTimeout, true, /*immediate*/
+ func(ctx context.Context) (bool, error) {
+ if err := c.Get(ctx, rdName, &rd); err != nil {
+ return false, err
+ }
+ if rd.Status == nil || rd.Status.RsyncTLS == nil {
+ return false, nil
+ }
+ if rd.Status.RsyncTLS.Address == nil {
+ return false, nil
+ }
+ if rd.Status.RsyncTLS.KeySecret == nil {
+ return false, nil
+ }
+ return true, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &rd, nil
+}
+
+func (rhrtls *replicationHandlerRsyncTLS) ApplySource(ctx context.Context, c client.Client,
+ address *string, dstKeys *corev1.Secret, addIDLabel func(obj client.Object),
+ sourceConfig *replicationRelationshipSourceV2) error {
+ klog.Infof("creating resources on Source")
+ srcKeys, err := rhrtls.applySourceKeys(ctx, c, dstKeys, addIDLabel, sourceConfig)
+ if err != nil {
+ klog.Errorf("unable to create source ssh keys: %v", err)
+ return err
+ }
+
+ rs := &volsyncv1alpha1.ReplicationSource{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: sourceConfig.RSName,
+ Namespace: sourceConfig.Namespace,
+ },
+ }
+ _, err = ctrlutil.CreateOrUpdate(ctx, c, rs, func() error {
+ addIDLabel(rs)
+ rs.Spec = volsyncv1alpha1.ReplicationSourceSpec{
+ SourcePVC: sourceConfig.PVCName,
+ Trigger: &sourceConfig.Trigger,
+ RsyncTLS: &volsyncv1alpha1.ReplicationSourceRsyncTLSSpec{
+ ReplicationSourceVolumeOptions: sourceConfig.ReplicationSourceVolumeOptions,
+ MoverConfig: volsyncv1alpha1.MoverConfig{
+ MoverSecurityContext: sourceConfig.MoverSecurityContext,
+ },
+ },
+ }
+ rs.Spec.RsyncTLS.Address = address
+ rs.Spec.RsyncTLS.KeySecret = &srcKeys.Name
+ return nil
+ })
+ return err
+}
+
+// Copies the ssh key secret into the source cluster
+func (rhrtls *replicationHandlerRsyncTLS) applySourceKeys(ctx context.Context,
+ c client.Client, dstKeys *corev1.Secret, addIDLabel func(obj client.Object),
+ sourceConfig *replicationRelationshipSourceV2) (*corev1.Secret, error) {
+ srcKeys := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: sourceConfig.RSName,
+ Namespace: sourceConfig.Namespace,
+ },
+ }
+ _, err := ctrlutil.CreateOrUpdate(ctx, c, srcKeys, func() error {
+ addIDLabel(srcKeys)
+ srcKeys.Data = dstKeys.Data
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return srcKeys, nil
+}
diff --git a/kubectl-volsync/cmd/replication_setDestination.go b/kubectl-volsync/cmd/replication_setDestination.go
index f9f9b352b..efce0cfff 100644
--- a/kubectl-volsync/cmd/replication_setDestination.go
+++ b/kubectl-volsync/cmd/replication_setDestination.go
@@ -39,6 +39,9 @@ type replicationSetDestination struct {
serviceType corev1.ServiceType
storageClassName *string
volumeSnapshotClassName *string
+ // MoverSecurity context to use for the ReplicationDestination
+ // Individual fields will come from cli parameters
+ MoverSecurityContext *corev1.PodSecurityContext
}
// replicationSetDestinationCmd represents the replicationSetDestination command
@@ -77,6 +80,9 @@ func init() {
"name of the StorageClass to use for the destination volume")
replicationSetDestinationCmd.Flags().String("volumesnapshotclass", "",
"name of the VolumeSnapshotClass to use for destination snapshots")
+
+ // MoverSecurityContext flags - will only apply if rsyncTLS is true
+ addCLIRsyncTLSMoverSecurityContextFlags(replicationSetDestinationCmd, true)
}
func newReplicationSetDestination(cmd *cobra.Command) (*replicationSetDestination, error) {
@@ -133,6 +139,12 @@ func newReplicationSetDestination(cmd *cobra.Command) (*replicationSetDestinatio
rsd.volumeSnapshotClassName = &vscName
}
+ // Parse moverSecurityContext flags into a desired MoverSecurityContext
+ rsd.MoverSecurityContext, err = parseCLIRsyncTLSMoverSecurityContextFlags(cmd)
+ if err != nil {
+ return nil, err
+ }
+
return rsd, nil
}
@@ -144,20 +156,27 @@ func (rsd *replicationSetDestination) Run(ctx context.Context) error {
_ = rsd.rel.DeleteSource(ctx, srcClient)
_ = rsd.rel.DeleteDestination(ctx, dstClient)
- rsd.rel.data.Destination = &replicationRelationshipDestination{
+ if !rsd.rel.data.IsRsyncTLS {
+ // the rsync (ssh) handler will ignore moversecuritycontext,
+ // but remove it here in case someone passes in these flags when
+ // using rsync ssh so it doesn't end up in the saved yaml
+ // and confuse anyone
+ rsd.MoverSecurityContext = nil
+ }
+
+ rsd.rel.data.Destination = &replicationRelationshipDestinationV2{
Cluster: rsd.destName.Cluster,
Namespace: rsd.destName.Namespace,
RDName: rsd.destName.Name,
- Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{
- ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
- AccessModes: rsd.accessModes,
- CopyMethod: rsd.copyMethod,
- Capacity: rsd.capacity,
- StorageClassName: rsd.storageClassName,
- VolumeSnapshotClassName: rsd.volumeSnapshotClassName,
- },
- ServiceType: &rsd.serviceType,
+ ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ AccessModes: rsd.accessModes,
+ CopyMethod: rsd.copyMethod,
+ Capacity: rsd.capacity,
+ StorageClassName: rsd.storageClassName,
+ VolumeSnapshotClassName: rsd.volumeSnapshotClassName,
},
+ ServiceType: &rsd.serviceType,
+ MoverSecurityContext: rsd.MoverSecurityContext,
}
var err error
diff --git a/kubectl-volsync/cmd/replication_setSource.go b/kubectl-volsync/cmd/replication_setSource.go
index 5aad46455..473d146d8 100644
--- a/kubectl-volsync/cmd/replication_setSource.go
+++ b/kubectl-volsync/cmd/replication_setSource.go
@@ -37,6 +37,9 @@ type replicationSetSource struct {
pvcName XClusterName
storageClassName *string
volumeSnapshotClassName *string
+ // MoverSecurity context to use for the ReplicationDestination
+ // Individual fields will come from cli parameters
+ MoverSecurityContext *corev1.PodSecurityContext
}
// replicationSetSourceCmd represents the replicationSetSource command
@@ -71,6 +74,9 @@ func init() {
"name of the StorageClass to use for the cloned volume")
replicationSetSourceCmd.Flags().String("volumesnapshotclass", "",
"name of the VolumeSnapshotClass to use for volume snapshots")
+
+ // MoverSecurityContext flags - will only apply if rsyncTLS is true
+ addCLIRsyncTLSMoverSecurityContextFlags(replicationSetSourceCmd, false)
}
func newReplicationSetSource(cmd *cobra.Command) (*replicationSetSource, error) {
@@ -113,6 +119,12 @@ func newReplicationSetSource(cmd *cobra.Command) (*replicationSetSource, error)
rss.volumeSnapshotClassName = &vscName
}
+ // Parse moverSecurityContext flags into a desired MoverSecurityContext
+ rss.MoverSecurityContext, err = parseCLIRsyncTLSMoverSecurityContextFlags(cmd)
+ if err != nil {
+ return nil, err
+ }
+
return rss, nil
}
@@ -124,21 +136,28 @@ func (rss *replicationSetSource) Run(ctx context.Context) error {
_ = rss.rel.DeleteSource(ctx, srcClient)
_ = rss.rel.DeleteDestination(ctx, dstClient)
- rss.rel.data.Source = &replicationRelationshipSource{
+ if !rss.rel.data.IsRsyncTLS {
+ // the rsync (ssh) handler will ignore moversecuritycontext,
+ // but remove it here in case someone passes in these flags when
+ // using rsync ssh so it doesn't end up in the saved yaml
+ // and confuse anyone
+ rss.MoverSecurityContext = nil
+ }
+
+ rss.rel.data.Source = &replicationRelationshipSourceV2{
Cluster: rss.pvcName.Cluster,
Namespace: rss.pvcName.Namespace,
// The RS name needs to be unique since it's possible to have a single
// PVC be the source of multiple replications
RSName: rss.pvcName.Name + "-" + krand.String(5),
PVCName: rss.pvcName.Name,
- Source: volsyncv1alpha1.ReplicationSourceRsyncSpec{
- ReplicationSourceVolumeOptions: volsyncv1alpha1.ReplicationSourceVolumeOptions{
- AccessModes: rss.accessModes,
- CopyMethod: rss.copyMethod,
- StorageClassName: rss.storageClassName,
- VolumeSnapshotClassName: rss.volumeSnapshotClassName,
- },
+ ReplicationSourceVolumeOptions: volsyncv1alpha1.ReplicationSourceVolumeOptions{
+ AccessModes: rss.accessModes,
+ CopyMethod: rss.copyMethod,
+ StorageClassName: rss.storageClassName,
+ VolumeSnapshotClassName: rss.volumeSnapshotClassName,
},
+ MoverSecurityContext: rss.MoverSecurityContext,
}
var err error
diff --git a/kubectl-volsync/cmd/replication_test.go b/kubectl-volsync/cmd/replication_test.go
index 6e47525d6..40c2fb50a 100644
--- a/kubectl-volsync/cmd/replication_test.go
+++ b/kubectl-volsync/cmd/replication_test.go
@@ -21,7 +21,6 @@ import (
"os"
"reflect"
- volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/spf13/cobra"
@@ -30,11 +29,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
+
+ volsyncv1alpha1 "github.com/backube/volsync/api/v1alpha1"
)
var _ = Describe("Replication relationships can create/save/load", func() {
var dirname string
var cmd *cobra.Command
+ var replicationRelationshipFile string
+
BeforeEach(func() {
var err error
// Create temp directory for relationship files
@@ -42,62 +45,200 @@ var _ = Describe("Replication relationships can create/save/load", func() {
Expect(err).NotTo(HaveOccurred())
cmd = &cobra.Command{}
+ initReplicationCreateCmd(cmd) // Init createCmd as the replicationrelationship is created via the Create cmd
+
cmd.Flags().StringP("relationship", "r", "test-name", "")
cmd.Flags().String("config-dir", dirname, "")
+
+ replicationRelationshipFile = dirname + "/test-name.yaml"
})
AfterEach(func() {
os.RemoveAll(dirname)
})
+
It("can be round-triped", func() {
By("creating a new relationship")
rr, err := newReplicationRelationship(cmd)
Expect(err).NotTo(HaveOccurred())
- Expect(rr.data.Version).To(Equal(1))
+ Expect(rr.data.Version).To(Equal(2))
Expect(rr.data.Destination).To(BeNil())
Expect(rr.data.Source).To(BeNil())
By("saving the relationship")
// Assign some values to test round-trip
caps := resource.MustParse("1Gi")
- rr.data.Source = &replicationRelationshipSource{
+ rr.data.Source = &replicationRelationshipSourceV2{
Cluster: "cluster",
Namespace: "the-ns",
PVCName: "a-pvc",
RSName: "an-rs",
- Source: volsyncv1alpha1.ReplicationSourceRsyncSpec{
- ReplicationSourceVolumeOptions: volsyncv1alpha1.ReplicationSourceVolumeOptions{
- CopyMethod: volsyncv1alpha1.CopyMethodClone,
- Capacity: &caps,
- StorageClassName: ptr.To("scn"),
- AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
- VolumeSnapshotClassName: ptr.To("vscn"),
- },
+ ReplicationSourceVolumeOptions: volsyncv1alpha1.ReplicationSourceVolumeOptions{
+ CopyMethod: volsyncv1alpha1.CopyMethodClone,
+ Capacity: &caps,
+ StorageClassName: ptr.To("scn"),
+ AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
+ VolumeSnapshotClassName: ptr.To("vscn"),
},
}
capd := resource.MustParse("99Gi")
- rr.data.Destination = &replicationRelationshipDestination{
+ rr.data.Destination = &replicationRelationshipDestinationV2{
Cluster: "c2",
Namespace: "n2",
RDName: "rd2",
- Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{
- ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
- CopyMethod: volsyncv1alpha1.CopyMethodSnapshot,
- Capacity: &capd,
- StorageClassName: ptr.To("scn2"),
- AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
- VolumeSnapshotClassName: ptr.To("vscn2"),
- DestinationPVC: ptr.To("dpvc"),
- },
- ServiceType: (*corev1.ServiceType)(ptr.To(string(corev1.ServiceTypeClusterIP))),
+ ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ CopyMethod: volsyncv1alpha1.CopyMethodSnapshot,
+ Capacity: &capd,
+ StorageClassName: ptr.To("scn2"),
+ AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
+ VolumeSnapshotClassName: ptr.To("vscn2"),
+ DestinationPVC: ptr.To("dpvc"),
},
+ ServiceType: (*corev1.ServiceType)(ptr.To(string(corev1.ServiceTypeClusterIP))),
}
Expect(rr.Save()).To(Succeed())
+ // Verify ReplicationRelationship file was created
+ _, err = os.Stat(replicationRelationshipFile)
+ Expect(err).ToNot(HaveOccurred())
+
By("loading it back in, they should match")
rr2, err := loadReplicationRelationship(cmd)
Expect(err).NotTo(HaveOccurred())
Expect(reflect.DeepEqual(rr2.data, rr.data)).To(BeTrue())
})
+
+ It("Should be able to load a relationship file at v1 and convert to v2", func() {
+ const v1ReplicationRelationshipFileContents string = `data:
+ destination:
+ cluster: "cluster-a"
+ destination:
+ replicationdestinationvolumeoptions:
+ accessmodes: []
+ copymethod: Direct
+ destinationpvc: data-dest
+ servicetype: ClusterIP
+ namespace: test-75543-a
+ rdname: data-dest
+ source:
+ cluster: "cluster-b"
+ namespace: test-75543-b
+ pvcname: data-source
+ rsname: data-source-htfcq
+ source:
+ address: 10.96.42.19
+ replicationsourcevolumeoptions:
+ accessmodes: []
+ copymethod: Snapshot
+ sshkeys: data-source-htfcq
+ trigger:
+ manual: "2024-04-20T20:58:05-04:00"
+ version: 1
+id: 1e7a650f-043e-4fca-b2e2-b152655e11bd
+type: replication`
+
+ // Write out the v1 replication relationship file to the expected location
+ Expect(os.WriteFile(replicationRelationshipFile, []byte(v1ReplicationRelationshipFileContents), 0600)).To(Succeed())
+
+ // Now load the file and expect it gets converted to v2 correctly
+ rr, err := loadReplicationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(rr.data.Version).To(Equal(2))
+ Expect(rr.data.IsRsyncTLS).To(BeFalse())
+ Expect(rr.data.Source).NotTo(BeNil())
+ Expect(rr.data.Destination).NotTo(BeNil())
+
+ expectedDestPVC := "data-dest"
+ expectedServiceType := corev1.ServiceTypeClusterIP
+
+ dest := rr.data.Destination
+ Expect(dest.Cluster).To(Equal("cluster-a"))
+ Expect(dest.Namespace).To(Equal("test-75543-a"))
+ Expect(dest.RDName).To(Equal("data-dest"))
+ Expect(dest.ReplicationDestinationVolumeOptions).To(Equal(volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ AccessModes: []corev1.PersistentVolumeAccessMode{},
+ CopyMethod: volsyncv1alpha1.CopyMethodDirect,
+ DestinationPVC: &expectedDestPVC,
+ }))
+ Expect(dest.ServiceType).To(Equal(&expectedServiceType))
+
+ source := rr.data.Source
+ Expect(source.Cluster).To(Equal("cluster-b"))
+ Expect(source.Namespace).To(Equal("test-75543-b"))
+ Expect(source.PVCName).To(Equal("data-source"))
+ Expect(source.RSName).To(Equal("data-source-htfcq"))
+ Expect(source.ReplicationSourceVolumeOptions).To(Equal(volsyncv1alpha1.ReplicationSourceVolumeOptions{
+ AccessModes: []corev1.PersistentVolumeAccessMode{},
+ CopyMethod: volsyncv1alpha1.CopyMethodSnapshot,
+ }))
+ Expect(source.Trigger).To(Equal(volsyncv1alpha1.ReplicationSourceTriggerSpec{
+ Manual: "2024-04-20T20:58:05-04:00",
+ }))
+ })
+
+ Context("When using rsync (the default)", func() {
+ var rr *replicationRelationship
+ BeforeEach(func() {
+ var err error
+ rr, err = newReplicationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rr.data.Version).To(Equal(2))
+ Expect(rr.data.Destination).To(BeNil())
+ Expect(rr.data.Source).To(BeNil())
+
+ Expect(rr.data.IsRsyncTLS).To(BeFalse())
+ })
+ It("Should use rsync when a new replicationrelationship is created", func() {
+ // Check replicationHandler is the Rsync replicationHandler
+ _, ok := rr.rh.(*replicationHandlerRsync)
+ Expect(ok).To(BeTrue())
+ })
+ It("Should use rsync when a replicationrelationship is loaded that specifies rsync", func() {
+ // BeforeEach created a replicationRelationship with rsync - save it and then load it back in
+ Expect(rr.Save()).To(Succeed())
+
+ // load and then test for rsync
+ rrReloaded, err := loadReplicationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rrReloaded.data.IsRsyncTLS).To(BeFalse())
+
+ _, ok := rrReloaded.rh.(*replicationHandlerRsync)
+ Expect(ok).To(BeTrue())
+ })
+ })
+ Context("When using rsync-tls", func() {
+ var rr *replicationRelationship
+ BeforeEach(func() {
+ // Make sure rsynctls flag is set
+ Expect(cmd.Flags().Set("rsynctls", "true")).To(Succeed())
+
+ var err error
+ rr, err = newReplicationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rr.data.Version).To(Equal(2))
+ Expect(rr.data.Destination).To(BeNil())
+ Expect(rr.data.Source).To(BeNil())
+
+ Expect(rr.data.IsRsyncTLS).To(BeTrue())
+ })
+ It("Should use rsync-tls when a new replicationrelationship is created", func() {
+ // Check replicationHandler is the RsyncTLS replicationHandler
+ _, ok := rr.rh.(*replicationHandlerRsyncTLS)
+ Expect(ok).To(BeTrue())
+ })
+ It("Should use rsync-tls when a replicationrelationship is loaded that specifies rsync", func() {
+ // BeforeEach created a replicationRelationship with rsync - save it and then load it back in
+ Expect(rr.Save()).To(Succeed())
+
+ // load and then test for rsync
+ rrReloaded, err := loadReplicationRelationship(cmd)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(rrReloaded.data.IsRsyncTLS).To(BeTrue())
+
+ _, ok := rrReloaded.rh.(*replicationHandlerRsyncTLS)
+ Expect(ok).To(BeTrue())
+ })
+ })
})
var _ = Describe("Replication relationships", func() {
@@ -115,8 +256,8 @@ var _ = Describe("Replication relationships", func() {
Expect(err).NotTo(HaveOccurred())
repRel = &replicationRelationship{
Relationship: *rel,
- data: replicationRelationshipData{
- Version: 1,
+ data: replicationRelationshipDataV2{
+ Version: 2,
Source: nil,
Destination: nil,
},
@@ -133,11 +274,11 @@ var _ = Describe("Replication relationships", func() {
Expect(repRel.DeleteDestination(ctx, k8sClient)).To(Succeed())
})
It("succeeds if the cluster is empty", func() {
- repRel.data.Source = &replicationRelationshipSource{
+ repRel.data.Source = &replicationRelationshipSourceV2{
RSName: "xxx",
Namespace: "zzz",
}
- repRel.data.Destination = &replicationRelationshipDestination{
+ repRel.data.Destination = &replicationRelationshipDestinationV2{
RDName: "xxx",
Namespace: "zzz",
}
@@ -192,11 +333,11 @@ var _ = Describe("Replication relationships", func() {
}
repRel.AddIDLabel(rd)
Expect(k8sClient.Create(ctx, rd)).To(Succeed())
- repRel.data.Source = &replicationRelationshipSource{
+ repRel.data.Source = &replicationRelationshipSourceV2{
RSName: rs.Name,
Namespace: srcNs.Name,
}
- repRel.data.Destination = &replicationRelationshipDestination{
+ repRel.data.Destination = &replicationRelationshipDestinationV2{
RDName: rd.Name,
Namespace: dstNs.Name,
}
@@ -254,11 +395,10 @@ var _ = Describe("Replication relationships", func() {
})
When("destination size and accessMode aren't specified", func() {
BeforeEach(func() {
- repRel.data.Destination = &replicationRelationshipDestination{
- Cluster: "",
- Namespace: destNS.Name,
- RDName: "test",
- Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{},
+ repRel.data.Destination = &replicationRelationshipDestinationV2{
+ Cluster: "",
+ Namespace: destNS.Name,
+ RDName: "test",
}
})
It("uses the values from the Source PVC", func() {
@@ -273,15 +413,13 @@ var _ = Describe("Replication relationships", func() {
var newCap resource.Quantity
BeforeEach(func() {
newCap = resource.MustParse("99Gi")
- repRel.data.Destination = &replicationRelationshipDestination{
+ repRel.data.Destination = &replicationRelationshipDestinationV2{
Cluster: "",
Namespace: destNS.Name,
RDName: "test",
- Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{
- ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
- AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
- Capacity: &newCap,
- },
+ ReplicationDestinationVolumeOptions: volsyncv1alpha1.ReplicationDestinationVolumeOptions{
+ AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany},
+ Capacity: &newCap,
},
}
})
@@ -314,11 +452,11 @@ var _ = Describe("Replication relationships", func() {
},
}
Expect(k8sClient.Create(ctx, existingPVC)).To(Succeed())
- repRel.data.Destination = &replicationRelationshipDestination{
- Cluster: "",
- Namespace: destNS.Name,
- RDName: "test",
- Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{},
+ repRel.data.Destination = &replicationRelationshipDestinationV2{
+ Cluster: "",
+ Namespace: destNS.Name,
+ RDName: "test",
+ //Destination: volsyncv1alpha1.ReplicationDestinationRsyncSpec{},
}
})
It("will not be modified", func() {
diff --git a/kubectl-volsync/cmd/tls_cli_flag_utils.go b/kubectl-volsync/cmd/tls_cli_flag_utils.go
new file mode 100644
index 000000000..157685687
--- /dev/null
+++ b/kubectl-volsync/cmd/tls_cli_flag_utils.go
@@ -0,0 +1,127 @@
+/*
+Copyright © 2024 The VolSync authors
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see .
+*/
+
+package cmd
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/spf13/cobra"
+ corev1 "k8s.io/api/core/v1"
+)
+
+func addCLIRsyncTLSMoverSecurityContextFlags(cmdToUpdate *cobra.Command, isReplicationDestination bool) {
+ crName := "ReplicationSource"
+ if isReplicationDestination {
+ crName = "ReplicationDestination"
+ }
+ cmdToUpdate.Flags().String("runasgroup", "",
+ fmt.Sprintf("MoverSecurityContext runAsGroup to use in the %s (only if rsynctls=true)", crName))
+ cmdToUpdate.Flags().String("runasuser", "",
+ fmt.Sprintf("MoverSecurityContext runAsUser to use in the %s (only if rsynctls=true)", crName))
+ cmdToUpdate.Flags().String("fsgroup", "",
+ fmt.Sprintf("MoverSecurityContext fsGroup to use in the %s (only if rsynctls=true)", crName))
+ // set runAsNonRoot as a string value with "" as default, as we don't want to
+ // specify moverSecurityContext.runAsNonRoot unless the user sets this flag
+ cmdToUpdate.Flags().String("runasnonroot", "",
+ fmt.Sprintf("MoverSecurityContext runAsNonRoot (true/false) setting to use in the %s (only if rsynctls=true)",
+ crName))
+ cmdToUpdate.Flags().String("seccompprofiletype", "",
+ fmt.Sprintf("MoverSecurityContext SeccompProfile.Type to use in the %s (only if rsynctls=true)", crName))
+}
+
+//nolint:funlen
+func parseCLIRsyncTLSMoverSecurityContextFlags(cmd *cobra.Command) (*corev1.PodSecurityContext, error) {
+ moverSecurityContext := &corev1.PodSecurityContext{}
+ moverSecurityContextUpdated := false
+
+ runAsGroupStr, err := cmd.Flags().GetString("runasgroup")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch runasgroup, %w", err)
+ }
+ if runAsGroupStr != "" {
+ runAsGroupInt64, err := strconv.ParseInt(runAsGroupStr, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse runasgroup, %w", err)
+ }
+ moverSecurityContext.RunAsGroup = &runAsGroupInt64
+ moverSecurityContextUpdated = true
+ }
+
+ runAsUserStr, err := cmd.Flags().GetString("runasuser")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch runasuser, %w", err)
+ }
+ if runAsUserStr != "" {
+ runAsUserInt64, err := strconv.ParseInt(runAsUserStr, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse runasuser, %w", err)
+ }
+ moverSecurityContext.RunAsUser = &runAsUserInt64
+ moverSecurityContextUpdated = true
+ }
+
+ fsGroupStr, err := cmd.Flags().GetString("fsgroup")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch fsgroup, %w", err)
+ }
+ if fsGroupStr != "" {
+ fsGroupInt64, err := strconv.ParseInt(fsGroupStr, 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse fsgroup, %w", err)
+ }
+ moverSecurityContext.FSGroup = &fsGroupInt64
+ moverSecurityContextUpdated = true
+ }
+
+ runAsNonRootStr, err := cmd.Flags().GetString("runasnonroot")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch runasnonroot, %w", err)
+ }
+ if runAsNonRootStr != "" {
+ runAsNonRootBool, err := strconv.ParseBool(runAsNonRootStr)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to parse runasnonroot, %w", err)
+ }
+ moverSecurityContext.RunAsNonRoot = &runAsNonRootBool
+ moverSecurityContextUpdated = true
+ }
+
+ secCompProfileTypeStr, err := cmd.Flags().GetString("seccompprofiletype")
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch seccompprofiletype, %w", err)
+ }
+ if secCompProfileTypeStr != "" {
+ if corev1.SeccompProfileType(secCompProfileTypeStr) != corev1.SeccompProfileTypeLocalhost &&
+ corev1.SeccompProfileType(secCompProfileTypeStr) != corev1.SeccompProfileTypeRuntimeDefault &&
+ corev1.SeccompProfileType(secCompProfileTypeStr) != corev1.SeccompProfileTypeUnconfined {
+ return nil, fmt.Errorf("unsupported seccompprofiletype: %v", secCompProfileTypeStr)
+ }
+ moverSecurityContext.SeccompProfile = &corev1.SeccompProfile{
+ Type: corev1.SeccompProfileType(secCompProfileTypeStr),
+ }
+ moverSecurityContextUpdated = true
+ }
+
+ if moverSecurityContextUpdated {
+ return moverSecurityContext, nil
+ }
+
+ // No need to set a moverSecurityContext
+ return nil, nil
+}
diff --git a/test-e2e/test_replication_sync_direct_tls_normal.yml b/test-e2e/test_replication_sync_direct_tls_normal.yml
new file mode 100644
index 000000000..e77f29aa4
--- /dev/null
+++ b/test-e2e/test_replication_sync_direct_tls_normal.yml
@@ -0,0 +1,194 @@
+---
+- hosts: localhost
+ tags:
+ - cli
+ - rsync_tls
+ - unprivileged
+ tasks:
+ - name: Create namespace
+ include_role:
+ name: create_namespace
+
+ - name: Probe cluster information
+ include_role:
+ name: gather_cluster_info
+
+ - name: Define podSecurityContext
+ ansible.builtin.set_fact:
+ podSecurityContext:
+ fsGroup: 5678
+ runAsGroup: 5678
+ runAsNonRoot: true
+ runAsUser: 1234
+ seccompProfile:
+ type: RuntimeDefault
+ when: not cluster_info.is_openshift
+
+ - name: Create source PVC
+ kubernetes.core.k8s:
+ state: present
+ definition:
+ kind: PersistentVolumeClaim
+ apiVersion: v1
+ metadata:
+ name: data-source
+ namespace: "{{ namespace }}"
+ spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+
+ - name: Create destination PVC
+ kubernetes.core.k8s:
+ state: present
+ definition:
+ kind: PersistentVolumeClaim
+ apiVersion: v1
+ metadata:
+ name: data-dest
+ namespace: "{{ namespace }}"
+ spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+
+ # Both PVCs are empty. We use this role because it mounts both PVCs on the
+ # same Pod, forcing them to be co-located.
+ - name: Ensure both PVCs are provisioned in same zone
+ include_role:
+ name: compare_pvc_data
+ vars:
+ pvc1_name: data-source
+ pvc2_name: data-dest
+
+ - name: Write data into the source PVC
+ include_role:
+ name: write_to_pvc
+ vars:
+ data: 'data'
+ path: '/datafile'
+ pvc_name: 'data-source'
+
+ - name: Create replication relationship with rsync-tls
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "create"
+ - "--rsynctls"
+ - "true"
+
+ - name: Set source of replication (w/ mSC)
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-source"
+ - "--pvcname"
+ - "{{namespace}}/data-source"
+ - "--copymethod"
+ - "Snapshot"
+ - "--fsgroup"
+ - "{{ podSecurityContext.fsGroup }}"
+ - "--runasgroup"
+ - "{{ podSecurityContext.runAsGroup }}"
+ - "--runasuser"
+ - "{{ podSecurityContext.runAsUser }}"
+ - "--runasnonroot"
+ - "{{ podSecurityContext.runAsNonRoot }}"
+ - "--seccompprofiletype"
+ - "{{ podSecurityContext.seccompProfile.type }}"
+ when: podSecurityContext is defined
+
+ - name: Set source of replication (w/o mSC)
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-source"
+ - "--pvcname"
+ - "{{namespace}}/data-source"
+ - "--copymethod"
+ - "Snapshot"
+ when: podSecurityContext is not defined
+
+ - name: Set destination of replication (w/ mSC)
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-destination"
+ - "--destination"
+ - "{{namespace}}/data-dest"
+ - "--copymethod"
+ - "Direct"
+ - "--fsgroup"
+ - "{{ podSecurityContext.fsGroup }}"
+ - "--runasgroup"
+ - "{{ podSecurityContext.runAsGroup }}"
+ - "--runasuser"
+ - "{{ podSecurityContext.runAsUser }}"
+ - "--runasnonroot"
+ - "{{ podSecurityContext.runAsNonRoot }}"
+ - "--seccompprofiletype"
+ - "{{ podSecurityContext.seccompProfile.type }}"
+ when: podSecurityContext is defined
+
+ - name: Set destination of replication (w/o mSC)
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-destination"
+ - "--destination"
+ - "{{namespace}}/data-dest"
+ - "--copymethod"
+ - "Direct"
+ when: podSecurityContext is not defined
+
+ - name: Trigger synchronization
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "sync"
+ timeout: 900 # Command doesn't return until sync completes
+
+ - name: Clean up replication resources
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "delete"
+
+ - name: Verify contents of PVC
+ include_role:
+ name: compare_pvc_data
+ vars:
+ pvc1_name: data-source
+ pvc2_name: data-dest
diff --git a/test-e2e/test_replication_sync_direct_tls_priv.yml b/test-e2e/test_replication_sync_direct_tls_priv.yml
new file mode 100644
index 000000000..7c4769075
--- /dev/null
+++ b/test-e2e/test_replication_sync_direct_tls_priv.yml
@@ -0,0 +1,136 @@
+---
+- hosts: localhost
+ tags:
+ - cli
+ - rsync_tls
+ - privileged
+ tasks:
+ - name: Create namespace
+ include_role:
+ name: create_namespace
+
+ - name: Probe cluster information
+ include_role:
+ name: gather_cluster_info
+
+ - name: Enable privileged movers
+ include_role:
+ name: enable_privileged_mover
+
+ - name: Create source PVC
+ kubernetes.core.k8s:
+ state: present
+ definition:
+ kind: PersistentVolumeClaim
+ apiVersion: v1
+ metadata:
+ name: data-source
+ namespace: "{{ namespace }}"
+ spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+
+ - name: Create destination PVC
+ kubernetes.core.k8s:
+ state: present
+ definition:
+ kind: PersistentVolumeClaim
+ apiVersion: v1
+ metadata:
+ name: data-dest
+ namespace: "{{ namespace }}"
+ spec:
+ accessModes:
+ - ReadWriteOnce
+ resources:
+ requests:
+ storage: 1Gi
+
+ # Both PVCs are empty. We use this role because it mounts both PVCs on the
+ # same Pod, forcing them to be co-located.
+ - name: Ensure both PVCs are provisioned in same zone
+ include_role:
+ name: compare_pvc_data
+ vars:
+ pvc1_name: data-source
+ pvc2_name: data-dest
+
+ - name: Write data into the source PVC
+ include_role:
+ name: write_to_pvc
+ vars:
+ data: 'data'
+ path: '/datafile'
+ pvc_name: 'data-source'
+
+ - name: Create replication relationship with rsync-tls
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "create"
+ - "--rsynctls"
+ - "true"
+
+ - name: Set source of replication
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-source"
+ - "--pvcname"
+ - "{{namespace}}/data-source"
+ - "--copymethod"
+ - "Snapshot"
+
+ - name: Set destination of replication
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "set-destination"
+ - "--destination"
+ - "{{namespace}}/data-dest"
+ - "--copymethod"
+ - "Direct"
+ when: podSecurityContext is not defined
+
+ - name: Trigger synchronization
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "sync"
+ timeout: 900 # Command doesn't return until sync completes
+
+ - name: Clean up replication resources
+ include_role:
+ name: cli
+ vars:
+ params:
+ - "replication"
+ - "-r"
+ - "replication"
+ - "delete"
+
+ - name: Verify contents of PVC
+ include_role:
+ name: compare_pvc_data
+ vars:
+ pvc1_name: data-source
+ pvc2_name: data-dest