Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18543. AliyunOSSFileSystem#open(Path path, int bufferSize) use buffer size as its downloadPartSize #5172

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,21 @@ private void validatePath(Path path) throws IOException {
} while (fPart != null);
}

@Override
public FSDataInputStream open(Path path) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}

return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}

@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
Expand All @@ -593,7 +608,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
" because it is a directory");
}

return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
return new FSDataInputStream(new AliyunOSSInputStream(bufferSize,
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.*;

/**
Expand All @@ -57,18 +58,21 @@ public class AliyunOSSInputStream extends FSInputStream {
private ExecutorService readAheadExecutorService;
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();

public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
public AliyunOSSInputStream(
long downloadPartSize,
ExecutorService readAheadExecutorService,
int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store,
String key,
Long contentLength,
Statistics statistics) throws IOException {
this.readAheadExecutorService =
MoreExecutors.listeningDecorator(readAheadExecutorService);
MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
this.downloadPartSize = Math.max(downloadPartSize, IO_FILE_BUFFER_SIZE_DEFAULT);
this.maxReadAheadPartNumber = maxReadAheadPartNumber;

this.expectNextPos = 0;
Expand All @@ -77,6 +81,18 @@ public AliyunOSSInputStream(Configuration conf,
closed = false;
}

public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this(conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
readAheadExecutorService, maxReadAheadPartNumber, store, key, contentLength, statistics);
}

long getDownloadPartSize() {
return downloadPartSize;
}

/**
* Reopen the wrapped stream at give position, by seeking for
* data of a part length from object content stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -108,6 +112,31 @@ public void testSeekFile() throws Exception {
IOUtils.closeStream(instream);
}

@Test
public void testConfiguration() throws IOException {
Path configurationFile = setPath("/test/configurationFile.txt");
long size = 5 * 1024 * 1024;

ContractTestUtils.generateTestFile(this.fs, configurationFile, size, 256, 255);
LOG.info("5MB file created: configurationFile.txt");

FSDataInputStream instream = this.fs.open(configurationFile);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
AliyunOSSInputStream wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
fs.getConf().getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);

instream = this.fs.open(configurationFile, 1024);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
1024,
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);
}

@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
Expand Down