Skip to content

Commit

Permalink
[YUNIKORN-2515] Add property event.RESTResponseSize to the batch even…
Browse files Browse the repository at this point in the history
…t handler (#886)

Closes: #886

Signed-off-by: Peter Bacsko <pbacsko@cloudera.com>
  • Loading branch information
pbacsko committed Jun 12, 2024
1 parent 505a509 commit a786feb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/common/configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
CMEventRingBufferCapacity = PrefixEvent + "ringBufferCapacity" // Ring Buffer Capacity
CMMaxEventStreams = PrefixEvent + "maxStreams"
CMMaxEventStreamsPerHost = PrefixEvent + "maxStreamsPerHost"
CMRESTResponseSize = PrefixEvent + "RESTResponseSize"

// defaults
DefaultHealthCheckInterval = 30 * time.Second
Expand All @@ -46,6 +47,7 @@ const (
DefaultEventRingBufferCapacity = 100000
DefaultMaxStreams = uint64(100)
DefaultMaxStreamsPerHost = uint64(15)
DefaultRESTResponseSize = uint64(10000)
)

var ConfigContext *SchedulerConfigContext
Expand Down
27 changes: 24 additions & 3 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/julienschmidt/httprouter"
Expand Down Expand Up @@ -65,6 +66,7 @@ const (
var allowedActiveStatusMsg string
var allowedAppActiveStatuses map[string]bool
var streamingLimiter *StreamingLimiter
var maxRESTResponseSize atomic.Uint64

func init() {
allowedAppActiveStatuses = make(map[string]bool)
Expand All @@ -83,6 +85,22 @@ func init() {
allowedActiveStatusMsg = fmt.Sprintf("Only following active statuses are allowed: %s", strings.Join(activeStatuses, ","))

streamingLimiter = NewStreamingLimiter()

configs.AddConfigMapCallback("rest-response-size", func() {
newSize := common.GetConfigurationUint(configs.GetConfigMap(), configs.CMRESTResponseSize, configs.DefaultRESTResponseSize)
if newSize == 0 {
log.Log(log.REST).Warn("Illegal value `0` for config key, using default",
zap.String("key", configs.CMRESTResponseSize),
zap.Uint64("default", configs.DefaultRESTResponseSize))
newSize = configs.DefaultRESTResponseSize
}

log.Log(log.REST).Info("Reloading max REST event response size setting",
zap.Uint64("current", maxRESTResponseSize.Load()),
zap.Uint64("new", newSize))
maxRESTResponseSize.Store(newSize)
})
maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
}

func getStackInfo(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1117,22 +1135,25 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
return
}

count := uint64(10000)
var start uint64

maxCount := maxRESTResponseSize.Load()
count := maxCount
if countStr := r.URL.Query().Get("count"); countStr != "" {
var err error
count, err = strconv.ParseUint(countStr, 10, 64)
if err != nil {
buildJSONErrorResponse(w, err.Error(), http.StatusBadRequest)
return
}
if count > maxCount {
count = maxCount
}
if count == 0 {
buildJSONErrorResponse(w, `0 is not a valid value for "count"`, http.StatusBadRequest)
return
}
}

var start uint64
if startStr := r.URL.Query().Get("start"); startStr != "" {
var err error
start, err = strconv.ParseUint(startStr, 10, 64)
Expand Down
33 changes: 33 additions & 0 deletions pkg/webservice/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,11 @@ func TestGetEvents(t *testing.T) {
checkIllegalBatchRequest(t, "count=0", `0 is not a valid value for "count`)
checkIllegalBatchRequest(t, "start=xyz", `strconv.ParseUint: parsing "xyz": invalid syntax`)
checkIllegalBatchRequest(t, "start=-100", `strconv.ParseUint: parsing "-100": invalid syntax`)

// "count" too high
maxRESTResponseSize.Store(1)
defer maxRESTResponseSize.Store(configs.DefaultRESTResponseSize)
checkSingleEvent(t, appEvent, "count=3")
}

func TestGetEventsWhenTrackingDisabled(t *testing.T) {
Expand Down Expand Up @@ -2681,6 +2686,34 @@ func TestGetPartitionRuleHandler(t *testing.T) {
assert.Equal(t, partitionRules[3].Name, types.Recovery)
}

func TestSetMaxRESTResponseSize(t *testing.T) {
current := configs.GetConfigMap()
defer configs.SetConfigMap(current)

configs.SetConfigMap(map[string]string{
configs.CMRESTResponseSize: "1234",
})
assert.Equal(t, uint64(1234), maxRESTResponseSize.Load())

configs.SetConfigMap(map[string]string{})
assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())

configs.SetConfigMap(map[string]string{
configs.CMRESTResponseSize: "xyz",
})
assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())

configs.SetConfigMap(map[string]string{
configs.CMRESTResponseSize: "0",
})
assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())

configs.SetConfigMap(map[string]string{
configs.CMRESTResponseSize: "-1",
})
assert.Equal(t, uint64(10000), maxRESTResponseSize.Load())
}

type ResponseRecorderWithDeadline struct {
*httptest.ResponseRecorder
setWriteFails bool
Expand Down

0 comments on commit a786feb

Please sign in to comment.