Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

FLUME-682: Improve SeqfileEventSink performance

  • Loading branch information...
commit 88df30b1bdb00ddd138a8cb1fed95d3d22f234e7 1 parent 89210af
@chetan chetan authored jmhsieh committed
View
8 conf/flume-conf.xml
@@ -83,6 +83,14 @@ configuration values placed in flume-site.xml. -->
</description>
</property>
+ <property>
+ <name>flume.node.wal.output.buffered</name>
+ <value>true</value>
+ <description>When true, the WAL subsystem will buffer writes to disk.
+ Setting to false will improve durability at the cost of performance.
+ </description>
+ </property>
+
<!-- ================================================== -->
<!-- Agent ============================================ -->
<!-- ================================================== -->
View
8 flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
@@ -160,6 +160,7 @@ protected FlumeConfiguration(boolean loadDefaults) {
public static final String AGENT_MEMTHRESHOLD = "flume.agent.mem.threshold";
public static final String AGENT_MULTIMASTER_MAXRETRIES = "flume.agent.multimaster.maxretries";
public static final String AGENT_MULTIMASTER_RETRYBACKOFF = "flume.agent.multimaster.retrybackoff";
+ public static final String WAL_OUTPUT_BUFFER = "flume.node.wal.output.buffered";
// Flow options
public static final String DEFAULT_FLOW_NAME = "flume.flow.default.name";
@@ -1062,4 +1063,11 @@ public long getNodeCloseTimeout() {
return getLong(NODE_CLOSE_TIMEOUT, 30000);
}
+ /**
+ * Whether or not the WAL should buffer writes to disk. Defaults to true.
+ */
+ public boolean getWALOutputBuffering() {
+ return getBoolean(WAL_OUTPUT_BUFFER, true);
+ }
+
}
View
25 flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSink.java
@@ -21,9 +21,12 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FlushingSequenceFileWriter;
+import org.apache.hadoop.io.RawSequenceFileWriter;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,10 +70,22 @@ public void open() throws IOException {
+ f.getAbsoluteFile().getParentFile() + " for writing");
}
- Configuration conf = FlumeConfiguration.get();
+ FlumeConfiguration conf = FlumeConfiguration.get();
+ FileSystem fs = FileSystem.getLocal(conf);
+
try {
- writer = FlushingSequenceFileWriter.createWriter(conf, f,
- WriteableEventKey.class, WriteableEvent.class);
+
+ if (conf.getWALOutputBuffering()) {
+ writer = RawSequenceFileWriter.createWriter(fs, conf,
+ new Path(f.getAbsolutePath()), WriteableEventKey.class,
+ WriteableEvent.class, CompressionType.NONE);
+
+ } else {
+ writer = FlushingSequenceFileWriter.createWriter(conf, f,
+ WriteableEventKey.class, WriteableEvent.class);
+
+ }
+
} catch (FileNotFoundException fnfe) {
LOG.error("Possible permissions problem when creating " + f, fnfe);
throw fnfe;
@@ -79,7 +94,7 @@ public void open() throws IOException {
/**
* @throws IOException
- *
+ *
*/
public void close() throws IOException {
LOG.debug("closing " + f);
View
3  flume-core/src/main/java/com/cloudera/flume/handlers/hdfs/SeqfileEventSource.java
@@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventSource;
@@ -82,7 +83,7 @@ public void close() throws IOException {
@Override
public void open() throws IOException {
LOG.debug("opening SeqfileEventSource " + fname);
- Configuration conf = new Configuration();
+ Configuration conf = FlumeConfiguration.get();
FileSystem fs = FileSystem.getLocal(conf);
reader = new SequenceFile.Reader(fs, new Path(fname), conf);
}
View
87 flume-core/src/main/java/org/apache/hadoop/io/RawSequenceFileWriter.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A {@link SequenceFile} {@link Writer} that doesn't create checksum files
+ */
+@SuppressWarnings("rawtypes")
+public class RawSequenceFileWriter extends SequenceFile.Writer {
+
+ public static RawSequenceFileWriter createWriter(FileSystem fs,
+ Configuration conf, Path name, Class keyClass, Class valClass,
+ CompressionType compressionType) throws IOException {
+
+ return new RawSequenceFileWriter(fs, conf, name, keyClass, valClass,
+ fs.getConf().getInt("io.file.buffer.size", 65536),
+ fs.getDefaultReplication(), fs.getDefaultBlockSize(), null,
+ new Metadata());
+ }
+
+ public RawSequenceFileWriter(FileSystem fs, Configuration conf, Path name,
+ Class keyClass, Class valClass, int bufferSize, short replication,
+ long blockSize, Progressable progress, Metadata metadata)
+ throws IOException {
+
+ Path parent = name.getParent();
+ if (parent != null && !fs.mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " + parent);
+ }
+
+ FSDataOutputStream out = ((LocalFileSystem) fs).getRawFileSystem().create(
+ name, true, bufferSize, replication, blockSize, progress);
+
+ init(name, conf, out, keyClass, valClass, false, null, metadata);
+
+ initializeFileHeader();
+ writeFileHeader();
+ finalizeFileHeader();
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.