-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prepare downscale webhook #47
Prepare downscale webhook #47
Conversation
Signed-off-by: JordanRushing <rushing.jordan@gmail.com>
…oroutines where the POST errors Signed-off-by: JordanRushing <rushing.jordan@gmail.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
👋 Thank you for taking this task. Did a quick first review, and have some feedback:
|
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pkg/admission/prep_downscale_test.go
Outdated
DownScalePort string | ||
} | ||
|
||
func testPrepDownscaleWebhook(t *testing.T, oldReplicas, newReplicas, httpStatusCode int, allowed bool, podsPrepared bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we write an integration test please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this PR is getting bigger and bigger I propose to do this in a separate PR.
pkg/admission/prep_downscale.go
Outdated
// Since it's a downscale, check if the resource has the label that indicates it's ready to be prepared to be downscaled. | ||
// Create a slice of endpoint addresses for pods to send HTTP post requests to and to fail if any don't return 200 | ||
if lbls[PrepDownscaleLabelKey] == PrepDownscaleLabelValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure it's not a dry run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been added.
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pkg/admission/annotations.go
Outdated
client := api.AppsV1().StatefulSets(namespace) | ||
sts, err := client.Get(ctx, stsName, v1.GetOptions{}) | ||
if err != nil { | ||
return err | ||
} | ||
annotations := sts.GetAnnotations() | ||
if annotations == nil { | ||
annotations = map[string]string{} | ||
} | ||
annotations[LastDownscaleAnnotationKey] = time.Now().UTC().Format(time.RFC3339) | ||
sts.SetAnnotations(annotations) | ||
|
||
_, err = client.Update(ctx, sts, v1.UpdateOptions{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this can trigger a race condition (something updating sts in between our read & update). Can we do a Patch()
operation here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch does work better here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking very good! Thank you for addressing all my feedback. Left one last comment about updating operation, but otherwise LGTM!
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, just a few questions/suggestions
pkg/admission/prep_downscale.go
Outdated
err := errors.New("HTTP post request returned non-2xx status code") | ||
body, readError := io.ReadAll(resp.Body) | ||
defer resp.Body.Close() | ||
level.Error(logger).Log("msg", err, "status", resp.StatusCode, "response_body", body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how large can be the response_body? depending on the size we might prefer to not log it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Loki and Mimir at the moment it will be empty. They will log the error in more detail so you have to look there.
pkg/admission/prep_downscale.go
Outdated
for i := 0; i < int(diff); i++ { | ||
index := int(*oldReplicas) - i - 1 // nr in statefulset | ||
eps[i].url = fmt.Sprintf("%v-%v.%v.%v.svc.cluster.local:%s/ingester/%s", | ||
ar.Request.Name, // pod name | ||
index, | ||
ar.Request.Name, // svc name | ||
ar.Request.Namespace, | ||
port, | ||
path, | ||
) | ||
eps[i].index = index | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?
for i := 0; i < int(diff); i++ { | |
index := int(*oldReplicas) - i - 1 // nr in statefulset | |
eps[i].url = fmt.Sprintf("%v-%v.%v.%v.svc.cluster.local:%s/ingester/%s", | |
ar.Request.Name, // pod name | |
index, | |
ar.Request.Name, // svc name | |
ar.Request.Namespace, | |
port, | |
path, | |
) | |
eps[i].index = index | |
} | |
for i := int(*newReplicas); i < int(*oldReplicas); i++ { | |
eps[i].url = fmt.Sprintf("%v-%v.%v.%v.svc.cluster.local:%s/ingester/%s", | |
ar.Request.Name, // pod name | |
index, | |
ar.Request.Name, // svc name | |
ar.Request.Namespace, | |
port, | |
path, | |
) | |
eps[i].index = index | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't define a field called index. Do you mean using i
instead of index
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, mostly because iterating from 0 to diff and evaluating index as oldreplicas - i - 1
, which is a little confusing, just iterate from newReplicas to oldReplicas.
pkg/admission/prep_downscale.go
Outdated
err := errors.New("HTTP post request returned non-2xx status code") | ||
body, readError := io.ReadAll(resp.Body) | ||
defer resp.Body.Close() | ||
level.Error(logger).Log("msg", err, "status", resp.StatusCode, "response_body", body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT of having a dedicated "err" field for the err? Like:
("msg", "non-2xx received", "err", err, ...)
motiavtion is: it would allow us of searching for errors precisely by doing {mystream} |= "err="
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that makes it consistent with the other error log messages as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is getting close to being merged. Remember to add an entry to the CHANGELOG (and sorry for the previous dependency update conflict!)
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
The CHANGELOG has been updated as well as the README.md. The e2e tests were fixed. However in the CI the last test fails occasionally. I'm not sure how to fix it at the moment. |
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
The |
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pkg/admission/prep_downscale.go
Outdated
reviewResponse := v1.AdmissionResponse{ | ||
Allowed: false, | ||
Result: &metav1.Status{ | ||
Message: fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because the %v label is not set or empty.", ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, PrepDownscalePortKey), | ||
}, | ||
} | ||
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v label is not set or empty", PrepDownscalePortKey)) | ||
return &reviewResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: during this last review, I found it hard to follow the code with all the noise of these denied responses. I would consider not introducing a var for them, and defining a method that would format the Allowed: false
response, like this:
reviewResponse := v1.AdmissionResponse{ | |
Allowed: false, | |
Result: &metav1.Status{ | |
Message: fmt.Sprintf("downscale of %s/%s in %s from %d to %d replicas is not allowed because the %v label is not set or empty.", ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, PrepDownscalePortKey), | |
}, | |
} | |
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v label is not set or empty", PrepDownscalePortKey)) | |
return &reviewResponse | |
level.Warn(logger).Log("msg", fmt.Sprintf("downscale not allowed because the %v label is not set or empty", PrepDownscalePortKey)) | |
return deny("downscale of %s/%s in %s from %d to %d replicas is not allowed because the %v label is not set or empty.", ar.Request.Resource.Resource, ar.Request.Name, ar.Request.Namespace, *oldReplicas, *newReplicas, PrepDownscalePortKey) | |
// ... | |
// deny returns a *v1.AdmissionResponse with Allowed: false and the message provided formatted with as in fmt.Sprintf. | |
func deny(msg, args ...any) *v1.AdmissionResponse { | |
return v1.AdmissionResponse{ | |
Allowed: false, | |
Result: &metav1.Status{ | |
Message: fmt.Sprintf(msg, args...), | |
}, | |
} | |
} |
IMO this would make this entire method shorter and easier to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. The function was added manually.
pkg/admission/prep_downscale.go
Outdated
if foundSts != nil { | ||
reviewResponse := v1.AdmissionResponse{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fail if this sts was downscaled few moments ago? Can we downscale again the same statefulset? I'm trying to think of corner cases that would bite us here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Loki we set the stabilizationWindowSeconds
in the Keda ScaledObject (which is passed to the HPA). This makes sure there is a certain time between downscales. If this is set big enough the other statefulsets should have time to finish their downscales at which point another scale down should be possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about a human operator having to do something manually. However, a human can just delete the annotation as a break-glass mechanism so it's ok.
Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Thank you! |
This PR follows #25 and implements a
prep_downscale
mutating admission webhook that, after receiving a request that scales down a resource, sends a HTTP POST to pods in a StatefulSet/Deployment/ReplicaSet that are labeled withgrafana.com/prep-downscale: true
.The endpoint targets for the requests are crafted by extracting and combining the
grafana.com/prep-downscale-http-path
andgrafana.com/prep-downscale-http-port
labels along with the diff in the number of replicas.An example scenario for when this is useful:
prep_shutdown
HTTP endpoint that tells them to expect to be terminated on the next SIGTERM and manage their state appropriatelyprep_shutdown
HTTP endpoints on relevant pods, and allows the admission request thus resulting in the pods being deleted gracefully