From 087e4a02e560695c8f204d69e56b5c0b1017aa19 Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Sat, 6 May 2017 13:41:36 +0200 Subject: [PATCH 1/4] update kudu-client to 1.3.0 --- contrib/storage-kudu/pom.xml | 7 ++-- .../codegen/templates/KuduRecordWriter.java | 4 +- .../drill/exec/store/kudu/DrillKuduTable.java | 14 +++---- .../drill/exec/store/kudu/KuduGroupScan.java | 4 +- .../exec/store/kudu/KuduRecordReader.java | 34 ++++++++--------- .../exec/store/kudu/KuduRecordWriterImpl.java | 37 ++++++++++--------- .../exec/store/kudu/KuduSchemaFactory.java | 6 +-- .../exec/store/kudu/KuduStoragePlugin.java | 2 +- .../drill/store/kudu/TestKuduConnect.java | 28 +++++++------- 9 files changed, 68 insertions(+), 68 deletions(-) diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml index 2ba9cacd67a..1405676c76e 100644 --- a/contrib/storage-kudu/pom.xml +++ b/contrib/storage-kudu/pom.xml @@ -14,11 +14,10 @@ drill-contrib-parent org.apache.drill.contrib - 1.11.0-SNAPSHOT + 1.10.0 drill-kudu-storage - contrib/kudu-storage-plugin @@ -47,9 +46,9 @@ - org.kududb + org.apache.kudu kudu-client - 0.6.0 + 1.3.0 diff --git a/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java index 01c7c289849..2b76cac60da 100644 --- a/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java +++ b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java @@ -88,7 +88,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import org.kududb.client.*; +import org.apache.kudu.client.*; import org.apache.drill.exec.store.*; public abstract class KuduRecordWriter extends AbstractRecordWriter implements RecordWriter { @@ -157,7 +157,7 @@ public void writeField() throws IOException { <#elseif minor.class == "VarChar" > byte[] bytes = new byte[holder.end - holder.start]; holder.buffer.getBytes(holder.start, bytes); - row.addStringUtf8(fieldId, bytes); + row.addString(fieldId, new String(bytes)); <#elseif minor.class == "VarBinary"> byte[] bytes = new byte[holder.end - holder.start]; holder.buffer.getBytes(holder.start, bytes); diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java index 3fc69c645ff..8404aac140d 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/DrillKuduTable.java @@ -23,9 +23,9 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.drill.exec.planner.logical.DynamicDrillTable; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.Type; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; import com.google.common.collect.Lists; @@ -56,8 +56,6 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { private RelDataType getSqlTypeFromKuduType(RelDataTypeFactory typeFactory, Type type) { switch (type) { - case BINARY: - return typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE); case BOOL: return typeFactory.createSqlType(SqlTypeName.BOOLEAN); case DOUBLE: @@ -70,9 +68,11 @@ private RelDataType getSqlTypeFromKuduType(RelDataTypeFactory typeFactory, Type case INT8: return typeFactory.createSqlType(SqlTypeName.INTEGER); case STRING: - return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE); - case TIMESTAMP: + return typeFactory.createSqlType(SqlTypeName.VARCHAR); + case UNIXTIME_MICROS: return typeFactory.createSqlType(SqlTypeName.TIMESTAMP); + case BINARY: + return typeFactory.createSqlType(SqlTypeName.VARBINARY, Integer.MAX_VALUE); default: throw new UnsupportedOperationException("Unsupported type."); } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java index 873f21638e8..dfc3c4470a6 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java @@ -51,8 +51,8 @@ import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; -import org.kududb.client.LocatedTablet; -import org.kududb.client.LocatedTablet.Replica; +import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.LocatedTablet.Replica; @JsonTypeName("kudu-scan") public class KuduGroupScan extends AbstractGroupScan { diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index 541daa44f5d..0f1d1e13fb0 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -52,16 +52,16 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.Type; -import org.kududb.client.KuduClient; -import org.kududb.client.KuduScanner; -import org.kududb.client.KuduScanner.KuduScannerBuilder; -import org.kududb.client.KuduTable; -import org.kududb.client.RowResult; -import org.kududb.client.RowResultIterator; -import org.kududb.client.shaded.com.google.common.collect.ImmutableMap; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduScanner.KuduScannerBuilder; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import org.apache.kudu.client.shaded.com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -114,8 +114,8 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio context.getStats().startWait(); try { scanner = builder - .lowerBoundPartitionKeyRaw(scanSpec.getStartKey()) - .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey()) + .lowerBoundRaw(scanSpec.getStartKey()) + .exclusiveUpperBoundRaw(scanSpec.getEndKey()) .build(); } finally { context.getStats().stopWait(); @@ -138,7 +138,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio .put(Type.INT32, MinorType.INT) .put(Type.INT64, MinorType.BIGINT) .put(Type.STRING, MinorType.VARCHAR) - .put(Type.TIMESTAMP, MinorType.TIMESTAMP) + .put(Type.UNIXTIME_MICROS, MinorType.TIMESTAMP) .build(); } @@ -236,13 +236,13 @@ private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeExc break; } case STRING: { - ByteBuffer value = result.getBinary(pci.index); + String value = result.getString(pci.index); if (pci.kuduColumn.isNullable()) { ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + .setSafe(rowIndex, value.getBytes(), 0, value.length()); } else { ((VarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + .setSafe(rowIndex, value.getBytes(), 0, value.length()); } break; } @@ -309,7 +309,7 @@ private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeExc .setSafe(rowIndex, result.getLong(pci.index)); } break; - case TIMESTAMP: + case UNIXTIME_MICROS: if (pci.kuduColumn.isNullable()) { ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) .setSafe(rowIndex, result.getLong(pci.index) / 1000); diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java index 6b39cc585aa..2e40acf6fc8 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java @@ -17,11 +17,6 @@ */ package org.apache.drill.exec.store.kudu; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -29,15 +24,21 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorAccessible; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.Type; -import org.kududb.client.Insert; -import org.kududb.client.KuduClient; -import org.kududb.client.KuduSession; -import org.kududb.client.KuduTable; -import org.kududb.client.OperationResponse; -import org.kududb.client.SessionConfiguration.FlushMode; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.OperationResponse; +import org.apache.kudu.client.SessionConfiguration.FlushMode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; public class KuduRecordWriterImpl extends KuduRecordWriter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordWriterImpl.class); @@ -81,7 +82,7 @@ public void updateSchema(VectorAccessible batch) throws IOException { i++; } Schema kuduSchema = new Schema(columns); - table = client.createTable(name, kuduSchema); + table = client.createTable(name, kuduSchema, new CreateTableOptions()); } } catch (Exception e) { throw new IOException(e); @@ -113,11 +114,11 @@ private Type getType(MajorType t) { case INT: return Type.INT32; case TIMESTAMP: - return Type.TIMESTAMP; - case VARBINARY: - return Type.BINARY; + return Type.UNIXTIME_MICROS; case VARCHAR: return Type.STRING; + case VARBINARY: + return Type.BINARY; default: throw UserException .dataWriteError() diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java index 34e5b2a4245..4d9caf39ae5 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java @@ -31,9 +31,9 @@ import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; -import org.kududb.Schema; -import org.kududb.client.KuduTable; -import org.kududb.client.ListTablesResponse; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.ListTablesResponse; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java index 15aa469ca9a..0d987556c8c 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java @@ -24,7 +24,7 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; -import org.kududb.client.KuduClient; +import org.apache.kudu.client.KuduClient; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java index 0ee01349b02..3155bf92530 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java @@ -22,20 +22,20 @@ import org.junit.Ignore; import org.junit.Test; -import org.kududb.ColumnSchema; -import org.kududb.Schema; -import org.kududb.Type; -import org.kududb.client.CreateTableOptions; -import org.kududb.client.Insert; -import org.kududb.client.KuduClient; -import org.kududb.client.KuduScanner; -import org.kududb.client.KuduSession; -import org.kududb.client.KuduTable; -import org.kududb.client.ListTablesResponse; -import org.kududb.client.PartialRow; -import org.kududb.client.RowResult; -import org.kududb.client.RowResultIterator; -import org.kududb.client.SessionConfiguration; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.ListTablesResponse; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import org.apache.kudu.client.SessionConfiguration; @Ignore("requires remote kudu server") public class TestKuduConnect { From 335a2089ca376967ee8510ab0715803c6447c2c5 Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Sat, 6 May 2017 13:43:42 +0200 Subject: [PATCH 2/4] setting correctly version --- contrib/storage-kudu/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml index 1405676c76e..74e6eb8c73d 100644 --- a/contrib/storage-kudu/pom.xml +++ b/contrib/storage-kudu/pom.xml @@ -14,7 +14,7 @@ drill-contrib-parent org.apache.drill.contrib - 1.10.0 + 1.11.0-SNAPSHOT drill-kudu-storage From 9dbfa843682c5ac5c2d42dc2f80baff788e7355f Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Sun, 14 May 2017 21:14:08 +0200 Subject: [PATCH 3/4] resolve string cut --- .../org/apache/drill/exec/store/kudu/KuduRecordReader.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index 0f1d1e13fb0..ef7efcfb4d1 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -236,13 +236,13 @@ private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeExc break; } case STRING: { - String value = result.getString(pci.index); + ByteBuffer value = ByteBuffer.wrap(result.getString(pci.index).getBytes()); if (pci.kuduColumn.isNullable()) { ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(rowIndex, value.getBytes(), 0, value.length()); + .setSafe(rowIndex, value, 0, value.remaining()); } else { ((VarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(rowIndex, value.getBytes(), 0, value.length()); + .setSafe(rowIndex, value, 0, value.remaining()); } break; } From 58c0498c5667de40d756329a6388fad7029c48ea Mon Sep 17 00:00:00 2001 From: eskabetxe Date: Mon, 22 May 2017 21:03:07 +0200 Subject: [PATCH 4/4] testing kudu connector --- .../test/java/org/apache/drill/store/kudu/TestKuduConnect.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java index 3155bf92530..2391fc9c0b4 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java @@ -18,6 +18,7 @@ package org.apache.drill.store.kudu; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.junit.Ignore; @@ -63,6 +64,7 @@ public static void createKuduTable(String tableName, int tablets, int replicas, CreateTableOptions builder = new CreateTableOptions(); builder.setNumReplicas(replicas); + builder.setRangePartitionColumns(Arrays.asList("key")); for (int i = 1; i < tablets; i++) { PartialRow splitRow = schema.newPartialRow(); splitRow.addInt("key", i*1000);