From 58bc69d37ba24b365f43a87b0a509163d584a7fc Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 18 Oct 2014 23:28:54 -0700 Subject: [PATCH] TAJO-1124: RowWriter::putText and RowWriter::putBlob should take offset and length params. --- .../org/apache/tajo/jdbc/MetaDataTuple.java | 3 +- .../java/org/apache/tajo/storage/Tuple.java | 3 +- .../tajo/tuple/offheap/OffHeapRowWriter.java | 27 +++-- .../apache/tajo/tuple/offheap/RowWriter.java | 4 + .../tuple/offheap/TestOffHeapRowBlock.java | 105 ++++++++++++------ .../java/org/apache/tajo/storage/Tuple.java | 1 + 6 files changed, 93 insertions(+), 50 deletions(-) diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java index 6c8ef5d4e2..bbad9d739c 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java @@ -1,4 +1,4 @@ -package org.apache.tajo.jdbc; /** +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.tajo.jdbc; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.IntervalDatum; diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java index 53e68c7420..7c28b2cb62 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; public interface Tuple extends Cloneable { @@ -69,7 +70,7 @@ public interface Tuple extends Cloneable { public Datum getProtobufDatum(int fieldId); - public Datum getInterval(int fieldId); + public IntervalDatum getInterval(int fieldId); public char [] getUnicodeChars(int fieldId); diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java index 22253cb22f..3f98f8b4ff 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java @@ -18,6 +18,7 @@ package org.apache.tajo.tuple.offheap; +import com.google.common.base.Preconditions; import org.apache.tajo.annotation.UsedByJIT; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.IntervalDatum; @@ -206,17 +207,12 @@ public void putText(String val) { @UsedByJIT public void putText(byte[] val) { - int bytesLen = val.length; - - ensureSize(SizeOf.SIZE_OF_INT + bytesLen); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); - curOffset += SizeOf.SIZE_OF_INT; + putText(val, 0, val.length); + } - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, - recordStartAddr() + curOffset, bytesLen); - curOffset += bytesLen; + @UsedByJIT + public void putText(byte[] val, int offset, int length) { + putBlob(val, offset, length); } @UsedByJIT @@ -237,7 +233,14 @@ public void copyTextFrom(UnSafeTuple tuple, int fieldIdx) { @UsedByJIT public void putBlob(byte[] val) { - int bytesLen = val.length; + putBlob(val, 0, val.length); + } + + @UsedByJIT + public void putBlob(byte[] val, int offset, int length) { + Preconditions.checkArgument(offset >= 0 && offset <= val.length); + + int bytesLen = length; ensureSize(SizeOf.SIZE_OF_INT + bytesLen); forwardField(); @@ -245,7 +248,7 @@ public void putBlob(byte[] val) { OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); curOffset += SizeOf.SIZE_OF_INT; - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, + OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset, null, recordStartAddr() + curOffset, bytesLen); curOffset += bytesLen; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java index f6d5ba2fb1..cfa955ca0c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java +++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java @@ -61,8 +61,12 @@ public interface RowWriter { public void putText(byte [] val); + public void putText(byte [] val, int offset, int length); + public void putBlob(byte[] val); + public void putBlob(byte[] val, int offset, int length); + public void putTimestamp(long val); public void putTime(long val); diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java index c43ba38930..de35e12941 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java +++ b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java @@ -46,6 +46,7 @@ public class TestOffHeapRowBlock { private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); public static String UNICODE_FIELD_PREFIX = "abc_가나다_"; + public static String TEXTS_PARTS = "abcdefghijk"; public static Schema schema; static { @@ -57,12 +58,13 @@ public class TestOffHeapRowBlock { schema.addColumn("col4", Type.FLOAT4); schema.addColumn("col5", Type.FLOAT8); schema.addColumn("col6", Type.TEXT); - schema.addColumn("col7", Type.TIMESTAMP); - schema.addColumn("col8", Type.DATE); - schema.addColumn("col9", Type.TIME); - schema.addColumn("col10", Type.INTERVAL); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", + schema.addColumn("col7", Type.TEXT); + schema.addColumn("col8", Type.TIMESTAMP); + schema.addColumn("col9", Type.DATE); + schema.addColumn("col10", Type.TIME); + schema.addColumn("col11", Type.INTERVAL); + schema.addColumn("col12", Type.INET4); + schema.addColumn("col13", CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); } @@ -357,12 +359,15 @@ public static void fillRow(int i, RowWriter builder) { builder.putFloat4(i); // 4 builder.putFloat8(i); // 5 builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + int offset = i % (TEXTS_PARTS.length() - 1); // 0 ~ (length -1) + int length = TEXTS_PARTS.length() - offset; + builder.putText(TEXTS_PARTS.getBytes(), offset, length); // 7 + builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 8 + builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 9 + builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 10 + builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 11 + builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 12 + builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 13 builder.endRow(); } @@ -413,37 +418,45 @@ public static void fillRowBlockWithNull(int i, RowWriter writer) { if (i % 7 == 0) { writer.skipField(); } else { - writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 + int offset = i % (TEXTS_PARTS.length() - 1); // 0 ~ (length -1) + int length = TEXTS_PARTS.length() - offset; + writer.putText(TEXTS_PARTS.getBytes(), offset, length); // 7 } if (i % 8 == 0) { writer.skipField(); } else { - writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 + writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 8 } if (i % 9 == 0) { writer.skipField(); } else { - writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 + writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 9 } if (i % 10 == 0) { writer.skipField(); } else { - writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 + writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 10 } if (i % 11 == 0) { writer.skipField(); } else { - writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 + writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 11 } if (i % 12 == 0) { writer.skipField(); } else { - writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 + writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 12 + } + + if (i % 13 == 0) { + writer.skipField(); + } else { + writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 13 } writer.endRow(); @@ -457,12 +470,17 @@ public static void fillVTuple(int i, VTuple tuple) { tuple.put(4, DatumFactory.createFloat4(i)); tuple.put(5, DatumFactory.createFloat8(i)); tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); - tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 - tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 - tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 - tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 - tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 - tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; + int offset = i % (TEXTS_PARTS.length() - 1); // 0 ~ (length -1) + int length = TEXTS_PARTS.length() - offset; + byte [] textPart = new byte[length]; + System.arraycopy(TEXTS_PARTS.getBytes(), offset, textPart, 0, length); + tuple.put(7, DatumFactory.createText(textPart)); + tuple.put(8, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 + tuple.put(9, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 + tuple.put(10, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 + tuple.put(11, DatumFactory.createInterval((i + 1) + " hours")); // 10 + tuple.put(12, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 + tuple.put(13, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; } public static void validateResults(OffHeapRowBlock rowBlock) { @@ -487,12 +505,17 @@ public static void validateTupleResult(int j, Tuple t) { assertTrue(j == t.getFloat4(4)); assertTrue(j == t.getFloat8(5)); assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); + int offset = j % (TEXTS_PARTS.length() - 1); // 0 ~ (length -1) + int length = (TEXTS_PARTS.length() - offset); + byte [] textPart = new byte[length]; + System.arraycopy(TEXTS_PARTS.getBytes(), offset, textPart, 0, length); + assertEquals(new String(textPart), t.getText(7)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(8)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(9)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(10)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(11)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(12)); + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(13)); } public static void validateNullity(int j, Tuple tuple) { @@ -541,37 +564,47 @@ public static void validateNullity(int j, Tuple tuple) { if (j % 7 == 0) { tuple.isNull(7); } else { - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); + int offset = j % (TEXTS_PARTS.length() - 1); // 0 ~ (length -1) + int length = TEXTS_PARTS.length() - offset; + byte [] textPart = new byte[length]; + System.arraycopy(TEXTS_PARTS.getBytes(), offset, textPart, 0, length); + assertEquals(new String(textPart), tuple.getText(7)); } if (j % 8 == 0) { tuple.isNull(8); } else { - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); + assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(8)); } if (j % 9 == 0) { tuple.isNull(9); } else { - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); + assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(9)); } if (j % 10 == 0) { tuple.isNull(10); } else { - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); + assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(10)); } if (j % 11 == 0) { tuple.isNull(11); } else { - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); + assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(11)); } if (j % 12 == 0) { tuple.isNull(12); } else { - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); + assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(12)); + } + + if (j % 13 == 0) { + tuple.isNull(13); + } else { + assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(13)); } } } \ No newline at end of file diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java index f2baa24532..2cc8d90cc5 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java @@ -77,6 +77,7 @@ public interface Tuple extends Cloneable { @SuppressWarnings("unused") public String getText(int fieldId); + @SuppressWarnings("unused") public IntervalDatum getInterval(int fieldId); @SuppressWarnings("unused")