Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public LogFetcher(
this.scannerMetricGroup = scannerMetricGroup;
this.remoteLogDownloader =
new RemoteLogDownloader(tablePath, conf, remoteFileDownloader, scannerMetricGroup);
remoteLogDownloader.start();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.utils.CloseableRegistry;
import com.alibaba.fluss.utils.FlussPaths;
import com.alibaba.fluss.utils.MapUtils;
import com.alibaba.fluss.utils.concurrent.ShutdownableThread;

import org.slf4j.Logger;
Expand All @@ -43,18 +42,17 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.alibaba.fluss.utils.FileUtils.deleteFileOrDirectory;
import static com.alibaba.fluss.utils.FileUtils.deleteDirectoryQuietly;
import static com.alibaba.fluss.utils.FlussPaths.LOG_FILE_SUFFIX;
import static com.alibaba.fluss.utils.FlussPaths.filenamePrefixFromOffset;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentDir;
import static com.alibaba.fluss.utils.FlussPaths.remoteLogSegmentFile;

Expand All @@ -64,17 +62,14 @@
public class RemoteLogDownloader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RemoteLogDownloader.class);

private static final long POLL_TIMEOUT = 5000L;
private static final long POLL_SLEEP_TIMEOUT = 500L;

private final Path localLogDir;

private final BlockingQueue<RemoteLogDownloadRequest> segmentsToFetch;

private final BlockingQueue<RemoteLogSegment> segmentsToRecycle;

// <log_segment_id -> segment_uuid_path>
private final ConcurrentHashMap<String, Path> fetchedFiles;

private final Semaphore prefetchSemaphore;

