Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,25 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// Pick which pods should be deleted for an update.
// Don't exit on an error, which would only occur because of an HTTP Exception. Requeue later instead.
additionalPodsToUpdate, retryLater := util.DeterminePodsSafeToUpdate(ctx, instance, outOfDatePods, int(newStatus.ReadyReplicas), availableUpdatedPodCount, len(outOfDatePodsNotStarted), updateLogger)
podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)
additionalPodsToUpdate, podsHaveReplicas, retryLater := util.DeterminePodsSafeToUpdate(ctx, instance, outOfDatePods, int(newStatus.ReadyReplicas), availableUpdatedPodCount, len(outOfDatePodsNotStarted), updateLogger)
// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
for _, pod := range additionalPodsToUpdate {
if podsHaveReplicas[pod.Name] {
// Only evict pods that contain replicas in the clusterState
if evictError, canDeletePod := util.EvictReplicasForPodIfNecessary(ctx, instance, &pod, updateLogger); evictError != nil {
err = evictError
updateLogger.Error(err, "Error while evicting replicas on pod", "pod", pod.Name)
} else if canDeletePod {
podsToUpdate = append(podsToUpdate, pod)
} else {
// Try again in 5 seconds if cannot delete a pod.
updateRequeueAfter(&requeueOrNot, time.Second*5)
}
} else {
// If a pod has no replicas, then update it when asked to
podsToUpdate = append(podsToUpdate, pod)
}
}

