Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/deployment/reconcile/plan_builder_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObje
}
groupSpec := spec.GetServerGroupSpec(group)
storageClassName := groupSpec.GetStorageClassName()
if storageClassName == "" {
// Using default storage class name
continue
}
// Load PVC
pvc, err := getPVC(m.PersistentVolumeClaimName)
if err != nil {
Expand All @@ -71,10 +67,17 @@ func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObje
continue
}
replacementNeeded := false
if util.StringOrDefault(pvc.Spec.StorageClassName) != storageClassName {
if util.StringOrDefault(pvc.Spec.StorageClassName) != storageClassName && storageClassName != "" {
// Storageclass has changed
log.Debug().Str("pod-name", m.PodName).
Str("pvc-storage-class", util.StringOrDefault(pvc.Spec.StorageClassName)).
Str("group-storage-class", storageClassName).Msg("Storage class has changed - pod needs replacement")
replacementNeeded = true
}
rotationNeeded := false
if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) {
rotationNeeded = true
}
if replacementNeeded {
if group != api.ServerGroupAgents && group != api.ServerGroupDBServers {
// Only agents & dbservers are allowed to change their storage class.
Expand Down Expand Up @@ -107,6 +110,8 @@ func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObje
)
}
}
} else if rotationNeeded {
plan = createRotateMemberPlan(log, m, group, "Filesystem resize pending")
}
}
return nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/deployment/resources/pvc_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
apiv1 "k8s.io/api/core/v1"
)

