Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

FLUME-1631: Retire hdfs.txnEventMax in HDFS sink

(Mike Percy via Brock Noland)
  • Loading branch information...
commit d4dde03d1ff9e19796d9385e8f4a3e87311302f7 1 parent 0b59252
Brock Noland authored
View
11 flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -984,23 +984,21 @@ Name Default Description
**channel** --
**type** -- The component type name, needs to be ``hdfs``
**hdfs.path** -- HDFS directory path (eg hdfs://namenode/flume/webdata/)
-hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.rollInterval 30 Number of seconds to wait before rolling current file
(0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled
(0 = never roll based on number of events)
-hdfs.batchSize 1 number of events written to file before it flushed to HDFS
-hdfs.txnEventMax 100
+hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC -- Compression codec. one of following : gzip, bzip2, lzo, snappy
hdfs.fileType SequenceFile File format: currently ``SequenceFile``, ``DataStream`` or ``CompressedStream``
(1)DataStream will not compress output file and please don't set codeC
(2)CompressedStream requires set hdfs.codeC with an available codeC
-hdfs.maxOpenFiles 5000
+hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.writeFormat -- "Text" or "Writable"
-hdfs.appendTimeout 1000
-hdfs.callTimeout 10000
+hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.
+ This number should be increased if many HDFS timeout operations are occurring.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal -- Kerberos user principal for accessing secure HDFS
@@ -1008,6 +1006,7 @@ hdfs.kerberosKeytab -- Kerberos keytab for accessing secure HDFS
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time.
hdfs.roundUnit second The unit of the round down value - ``second``, ``minute`` or ``hour``.
+hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
serializer ``TEXT`` Other possible options include ``AVRO_EVENT`` or the
fully-qualified class name of an implementation of the
``EventSerializer.Builder`` interface.
View
33 flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -69,7 +69,7 @@
private static final long defaultRollSize = 1024;
private static final long defaultRollCount = 10;
private static final String defaultFileName = "FlumeData";
- private static final long defaultBatchSize = 1;
+ private static final long defaultBatchSize = 100;
private static final long defaultTxnEventMax = 100;
private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
private static final int defaultMaxOpenFiles = 5000;
@@ -101,7 +101,6 @@
private long rollInterval;
private long rollSize;
private long rollCount;
- private long txnEventMax;
private long batchSize;
private int threadsPoolSize;
private int rollTimerPoolSize;
@@ -185,7 +184,6 @@ public void configure(Context context) {
rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
- txnEventMax = context.getLong("hdfs.txnEventMax", defaultTxnEventMax);
String codecName = context.getString("hdfs.codeC");
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
@@ -201,8 +199,6 @@ public void configure(Context context) {
Preconditions.checkArgument(batchSize > 0,
"batchSize must be greater than 0");
- Preconditions.checkArgument(txnEventMax > 0,
- "txnEventMax must be greater than 0");
if (codecName == null) {
codeC = null;
compType = CompressionType.NONE;
@@ -368,11 +364,11 @@ private static CompressionCodec getCodec(String codecName) {
}
}
/**
- * Pull events out of channel and send it to HDFS - take at the most
- * txnEventMax, that's the maximum #events to hold in channel for a given
- * transaction - find the corresponding bucket for the event, ensure the file
- * is open - extract the pay-load and append to HDFS file <br />
- * WARNING: NOT THREAD SAFE
+ * Pull events out of channel and send it to HDFS. Take at most batchSize
+ * events per Transaction. Find the corresponding bucket for the event.
+ * Ensure the file is open. Serialize the data and write it to the file on
+ * HDFS. <br/>
+ * This method is not thread safe.
*/
@Override
public Status process() throws EventDeliveryException {
@@ -381,10 +377,9 @@ public Status process() throws EventDeliveryException {
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
try {
- Event event = null;
int txnEventCount = 0;
- for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
- event = channel.take();
+ for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
+ Event event = channel.take();
if (event == null) {
break;
}
@@ -418,7 +413,7 @@ public Status process() throws EventDeliveryException {
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
- } else if (txnEventCount == txnEventMax) {
+ } else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
@@ -431,14 +426,12 @@ public Status process() throws EventDeliveryException {
transaction.commit();
- if (txnEventCount > 0) {
- sinkCounter.addToEventDrainSuccessCount(txnEventCount);
- }
-
- if(event == null) {
+ if (txnEventCount < 1) {
return Status.BACKOFF;
+ } else {
+ sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+ return Status.READY;
}
- return Status.READY;
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
View
71 ...-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -75,7 +75,9 @@ private void dirCleanup() {
try {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(testPath);
- fs.delete(dirPath, true);
+ if (fs.exists(dirPath)) {
+ fs.delete(dirPath, true);
+ }
} catch (IOException eIO) {
LOG.warn("IO Error in test cleanup", eIO);
}
@@ -105,13 +107,11 @@ public void tearDown() {
dirCleanup();
}
-
@Test
public void testTextBatchAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 2;
final long rollCount = 10;
final long batchSize = 2;
final String fileName = "FlumeData";
@@ -131,7 +131,6 @@ public void testTextBatchAppend() throws InterruptedException, LifecycleExceptio
// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.rollInterval", "0");
context.put("hdfs.rollSize", "0");
@@ -151,10 +150,10 @@ public void testTextBatchAppend() throws InterruptedException, LifecycleExceptio
List<String> bodies = Lists.newArrayList();
// push the event batches into channel to roll twice
- for (i = 1; i <= rollCount*2/txnMax; i++) {
+ for (i = 1; i <= rollCount*2/batchSize; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -178,7 +177,10 @@ public void testTextBatchAppend() throws InterruptedException, LifecycleExceptio
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
// check the contents of the all files
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -222,7 +224,6 @@ public void testEmptyChannelResultsInStatusBackoff()
public void testKerbFileAccess() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting testKerbFileAccess() ...");
- final long txnMax = 25;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -239,7 +240,6 @@ public void testKerbFileAccess() throws InterruptedException,
Context context = new Context();
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.kerberosPrincipal", kerbConfPrincipal);
@@ -264,7 +264,6 @@ public void testTextAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 25;
final long rollCount = 3;
final long batchSize = 2;
final String fileName = "FlumeData";
@@ -284,7 +283,6 @@ public void testTextAppend() throws InterruptedException, LifecycleException,
// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.writeFormat", "Text");
@@ -305,7 +303,7 @@ public void testTextAppend() throws InterruptedException, LifecycleException,
for (i = 1; i < 4; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -332,7 +330,10 @@ public void testTextAppend() throws InterruptedException, LifecycleException,
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -341,7 +342,6 @@ public void testAvroAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 25;
final long rollCount = 3;
final long batchSize = 2;
final String fileName = "FlumeData";
@@ -361,7 +361,6 @@ public void testAvroAppend() throws InterruptedException, LifecycleException,
// context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.writeFormat", "Text");
@@ -383,7 +382,7 @@ public void testAvroAppend() throws InterruptedException, LifecycleException,
for (i = 1; i < 4; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -410,7 +409,10 @@ public void testAvroAppend() throws InterruptedException, LifecycleException,
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -419,7 +421,6 @@ public void testSimpleAppend() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 25;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -439,7 +440,6 @@ public void testSimpleAppend() throws InterruptedException,
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
@@ -458,7 +458,7 @@ public void testSimpleAppend() throws InterruptedException,
for (i = 1; i < numBatches; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -485,7 +485,10 @@ public void testSimpleAppend() throws InterruptedException,
Path fList[] = FileUtil.stat2Paths(dirStat);
// check that the roll happened correctly for the given data
- Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
@@ -494,7 +497,6 @@ public void testAppend() throws InterruptedException, LifecycleException,
EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 25;
final long rollCount = 3;
final long batchSize = 2;
final String fileName = "FlumeData";
@@ -511,7 +513,6 @@ public void testAppend() throws InterruptedException, LifecycleException,
context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
context.put("hdfs.timeZone", "UTC");
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
@@ -529,7 +530,7 @@ public void testAppend() throws InterruptedException, LifecycleException,
for (int i = 1; i < 4; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (int j = 1; j <= txnMax; j++) {
+ for (int j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -558,7 +559,6 @@ public void testBadSimpleAppend() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 25;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -581,7 +581,6 @@ public void testBadSimpleAppend() throws InterruptedException,
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -601,7 +600,7 @@ public void testBadSimpleAppend() throws InterruptedException,
for (i = 1; i < numBatches; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -741,7 +740,6 @@ public void testCloseReopen() throws InterruptedException,
LOG.debug("Starting...");
final int numBatches = 4;
- final long txnMax = 25;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -762,7 +760,6 @@ public void testCloseReopen() throws InterruptedException,
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -781,7 +778,7 @@ public void testCloseReopen() throws InterruptedException,
for (i = 1; i < numBatches; i++) {
channel.getTransaction().begin();
try {
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -816,7 +813,6 @@ public void testSlowAppendFailure() throws InterruptedException,
LifecycleException, EventDeliveryException, IOException {
LOG.debug("Starting...");
- final long txnMax = 2;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -838,7 +834,6 @@ public void testSlowAppendFailure() throws InterruptedException,
Context context = new Context();
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -857,7 +852,7 @@ public void testSlowAppendFailure() throws InterruptedException,
for (i = 0; i < numBatches; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -887,7 +882,6 @@ public void testSlowAppendFailure() throws InterruptedException,
*/
private void slowAppendTestHelper (long appendTimeout) throws InterruptedException, IOException,
LifecycleException, EventDeliveryException, IOException {
- final long txnMax = 2;
final String fileName = "FlumeData";
final long rollCount = 5;
final long batchSize = 2;
@@ -910,7 +904,6 @@ private void slowAppendTestHelper (long appendTimeout) throws InterruptedExcept
Context context = new Context();
context.put("hdfs.path", newPath);
context.put("hdfs.filePrefix", fileName);
- context.put("hdfs.txnEventMax", String.valueOf(txnMax));
context.put("hdfs.rollCount", String.valueOf(rollCount));
context.put("hdfs.batchSize", String.valueOf(batchSize));
context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -929,7 +922,7 @@ private void slowAppendTestHelper (long appendTimeout) throws InterruptedExcept
for (i = 0; i < numBatches; i++) {
Transaction txn = channel.getTransaction();
txn.begin();
- for (j = 1; j <= txnMax; j++) {
+ for (j = 1; j <= batchSize; j++) {
Event event = new SimpleEvent();
eventDate.clear();
eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -958,8 +951,10 @@ private void slowAppendTestHelper (long appendTimeout) throws InterruptedExcept
// check that the roll happened correctly for the given data
// Note that we'll end up with two files with only a head
- Assert.assertEquals((totalEvents / rollCount) +1 , fList.length);
-
+ long expectedFiles = totalEvents / rollCount;
+ if (totalEvents % rollCount > 0) expectedFiles++;
+ Assert.assertEquals("num files wrong, found: " +
+ Lists.newArrayList(fList), expectedFiles, fList.length);
verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
}
Please sign in to comment.
Something went wrong with that request. Please try again.