From ded3576f29a0e7f1fc67de056c27b063821dc12b Mon Sep 17 00:00:00 2001 From: robin m Date: Wed, 24 Feb 2016 23:40:24 +0530 Subject: [PATCH] Spooling Directory Source support for gzip files --- .../avro/ReliableSpoolingFileEventReader.java | 98 ++++++++++++++++--- .../TestReliableSpoolingFileEventReader.java | 32 ++++++ 2 files changed, 119 insertions(+), 11 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index d54f415d29..22881c4e03 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -41,11 +41,14 @@ import java.io.File; import java.io.FileFilter; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.Charset; import java.util.*; import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; import java.util.ArrayList; /** @@ -80,7 +83,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { .getLogger(ReliableSpoolingFileEventReader.class); static final String metaFileName = ".flumespool-main.meta"; - + private final String GZIP_FILE_EXTENSION = ".gz"; private final File spoolDirectory; private final String completedSuffix; private final String deserializerType; @@ -325,17 +328,9 @@ private void retireCurrentFile() throws IOException { String message = "File has changed size since being read: " + fileToRoll; throw new IllegalStateException(message); } - - if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) { - rollCurrentFile(fileToRoll); - } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) { - deleteCurrentFile(fileToRoll); - } else { - // TODO: implement delay in the future - throw new IllegalArgumentException("Unsupported delete policy: " + - deletePolicy); - } + applyRetentionPolicy(fileToRoll); } + /** * Rename the given spooled file @@ -457,6 +452,16 @@ public boolean accept(File candidate) { } File selectedFile = candidateFileIter.next(); + if(selectedFile.getName().endsWith(GZIP_FILE_EXTENSION)){ + unzipfile(selectedFile); + + try { + // apply it for the .gz file the files after unzip will be handled by retireCurrentFile() flow + applyRetentionPolicy(selectedFile); + } catch (IOException e) { + e.printStackTrace(); + } + } if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random. return openFile(selectedFile); } else if (consumeOrder == ConsumeOrder.YOUNGEST) { @@ -483,6 +488,77 @@ public boolean accept(File candidate) { return openFile(selectedFile); } + + private void applyRetentionPolicy(File fileToRoll) throws IOException { + if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) { + rollCurrentFile(fileToRoll); + } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) { + deleteCurrentFile(fileToRoll); + } else { + // TODO: implement delay in the future + throw new IllegalArgumentException("Unsupported delete policy: " + + deletePolicy); + } +} + +private String getUnzippedFileName(String zippedFileName) { + String unzippedFileName = zippedFileName; + if (!StringUtils.isBlank(zippedFileName)) { + String fileName = StringUtils.substringBeforeLast(zippedFileName, + GZIP_FILE_EXTENSION); + unzippedFileName = fileName; + } + + return unzippedFileName; + } + + private void unzipfile(File zippedFile) { + String unzippedFileDirPath = zippedFile.getParent(); + byte[] buffer = new byte[1024]; + GZIPInputStream gzis = null; + FileOutputStream out = null; + + if (zippedFile.exists()) { + try { + String unzippedFileName = getUnzippedFileName(zippedFile.getName()); + gzis = new GZIPInputStream(new FileInputStream(zippedFile)); + out = new FileOutputStream(unzippedFileDirPath + "/"+unzippedFileName); + logger.debug("Started unzip process "); + int length; + length = gzis.read(buffer); + while (length > 0) { + out.write(buffer, 0, length); + length = gzis.read(buffer); + } + gzis.close(); + out.close(); + logger.debug("Completed unzip process "); + logger.debug(" After unzipping the file is {}", unzippedFileName); + + + } catch (IOException ex) { + logger.error(" Error in unzip file {} ", ex.getMessage()); + } finally { + if (gzis != null) { + try { + gzis.close(); + } catch (IOException e) { + logger.error("Error in closing resource {} ", + e.getMessage()); + } + } + if (out != null) { + try { + out.close(); + } catch (IOException e) { + logger.error("Error in closing resource {} ", + e.getMessage()); + } + } + } + + } + } private File smallerLexicographical(File f1, File f2) { if (f1.getName().compareTo(f2.getName()) < 0) { diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java index 4e90054889..64f5e8d52a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java @@ -36,10 +36,12 @@ import java.io.File; import java.io.FileFilter; +import java.io.FileOutputStream; import java.io.IOException; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; +import java.util.zip.GZIPOutputStream; public class TestReliableSpoolingFileEventReader { @@ -359,6 +361,23 @@ public void testConsumeFileOldestWithLexicographicalComparision() expected.add("New file3 created."); Assert.assertEquals(expected, actual); } + + @Test + public void testConsumeGZipFile() + throws IOException, InterruptedException { + ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder() + .spoolDirectory(WORK_DIR).build(); + String zipFileContents = "A Gzip file"; + File file1 = new File(WORK_DIR,"new-file1.gz"); + createGZipFile(file1,zipFileContents); + Set actual = Sets.newHashSet(); + readEventsForFilesInDir(WORK_DIR, reader, actual); + Set expected = Sets.newHashSet(); + createExpectedFromFilesInSetup(expected); + expected.add(""); + expected.add(zipFileContents); + Assert.assertEquals(expected, actual); + } @Test public void testConsumeFileYoungestWithLexicographicalComparision() @@ -542,4 +561,17 @@ public boolean accept(File pathname) { })); return files; } + + private void createGZipFile(File file,String zipFileContent) { + try { + FileOutputStream fos = new FileOutputStream(file); + GZIPOutputStream gzos = new GZIPOutputStream(fos); + gzos.write((zipFileContent).getBytes()); + gzos.finish(); + gzos.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }