Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-13560 block output streams #130

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
f9ead51
HADOOP-13560: squashl merge of the block output code
steveloughran Sep 23, 2016
9b826e0
HADOOP-1356 address most yetus complaints from javac and checkstyle
steveloughran Sep 24, 2016
9fbc74b
Patch 005. Moved all the operations in the block output stream which …
steveloughran Sep 26, 2016
7fafc95
HADOOP-13560. Lots of work on improving many-GB test runs, including:…
steveloughran Sep 27, 2016
abf6929
HADOOP-13560 fix an NPE in a debug log statement for close-with-no-a…
steveloughran Sep 27, 2016
af31bd2
HADOOP-13560 address chris's initial comments
steveloughran Sep 28, 2016
19da69f
HADOOP-13560 - PUT wasn't setting block size; AWS SDK was (a) buffer…
steveloughran Sep 29, 2016
0b76140
HADOOP-13560: Address chris's comments
steveloughran Sep 30, 2016
bb7ef85
HADOOP-13560 adding active limit to output of each thread
steveloughran Oct 5, 2016
275cfe0
HADOOP-13560: use fs.s3a.fast.upload as the switch to enable the new …
steveloughran Oct 5, 2016
0c0b676
HADOOP-13560: chris's comments of October 5: minor code tweaks and im…
steveloughran Oct 6, 2016
678325c
HADOOP-13560: use <a name=""> to correctly tag anchors
steveloughran Oct 6, 2016
319ccd5
HADOOP-13560 tuning docs of setting huge filesize in tests
steveloughran Oct 7, 2016
0397517
Patch 011; address checkstyle warnings as well as can be done.
steveloughran Oct 10, 2016
a4264e7
HADOOP-13560 patch 14; address comments on the PR
steveloughran Oct 13, 2016
726876d
HADOOP-13560 patch 015; address thomas and pietr's comments
steveloughran Oct 14, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1003,8 +1003,8 @@
<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
<description> Maximum number of concurrent active (part)uploads,
which each use a thread from the threadpool.</description>
<description>The total number of threads available in the filesystem for data
uploads *or any other queued filesystem operation*.</description>
</property>

<property>
Expand All @@ -1017,8 +1017,7 @@
<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
<description>Number of (part)uploads allowed to the queue before
blocking additional uploads.</description>
<description>The number of operations which can be queued for execution</description>
</property>

<property>
Expand Down Expand Up @@ -1056,13 +1055,21 @@
<name>fs.s3a.multipart.purge</name>
<value>false</value>
<description>True if you want to purge existing multipart uploads that may not have been
completed/aborted correctly</description>
completed/aborted correctly. The corresponding purge age is defined in
fs.s3a.multipart.purge.age.
If set, when the filesystem is instantiated then all outstanding uploads
older than the purge age will be terminated -across the entire bucket.
This will impact multipart uploads by other applications and users. so should
be used sparingly, with an age value chosen to stop failed uploads, without
breaking ongoing operations.
</description>
</property>

<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
<description>Minimum age in seconds of multipart uploads to purge</description>
<description>Minimum age in seconds of multipart uploads to purge.
</description>
</property>

<property>
Expand Down Expand Up @@ -1095,10 +1102,50 @@
<property>
<name>fs.s3a.fast.upload</name>
<value>false</value>
<description>Upload directly from memory instead of buffering to
disk first. Memory usage and parallelism can be controlled as up to
fs.s3a.multipart.size memory is consumed for each (part)upload actively
uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
<description>
Use the incremental block-based fast upload mechanism with
the buffering mechanism set in fs.s3a.fast.upload.buffer.
</description>
</property>