var (
Expand Down Expand Up @@ -57,6 +58,7 @@ func (r *Resources) InspectPVCs(ctx context.Context) (util.Interval, error) {

// Update member status from all pods found
status, _ := r.context.GetStatus()
spec := r.context.GetSpec()
for _, p := range pvcs {
// PVC belongs to this deployment, update metric
inspectedPVCsCounters.WithLabelValues(deploymentName).Inc()
Expand All @@ -79,6 +81,30 @@ func (r *Resources) InspectPVCs(ctx context.Context) (util.Interval, error) {
continue
}

// Resize inspector
groupSpec := spec.GetServerGroupSpec(group)
if requestedSize, ok := groupSpec.Resources.Requests[apiv1.ResourceStorage]; ok {
if volumeSize, ok := p.Spec.Resources.Requests[apiv1.ResourceStorage]; ok {
cmp := volumeSize.Cmp(requestedSize)
if cmp < 0 {
// Size of the volume is smaller than the requested size
// Update the pvc with the request size
p.Spec.Resources.Requests[apiv1.ResourceStorage] = requestedSize

log.Debug().Str("pvc-capacity", volumeSize.String()).Str("requested", requestedSize.String()).Msg("PVC capacity differs - updating")
kube := r.context.GetKubeCli()
if _, err := kube.CoreV1().PersistentVolumeClaims(r.context.GetNamespace()).Update(&p); err != nil {
log.Error().Err(err).Msg("Failed to update pvc")
}
r.context.CreateEvent(k8sutil.NewPVCResizedEvent(r.context.GetAPIObject(), p.Name))
} else if cmp > 0 {
log.Error().Str("server-group", group.AsRole()).Str("pvc-storage-size", volumeSize.String()).Str("requested-size", requestedSize.String()).
Msg("Volume size should not shrink")
r.context.CreateEvent(k8sutil.NewCannotShrinkVolumeEvent(r.context.GetAPIObject(), p.Name))
}
}
}

if k8sutil.IsPersistentVolumeClaimMarkedForDeletion(&p) {
// Process finalizers
if x, err := r.runPVCFinalizers(ctx, &p, group, memberStatus); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/k8sutil/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ func NewDowntimeNotAllowedEvent(apiObject APIObject, operation string) *Event {
return event
}

// NewPVCResizedEvent creates an event indicating that a PVC has been resized
func NewPVCResizedEvent(apiObject APIObject, pvcname string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "PVC Resized"
event.Message = fmt.Sprintf("The persistent volume claim %s has been resized", pvcname)
return event
}

// NewCannotShrinkVolumeEvent creates an event indicating that the user tried to shrink a PVC
func NewCannotShrinkVolumeEvent(apiObject APIObject, pvcname string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "PVC Shrinked"
event.Message = fmt.Sprintf("The persistent volume claim %s can not be shrinked", pvcname)
return event
}

// NewUpgradeNotAllowedEvent creates an event indicating that an upgrade (or downgrade) is not allowed.
func NewUpgradeNotAllowedEvent(apiObject APIObject,
fromVersion, toVersion driver.Version,
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/k8sutil/pvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ type PersistentVolumeClaimInterface interface {
Get(name string, options metav1.GetOptions) (*v1.PersistentVolumeClaim, error)
}

// IsPersistentVolumeClaimMarkedForDeletion returns true if the pod has been marked for deletion.
// IsPersistentVolumeClaimMarkedForDeletion returns true if the pvc has been marked for deletion.
func IsPersistentVolumeClaimMarkedForDeletion(pvc *v1.PersistentVolumeClaim) bool {
return pvc.DeletionTimestamp != nil
}

// IsPersistentVolumeClaimFileSystemResizePending returns true if the pvc has FileSystemResizePending set to true
func IsPersistentVolumeClaimFileSystemResizePending(pvc *v1.PersistentVolumeClaim) bool {
for _, c := range pvc.Status.Conditions {
if c.Type == v1.PersistentVolumeClaimFileSystemResizePending && c.Status == v1.ConditionTrue {
return true
}
}
return false
}

// CreatePersistentVolumeClaimName returns the name of the persistent volume claim for a member with
// a given id in a deployment with a given name.
func CreatePersistentVolumeClaimName(deploymentName, role, id string) string {
Expand Down
95 changes: 94 additions & 1 deletion tests/persistent_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,24 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/arangodb/arangosync/pkg/retry"

"github.com/dchest/uniuri"
"github.com/stretchr/testify/assert"

api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
kubeArangoClient "github.com/arangodb/kube-arangodb/pkg/client"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
//"github.com/arangodb/kube-arangodb/pkg/util"
)

// TODO - add description
func TestPersistence(t *testing.T) {
func TestPVCExists(t *testing.T) {
longOrSkip(t)

k8sNameSpace := getNamespace(t)
Expand Down Expand Up @@ -78,3 +85,89 @@ func TestPersistence(t *testing.T) {
// Cleanup
removeDeployment(deploymentClient, deploymentTemplate.GetName(), k8sNameSpace)
}

func TestPVCResize(t *testing.T) {
longOrSkip(t)

k8sNameSpace := getNamespace(t)
k8sClient := mustNewKubeClient(t)

mode := api.DeploymentModeCluster
engine := api.StorageEngineRocksDB

size10GB, _ := resource.ParseQuantity("10Gi")
size08GB, _ := resource.ParseQuantity("8Gi")

deploymentClient := kubeArangoClient.MustNewInCluster()
deploymentTemplate := newDeployment(strings.Replace(fmt.Sprintf("trsz-%s-%s-%s", mode[:2], engine[:2], uniuri.NewLen(4)), ".", "", -1))
deploymentTemplate.Spec.Mode = api.NewMode(mode)
deploymentTemplate.Spec.StorageEngine = api.NewStorageEngine(engine)
deploymentTemplate.Spec.TLS = api.TLSSpec{}
deploymentTemplate.Spec.DBServers.Resources.Requests = corev1.ResourceList{corev1.ResourceStorage: size08GB}
deploymentTemplate.Spec.SetDefaults(deploymentTemplate.GetName()) // this must be last
assert.NoError(t, deploymentTemplate.Spec.Validate())

// Create deployment
_, err := deploymentClient.DatabaseV1alpha().ArangoDeployments(k8sNameSpace).Create(deploymentTemplate)
defer removeDeployment(deploymentClient, deploymentTemplate.GetName(), k8sNameSpace)
assert.NoError(t, err, "failed to create deplyment: %s", err)

depl, err := waitUntilDeployment(deploymentClient, deploymentTemplate.GetName(), k8sNameSpace, deploymentIsReady())
assert.NoError(t, err, fmt.Sprintf("Deployment not running in time: %s", err))

// Get list of all pvcs for dbservers
for _, m := range depl.Status.Members.DBServers {
pvc, err := k8sClient.CoreV1().PersistentVolumeClaims(k8sNameSpace).Get(m.PersistentVolumeClaimName, metav1.GetOptions{})
assert.NoError(t, err, "failed to get pvc: %s", err)
volumeSize, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
assert.True(t, ok, "pvc does not have storage resource")
assert.True(t, volumeSize.Cmp(size08GB) == 0, "wrong volume size: expected: %s, found: %s", size08GB.String(), volumeSize.String())
}

// Update the deployment
// Try to change image version
depl, err = updateDeployment(deploymentClient, deploymentTemplate.GetName(), k8sNameSpace,
func(depl *api.DeploymentSpec) {
depl.DBServers.Resources.Requests[corev1.ResourceStorage] = size10GB
})
if err != nil {
t.Fatalf("Failed to update the deployment")
} else {
t.Log("Updated deployment")
}

if err := retry.Retry(func() error {
// Get list of all pvcs for dbservers and check for new size
for _, m := range depl.Status.Members.DBServers {
pvc, err := k8sClient.CoreV1().PersistentVolumeClaims(k8sNameSpace).Get(m.PersistentVolumeClaimName, metav1.GetOptions{})
if err != nil {
return err
}
volumeSize, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
if !ok {
return fmt.Errorf("pvc does not have storage resource")
}
if volumeSize.Cmp(size10GB) != 0 {
return fmt.Errorf("wrong pvc size: expected: %s, found: %s", size10GB.String(), volumeSize.String())
}
volume, err := k8sClient.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return err
}
volumeSize, ok = volume.Spec.Capacity[corev1.ResourceStorage]
if !ok {
return fmt.Errorf("pv does not have storage resource")
}
if volumeSize.Cmp(size10GB) != 0 {
return fmt.Errorf("wrong volume size: expected: %s, found: %s", size10GB.String(), volumeSize.String())
}
if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) {
return fmt.Errorf("persistent volume claim file system resize pending")
}
}
return nil
}, 5*time.Minute); err != nil {
t.Fatalf("PVCs not resized: %s", err.Error())
}

}