for _, pod := range podsToUpdate {
err = r.Delete(ctx, &pod, client.Preconditions{
Expand All @@ -433,6 +450,9 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil || retryLater {
updateRequeueAfter(&requeueOrNot, time.Second*15)
}
if err != nil {
return requeueOrNot, err
}
}

extAddressabilityOpts := instance.Spec.SolrAddressability.External
Expand Down Expand Up @@ -474,7 +494,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

return requeueOrNot, nil
return requeueOrNot, err
}

func (r *SolrCloudReconciler) reconcileCloudStatus(ctx context.Context, solrCloud *solrv1beta1.SolrCloud, logger logr.Logger,
Expand Down
94 changes: 85 additions & 9 deletions controllers/util/solr_update_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import (
solr "github.com/apache/solr-operator/api/v1beta1"
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
cron "github.com/robfig/cron/v3"
"github.com/robfig/cron/v3"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net/url"
"sort"
"strings"
"time"
)

Expand Down Expand Up @@ -94,7 +95,7 @@ func scheduleNextRestartWithTime(restartSchedule string, podTemplateAnnotations
// TODO:
// - Think about caching this for ~250 ms? Not a huge need to send these requests milliseconds apart.
// - Might be too much complexity for very little gain.
func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, readyPods int, availableUpdatedPodCount int, outOfDatePodsNotStartedCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, retryLater bool) {
func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, readyPods int, availableUpdatedPodCount int, outOfDatePodsNotStartedCount int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool, retryLater bool) {
// Before fetching the cluster state, be sure that there is room to update at least 1 pod
maxPodsUnavailable, unavailableUpdatedPodCount, maxPodsToUpdate := calculateMaxPodsToUpdate(cloud, len(outOfDatePods), outOfDatePodsNotStartedCount, availableUpdatedPodCount)
if maxPodsToUpdate <= 0 {
Expand All @@ -114,7 +115,7 @@ func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOf
} else {
queryParams.Set("action", "OVERSEERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, overseerResp)
if hasError, apiErr := solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", clusterResp.ResponseHeader); hasError {
if hasError, apiErr = solr_api.CheckForCollectionsApiError("OVERSEERSTATUS", clusterResp.ResponseHeader); hasError {
err = apiErr
}
}
Expand All @@ -127,7 +128,7 @@ func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOf
// If the update logic already wants to retry later, then do not pick any pods
if !retryLater {
logger.Info("Pod update selection started.", "outOfDatePods", len(outOfDatePods), "maxPodsUnavailable", maxPodsUnavailable, "unavailableUpdatedPods", unavailableUpdatedPodCount, "outOfDatePodsNotStarted", outOfDatePodsNotStartedCount, "maxPodsToUpdate", maxPodsToUpdate)
podsToUpdate = pickPodsToUpdate(cloud, outOfDatePods, clusterResp.ClusterStatus, overseerResp.Leader, maxPodsToUpdate, logger)
podsToUpdate, podsHaveReplicas = pickPodsToUpdate(cloud, outOfDatePods, clusterResp.ClusterStatus, overseerResp.Leader, maxPodsToUpdate, logger)

// If there are no pods to upgrade, even though the maxPodsToUpdate is >0, then retry later because the issue stems from cluster state
// and clusterState changes will not call the reconciler.
Expand All @@ -136,7 +137,7 @@ func DeterminePodsSafeToUpdate(ctx context.Context, cloud *solr.SolrCloud, outOf
}
}
}
return podsToUpdate, retryLater
return podsToUpdate, podsHaveReplicas, retryLater
}

// calculateMaxPodsToUpdate determines the maximum number of additional pods that can be updated.
Expand All @@ -154,7 +155,8 @@ func calculateMaxPodsToUpdate(cloud *solr.SolrCloud, outOfDatePodCount int, outO
}

func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, clusterStatus solr_api.SolrClusterStatus,
overseer string, maxPodsToUpdate int, logger logr.Logger) (podsToUpdate []corev1.Pod) {
overseer string, maxPodsToUpdate int, logger logr.Logger) (podsToUpdate []corev1.Pod, podsHaveReplicas map[string]bool) {
podsHaveReplicas = make(map[string]bool, maxPodsToUpdate)
nodeContents, totalShardReplicas, shardReplicasNotActive, allManagedPodsLive := findSolrNodeContents(clusterStatus, overseer, GetAllManagedSolrNodeNames(cloud))
sortNodePodsBySafety(outOfDatePods, nodeContents, cloud)

Expand Down Expand Up @@ -237,6 +239,7 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, cluster
}
logger.Info("Pod killed for update.", "pod", pod.Name, "reason", reason)
podsToUpdate = append(podsToUpdate, pod)
podsHaveReplicas[pod.Name] = isInClusterState && nodeContent.replicas > 0

// Stop after the maxBatchNodeUpdate count, if one is provided.
if maxPodsToUpdate >= 1 && len(podsToUpdate) >= maxPodsToUpdate {
Expand All @@ -247,7 +250,7 @@ func pickPodsToUpdate(cloud *solr.SolrCloud, outOfDatePods []corev1.Pod, cluster
logger.Info("Pod not able to be killed for update.", "pod", pod.Name, "reason", reason)
}
}
return podsToUpdate
return podsToUpdate, podsHaveReplicas
}

