-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split write request at field boundary #8167
Conversation
…ger than max allowed size Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly nitpick. The only thing I'm not sure about is copying the buffers - I feel like that would eat up most of the benefit we get from the custom binary decoding
return marshalWriteRequestsToRecords(tenantID, subrequests) | ||
} | ||
|
||
func marshalWriteRequestsToRecords(tenantID string, reqs [][]byte) ([]*kgo.Record, int, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick on naming: this doesn't marshal the requests it just creates records from slices of bytes
var ( | ||
remaining = atomic.NewInt64(int64(len(records))) | ||
done = make(chan struct{}) | ||
firstErrMx sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you may be able to use atomic.Error here instead. Or a channel of errors
// into subrequests with given max size. | ||
// | ||
// This function partially parses WriteRequest and splits the request at field boundaries. | ||
// Some fields (source, skipLabelNameValidation) are copied into each returned subrequests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if we don't need some protection against adding new fields in the WriteRequest which won't be copied over. Something like fuzzing the WriteRequest and checking if the split request contains everything but the Timeseries and Metadata. Or maybe just a test which fails if there are new fields in WriteRequest; that will force whoever adds the fields to also look at this function. WDYT?
func putUvarintWithExpectedLength(buf []byte, val uint64, expLength int) int { | ||
n := binary.PutUvarint(buf, val) | ||
if n != expLength { | ||
panic(fmt.Sprintf("expected to write %d bytes, got %d", expLength, n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Sprintf makes the function just complex enough so it's not inlined. I'm not sure if it's worth simplifying at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Sprintf makes the function just complex enough so it's not inlined.
curious how did you find that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have this external tool in GoLand
![Screenshot 2024-05-29 at 17 15 50](https://private-user-images.githubusercontent.com/21020035/334857759-f1f306d7-6884-4d69-a2ad-7b7f14415458.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTk3NjM3NDUsIm5iZiI6MTcxOTc2MzQ0NSwicGF0aCI6Ii8yMTAyMDAzNS8zMzQ4NTc3NTktZjFmMzA2ZDctNjg4NC00ZDY5LWEyYWQtN2I3ZjE0NDE1NDU4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MzAlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjMwVDE2MDQwNVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNiY2IyYmUxNDc2YTY2YTRiMWFkNzM1ODkwNjg5ZTA2Zjk3ODA1ODA1NjJlODNlNWY1NzYxZjJmNzAwZWJmM2UmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.k8ZNjUlBphOyyHvKwj_7t1B0ySqLTx_BIdfVYCiSprw)
the source of go-escape-analysis.sh
is this
PKG_PATH=$1
FILE_NAME=$2
FILEPATH_WIDTH="$( echo -n "$PKG_PATH/$FILE_NAME" | wc -c )"
KEY_START=$(( $FILEPATH_WIDTH + 2 ))
go build -gcflags='-m=2' "./$PKG_PATH" 2>&1 \
| grep "$PKG_PATH/$FILE_NAME" \
| sort -n -k1.$KEY_START -s
then the output looks like this
credit to @colega for sharing this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice :) Thanks for sharing!
pkg/mimirpb/split.go
Outdated
if decodedLength <= 0 || decodedLength > math.MaxInt32 { | ||
return nil, fmt.Errorf("invalid decoded length: %d", decodedLength) | ||
} | ||
if len(writeRequest) < int(decodedLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here do we want to check if the rest of the buffer is smaller than decodedLength? The existing if
includes the tag size and the length size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for catching this, added test.
|
||
if extraBytes > 0 { | ||
for ix := range subrequests { | ||
// Clone subrequest before appending bytes to it. Here we could use a buffer pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not super sure about this. It will double allocations right when we're receiving large requests in the first place.
Instead when we are marshalling the original request can we provide a larger slice: req.MarshalToSizedBuffer()
. And then at every new subrequest we'd leave enough capacity in the slice to fit in one source
and one skipLabelName...
. When we're done splitting the requests we go back over the slice of subrequests and append the extra bytes if they are necessary. This way we still do the 2x copying, but at least save on the allocations.
Am I overcomplicating it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To put it another way - how much cheaper is to do this binary hacks+double allocation+copying vs just creating multiple WriteRequest
s and calling Marshall()
on each of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from Slack: Note that remote_write clients only ever writes field=1 or 3. source
is our internal field, and we only set it to non-default values from rulers (default value, API=0, is NOT serialized). Similarly, skipLabelNameValidation
is our internal field, set by our enterprise proxies. That means that in vast majority of cases, this extra copying will never happen.
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
What this PR does
This PR implements splitting of WriteRequest by parsing marshalled WriteRequest and splitting it at field boundary.
This is alternative to #8077, implementing idea from comment #8077 (review).
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.