From 6932366c854c62caf1be7c442bf438774c5933e1 Mon Sep 17 00:00:00 2001 From: Jaehwa Jung Date: Fri, 8 Aug 2014 01:28:59 +0900 Subject: [PATCH] TAJO-999: SequenceFile key class need to be compatible. --- .../sequencefile/SequenceFileAppender.java | 20 ++++++++++--------- .../sequencefile/SequenceFileScanner.java | 20 +++++++++++++------ .../tajo/storage/TestCompressionStorages.java | 12 +++++++++++ .../org/apache/tajo/storage/TestStorages.java | 13 ++++++++++++ 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index b150a9a340..86d902afbe 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -25,11 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.tajo.catalog.Schema; @@ -73,7 +70,8 @@ public class SequenceFileAppender extends FileAppender { long rowCount; private boolean isShuffle; - private static final BytesWritable EMPTY_KEY = new BytesWritable(); + + private Writable EMPTY_KEY; public SequenceFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { super(conf, schema, meta, path); @@ -128,20 +126,24 @@ public void init() throws IOException { throw new IOException(e); } - Class valueClass; + Class keyClass, valueClass; if (serde instanceof BinarySerializerDeserializer) { + keyClass = BytesWritable.class; + EMPTY_KEY = new BytesWritable(); valueClass = BytesWritable.class; } else { + keyClass = LongWritable.class; + EMPTY_KEY = new LongWritable(); valueClass = Text.class; } String type = this.meta.getOption(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name()); if (type.equals(CompressionType.BLOCK.name())) { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.BLOCK, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec); } else if (type.equals(CompressionType.RECORD.name())) { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.RECORD, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec); } else { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.NONE, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec); } if (enabledStats) { diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 32d1d57688..3c3984110b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -24,10 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.*; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -37,6 +35,7 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.ReflectionUtil; import java.io.IOException; @@ -72,7 +71,7 @@ public class SequenceFileScanner extends FileScanner { private int elementOffset, elementSize; - private static final BytesWritable EMPTY_KEY = new BytesWritable(); + private Writable EMPTY_KEY; public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { super(conf, schema, meta, fragment); @@ -120,8 +119,13 @@ public void init() throws IOException { String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - if (serde instanceof BinarySerializerDeserializer) + if (serde instanceof BinarySerializerDeserializer) { hasBinarySerDe = true; + } + + Class keyClass = (Class)Class.forName(reader.getKeyClassName()); + EMPTY_KEY = keyClass.newInstance(); + } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -129,6 +133,10 @@ public void init() throws IOException { super.init(); } + public Writable getKey() { + return EMPTY_KEY; + } + private void prepareProjection(Column [] targets) { projectionMap = new int[targets.length]; diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 9fe57213d9..61f4682b41 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -21,7 +21,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.util.NativeCodeLoader; @@ -34,6 +37,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -44,6 +48,7 @@ import java.util.Collection; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestCompressionStorages { @@ -228,6 +233,13 @@ private void storageCompressionTest(StoreType storeType, Class