func sortNodePodsBySafety(outOfDatePods []corev1.Pod, nodeMap map[string]*SolrNodeContents, solrCloud *solr.SolrCloud) {
Expand Down Expand Up @@ -296,7 +299,7 @@ func ResolveMaxPodsUnavailable(maxPodsUnavailable *intstr.IntOrString, desiredPo
if maxPodsUnavailable != nil && maxPodsUnavailable.Type == intstr.Int && maxPodsUnavailable.IntVal <= int32(0) {
return desiredPods, nil
}
podsUnavailable, err := intstr.GetValueFromIntOrPercent(intstr.ValueOrDefault(maxPodsUnavailable, intstr.FromString(DefaultMaxPodsUnavailable)), desiredPods, false)
podsUnavailable, err := intstr.GetScaledValueFromIntOrPercent(intstr.ValueOrDefault(maxPodsUnavailable, intstr.FromString(DefaultMaxPodsUnavailable)), desiredPods, false)
if err != nil {
return 1, err
}
Expand All @@ -314,7 +317,7 @@ func ResolveMaxShardReplicasUnavailable(maxShardReplicasUnavailable *intstr.IntO
maxUnavailable, isCached := cache[shard]
var err error
if !isCached {
maxUnavailable, err = intstr.GetValueFromIntOrPercent(intstr.ValueOrDefault(maxShardReplicasUnavailable, intstr.FromInt(DefaultMaxShardReplicasUnavailable)), totalShardReplicas[shard], false)
maxUnavailable, err = intstr.GetScaledValueFromIntOrPercent(intstr.ValueOrDefault(maxShardReplicasUnavailable, intstr.FromInt(DefaultMaxShardReplicasUnavailable)), totalShardReplicas[shard], false)
if err != nil {
maxUnavailable = 1
}
Expand Down Expand Up @@ -473,3 +476,76 @@ func GetAllManagedSolrNodeNames(solrCloud *solr.SolrCloud) map[string]bool {
}
return allNodeNames
}

// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod, if the Pod is using ephemeral storage.
// If the pod is using persistent storage, this function is a no-op.
// This function MUST be idempotent and return the same list of pods given the same kubernetes/solr state.
func EvictReplicasForPodIfNecessary(ctx context.Context, solrCloud *solr.SolrCloud, pod *corev1.Pod, logger logr.Logger) (err error, canDeletePod bool) {
var solrDataVolume *corev1.Volume
for _, volume := range pod.Spec.Volumes {
if volume.Name == SolrDataVolumeName {
solrDataVolume = &volume
break
}
}

// Only evict if the Data volume is not persistent
if solrDataVolume.VolumeSource.PersistentVolumeClaim == nil {
// If the Cloud has 1 or zero pods, and this is the "-0" pod, then delete the data since we can't move it anywhere else
// Otherwise, move the replicas to other pods
if (solrCloud.Spec.Replicas == nil || *solrCloud.Spec.Replicas < 2) && strings.HasSuffix(pod.Name, "-0") {
queryParams := url.Values{}
queryParams.Add("action", "DELETENODE")
queryParams.Add("node", SolrNodeName(solrCloud, pod.Name))
// TODO: Figure out a way to do this, since DeleteNode will not delete the last replica of every type...
canDeletePod = true
} else {
requestId := "move-replicas-" + pod.Name

// First check to see if the Async Replace request has started
if asyncState, message, asyncErr := solr_api.CheckAsyncRequest(ctx, solrCloud, requestId); asyncErr != nil {
err = asyncErr
} else if asyncState == "notfound" {
// Submit new Replace Node request
replaceResponse := &solr_api.SolrAsyncResponse{}
queryParams := url.Values{}
queryParams.Add("action", "REPLACENODE")
queryParams.Add("parallel", "true")
queryParams.Add("sourceNode", SolrNodeName(solrCloud, pod.Name))
queryParams.Add("async", requestId)
err = solr_api.CallCollectionsApi(ctx, solrCloud, queryParams, replaceResponse)
if hasError, apiErr := solr_api.CheckForCollectionsApiError("REPLACENODE", replaceResponse.ResponseHeader); hasError {
err = apiErr
}
if err == nil {
logger.Info("Migrating all replicas off of pod before deletion.", "requestId", requestId, "pod", pod.Name)
} else {
logger.Error(err, "Could not migrate all replicas off of pod before deletion. Will try again later.", "requestId", requestId, "message", message)
}
} else {
logger.Info("Found async status", "requestId", requestId, "state", asyncState)
// Only continue to delete the pod if the ReplaceNode request is complete and successful
if asyncState == "completed" {
canDeletePod = true
logger.Info("Migration of all replicas off of pod before deletion complete. Pod can now be deleted.", "pod", pod.Name)
} else if asyncState == "failed" {
logger.Info("Migration of all replicas off of pod before deletion failed. Will try again.", "pod", pod.Name, "message", message)
}

// Delete the async request Id if the async request is successful or failed.
// If the request failed, this will cause a retry since the next reconcile won't find the async requestId in Solr.
if asyncState == "completed" || asyncState == "failed" {
if message, err = solr_api.DeleteAsyncRequest(ctx, solrCloud, requestId); err != nil {
logger.Error(err, "Could not delete Async request status.", "requestId", requestId, "message", message)
} else {
canDeletePod = false
}
}
}
}
} else {
// The pod can be deleted, since it is using persistent data storage
canDeletePod = true
}
return err, canDeletePod
}
14 changes: 10 additions & 4 deletions controllers/util/solr_update_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,23 @@ func TestPickPodsToUpgrade(t *testing.T) {

// Normal inputs
maxshardReplicasUnavailable = intstr.FromInt(1)
podsToUpgrade := getPodNames(pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 6, log))
podsToUpgradeDetailed, podsHaveReplicas := pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 6, log)
assert.Equal(t, podsHaveReplicas, map[string]bool{"foo-solrcloud-2": true, "foo-solrcloud-6": false})
podsToUpgrade := getPodNames(podsToUpgradeDetailed)
assert.ElementsMatch(t, []string{"foo-solrcloud-2", "foo-solrcloud-6"}, podsToUpgrade, "Incorrect set of next pods to upgrade. Do to the down/non-live replicas, only the node without replicas and one more can be upgraded.")

// Test the maxBatchNodeUpgradeSpec
maxshardReplicasUnavailable = intstr.FromInt(1)
podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 1, log))
podsToUpgradeDetailed, podsHaveReplicas = pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 1, log)
assert.Equal(t, podsHaveReplicas, map[string]bool{"foo-solrcloud-6": false})
podsToUpgrade = getPodNames(podsToUpgradeDetailed)
assert.ElementsMatch(t, []string{"foo-solrcloud-6"}, podsToUpgrade, "Incorrect set of next pods to upgrade. Only 1 node should be upgraded when maxBatchNodeUpgradeSpec=1")

