Skip to content

Commit

Permalink
Merge 03d6309 into 219b787
Browse files Browse the repository at this point in the history
  • Loading branch information
nvhoang committed Dec 3, 2021
2 parents 219b787 + 03d6309 commit 8d350b3
Showing 1 changed file with 12 additions and 6 deletions.
Expand Up @@ -127,6 +127,9 @@ public Optional<AgentFileResource> getResource(
final URI uri,
@Nullable final HttpRange range
) {
if (range == null) {
log.warn("Attempting to stream file with no range: {} of job: {}", relativePath, jobId);
}
log.debug("Attempting to stream file: {} of job: {}", relativePath, jobId);
final Optional<DirectoryManifest> optionalManifest = this.getManifest(jobId);

Expand Down Expand Up @@ -423,7 +426,8 @@ private synchronized void reapStalledTransfers() {
transfer.lastAckTimestamp.plus(this.properties.getStalledTransferTimeout());
if (now.isAfter(deadline)) {
stalledTrasfersCounter.incrementAndGet();
log.warn("Transfer {} is stalled, shutting it down", transferId);
log.warn("Transfer {} is stalled of job {}, shutting it down", transferId,
entry.getValue().jobId);
final TimeoutException exception = new TimeoutException("Transfer not making progress");
// Shut down stream, if one was associated to this transfer
final AgentFileChunkObserver observer = transfer.getAgentFileChunkObserver();
Expand Down Expand Up @@ -484,7 +488,7 @@ private synchronized FileTransfer startFileTransfer(
endOffset = 1 + range.getRangeEnd(fileSize);
}

log.debug("Transfer {} effective range {}-{}: ", fileTransferId, startOffset, endOffset);
log.debug("Transfer {} effective range {}-{}: of job: {} ", fileTransferId, startOffset, endOffset, jobId);

// Allocate and park the buffer that will store the data in transit.
final StreamBuffer buffer = new StreamBuffer(startOffset);
Expand All @@ -503,16 +507,16 @@ private synchronized FileTransfer startFileTransfer(
this.transferSizeDistribution.record(endOffset - startOffset);

if (endOffset - startOffset == 0) {
log.debug("Transfer {} is empty, completing", fileTransferId);
log.debug("Transfer {} is empty, completing of job: {}", fileTransferId, jobId);
// When requesting an empty file (or a range of 0 bytes), short-circuit and just return an empty
// buffer, without tracking it as active transfer.
buffer.closeForCompleted();
} else {
log.debug("Tracking new transfer {}", fileTransferId);
log.debug("Tracking new transfer {} of job: {}", fileTransferId, jobId);
// Expecting some data. Track this stream and its buffer so incoming chunks can be appended.
this.activeTransfers.put(fileTransferId, fileTransfer);

log.debug("Requesting start of transfer {}", fileTransferId);
log.debug("Requesting start of transfer {} of job: {}", fileTransferId, jobId);
// Request file over control channel
try {
this.controlStreamsManager.requestFile(
Expand Down Expand Up @@ -648,12 +652,13 @@ private void writeDataAndAck(final FileTransfer fileTransfer, final ByteString d
}
} catch (IllegalStateException e) {
// Eventually retries will stop because the transfer times out due to lack of progress
log.warn("Buffer of transfer {} is closed", fileTransferId);
log.warn("Buffer of transfer {} of job {} is closed", fileTransferId, fileTransfer.jobId);
}
}
}

private static final class FileTransfer {
private final String jobId;
@Getter
private final String transferId;
private final StreamBuffer buffer;
Expand All @@ -672,6 +677,7 @@ private FileTransfer(
final long fileSize,
final StreamBuffer buffer
) {
this.jobId = jobId;
this.transferId = transferId;
this.buffer = buffer;
this.lastAckTimestamp = Instant.now();
Expand Down

0 comments on commit 8d350b3

Please sign in to comment.