Skip to content

Commit

Permalink
fix(storage): retry errors from last recv on uploads (#9616)
Browse files Browse the repository at this point in the history
* fix(storage): retry errors from last recv on uploads

* comment
  • Loading branch information
BrennaEpp committed Mar 21, 2024
1 parent 26c7ce1 commit b6574aa
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions storage/grpc_client.go
Expand Up @@ -1872,6 +1872,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Send a request with as many bytes as possible.
// Loop until all bytes are sent.
sendBytes: // label this loop so that we can use a continue statement from a nested block
for {
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
Expand Down Expand Up @@ -1949,10 +1950,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// we retry.
w.stream = nil

// Drop the stream reference as a new one will need to be created if
// we can retry the upload
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received.
Expand All @@ -1966,7 +1963,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
continue sendBytes
}
}
if err != nil {
Expand All @@ -1981,7 +1978,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
if recvd-sent > 0 {
continue
continue sendBytes
}

// The buffer has been uploaded and there is still more data to be
Expand Down Expand Up @@ -2012,7 +2009,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Drop the stream reference as a new one will need to be created.
w.stream = nil

continue
continue sendBytes
}
if err != nil {
return nil, 0, err
Expand All @@ -2022,7 +2019,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
continue sendBytes
}
} else {
// If the object is done uploading, close the send stream to signal
Expand All @@ -2042,6 +2039,15 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if shouldRetry(err) {
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)
w.stream = nil
continue sendBytes
}
if err != nil {
return nil, 0, err
}
Expand Down

0 comments on commit b6574aa

Please sign in to comment.