// Test the maxShardReplicasDownSpec
maxshardReplicasUnavailable = intstr.FromInt(2)
podsToUpgrade = getPodNames(pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 6, log))
podsToUpgradeDetailed, podsHaveReplicas = pickPodsToUpdate(solrCloud, allPods, testDownClusterStatus, overseerLeader, 6, log)
assert.Equal(t, podsHaveReplicas, map[string]bool{"foo-solrcloud-2": true, "foo-solrcloud-3": true, "foo-solrcloud-4": true, "foo-solrcloud-6": false})
podsToUpgrade = getPodNames(podsToUpgradeDetailed)
assert.ElementsMatch(t, []string{"foo-solrcloud-2", "foo-solrcloud-3", "foo-solrcloud-4", "foo-solrcloud-6"}, podsToUpgrade, "Incorrect set of next pods to upgrade.")

/*
Expand Down Expand Up @@ -974,7 +980,7 @@ var (
}
)

func getPodNames(pods []corev1.Pod) []string {
func getPodNames(pods []corev1.Pod, ignored ...map[string]bool) []string {
names := make([]string, len(pods))
for i, pod := range pods {
names[i] = pod.Name
Expand Down
10 changes: 5 additions & 5 deletions controllers/util/solr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
SolrCloudPVCTechnology = "solr-cloud"
SolrPVCStorageLabel = "solr.apache.org/storage"
SolrCloudPVCDataStorage = "data"
SolrDataVolumeName = "data"
SolrPVCInstanceLabel = "solr.apache.org/instance"
SolrXmlMd5Annotation = "solr.apache.org/solrXmlMd5"
SolrXmlFile = "solr.xml"
Expand Down Expand Up @@ -134,16 +135,15 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
},
}

solrDataVolumeName := "data"
volumeMounts := []corev1.VolumeMount{{Name: solrDataVolumeName, MountPath: "/var/solr/data"}}
volumeMounts := []corev1.VolumeMount{{Name: SolrDataVolumeName, MountPath: "/var/solr/data"}}

var pvcs []corev1.PersistentVolumeClaim
if solrCloud.UsesPersistentStorage() {
pvc := solrCloud.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.DeepCopy()

// Set the default name of the pvc
if pvc.ObjectMeta.Name == "" {
pvc.ObjectMeta.Name = solrDataVolumeName
pvc.ObjectMeta.Name = SolrDataVolumeName
}

// Set some defaults in the PVC Spec
Expand Down Expand Up @@ -177,7 +177,7 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
}
} else {
ephemeralVolume := corev1.Volume{
Name: solrDataVolumeName,
Name: SolrDataVolumeName,
VolumeSource: corev1.VolumeSource{},
}
if solrCloud.Spec.StorageOptions.EphemeralStorage != nil {
Expand Down Expand Up @@ -380,7 +380,7 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
Value: strings.Join(allSolrOpts, " "),
})

initContainers := generateSolrSetupInitContainers(solrCloud, solrCloudStatus, solrDataVolumeName, security)
initContainers := generateSolrSetupInitContainers(solrCloud, solrCloudStatus, SolrDataVolumeName, security)

// Add user defined additional init containers
if customPodOptions != nil && len(customPodOptions.InitContainers) > 0 {
Expand Down
32 changes: 25 additions & 7 deletions hack/release/smoke_test/test_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ if [[ -z "${KUBERNETES_VERSION:-}" ]]; then
KUBERNETES_VERSION="v1.21.2"
fi
if [[ -z "${SOLR_VERSION:-}" ]]; then
SOLR_VERSION="8.10.0"
SOLR_VERSION="8.11.1"
fi

# If LOCATION is not a URL, then get the absolute path
Expand Down Expand Up @@ -141,12 +141,12 @@ printf "\nInstall a test Solr Cluster\n"
helm install --kube-context "${KUBE_CONTEXT}" --verify example "${SOLR_HELM_CHART}" \
--set replicas=3 \
--set image.tag=${SOLR_VERSION} \
--set solrJavaMem="-Xms1g -Xmx3g" \
--set customSolrKubeOptions.podOptions.resources.limits.memory="1G" \
--set customSolrKubeOptions.podOptions.resources.requests.cpu="300m" \
--set customSolrKubeOptions.podOptions.resources.requests.memory="512Mi" \
--set zookeeperRef.provided.persistence.spec.resources.requests.storage="5Gi" \
--set zookeeperRef.provided.replicas=1 \
--set solrOptions.javaMemory="-Xms1g -Xmx3g" \
--set podOptions.resources.limits.memory="1G" \
--set podOptions.resources.requests.cpu="300m" \
--set podOptions.resources.requests.memory="512Mi" \
--set zk.provided.persistence.spec.resources.requests.storage="5Gi" \
--set zk.provided.replicas=1 \
--set "backupRepositories[0].name=local" \
--set "backupRepositories[0].volume.source.hostPath.path=/tmp/backup"

Expand Down Expand Up @@ -259,6 +259,24 @@ if (( "${FOUND_BACKUP_ID}" != "${LAST_BACKUP_ID}" )); then
exit 1
fi


printf "\nDo a rolling restart and make sure the cluster is healthy afterwards\n"

helm upgrade --kube-context "${KUBE_CONTEXT}" --verify example "${SOLR_HELM_CHART}" --reuse-values \
--set-string podOptions.annotations.restart="true"
printf '\nWait for the rolling restart to begin.\n\n'
grep -q "3 3 3 0" <(exec kubectl get solrcloud example -w); kill $!

printf '\nWait for all 3 Solr nodes to become ready.\n\n'
grep -q "3 3 3 3" <(exec kubectl get solrcloud example -w); kill $!

# Need a new port-forward, since the last one will have broken due to all pods restarting
kubectl port-forward service/example-solrcloud-common 28983:80 || true &
sleep 2

printf "\nQuery the test collection, test for 0 docs\n\n"
curl --silent "http://localhost:28983/solr/smoke-test/select" | grep '\"numFound\":0' > /dev/null

echo "Delete test Kind Kubernetes cluster."
kind delete clusters "${CLUSTER_NAME}"

Expand Down