private final DownloadRemoteLogThread downloadThread;
Expand All @@ -83,15 +78,15 @@ public class RemoteLogDownloader implements Closeable {

private final ScannerMetricGroup scannerMetricGroup;

private final long pollTimeout;
private final long pollSleepTimeout;

public RemoteLogDownloader(
TablePath tablePath,
Configuration conf,
RemoteFileDownloader remoteFileDownloader,
ScannerMetricGroup scannerMetricGroup) {
// default we give a 5s long interval to avoid frequent loop
this(tablePath, conf, remoteFileDownloader, scannerMetricGroup, POLL_TIMEOUT);
this(tablePath, conf, remoteFileDownloader, scannerMetricGroup, POLL_SLEEP_TIMEOUT);
}

@VisibleForTesting
Expand All @@ -100,13 +95,12 @@ public RemoteLogDownloader(
Configuration conf,
RemoteFileDownloader remoteFileDownloader,
ScannerMetricGroup scannerMetricGroup,
long pollTimeout) {
long pollSleepTimeout) {
this.segmentsToFetch = new LinkedBlockingQueue<>();
this.segmentsToRecycle = new LinkedBlockingQueue<>();
this.fetchedFiles = MapUtils.newConcurrentHashMap();
this.remoteFileDownloader = remoteFileDownloader;
this.scannerMetricGroup = scannerMetricGroup;
this.pollTimeout = pollTimeout;
this.pollSleepTimeout = pollSleepTimeout;
this.prefetchSemaphore =
new Semaphore(conf.getInt(ConfigOptions.CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM));
// The local tmp dir to store the fetched log segment files,
Expand All @@ -116,6 +110,9 @@ public RemoteLogDownloader(
conf.get(ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR),
"remote-logs-" + UUID.randomUUID());
this.downloadThread = new DownloadRemoteLogThread(tablePath);
}

public void start() {
downloadThread.start();
}

Expand All @@ -140,44 +137,55 @@ void recycleRemoteLog(RemoteLogSegment segment) {
* to fetch.
*/
void fetchOnce() throws Exception {
// wait until there is a remote fetch request
RemoteLogDownloadRequest request = segmentsToFetch.poll(pollTimeout, TimeUnit.MILLISECONDS);
if (request == null) {
int availablePermits = prefetchSemaphore.availablePermits();
List<RemoteLogDownloadRequest> requests = new ArrayList<>(availablePermits);
segmentsToFetch.drainTo(requests, availablePermits);
if (requests.size() <= 0) {
Thread.sleep(pollSleepTimeout);
return;
}
// blocks until there is capacity (the fetched file is consumed)
prefetchSemaphore.acquire();

prefetchSemaphore.acquire(requests.size());
try {
// 1. cleanup the finished logs first to free up disk space
cleanupRemoteLogs();

// 2. do the actual download work
FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
Path segmentPath = localLogDir.resolve(request.segment.remoteLogSegmentId().toString());
scannerMetricGroup.remoteFetchRequestCount().inc();
List<FsPathAndFileName> fsPathAndFileNames = new ArrayList<>();
for (RemoteLogDownloadRequest request : requests) {
FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
fsPathAndFileNames.add(fsPathAndFileName);
scannerMetricGroup.remoteFetchRequestCount().inc();
}
// download the remote file to local
LOG.info(
"Start to download remote log segment file {} to local.",
fsPathAndFileName.getFileName());

long startTime = System.currentTimeMillis();
List<String> fileNames =
fsPathAndFileNames.stream()
.map(FsPathAndFileName::getFileName)
.collect(Collectors.toList());
LOG.info(
"Start to download {} remote log segment files {} to local.",
fileNames.size(),
fileNames);
remoteFileDownloader.transferAllToDirectory(
Collections.singletonList(fsPathAndFileName),
segmentPath,
new CloseableRegistry());
fsPathAndFileNames, localLogDir, new CloseableRegistry());
LOG.info(
"Download remote log segment file {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
"Download {} remote log segment files {} to local cost {} ms.",
fileNames.size(),
fileNames,
System.currentTimeMillis() - startTime);
File localFile = new File(segmentPath.toFile(), fsPathAndFileName.getFileName());
scannerMetricGroup.remoteFetchBytes().inc(localFile.length());
String segmentId = request.segment.remoteLogSegmentId().toString();
fetchedFiles.put(segmentId, segmentPath);
request.future.complete(localFile);
for (int i = 0; i < fsPathAndFileNames.size(); i++) {
RemoteLogDownloadRequest request = requests.get(i);
FsPathAndFileName fsPathAndFileName = fsPathAndFileNames.get(i);
File localFile = new File(localLogDir.toFile(), fsPathAndFileName.getFileName());
scannerMetricGroup.remoteFetchBytes().inc(localFile.length());
request.future.complete(localFile);
}
} catch (Throwable t) {
prefetchSemaphore.release();
prefetchSemaphore.release(requests.size());
// add back the request to the queue
segmentsToFetch.add(request);
scannerMetricGroup.remoteFetchErrorCount().inc();
segmentsToFetch.addAll(requests);
scannerMetricGroup.remoteFetchErrorCount().inc(requests.size());
// log the error and continue instead of shutdown the download thread
LOG.error("Failed to download remote log segment.", t);
}
Expand All @@ -191,24 +199,15 @@ private void cleanupRemoteLogs() {
}

private void cleanupFinishedRemoteLog(RemoteLogSegment segment) {
String segmentId = segment.remoteLogSegmentId().toString();
Path segmentPath = fetchedFiles.remove(segmentId);
if (segmentPath != null) {
try {
Path logFile =
segmentPath.resolve(
filenamePrefixFromOffset(segment.remoteLogStartOffset())
+ LOG_FILE_SUFFIX);
Files.deleteIfExists(logFile);
Files.deleteIfExists(segmentPath);
LOG.info(
"Consumed and deleted the fetched log segment file {}/{} for bucket {}.",
segmentPath.getFileName(),
logFile.getFileName(),
segment.tableBucket());
} catch (IOException e) {
LOG.warn("Failed to delete the fetch segment path {}.", segmentPath, e);
}
try {
Path logFile = localLogDir.resolve(getLocalFileNameOfRemoteSegment(segment));
Files.deleteIfExists(logFile);
LOG.info(
"Consumed and deleted the fetched log segment file {} for bucket {}.",
logFile.getFileName(),
segment.tableBucket());
} catch (IOException e) {
LOG.warn("Failed to delete the local fetch segment file {}.", localLogDir, e);
}
}

Expand All @@ -219,11 +218,8 @@ public void close() throws IOException {
} catch (InterruptedException e) {
// ignore
}
// cleanup all downloaded files
for (Path segmentPath : fetchedFiles.values()) {
deleteFileOrDirectory(segmentPath.toFile());
}
fetchedFiles.clear();

deleteDirectoryQuietly(localLogDir.toFile());
}

@VisibleForTesting
Expand All @@ -242,10 +238,23 @@ protected static FsPathAndFileName getFsPathAndFileName(
remoteLogSegmentFile(
remoteLogSegmentDir(remoteLogTabletDir, segment.remoteLogSegmentId()),
segment.remoteLogStartOffset());
return new FsPathAndFileName(
remotePath,
FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
+ LOG_FILE_SUFFIX);
return new FsPathAndFileName(remotePath, getLocalFileNameOfRemoteSegment(segment));
}

/**
* Get the local file name of the remote log segment.
*
* <p>The file name is in pattern:
*
* <pre>
* {$remote_segment_id}_{$offset_prefix}.log
* </pre>
*/
private static String getLocalFileNameOfRemoteSegment(RemoteLogSegment segment) {
return segment.remoteLogSegmentId()
+ "_"
+ FlussPaths.filenamePrefixFromOffset(segment.remoteLogStartOffset())
+ LOG_FILE_SUFFIX;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.utils.CloseableRegistry;
import com.alibaba.fluss.utils.FileUtils;
import com.alibaba.fluss.utils.IOUtils;

Expand All @@ -35,11 +37,13 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Semaphore;

import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
Expand Down Expand Up @@ -72,12 +76,9 @@ void beforeEach() {
scannerMetricGroup = TestingScannerMetricGroup.newInstance();
remoteLogDownloader =
new RemoteLogDownloader(
DATA1_TABLE_PATH,
conf,
remoteFileDownloader,
scannerMetricGroup,
// use a short timout for faster testing
10L);
DATA1_TABLE_PATH, conf, remoteFileDownloader, scannerMetricGroup, 10L);
// trigger auto download.
remoteLogDownloader.start();
}

@AfterEach
Expand All @@ -98,7 +99,7 @@ void testPrefetchNum() throws Exception {
buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 5, conf);
FsPath remoteLogTabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb);
List<RemoteLogDownloadFuture> futures =
requestRemoteLogs(remoteLogTabletDir, remoteLogSegments);
requestRemoteLogs(remoteLogDownloader, remoteLogTabletDir, remoteLogSegments);

// the first 4 segments should success.
retry(
Expand Down Expand Up @@ -134,11 +135,84 @@ void testPrefetchNum() throws Exception {

// test cleanup
remoteLogDownloader.close();
assertThat(FileUtils.listDirectory(localLogDir).length).isEqualTo(0);
assertThat(localLogDir.toFile().exists()).isFalse();
}

@Test
void testParallelismDownloadLog() throws Exception {
class TestRemoteFileDownloader extends RemoteFileDownloader {

private int expectedDownloadFileNum;

public TestRemoteFileDownloader(int threadNum) {
super(threadNum);
}

public void setExpectedDownloadFileNum(int expectedDownloadFileNum) {
this.expectedDownloadFileNum = expectedDownloadFileNum;
}

@Override
public void transferAllToDirectory(
List<FsPathAndFileName> fsPathAndFileNames,
Path targetDirectory,
CloseableRegistry closeableRegistry)
throws IOException {
assertThat(fsPathAndFileNames.size()).isEqualTo(expectedDownloadFileNum);
}
}

TestRemoteFileDownloader testRemoteFileDownloader = new TestRemoteFileDownloader(1);
RemoteLogDownloader remoteLogDownloader2 =
new RemoteLogDownloader(
DATA1_TABLE_PATH,
conf, // max 4 pre-fetch num
testRemoteFileDownloader,
scannerMetricGroup,
10L);

TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
List<RemoteLogSegment> remoteLogSegments =
buildRemoteLogSegmentList(tb, DATA1_PHYSICAL_TABLE_PATH, 10, conf);
FsPath remoteLogTabletDir = remoteLogTabletDir(remoteLogDir, DATA1_PHYSICAL_TABLE_PATH, tb);
List<RemoteLogDownloadFuture> futures =
requestRemoteLogs(remoteLogDownloader2, remoteLogTabletDir, remoteLogSegments);

// 4 to fetch.
Semaphore prefetchSemaphore = remoteLogDownloader2.getPrefetchSemaphore();
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(4);
testRemoteFileDownloader.setExpectedDownloadFileNum(4);
remoteLogDownloader2.fetchOnce();
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(0);

// finish 2 segments.
futures.get(0).getRecycleCallback().run();
futures.get(1).getRecycleCallback().run();

// 2 to fetch.
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(2);
testRemoteFileDownloader.setExpectedDownloadFileNum(2);
remoteLogDownloader2.fetchOnce();
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(0);

// finish 4 segments.
for (int i = 2; i < 6; i++) {
futures.get(i).getRecycleCallback().run();
}

// 4 to fetch.
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(4);
testRemoteFileDownloader.setExpectedDownloadFileNum(4);
remoteLogDownloader2.fetchOnce();
assertThat(prefetchSemaphore.availablePermits()).isEqualTo(0);

remoteLogDownloader.close();
}

private List<RemoteLogDownloadFuture> requestRemoteLogs(
FsPath remoteLogTabletDir, List<RemoteLogSegment> remoteLogSegments) {
RemoteLogDownloader remoteLogDownloader,
FsPath remoteLogTabletDir,
List<RemoteLogSegment> remoteLogSegments) {
List<RemoteLogDownloadFuture> futures = new ArrayList<>();
for (RemoteLogSegment segment : remoteLogSegments) {
RemoteLogDownloadFuture future =
Expand Down