From 81e309e4a6577c44770c22b51c42013727bdc909 Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:11 +0000 Subject: [PATCH 1/9] FLUME-1408. Log uncaught Throwables thrown within Executors. (Brock Noland via Hari Shreedharan) git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368360 13f79535-47bb-0310-9956-ffa450edef68 --- .../flume/instrumentation/GangliaServer.java | 73 ++++----- .../flume/lifecycle/LifecycleSupervisor.java | 148 +++++++++--------- .../AbstractFileConfigurationProvider.java | 3 + .../apache/flume/sink/hdfs/BucketWriter.java | 10 +- .../apache/flume/sink/hdfs/HDFSEventSink.java | 1 - 5 files changed, 123 insertions(+), 112 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java index 1104141318..d93cd33225 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java +++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java @@ -37,7 +37,6 @@ import javax.management.MBeanAttributeInfo; import javax.management.MBeanServer; import javax.management.ObjectInstance; -import javax.management.ObjectName; import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.api.HostInfo; @@ -332,45 +331,49 @@ protected class GangliaCollector implements Runnable { @Override public void run() { - Set queryMBeans = null; try { - queryMBeans = mbeanServer.queryMBeans( - null, null); - } catch (Exception ex) { - logger.error("Could not get Mbeans for monitoring", ex); - Throwables.propagate(ex); - } - for (ObjectInstance obj : queryMBeans) { + Set queryMBeans = null; try { - if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { - continue; - } - MBeanAttributeInfo[] attrs = mbeanServer. - getMBeanInfo(obj.getObjectName()).getAttributes(); - String strAtts[] = new String[attrs.length]; - for (int i = 0; i < strAtts.length; i++) { - strAtts[i] = attrs[i].getName(); - } - AttributeList attrList = mbeanServer.getAttributes( - obj.getObjectName(), strAtts); - String component = obj.getObjectName().toString().substring( - obj.getObjectName().toString().indexOf('=') + 1); - for (Object attr : attrList) { - Attribute localAttr = (Attribute) attr; - if (isGanglia3) { - server.createGangliaMessage(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); - } else { - server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." - + localAttr.getName(), - localAttr.getValue().toString()); + queryMBeans = mbeanServer.queryMBeans( + null, null); + } catch (Exception ex) { + logger.error("Could not get Mbeans for monitoring", ex); + Throwables.propagate(ex); + } + for (ObjectInstance obj : queryMBeans) { + try { + if (!obj.getObjectName().toString().startsWith("org.apache.flume")) { + continue; + } + MBeanAttributeInfo[] attrs = mbeanServer. + getMBeanInfo(obj.getObjectName()).getAttributes(); + String strAtts[] = new String[attrs.length]; + for (int i = 0; i < strAtts.length; i++) { + strAtts[i] = attrs[i].getName(); } - server.sendToGangliaNodes(); + AttributeList attrList = mbeanServer.getAttributes( + obj.getObjectName(), strAtts); + String component = obj.getObjectName().toString().substring( + obj.getObjectName().toString().indexOf('=') + 1); + for (Object attr : attrList) { + Attribute localAttr = (Attribute) attr; + if (isGanglia3) { + server.createGangliaMessage(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } else { + server.createGangliaMessage31(GANGLIA_CONTEXT + component + "." + + localAttr.getName(), + localAttr.getValue().toString()); + } + server.sendToGangliaNodes(); + } + } catch (Exception ex) { + logger.error("Error getting mbean attributes", ex); } - } catch (Exception ex) { - logger.error("Error getting mbean attributes", ex); } + } catch(Throwable t) { + logger.error("Unexpected error", t); } } } diff --git a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java index 2ac94df1fc..78eda05903 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleSupervisor.java @@ -201,94 +201,94 @@ public void run() { long now = System.currentTimeMillis(); - if (supervisoree.status.firstSeen == null) { - logger.debug("first time seeing {}", lifecycleAware); + try { + if (supervisoree.status.firstSeen == null) { + logger.debug("first time seeing {}", lifecycleAware); - supervisoree.status.firstSeen = now; - } - - supervisoree.status.lastSeen = now; - synchronized (lifecycleAware) { - if (supervisoree.status.discard) { - // Unsupervise has already been called on this. - logger.info("Component has already been stopped {}", lifecycleAware); - return; - } else if(supervisoree.status.error) { - logger.info("Component {} is in error state, and Flume will not" + - "attempt to change its state", lifecycleAware); - return; + supervisoree.status.firstSeen = now; } - supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); + supervisoree.status.lastSeen = now; + synchronized (lifecycleAware) { + if (supervisoree.status.discard) { + // Unsupervise has already been called on this. + logger.info("Component has already been stopped {}", lifecycleAware); + return; + } else if (supervisoree.status.error) { + logger.info("Component {} is in error state, and Flume will not" + + "attempt to change its state", lifecycleAware); + return; + } + + supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState(); - if (!lifecycleAware.getLifecycleState().equals( - supervisoree.status.desiredState)) { + if (!lifecycleAware.getLifecycleState().equals( + supervisoree.status.desiredState)) { - logger - .debug("Want to transition {} from {} to {} (failures:{})", - new Object[] { lifecycleAware, - supervisoree.status.lastSeenState, + logger.debug("Want to transition {} from {} to {} (failures:{})", + new Object[] { lifecycleAware, supervisoree.status.lastSeenState, supervisoree.status.desiredState, supervisoree.status.failures }); - switch (supervisoree.status.desiredState) { - case START: - try { - lifecycleAware.start(); - } catch (Throwable e) { - logger.error("Unable to start " + lifecycleAware - + " - Exception follows.", e); - if(e instanceof Error){ - //This component can never recover, shut it down. - supervisoree.status.desiredState = LifecycleState.STOP; - try{ - lifecycleAware.stop(); - logger.warn("Component {} stopped, since it could not be" + - "successfully started due to missing dependencies", - lifecycleAware); - } catch (Throwable e1) { - logger.error("Unsuccessful attempt to " + - "shutdown component: {} due to missing dependencies." + - " Please shutdown the agent" + - "or disable this component, or the agent will be" + - "in an undefined state.", e1); - supervisoree.status.error = true; - if(e1 instanceof Error){ - throw (Error)e1; + switch (supervisoree.status.desiredState) { + case START: + try { + lifecycleAware.start(); + } catch (Throwable e) { + logger.error("Unable to start " + lifecycleAware + + " - Exception follows.", e); + if (e instanceof Error) { + // This component can never recover, shut it down. + supervisoree.status.desiredState = LifecycleState.STOP; + try { + lifecycleAware.stop(); + logger.warn("Component {} stopped, since it could not be" + + "successfully started due to missing dependencies", + lifecycleAware); + } catch (Throwable e1) { + logger.error("Unsuccessful attempt to " + + "shutdown component: {} due to missing dependencies." + + " Please shutdown the agent" + + "or disable this component, or the agent will be" + + "in an undefined state.", e1); + supervisoree.status.error = true; + if (e1 instanceof Error) { + throw (Error) e1; + } + // Set the state to stop, so that the conf poller can + // proceed. + } + } + supervisoree.status.failures++; + } + break; + case STOP: + try { + lifecycleAware.stop(); + } catch (Throwable e) { + logger.error("Unable to stop " + lifecycleAware + + " - Exception follows.", e); + if (e instanceof Error) { + throw (Error) e; + } + supervisoree.status.failures++; } - //Set the state to stop, so that the conf poller can - //proceed. - } + break; + default: + logger.warn("I refuse to acknowledge {} as a desired state", + supervisoree.status.desiredState); } - supervisoree.status.failures++; - } - break; - case STOP: - try { - lifecycleAware.stop(); - } catch (Throwable e) { - logger.error("Unable to stop " + lifecycleAware - + " - Exception follows.", e); - if(e instanceof Error) { - throw (Error)e; + + if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) { + logger.error( + "Policy {} of {} has been violated - supervisor should exit!", + supervisoree.policy, lifecycleAware); } - supervisoree.status.failures++; } - break; - default: - logger.warn("I refuse to acknowledge {} as a desired state", - supervisoree.status.desiredState); - } - - if (!supervisoree.policy.isValid( - lifecycleAware, supervisoree.status)) { - logger.error( - "Policy {} of {} has been violated - supervisor should exit!", - supervisoree.policy, lifecycleAware); } + } catch(Throwable t) { + logger.error("Unexpected error", t); } - } - logger.debug("Status check complete"); } } diff --git a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java index 9f900d3b74..a2c882b2da 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java @@ -206,6 +206,9 @@ public void run() { } catch (NoClassDefFoundError e) { logger.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); + } catch (Throwable t) { + // caught because the caller does not handle or log Throwables + logger.error("Unhandled error", t); } } } diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 75ff0693b6..6408eb9cde 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -40,6 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; + /** * Internal API intended for HDFSSink use. * This class does file rolling and handles file formats and serialization. @@ -199,7 +201,7 @@ private void doOpen() throws IOException { if (ex instanceof IOException) { throw (IOException) ex; } else { - throw new IOException(ex); + throw Throwables.propagate(ex); } } } @@ -213,7 +215,11 @@ private void doOpen() throws IOException { public Void call() throws Exception { LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", bucketPath + IN_USE_EXT, rollInterval); - close(); + try { + close(); + } catch(Throwable t) { + LOG.error("Unexpected error", t); + } return null; } }; diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index d65c5a8a1d..fcb96420f4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -460,7 +460,6 @@ public void stop() { for (Entry entry : sfWriters.entrySet()) { LOG.info("Closing {}", entry.getKey()); - final BucketWriter callableWriter = entry.getValue(); try { close(entry.getValue()); } catch (Exception ex) { From 68ebd5fa1a8c9b5dc8efc9fc3e2bcd49fb32d1fe Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:15 +0000 Subject: [PATCH 2/9] FLUME-1391. Use sync() instead of syncFs() in HDFS Sink to be compatible with hadoop 0.20.2. (Yongkun Wang via Jarek Jarcec Cecho) git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368361 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index bcc6f205ca..ac9104d7c1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -74,7 +74,7 @@ public void append(Event e, FlumeFormatter formatter) throws IOException { @Override public void sync() throws IOException { - writer.syncFs(); + writer.sync(); } @Override From e9e7f65a79775c4a0b1887f576fbbefc2b32207e Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:18 +0000 Subject: [PATCH 3/9] FLUME-1389. Flume gives opaque error if interceptor type not specified. (Patrick Wendell via Hari Shreedharan) git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368362 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/flume/channel/ChannelProcessor.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 53bfac1c8e..1cce137f78 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -100,6 +100,11 @@ private void configureInterceptors(Context context) { Context interceptorContext = new Context( interceptorContexts.getSubProperties(interceptorName + ".")); String type = interceptorContext.getString("type"); + if (type == null) { + LOG.error("Type not specified for interceptor " + interceptorName); + throw new FlumeException("Interceptor.Type not specified for " + + interceptorName); + } try { Interceptor.Builder builder = factory.newInstance(type); builder.configure(interceptorContext); From 4abf81a50834240f90f2360c1003fd716944b6be Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:21 +0000 Subject: [PATCH 4/9] FLUME-1412. Add missing commons-collections to pom (Hari Shreedharan via Mike Percy) git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368363 13f79535-47bb-0310-9956-ffa450edef68 --- flume-ng-channels/flume-file-channel/pom.xml | 5 +++++ pom.xml | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index 24c93231aa..cd882e5565 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -62,6 +62,11 @@ slf4j-api + + commons-collections + commons-collections + + junit junit diff --git a/pom.xml b/pom.xml index f91c09d371..8c6761020a 100644 --- a/pom.xml +++ b/pom.xml @@ -695,6 +695,12 @@ limitations under the License. 1.4 + + commons-collections + commons-collections + 3.2.1 + + org.apache.derby derby From b1e8bfd1f59e270f9ec58bd2cd49c2376b95731c Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:24 +0000 Subject: [PATCH 5/9] FLUME-1414. VersionInfo should not create a log instance. (Hari Shreedharan via Mike Percy) git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368364 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/flume/tools/VersionInfo.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java index 867a009b9e..3f066353cc 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java @@ -18,15 +18,12 @@ package org.apache.flume.tools; import org.apache.flume.VersionAnnotation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /* * This class provides version info of Flume NG */ public class VersionInfo { - private static final Logger LOG = LoggerFactory.getLogger(VersionInfo.class); private static Package myPackage; private static VersionAnnotation version; @@ -112,7 +109,6 @@ public static String getBuildVersion(){ } public static void main(String[] args) { - LOG.debug("version: "+ version); System.out.println("Flume " + getVersion()); System.out.println("Subversion " + getUrl() + " -r " + getRevision()); System.out.println("Compiled by " + getUser() + " on " + getDate()); From 008d0eff8bb80c284d8c97f593e24ffdbc9f947c Mon Sep 17 00:00:00 2001 From: Mubarak Seyed Date: Thu, 2 Aug 2012 06:33:28 +0000 Subject: [PATCH 6/9] FLUME-1416 git-svn-id: https://svn.apache.org/repos/asf/flume/trunk@1368365 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/flume/tools/VersionInfo.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java index 3f066353cc..c12cf8d3c6 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java +++ b/flume-ng-core/src/main/java/org/apache/flume/tools/VersionInfo.java @@ -54,7 +54,12 @@ public static String getVersion() { * @return the revision number, eg. "100755" */ public static String getRevision() { - return version != null ? version.revision() : "Unknown"; + if(version != null + && version.revision() != null + && !version.revision().isEmpty()){ + return version.revision(); + } + return "Unknown"; } /** @@ -110,7 +115,9 @@ public static String getBuildVersion(){ public static void main(String[] args) { System.out.println("Flume " + getVersion()); - System.out.println("Subversion " + getUrl() + " -r " + getRevision()); + System.out.println("Source code repository: " + + "https://git-wip-us.apache.org/repos/asf/flume.git"); + System.out.println("Revision: " + getRevision()); System.out.println("Compiled by " + getUser() + " on " + getDate()); System.out.println("From source with checksum " + getSrcChecksum()); } From f03a2406bf44a8300522c1941293e2d74df88d28 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Thu, 9 Jun 2016 15:50:13 -0400 Subject: [PATCH 7/9] FLUME-2922 Sync SequenceFile.Writer before calling hflush --- .../flume/sink/hdfs/HDFSSequenceFile.java | 1 + .../flume/sink/hdfs/TestHDFSEventSink.java | 58 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index a261cce6b3..848654af81 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -110,6 +110,7 @@ public void append(Event e) throws IOException { @Override public void sync() throws IOException { + writer.sync(); hflushOrSync(outStream); } diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 23862eb44e..ac5aa61ac1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -1323,6 +1323,64 @@ public void testCloseOnIdle() throws IOException, EventDeliveryException, Interr fs.close(); } + @Test + public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException, InterruptedException { + String hdfsPath = testPath + "/sequenceFileWriterSync"; + String body = "test event-" + System.nanoTime(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + + // Since we are reading a partial file we don't want to use checksums + fs.setVerifyChecksum(false); + fs.setWriteChecksum(false); + + Path dirPath = new Path(hdfsPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + + Context context = new Context(); + context.put("hdfs.path", hdfsPath); + // Ensure the file isn't closed and rolled, we are only going to send one event + context.put("hdfs.rollCount", "10"); + context.put("hdfs.rollSize", "0"); + context.put("hdfs.rollInterval", "0"); + context.put("hdfs.batchSize", "1"); + context.put("hdfs.fileType", "SequenceFile"); + context.put("hdfs.codeC", "BZip2Codec"); // Compression codec that doesn't require native hadoop libraries + context.put("hdfs.writeFormat", "Writable"); + Configurables.configure(sink, context); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + + sink.setChannel(channel); + sink.start(); + + Transaction txn = channel.getTransaction(); + txn.begin(); + Event event = new SimpleEvent(); + event.setBody(body.getBytes()); + channel.put(event); + txn.commit(); + txn.close(); + + sink.process(); + FileStatus[] dirStat = fs.listStatus(dirPath); + Path[] paths = FileUtil.stat2Paths(dirStat); + + Assert.assertEquals(1, paths.length); + + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.stream(fs.open(paths[0]))); + LongWritable key = new LongWritable(); + BytesWritable value = new BytesWritable(); + + Assert.assertTrue(reader.next(key, value)); + Assert.assertArrayEquals(body.getBytes(), value.copyBytes()); + + fs.close(); + } + private Context getContextForRetryTests() { Context context = new Context(); From 1dd320a8c293df9421a0bea24729146d560666ad Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Thu, 9 Jun 2016 15:52:22 -0400 Subject: [PATCH 8/9] FLUME-2922 Add doc for clarity --- .../test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index ac5aa61ac1..9baa228a8a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -1366,6 +1366,8 @@ public void testBlockCompressSequenceFileWriterSync() throws IOException, EventD txn.close(); sink.process(); + // Sink is _not_ closed. The file should remain open but + // the data written should be visible to readers via sync + hflush FileStatus[] dirStat = fs.listStatus(dirPath); Path[] paths = FileUtil.stat2Paths(dirStat); From 87d629f76fd01e8818aa60f23a5024e449fee948 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Mon, 18 Jul 2016 13:50:34 -0400 Subject: [PATCH 9/9] FLUME-2922 Code review updates --- .../flume/sink/hdfs/TestHDFSEventSink.java | 73 +++++++++++++------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 9baa228a8a..e798861ae4 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -23,8 +23,10 @@ import java.io.InputStreamReader; import java.nio.ByteBuffer; import java.nio.charset.CharsetDecoder; +import java.util.Arrays; import java.util.Calendar; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -1323,31 +1325,53 @@ public void testCloseOnIdle() throws IOException, EventDeliveryException, Interr fs.close(); } + /** + * This test simulates what happens when a batch of events is written to a compressed sequence + * file (and thus hsync'd to hdfs) but the file is not yet closed. + * + * When this happens, the data that we wrote should still be readable. + */ @Test - public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException, InterruptedException { + public void testBlockCompressSequenceFileWriterSync() throws IOException, EventDeliveryException { String hdfsPath = testPath + "/sequenceFileWriterSync"; - String body = "test event-" + System.nanoTime(); - - Configuration conf = new Configuration(); - FileSystem fs = FileSystem.get(conf); - + FileSystem fs = FileSystem.get(new Configuration()); // Since we are reading a partial file we don't want to use checksums fs.setVerifyChecksum(false); fs.setWriteChecksum(false); + String [] codecs = {"BZip2Codec", "DeflateCodec"}; // Compression codecs that don't require native hadoop libraries + + for (String codec : codecs) { + sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Collections.singletonList( + "single-event" + )); + + sequenceFileWriteAndVerifyEvents(fs, hdfsPath, codec, Arrays.asList( + "multiple-events-1", + "multiple-events-2", + "multiple-events-3", + "multiple-events-4", + "multiple-events-5" + )); + } + + fs.close(); + } + + private void sequenceFileWriteAndVerifyEvents(FileSystem fs, String hdfsPath, String codec, Collection eventBodies) throws IOException, EventDeliveryException { Path dirPath = new Path(hdfsPath); fs.delete(dirPath, true); fs.mkdirs(dirPath); Context context = new Context(); context.put("hdfs.path", hdfsPath); - // Ensure the file isn't closed and rolled, we are only going to send one event - context.put("hdfs.rollCount", "10"); + // Ensure the file isn't closed and rolled + context.put("hdfs.rollCount", String.valueOf(eventBodies.size() + 1)); context.put("hdfs.rollSize", "0"); context.put("hdfs.rollInterval", "0"); context.put("hdfs.batchSize", "1"); context.put("hdfs.fileType", "SequenceFile"); - context.put("hdfs.codeC", "BZip2Codec"); // Compression codec that doesn't require native hadoop libraries + context.put("hdfs.codeC", codec); context.put("hdfs.writeFormat", "Writable"); Configurables.configure(sink, context); @@ -1357,15 +1381,20 @@ public void testBlockCompressSequenceFileWriterSync() throws IOException, EventD sink.setChannel(channel); sink.start(); - Transaction txn = channel.getTransaction(); - txn.begin(); - Event event = new SimpleEvent(); - event.setBody(body.getBytes()); - channel.put(event); - txn.commit(); - txn.close(); + for (String eventBody : eventBodies) { + Transaction txn = channel.getTransaction(); + txn.begin(); + + Event event = new SimpleEvent(); + event.setBody(eventBody.getBytes()); + channel.put(event); + + txn.commit(); + txn.close(); + + sink.process(); + } - sink.process(); // Sink is _not_ closed. The file should remain open but // the data written should be visible to readers via sync + hflush FileStatus[] dirStat = fs.listStatus(dirPath); @@ -1373,14 +1402,16 @@ public void testBlockCompressSequenceFileWriterSync() throws IOException, EventD Assert.assertEquals(1, paths.length); - SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.stream(fs.open(paths[0]))); + SequenceFile.Reader reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.stream(fs.open(paths[0]))); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); - Assert.assertTrue(reader.next(key, value)); - Assert.assertArrayEquals(body.getBytes(), value.copyBytes()); + for (String eventBody : eventBodies) { + Assert.assertTrue(reader.next(key, value)); + Assert.assertArrayEquals(eventBody.getBytes(), value.copyBytes()); + } - fs.close(); + Assert.assertFalse(reader.next(key, value)); } private Context getContextForRetryTests() {