<property>
<name>fs.s3a.fast.upload.buffer</name>
<value>disk</value>
<description>
The buffering mechanism to use when using S3A fast upload
(fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
This configuration option has no effect if fs.s3a.fast.upload is false.

"disk" will use the directories listed in fs.s3a.buffer.dir as
the location(s) to save data prior to being uploaded.

"array" uses arrays in the JVM heap

"bytebuffer" uses off-heap memory within the JVM.

Both "array" and "bytebuffer" will consume memory in a single stream up to the number
of blocks set by:

fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

If using either of these mechanisms, keep this value low

The total number of threads performing work across all threads is set by
fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
work items.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total max block (memory/disk) consumption, across all streams, is bounded byfs.s3a.multipart.size * ( fs.s3a.fast.upload.active.blocks + fs.s3a.max.total.tasks + 1) bytes for an instance of S3AFileSystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you know, now that you can have a queue per stream, it could be set to something
bigger. This is something we could look at in the docs, leaving out of the XML so as
to have a single topic. This phrase here describes the number of active threads, which
is different —and will be more so once there's other work (COPY, DELETE) going on there.

So: wont change here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree. A bit further down I propose to add a single explanation in the javadoc and link to there in the various other locations

</description>
</property>

<property>
<name>fs.s3a.fast.upload.active.blocks</name>
<value>4</value>
<description>
Maximum Number of blocks a single output stream can have
active (uploading, or queued to the central FileSystem
instance's pool of queued operations.

This stops a single stream overloading the shared thread pool.
</description>
</property>

<property>
Expand All @@ -1109,13 +1156,6 @@
any call to setReadahead() is made to an open stream.</description>
</property>

<property>
<name>fs.s3a.fast.buffer.size</name>
<value>1048576</value>
<description>Size of initial memory buffer in bytes allocated for an
upload. No effect if fs.s3a.fast.upload is false.</description>
</property>

<property>
<name>fs.s3a.user.agent.prefix</name>
<value></value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ public static void bandwidth(NanoTimer timer, long bytes) {
* @return the number of megabytes/second of the recorded operation
*/
public static double bandwidthMBs(long bytes, long durationNS) {
return (bytes * 1000.0) / durationNS;
return bytes / (1024.0 * 1024) * 1.0e9 / durationNS;
}

/**
Expand Down Expand Up @@ -1415,17 +1415,27 @@ public long duration() {
return endTime - startTime;
}

/**
* Intermediate duration of the operation.
* @return how much time has passed since the start (in nanos).
*/
public long elapsedTime() {
return now() - startTime;
}

public double bandwidth(long bytes) {
return bandwidthMBs(bytes, duration());
}

/**
* Bandwidth as bytes per second.
* @param bytes bytes in
* @return the number of bytes per second this operation timed.
* @return the number of bytes per second this operation.
* 0 if duration == 0.
*/
public double bandwidthBytes(long bytes) {
return (bytes * 1.0) / duration();
double duration = duration();
return duration > 0 ? bytes / duration : 0;
}

/**
Expand Down
58 changes: 56 additions & 2 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
<hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>

<!-- are scale tests enabled ? -->
<fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
<!-- Size in MB of huge files. -->
<fs.s3a.scale.test.huge.filesize>unset</fs.s3a.scale.test.huge.filesize>
<!-- Size in MB of the partion size in huge file uploads. -->
<fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize>
<!-- Timeout in seconds for scale tests.-->
<fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout>
</properties>

<profiles>
Expand Down Expand Up @@ -115,6 +124,11 @@
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
</configuration>
</plugin>
Expand All @@ -132,7 +146,10 @@
<forkCount>${testsThreadCount}</forkCount>
<reuseForks>false</reuseForks>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
<!-- Tell tests that they are being executed in parallel -->
<test.parallel.execution>true</test.parallel.execution>
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
Expand All @@ -142,6 +159,11 @@
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation -->
Expand All @@ -160,10 +182,11 @@
<excludes>
<exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
<exclude>**/ITestS3ABlockingThreadPool.java</exclude>
<exclude>**/ITestS3AFastOutputStream.java</exclude>
<exclude>**/ITestS3AFileSystemContract.java</exclude>
<exclude>**/ITestS3AMiniYarnCluster.java</exclude>
<exclude>**/ITest*Root*.java</exclude>
<exclude>**/ITestS3AFileContextStatistics.java</exclude>
<include>**/ITestS3AHuge*.java</include>
</excludes>
</configuration>
</execution>
Expand All @@ -174,6 +197,16 @@
<goal>verify</goal>
</goals>
<configuration>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
<!-- Tell tests that they are being executed sequentially -->
<test.parallel.execution>false</test.parallel.execution>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
<!-- Do a sequential run for tests that cannot handle -->
<!-- parallel execution. -->
<includes>
Expand All @@ -183,6 +216,8 @@
<include>**/ITestS3AFileSystemContract.java</include>
<include>**/ITestS3AMiniYarnCluster.java</include>
<include>**/ITest*Root*.java</include>
<include>**/ITestS3AFileContextStatistics.java</include>
<include>**/ITestS3AHuge*.java</include>
</includes>
</configuration>
</execution>
Expand Down Expand Up @@ -210,14 +245,33 @@
<goal>verify</goal>
</goals>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
<!-- Propagate scale parameters -->
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>

<!-- Turn on scale tests-->
<profile>
<id>scale</id>
<activation>
<property>
<name>scale</name>
</property>
</activation>
<properties >
<fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled>
</properties>
</profile>
</profiles>

<build>
Expand Down
Loading