Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: updating stan eventbus sts instead of recreating when there is change #2054

Merged
merged 3 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions controllers/eventbus/installer/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,16 @@ func (i *natsInstaller) uninstallPVCs(ctx context.Context) error {
log := i.logger
pvcs, err := i.getPVCs(ctx, i.labels)
if err != nil {
log.Errorw("failed to get PVCs created by nats streaming statefulset when uninstalling", zap.Error(err))
log.Errorw("Failed to get PVCs created by nats streaming statefulset when uninstalling", zap.Error(err))
return err
}
for _, pvc := range pvcs {
err = i.client.Delete(ctx, &pvc)
if err != nil {
log.Errorw("failed to delete pvc when uninstalling", zap.Any("pvcName", pvc.Name), zap.Error(err))
log.Errorw("Failed to delete pvc when uninstalling", zap.Any("pvcName", pvc.Name), zap.Error(err))
return err
}
log.Infow("pvc deleted", "pvcName", pvc.Name)
log.Infow("Pvc deleted", "pvcName", pvc.Name)
}
return nil
}
Expand All @@ -143,13 +143,13 @@ func (i *natsInstaller) createStanService(ctx context.Context) (*corev1.Service,
svc, err := i.getStanService(ctx)
if err != nil && !apierrors.IsNotFound(err) {
i.eventBus.Status.MarkDeployFailed("GetServiceFailed", "Get existing service failed")
log.Errorw("error getting existing service", zap.Error(err))
log.Errorw("Error getting existing service", zap.Error(err))
return nil, err
}
expectedSvc, err := i.buildStanService()
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildServiceFailed", "Failed to build a service spec")
log.Errorw("error building service spec", zap.Error(err))
log.Errorw("Error building service spec", zap.Error(err))
return nil, err
}
if svc != nil {
Expand All @@ -162,20 +162,20 @@ func (i *natsInstaller) createStanService(ctx context.Context) (*corev1.Service,
err = i.client.Update(ctx, svc)
if err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateServiceFailed", "Failed to update existing service")
log.Errorw("error updating existing service", zap.Error(err))
log.Errorw("Error updating existing service", zap.Error(err))
return nil, err
}
log.Infow("service is updated", "serviceName", svc.Name)
log.Infow("Service is updated", "serviceName", svc.Name)
}
return svc, nil
}
err = i.client.Create(ctx, expectedSvc)
if err != nil {
i.eventBus.Status.MarkDeployFailed("CreateServiceFailed", "Failed to create a service")
log.Errorw("error creating a service", zap.Error(err))
log.Errorw("Error creating a service", zap.Error(err))
return nil, err
}
log.Infow("service is created", "serviceName", expectedSvc.Name)
log.Infow("Service is created", "serviceName", expectedSvc.Name)
return expectedSvc, nil
}

Expand All @@ -185,13 +185,13 @@ func (i *natsInstaller) createConfigMap(ctx context.Context) (*corev1.ConfigMap,
cm, err := i.getConfigMap(ctx)
if err != nil && !apierrors.IsNotFound(err) {
i.eventBus.Status.MarkDeployFailed("GetConfigMapFailed", "Failed to get existing configmap")
log.Errorw("error getting existing configmap", zap.Error(err))
log.Errorw("Error getting existing configmap", zap.Error(err))
return nil, err
}
expectedCm, err := i.buildConfigMap()
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildConfigMapFailed", "Failed to build a configmap spec")
log.Errorw("error building configmap spec", zap.Error(err))
log.Errorw("Error building configmap spec", zap.Error(err))
return nil, err
}
if cm != nil {
Expand All @@ -203,20 +203,20 @@ func (i *natsInstaller) createConfigMap(ctx context.Context) (*corev1.ConfigMap,
err := i.client.Update(ctx, cm)
if err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateConfigMapFailed", "Failed to update existing configmap")
log.Errorw("error updating configmap", zap.Error(err))
log.Errorw("Error updating configmap", zap.Error(err))
return nil, err
}
log.Infow("updated configmap", "configmapName", cm.Name)
log.Infow("Updated configmap", "configmapName", cm.Name)
}
return cm, nil
}
err = i.client.Create(ctx, expectedCm)
if err != nil {
i.eventBus.Status.MarkDeployFailed("CreateConfigMapFailed", "Failed to create configmap")
log.Errorw("error creating a configmap", zap.Error(err))
log.Errorw("Error creating a configmap", zap.Error(err))
return nil, err
}
log.Infow("created configmap", "configmapName", expectedCm.Name)
log.Infow("Created configmap", "configmapName", expectedCm.Name)
return expectedCm, nil
}

