From b607f176a705326cdacd4b8969ac911957769e62 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Wed, 24 Jun 2015 18:46:40 +0900 Subject: [PATCH] Added skipping header/footer lines feature for text files --- .../apache/tajo/storage/StorageConstants.java | 4 ++ .../tajo/storage/text/DelimitedTextFile.java | 51 +++++++++++++++++-- .../tajo/storage/TestDelimitedTextFile.java | 28 ++++++++++ .../TestDelimitedTextFile/testNormal.json | 6 +++ 4 files changed, 84 insertions(+), 5 deletions(-) create mode 100644 tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testNormal.json diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index d2c6c1c254..d97a7fbbd3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -43,6 +43,10 @@ public class StorageConstants { public static final String TEXT_NULL = "text.null"; public static final String TEXT_SERDE_CLASS = "text.serde"; public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe"; + + public static final String TEXT_SKIP_HEADER_LINE = "text.skip.headerlines"; + public static final String TEXT_SKIP_FOOTER_LINE = "text.skip.footerlines"; + /** * It's the maximum number of parsing error torrence. * diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 55a2b96a81..693716c746 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -46,7 +46,8 @@ import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -285,6 +286,11 @@ public static class DelimitedTextFileScanner extends FileScanner implements Seek /** How many errors have occurred? */ private int errorNum; + private int headerLineNum = 0; + private int footerLineNum = 0; + + private List footerBuf = null; + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) throws IOException { @@ -325,8 +331,35 @@ public void init() throws IOException { LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); } + // initialization for skipping header and footer(max 20) + headerLineNum = Math.min(Integer.parseInt(meta.getOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20); + footerLineNum = Math.min(Integer.parseInt(meta.getOption(StorageConstants.TEXT_SKIP_FOOTER_LINE, "0")), 20); + + // skip first line if it reads from middle of file if (startOffset > 0) { - reader.readLine(); // skip first line; + reader.readLine(); + } + // skip header lines if it is defined + else if (headerLineNum > 0) { + LOG.info(String.format("Skip %d header lines", headerLineNum)); + for (int i=0; i 0) { + LOG.info(String.format("Prepare to skip %d footer lines", footerLineNum)); + footerBuf = new LinkedList(); + + for (int i=0; i= 0), diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 9726eccdf3..4ffe30949a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -176,4 +176,32 @@ public void testIgnoreTruncatedValueErrorTolerance() throws IOException { scanner.close(); } } + + @Test + public void testSkippingHeaderFooter() throws IOException { + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta("JSON"); + meta.putOption(StorageConstants.TEXT_SKIP_HEADER_LINE, "2"); + meta.putOption(StorageConstants.TEXT_SKIP_FOOTER_LINE, "1"); + FileFragment fragment = getFileFragment("testNormal.json"); + Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + int lines = 0; + + try { + while (true) { + Tuple tuple = scanner.next(); + + if (tuple != null) { + assertEquals(19+lines, tuple.getInt2(2)); + lines++; + } + else break; + } + } finally { + assertEquals(3, lines); + scanner.close(); + } + } } diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testNormal.json b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testNormal.json new file mode 100644 index 0000000000..69fcc37638 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestDelimitedTextFile/testNormal.json @@ -0,0 +1,6 @@ +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 18, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 19, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 20, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 21, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} +{"col1": "true", "col2": "hyunsik", "col3": 22, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "aHl1bnNpaw==", "col10": "192.168.0.1"} \ No newline at end of file