Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): retry errors from last recv on uploads #9616

Merged
merged 4 commits into from Mar 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 14 additions & 8 deletions storage/grpc_client.go
Expand Up @@ -1872,6 +1872,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Send a request with as many bytes as possible.
// Loop until all bytes are sent.
sendBytes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool but I've literally never seen this in Go code before. Can you add a clarifying comment? It's just pretty rare syntax.

for {
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
Expand Down Expand Up @@ -1949,10 +1950,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// we retry.
w.stream = nil

// Drop the stream reference as a new one will need to be created if
// we can retry the upload
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
Expand All @@ -1966,7 +1963,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
continue sendBytes
}
}
if err != nil {
Expand All @@ -1981,7 +1978,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
if recvd-sent > 0 {
continue
continue sendBytes
}

// The buffer has been uploaded and there is still more data to be
Expand Down Expand Up @@ -2012,7 +2009,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Drop the stream reference as a new one will need to be created.
w.stream = nil

continue
continue sendBytes
}
if err != nil {
return nil, 0, err
Expand All @@ -2022,7 +2019,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
continue sendBytes
}
} else {
// If the object is done uploading, close the send stream to signal
Expand All @@ -2042,6 +2039,15 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)
w.stream = nil
continue sendBytes
}
if err != nil {
return nil, 0, err
}
Expand Down