Skip to content
Permalink
Browse files
Streaming chunked data transfer support
  • Loading branch information
DImuthuUpe committed Apr 19, 2022
1 parent 8e3b03a commit e917df5eefa87618d0131605824ed8f0a614d326
Showing 6 changed files with 60 additions and 11 deletions.
@@ -107,6 +107,9 @@ public class MFTAgent implements CommandLineRunner {
@org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
private int chunkedSize;

@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
private boolean doChunkStream;

private final Semaphore mainHold = new Semaphore(0);

private KVCache transferMessageCache;
@@ -139,7 +142,10 @@ public class MFTAgent implements CommandLineRunner {
public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
mediator = new TransportMediator(tempDataDir, concurrentTransfers, concurrentChunkedThreads, chunkedSize);
mediator = new TransportMediator(tempDataDir,
concurrentTransfers,
concurrentChunkedThreads,
chunkedSize, doChunkStream);
transferRequestExecutor = Executors.newFixedThreadPool(concurrentTransfers);
}

@@ -45,14 +45,20 @@ public class TransportMediator {

private String tempDataDir = "/tmp";
private final int chunkedSize;
private final boolean doChunkStreaming;

private final ExecutorService chunkedExecutorService;

public TransportMediator(String tempDataDir, int concurrentTransfers, int concurrentChunkedThreads, int chunkedSize) {
public TransportMediator(String tempDataDir,
int concurrentTransfers,
int concurrentChunkedThreads,
int chunkedSize,
boolean doChunkStreaming) {
this.tempDataDir = tempDataDir;
monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
this.chunkedSize = chunkedSize;
chunkedExecutorService = Executors.newFixedThreadPool(concurrentChunkedThreads);
this.doChunkStreaming = doChunkStreaming;
}

public void transferSingleThread(String transferId,
@@ -115,8 +121,10 @@ public void transferSingleThread(String transferId,
endPos = fileLength;
}

String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
completionService.submit(new ChunkMover(inConnector, outConnector, uploadLength, endPos, chunkIdx, tempFile));

completionService.submit(new ChunkMover(inConnector,
outConnector, uploadLength, endPos, chunkIdx,
transferId, doChunkStreaming));

uploadLength = endPos;
chunkIdx++;
@@ -221,30 +229,38 @@ public void destroy() {
monitorPool.shutdown();
}

private static class ChunkMover implements Callable<Integer> {
private class ChunkMover implements Callable<Integer> {

IncomingChunkedConnector downloader;
OutgoingChunkedConnector uploader;
long startPos;
long endPos;
int chunkIdx;
String tempFile;
String transferId;
boolean useStreaming;

public ChunkMover(IncomingChunkedConnector downloader, OutgoingChunkedConnector uploader, long startPos,
long endPos, int chunkIdx, String tempFile) {
long endPos, int chunkIdx, String transferId, boolean useStreaming) {
this.downloader = downloader;
this.uploader = uploader;
this.startPos = startPos;
this.endPos = endPos;
this.chunkIdx = chunkIdx;
this.tempFile = tempFile;
this.transferId = transferId;
this.useStreaming = useStreaming;
}

@Override
public Integer call() throws Exception {
downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
new File(tempFile).delete();
if (useStreaming) {
InputStream inputStream = downloader.downloadChunk(chunkIdx, startPos, endPos);
uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
} else {
String tempFile = tempDataDir + File.separator + transferId + "-" + chunkIdx;
downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
new File(tempFile).delete();
}
return chunkIdx;
}
}
@@ -4,4 +4,5 @@

public interface IncomingChunkedConnector extends BasicConnector {
public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception;
public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception;
}
@@ -4,4 +4,5 @@

public interface OutgoingChunkedConnector extends BasicConnector {
public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception;
public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception;
}
@@ -93,6 +93,16 @@ public void downloadChunk(int chunkId, long startByte, long endByte, String down
logger.debug("Downloaded S3 chunk to path {} for resource id {}", downloadFile, resource.getResourceId());
}

@Override
public InputStream downloadChunk(int chunkId, long startByte, long endByte) throws Exception {
GetObjectRequest rangeObjectRequest = new GetObjectRequest(resource.getS3Storage().getBucketName(),
resource.getFile().getResourcePath());
rangeObjectRequest.setRange(startByte, endByte - 1);
logger.debug("Fetching input stream for chunk {} in resource {}", chunkId, resource.getResourceId());
S3Object object = s3Client.getObject(rangeObjectRequest);
return object.getObjectContent();
}

@Override
public void complete() throws Exception {

@@ -96,6 +96,21 @@ public void uploadChunk(int chunkId, long startByte, long endByte, String upload
logger.debug("Uploaded S3 chunk to path {} for resource id {}", uploadFile, resource.getResourceId());
}

@Override
public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {
UploadPartRequest uploadRequest = new UploadPartRequest()
.withBucketName(resource.getS3Storage().getBucketName())
.withKey(resource.getFile().getResourcePath())
.withUploadId(initResponse.getUploadId())
.withPartNumber(chunkId + 1)
.withFileOffset(0)
.withInputStream(inputStream)
.withPartSize(endByte - startByte);

UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
this.partETags.add(uploadResult.getPartETag());
logger.debug("Uploaded S3 chunk {} for resource id {} using stream", chunkId, resource.getResourceId());
}

@Override
public void complete() throws Exception {

0 comments on commit e917df5

Please sign in to comment.