Skip to content

Commit

Permalink
Allow ByteStream write restart (#635)
Browse files Browse the repository at this point in the history
The ByteStreamServer write was amended to enable resuming of sessions.  However,
Reclient just starts from the begining again.  This would be fine, except we
expect it to resume.  Add in a check to see if the first message is the beginning
(i.e. its write_offset is zero) and then force a new IdleStream to be created for
it.
  • Loading branch information
chrisstaite-menlo authored Jan 29, 2024
1 parent 3037a66 commit 3fabbaa
Show file tree
Hide file tree
Showing 2 changed files with 345 additions and 16 deletions.
73 changes: 57 additions & 16 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -196,14 +197,28 @@ impl ByteStreamServer {
store: Arc<dyn Store>,
digest: DigestInfo,
) -> Result<ActiveStreamGuard<'_>, Error> {
let mut active_uploads = self.active_uploads.lock();
if let Some(maybe_idle_stream) = active_uploads.get_mut(&uuid) {
if let Some(idle_stream) = maybe_idle_stream.1.take() {
info!("Joining existing stream {uuid}");
return Ok(idle_stream.into_active_stream(maybe_idle_stream.0.clone(), self));
let (uuid, bytes_received) = match self.active_uploads.lock().entry(uuid) {
Entry::Occupied(mut entry) => {
let maybe_idle_stream = entry.get_mut();
let Some(idle_stream) = maybe_idle_stream.1.take() else {
return Err(make_input_err!("Cannot upload same UUID simultaneously"));
};
let bytes_received = maybe_idle_stream.0.clone();
info!("Joining existing stream {}", entry.key());
return Ok(idle_stream.into_active_stream(bytes_received, self));
}
return Err(make_input_err!("Cannot upload same UUID simultaneously"));
}
Entry::Vacant(entry) => {
let bytes_received = Arc::new(AtomicU64::new(0));
let uuid = entry.key().clone();
// Our stream is "in use" if the key is in the map, but the value is None.
entry.insert((bytes_received.clone(), None));
(uuid, bytes_received)
}
};

// Important: Do not return an error from this point onwards without
// removing the entry from the map, otherwise that UUID becomes
// unusable.

let (tx, rx) = make_buf_channel_pair();
let store_update_fut = Box::pin(async move {
Expand All @@ -218,9 +233,6 @@ impl ByteStreamServer {
)
.await
});
let bytes_received = Arc::new(AtomicU64::new(0));
// Our stream is "in use" if the key is in the map, but the value is None.
active_uploads.insert(uuid.clone(), (bytes_received.clone(), None));
Ok(ActiveStreamGuard {
stream_state: Some(StreamState {
tx,
Expand Down Expand Up @@ -390,17 +402,46 @@ impl ByteStreamServer {
// Code path for received chunk of data.
Ok(Some(write_request)) => write_request,
};
if write_request.write_offset as u64 != tx.get_bytes_written() {

if write_request.write_offset < 0 {
return Err(make_input_err!(
"Received out of order data. Got {}, expected {}",
write_request.write_offset,
tx.get_bytes_written()
"Invalid negative write offset in write request: {}",
write_request.write_offset
));
}
let write_offset = write_request.write_offset as u64;

// If we get duplicate data because a client didn't know where
// it left off from, then we can simply skip it.
let data = if write_offset < tx.get_bytes_written() {
if (write_offset + write_request.data.len() as u64) < tx.get_bytes_written() {
if write_request.finish_write {
return Err(make_input_err!(
"Resumed stream finished at {} bytes when we already received {} bytes.",
write_offset + write_request.data.len() as u64,
tx.get_bytes_written()
));
}
continue;
}
write_request
.data
.slice((tx.get_bytes_written() - write_offset) as usize..)
} else {
if write_offset != tx.get_bytes_written() {
return Err(make_input_err!(
"Received out of order data. Got {}, expected {}",
write_offset,
tx.get_bytes_written()
));
}
write_request.data
};

// Do not process EOF or weird stuff will happen.
if !write_request.data.is_empty() {
if !data.is_empty() {
// We also need to process the possible EOF branch, so we can't early return.
if let Err(mut err) = tx.send(write_request.data).await {
if let Err(mut err) = tx.send(data).await {
err.code = Code::Internal;
return Err(err);
}
Expand Down
Loading

0 comments on commit 3fabbaa

Please sign in to comment.