Expand All @@ -226,13 +226,13 @@ func (i *natsInstaller) createAuthSecrets(ctx context.Context, strategy v1alpha1
sSecret, err := i.getServerAuthSecret(ctx)
if err != nil && !apierrors.IsNotFound(err) {
i.eventBus.Status.MarkDeployFailed("GetServerAuthSecretFailed", "Failed to get existing server auth secret")
log.Errorw("error getting existing server auth secret", zap.Error(err))
log.Errorw("Error getting existing server auth secret", zap.Error(err))
return nil, nil, err
}
cSecret, err := i.getClientAuthSecret(ctx)
if err != nil && !apierrors.IsNotFound(err) {
i.eventBus.Status.MarkDeployFailed("GetClientAuthSecretFailed", "Failed to get existing client auth secret")
log.Errorw("error getting existing client auth secret", zap.Error(err))
log.Errorw("Error getting existing client auth secret", zap.Error(err))
return nil, nil, err
}
if strategy != v1alpha1.AuthStrategyNone { // Do not checkout AuthStrategyNone because it only has server auth secret
Expand All @@ -251,10 +251,10 @@ func (i *natsInstaller) createAuthSecrets(ctx context.Context, strategy v1alpha1
err = i.client.Delete(ctx, cSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("DeleteClientAuthSecretFailed", "Failed to delete the client auth secret")
log.Errorw("error deleting client auth secret", zap.Error(err))
log.Errorw("Error deleting client auth secret", zap.Error(err))
return nil, nil, err
}
log.Info("deleted server auth secret")
log.Info("Deleted server auth secret")
}
if sSecret != nil && sSecret.Annotations != nil && sSecret.Annotations[authStrategyAnnoKey] == string(strategy) && len(sSecret.Data[serverAuthSecretKey]) == 0 {
// If the server auth secret is already existing, strategy didn't change, and the secret is empty string, reuse it without updating.
Expand All @@ -264,7 +264,7 @@ func (i *natsInstaller) createAuthSecrets(ctx context.Context, strategy v1alpha1
expectedSSecret, err := i.buildServerAuthSecret(strategy, "")
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildServerAuthSecretFailed", "Failed to build a server auth secret spec")
log.Errorw("error building server auth secret spec", zap.Error(err))
log.Errorw("Error building server auth secret spec", zap.Error(err))
return nil, nil, err
}
if sSecret != nil {
Expand All @@ -274,19 +274,19 @@ func (i *natsInstaller) createAuthSecrets(ctx context.Context, strategy v1alpha1
err = i.client.Update(ctx, sSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateServerAuthSecretFailed", "Failed to update the server auth secret")
log.Errorw("error updating server auth secret", zap.Error(err))
log.Errorw("Error updating server auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("updated server auth secret", "serverAuthSecretName", sSecret.Name)
log.Infow("Updated server auth secret", "serverAuthSecretName", sSecret.Name)
return sSecret, nil, nil
}
err = i.client.Create(ctx, expectedSSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("CreateServerAuthSecretFailed", "Failed to create a server auth secret")
log.Errorw("error creating server auth secret", zap.Error(err))
log.Errorw("Error creating server auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("created server auth secret", "serverAuthSecretName", expectedSSecret.Name)
log.Infow("Created server auth secret", "serverAuthSecretName", expectedSSecret.Name)
return expectedSSecret, nil, nil
case v1alpha1.AuthStrategyToken:
token := common.RandomString(64)
Expand All @@ -298,58 +298,58 @@ func (i *natsInstaller) createAuthSecrets(ctx context.Context, strategy v1alpha1
expectedSSecret, err := i.buildServerAuthSecret(strategy, serverAuthText)
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildServerAuthSecretFailed", "Failed to build a server auth secret spec")
log.Errorw("error building server auth secret spec", zap.Error(err))
log.Errorw("Error building server auth secret spec", zap.Error(err))
return nil, nil, err
}
returnedSSecret := expectedSSecret
if sSecret == nil {
err = i.client.Create(ctx, expectedSSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("CreateServerAuthSecretFailed", "Failed to create a server auth secret")
log.Errorw("error creating server auth secret", zap.Error(err))
log.Errorw("Error creating server auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("created server auth secret", "serverAuthSecretName", expectedSSecret.Name)
log.Infow("Created server auth secret", "serverAuthSecretName", expectedSSecret.Name)
} else {
sSecret.Data = expectedSSecret.Data
sSecret.SetLabels(expectedSSecret.Labels)
sSecret.SetAnnotations(expectedSSecret.Annotations)
err = i.client.Update(ctx, sSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateServerAuthSecretFailed", "Failed to update the server auth secret")
log.Errorw("error updating server auth secret", zap.Error(err))
log.Errorw("Error updating server auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("updated server auth secret", "serverAuthSecretName", sSecret.Name)
log.Infow("Updated server auth secret", "serverAuthSecretName", sSecret.Name)
returnedSSecret = sSecret
}
// create client auth secret
expectedCSecret, err := i.buildClientAuthSecret(strategy, clientAuthText)
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildClientAuthSecretFailed", "Failed to build a client auth secret spec")
log.Errorw("error building client auth secret spec", zap.Error(err))
log.Errorw("Error building client auth secret spec", zap.Error(err))
return nil, nil, err
}
returnedCSecret := expectedCSecret
if cSecret == nil {
err = i.client.Create(ctx, expectedCSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("CreateClientAuthSecretFailed", "Failed to create a client auth secret")
log.Errorw("error creating client auth secret", zap.Error(err))
log.Errorw("Error creating client auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("created client auth secret", "clientAuthSecretName", expectedCSecret.Name)
log.Infow("Created client auth secret", "clientAuthSecretName", expectedCSecret.Name)
} else {
cSecret.Data = expectedCSecret.Data
cSecret.SetLabels(expectedCSecret.Labels)
cSecret.SetAnnotations(expectedCSecret.Annotations)
err = i.client.Update(ctx, cSecret)
if err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateClientAuthSecretFailed", "Failed to update the client auth secret")
log.Errorw("error updating client auth secret", zap.Error(err))
log.Errorw("Error updating client auth secret", zap.Error(err))
return nil, nil, err
}
log.Infow("updated client auth secret", "clientAuthSecretName", cSecret.Name)
log.Infow("Updated client auth secret", "clientAuthSecretName", cSecret.Name)
returnedCSecret = cSecret
}
return returnedSSecret, returnedCSecret, nil
Expand All @@ -365,35 +365,36 @@ func (i *natsInstaller) createStatefulSet(ctx context.Context, serviceName, conf
ss, err := i.getStatefulSet(ctx)
if err != nil && !apierrors.IsNotFound(err) {
i.eventBus.Status.MarkDeployFailed("GetStatefulSetFailed", "Failed to get existing statefulset")
log.Errorw("error getting existing statefulset", zap.Error(err))
log.Errorw("Error getting existing statefulset", zap.Error(err))
return err
}
expectedSs, err := i.buildStatefulSet(serviceName, configmapName, authSecretName)
if err != nil {
i.eventBus.Status.MarkDeployFailed("BuildStatefulSetFailed", "Failed to build a statefulset spec")
log.Errorw("error building statefulset spec", zap.Error(err))
log.Errorw("Error building statefulset spec", zap.Error(err))
return err
}
if ss != nil {
if ss.Annotations != nil && ss.Annotations[common.AnnotationResourceSpecHash] == expectedSs.Annotations[common.AnnotationResourceSpecHash] {
return nil
}
// Delete the existing one to recreate it
err := i.client.Delete(ctx, ss)
if err != nil {
i.eventBus.Status.MarkDeployFailed("DeleteOldStatefulSetFailed", "Failed to delete a statefulset")
log.Errorw("error deleting a statefulset", zap.Error(err))
ss.SetLabels(expectedSs.Labels)
ss.Annotations[common.AnnotationResourceSpecHash] = expectedSs.Annotations[common.AnnotationResourceSpecHash]
ss.Spec = expectedSs.Spec
if err := i.client.Update(ctx, ss); err != nil {
i.eventBus.Status.MarkDeployFailed("UpdateStatefulSetFailed", "Failed to update a statefulset")
log.Errorw("Error updating a statefulset", zap.Error(err))
return err
}
log.Infow("old statefulset is deleted", "statefulsetName", ss.Name)
log.Infow("Statefulset is updated", "statefulsetName", ss.Name)
return nil
}
err = i.client.Create(ctx, expectedSs)
if err != nil {
if err := i.client.Create(ctx, expectedSs); err != nil {
i.eventBus.Status.MarkDeployFailed("CreateStatefulSetFailed", "Failed to create a statefulset")
log.Errorw("error creating a statefulset", zap.Error(err))
log.Errorw("Error creating a statefulset", zap.Error(err))
return err
}
log.Infow("statefulset is created", "statefulsetName", expectedSs.Name)
log.Infow("Statefulset is created", "statefulsetName", expectedSs.Name)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build functional
// +build functional

package e2e

Expand Down