From 392d1f7e9398fb1bee8f67b0d49a82436c3145fb Mon Sep 17 00:00:00 2001 From: Jacques Nadeau Date: Thu, 19 Nov 2015 18:20:03 -0800 Subject: [PATCH] DRILL-4241: Add Single Tablet Writer - Also move to a test bootstrap - Update to the latest kudu and Drill - Add plugin to Drill distribution - Checkstyle and directory cleanup This closes #314. --- contrib/pom.xml | 1 + contrib/storage-kudu/pom.xml | 114 ++++++++++-- .../storage-kudu/src/main/codegen/config.fmpp | 23 +++ .../codegen/templates/KuduRecordWriter.java | 175 ++++++++++++++++++ .../exec/store/kudu/KuduRecordReader.java | 119 ++++++------ .../exec/store/kudu/KuduRecordWriterImpl.java | 174 +++++++++++++++++ .../exec/store/kudu/KuduSchemaFactory.java | 21 +++ .../exec/store/kudu/KuduStoragePlugin.java | 13 +- .../drill/exec/store/kudu/KuduWriter.java | 79 ++++++++ .../store/kudu/KuduWriterBatchCreator.java | 43 +++++ .../resources/bootstrap-storage-plugins.json | 4 +- .../src/main/resources/checkstyle-config.xml | 42 ----- .../resources/checkstyle-suppressions.xml | 19 -- .../drill/store/kudu/TestKuduConnect.java | 9 +- .../drill/store/kudu/TestKuduPlugin.java | 10 + .../resources/bootstrap-storage-plugins.json | 9 + distribution/pom.xml | 5 + distribution/src/assemble/bin.xml | 1 + 18 files changed, 706 insertions(+), 155 deletions(-) create mode 100644 contrib/storage-kudu/src/main/codegen/config.fmpp create mode 100644 contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java create mode 100644 contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java create mode 100644 contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java create mode 100644 contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java delete mode 100644 contrib/storage-kudu/src/main/resources/checkstyle-config.xml delete mode 100644 contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml create mode 100644 contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json diff --git a/contrib/pom.xml b/contrib/pom.xml index bc9591039bd..87b84c12493 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -36,6 +36,7 @@ storage-hive storage-mongo storage-jdbc + storage-kudu sqlline data gis diff --git a/contrib/storage-kudu/pom.xml b/contrib/storage-kudu/pom.xml index 7e57ca8dbc5..cca319f05af 100644 --- a/contrib/storage-kudu/pom.xml +++ b/contrib/storage-kudu/pom.xml @@ -16,23 +16,19 @@ drill-contrib-parent org.apache.drill.contrib - 1.3.0 + 1.5.0-SNAPSHOT - drill-storage-kudu - 1.3.0-SNAPSHOT + drill-kudu-storage contrib/kudu-storage-plugin - - 1.3.0 - org.apache.drill.exec drill-java-exec - ${drill.version} + ${project.version} @@ -40,7 +36,7 @@ org.apache.drill.exec drill-java-exec tests - ${drill.version} + ${project.version} test @@ -48,26 +44,19 @@ org.apache.drill drill-common tests - ${drill.version} + ${project.version} test org.kududb kudu-client - 0.5.0 + 0.6.0 - - drill-1016 - https://repository.apache.org/content/repositories/orgapachedrill-1016/ - - false - - cdh.repo Cloudera Repositories @@ -78,9 +67,98 @@ + + + apache-snapshots + https://repository.apache.org/content/groups/snapshots/ + + true + + + false + + + + - + + maven-resources-plugin + + + copy-fmpp-resources + initialize + + copy-resources + + + ${project.build.directory}/codegen + + + src/main/codegen + false + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.8 + + + + unpack-vector-types + initialize + + unpack + + + + + org.apache.drill.exec + vector + ${project.version} + jar + true + ${project.build.directory}/ + codegen/data/ValueVectorTypes.tdd + + + + + + + + org.apache.drill.tools + drill-fmpp-maven-plugin + ${project.version} + + + org.freemarker + freemarker + 2.3.19 + + + + + generate-fmpp + generate-sources + + generate + + + ${project.build.directory}/codegen/config.fmpp + ${project.build.directory}/generated-sources + ${project.build.directory}/codegen/templates + + + + diff --git a/contrib/storage-kudu/src/main/codegen/config.fmpp b/contrib/storage-kudu/src/main/codegen/config.fmpp new file mode 100644 index 00000000000..40a29b4d7f8 --- /dev/null +++ b/contrib/storage-kudu/src/main/codegen/config.fmpp @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http:# www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +data: { + vv: tdd(../data/ValueVectorTypes.tdd), + +} +freemarkerLinks: { + includes: includes/ +} \ No newline at end of file diff --git a/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java new file mode 100644 index 00000000000..01c7c289849 --- /dev/null +++ b/contrib/storage-kudu/src/main/codegen/templates/KuduRecordWriter.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="org/apache/drill/exec/store/kudu/KuduRecordWriter.java" /> +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.kudu; + +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.fn.JsonOutput; +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.List; +import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.util.DecimalUtility; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.io.api.Binary; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.common.types.TypeProtos; +import org.joda.time.DateTimeUtils; +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.util.DecimalUtility; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.io.api.Binary; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.common.types.TypeProtos; +import org.joda.time.DateTimeUtils; +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.kududb.client.*; +import org.apache.drill.exec.store.*; + +public abstract class KuduRecordWriter extends AbstractRecordWriter implements RecordWriter { + + private PartialRow row; + + public void setUp(PartialRow row) { + this.row = row; + } + + <#list vv.types as type> + <#list type.minor as minor> + <#list vv.modes as mode> + + <#if mode.prefix == "Repeated" || + minor.class == "TinyInt" || + minor.class == "UInt1" || + minor.class == "UInt2" || + minor.class == "SmallInt" || + minor.class == "Time" || + minor.class == "Decimal9" || + minor.class == "Decimal18" || + minor.class == "Date" || + minor.class == "UInt4" || + minor.class == "Decimal28Sparse" || + minor.class == "Decimal38Sparse" || + minor.class?contains("Interval") + > + + <#else> + @Override + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { + return new ${mode.prefix}${minor.class}KuduConverter(fieldId, fieldName, reader); + } + + public class ${mode.prefix}${minor.class}KuduConverter extends FieldConverter { + private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder(); + + public ${mode.prefix}${minor.class}KuduConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void writeField() throws IOException { + + <#if mode.prefix == "Nullable" > + if (!reader.isSet()) { + return; + } + + + reader.read(holder); + + <#if minor.class == "Float4"> + row.addFloat(fieldId, holder.value); + <#elseif minor.class == "TimeStamp"> + row.addLong(fieldId, holder.value*1000); + <#elseif minor.class == "Int"> + row.addInt(fieldId, holder.value); + <#elseif minor.class == "BigInt"> + row.addLong(fieldId, holder.value); + <#elseif minor.class == "Float8"> + row.addDouble(fieldId, holder.value); + <#elseif minor.class == "Bit"> + row.addBoolean(fieldId, holder.value == 1); + <#elseif minor.class == "VarChar" > + byte[] bytes = new byte[holder.end - holder.start]; + holder.buffer.getBytes(holder.start, bytes); + row.addStringUtf8(fieldId, bytes); + <#elseif minor.class == "VarBinary"> + byte[] bytes = new byte[holder.end - holder.start]; + holder.buffer.getBytes(holder.start, bytes); + row.addBinary(fieldId, bytes); + reader.read(holder); + <#else> + throw new UnsupportedOperationException(); + + } + } + + + + + } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index a97df77cb47..abd2ab7f4da 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -75,7 +75,7 @@ public class KuduRecordReader extends AbstractRecordReader { private final KuduSubScanSpec scanSpec; private KuduScanner scanner; private RowResultIterator iterator; - + private OutputMutator output; private OperatorContext context; @@ -84,8 +84,8 @@ private static class ProjectedColumnInfo { ValueVector vv; ColumnSchema kuduColumn; } - private ImmutableList projectedCols; + private ImmutableList projectedCols; public KuduRecordReader(KuduClient client, KuduSubScan.KuduSubScanSpec subScanSpec, List projectedColumns, FragmentContext context) { @@ -101,7 +101,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio this.context = context; try { KuduTable table = client.openTable(scanSpec.getTableName()); - + KuduScannerBuilder builder = client.newScannerBuilder(table); if (!isStarQuery()) { List colNames = Lists.newArrayList(); @@ -114,9 +114,9 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio context.getStats().startWait(); try { scanner = builder - .lowerBoundPartitionKeyRaw(scanSpec.getStartKey()) - .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey()) - .build(); + .lowerBoundPartitionKeyRaw(scanSpec.getStartKey()) + .exclusiveUpperBoundPartitionKeyRaw(scanSpec.getEndKey()) + .build(); } finally { context.getStats().stopWait(); } @@ -125,7 +125,7 @@ public void setup(OperatorContext context, OutputMutator output) throws Executio } } - static final Map TYPES; + static final Map TYPES; static { TYPES = ImmutableMap. builder() @@ -169,14 +169,14 @@ public int next() { } return rowCount; } - + @SuppressWarnings("unchecked") private void initCols(Schema schema) throws SchemaChangeException { ImmutableList.Builder pciBuilder = ImmutableList.builder(); - + for (int i = 0; i < schema.getColumnCount(); i++) { - ColumnSchema col = schema.getColumnByIndex(i); - + ColumnSchema col = schema.getColumnByIndex(i); + final String name = col.getName(); final Type kuduType = col.getType(); MinorType minorType = TYPES.get(kuduType); @@ -184,7 +184,7 @@ private void initCols(Schema schema) throws SchemaChangeException { logger.warn("Ignoring column that is unsupported.", UserException .unsupportedError() .message( - "A column you queried has a data type that is not currently supported by the JDBC storage plugin. " + "A column you queried has a data type that is not currently supported by the Kudu storage plugin. " + "The column's name was %s and its Kudu data type was %s. ", name, kuduType.toString()) .addContext("column Name", name) @@ -204,7 +204,7 @@ private void initCols(Schema schema) throws SchemaChangeException { minorType, majorType.getMode()); ValueVector vector = output.addField(field, clazz); vector.allocateNew(); - + ProjectedColumnInfo pci = new ProjectedColumnInfo(); pci.vv = vector; pci.kuduColumn = col; @@ -214,111 +214,109 @@ private void initCols(Schema schema) throws SchemaChangeException { projectedCols = pciBuilder.build(); } - + private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeException { if (projectedCols == null) { initCols(result.getColumnProjection()); } - + for (ProjectedColumnInfo pci : projectedCols) { if (result.isNull(pci.index)) { continue; } switch (pci.kuduColumn.getType()) { - case BINARY: - { + case BINARY: { ByteBuffer value = result.getBinary(pci.index); if (pci.kuduColumn.isNullable()) { - ((NullableVarBinaryVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + ((NullableVarBinaryVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, value, 0, value.remaining()); } else { - ((VarBinaryVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + ((VarBinaryVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, value, 0, value.remaining()); } break; } - case STRING: - { + case STRING: { ByteBuffer value = result.getBinary(pci.index); if (pci.kuduColumn.isNullable()) { - ((NullableVarCharVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + ((NullableVarCharVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, value, 0, value.remaining()); } else { - ((VarCharVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, value, 0, value.remaining()); + ((VarCharVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, value, 0, value.remaining()); } break; } case BOOL: if (pci.kuduColumn.isNullable()) { - ((NullableBitVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0); + ((NullableBitVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0); } else { - ((BitVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0); + ((BitVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getBoolean(pci.index) ? 1 : 0); } break; case DOUBLE: if (pci.kuduColumn.isNullable()) { - ((NullableFloat8Vector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getDouble(pci.index)); + ((NullableFloat8Vector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getDouble(pci.index)); } else { - ((Float8Vector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getDouble(pci.index)); + ((Float8Vector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getDouble(pci.index)); } break; case FLOAT: if (pci.kuduColumn.isNullable()) { - ((NullableFloat4Vector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getFloat(pci.index)); + ((NullableFloat4Vector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getFloat(pci.index)); } else { - ((Float4Vector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getFloat(pci.index)); + ((Float4Vector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getFloat(pci.index)); } break; case INT16: if (pci.kuduColumn.isNullable()) { - ((NullableIntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getShort(pci.index)); + ((NullableIntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getShort(pci.index)); } else { - ((IntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getShort(pci.index)); + ((IntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getShort(pci.index)); } break; case INT32: if (pci.kuduColumn.isNullable()) { - ((NullableIntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getInt(pci.index)); + ((NullableIntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getInt(pci.index)); } else { - ((IntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getInt(pci.index)); + ((IntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getInt(pci.index)); } break; case INT8: if (pci.kuduColumn.isNullable()) { - ((NullableIntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getByte(pci.index)); + ((NullableIntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getByte(pci.index)); } else { - ((IntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getByte(pci.index)); + ((IntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getByte(pci.index)); } break; case INT64: if (pci.kuduColumn.isNullable()) { - ((NullableBigIntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getLong(pci.index)); + ((NullableBigIntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getLong(pci.index)); } else { - ((BigIntVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getLong(pci.index)); + ((BigIntVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getLong(pci.index)); } break; case TIMESTAMP: if (pci.kuduColumn.isNullable()) { - ((NullableTimeStampVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getLong(pci.index) / 1000); + ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getLong(pci.index) / 1000); } else { - ((TimeStampVector.Mutator)pci.vv.getMutator()) - .setSafe(rowIndex, result.getLong(pci.index) / 1000); + ((TimeStampVector.Mutator) pci.vv.getMutator()) + .setSafe(rowIndex, result.getLong(pci.index) / 1000); } break; default: @@ -327,9 +325,8 @@ private void addRowResult(RowResult result, int rowIndex) throws SchemaChangeExc } } - @Override public void close() { } - + } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java new file mode 100644 index 00000000000..6b39cc585aa --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordWriterImpl.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.kududb.ColumnSchema; +import org.kududb.Schema; +import org.kududb.Type; +import org.kududb.client.Insert; +import org.kududb.client.KuduClient; +import org.kududb.client.KuduSession; +import org.kududb.client.KuduTable; +import org.kududb.client.OperationResponse; +import org.kududb.client.SessionConfiguration.FlushMode; + +public class KuduRecordWriterImpl extends KuduRecordWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduRecordWriterImpl.class); + + private static final int FLUSH_FREQUENCY = 100; + + private final KuduClient client; + private final String name; + private final OperatorContext context; + private KuduTable table; + private KuduSession session; + + private Insert insert; + private int recordsSinceFlush; + + public KuduRecordWriterImpl(OperatorContext context, KuduClient client, String name) { + this.client = client; + this.name = name; + this.context = context; + session = client.newSession(); + session.setFlushMode(FlushMode.MANUAL_FLUSH); + } + + @Override + public void init(Map writerOptions) throws IOException { + + } + + @Override + public void updateSchema(VectorAccessible batch) throws IOException { + BatchSchema schema = batch.getSchema(); + int i = 0; + + try { + if (!checkForTable(name)) { + List columns = new ArrayList<>(); + for (MaterializedField f : schema) { + columns.add(new ColumnSchema.ColumnSchemaBuilder(f.getLastName(), getType(f.getType())) + .nullable(f.getType().getMode() == DataMode.OPTIONAL) + .key(i == 0).build()); + i++; + } + Schema kuduSchema = new Schema(columns); + table = client.createTable(name, kuduSchema); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + private boolean checkForTable(String name) throws Exception { + return !client.getTablesList(name).getTablesList().isEmpty(); + } + + private Type getType(MajorType t) { + + if(t.getMode() == DataMode.REPEATED){ + throw UserException + .dataWriteError() + .message("Kudu does not support array types.") + .build(logger); + } + + switch (t.getMinorType()) { + case BIGINT: + return Type.INT64; + case BIT: + return Type.BOOL; + case FLOAT4: + return Type.FLOAT; + case FLOAT8: + return Type.DOUBLE; + case INT: + return Type.INT32; + case TIMESTAMP: + return Type.TIMESTAMP; + case VARBINARY: + return Type.BINARY; + case VARCHAR: + return Type.STRING; + default: + throw UserException + .dataWriteError() + .message("Data type: '%s' not supported in Kudu.", t.getMinorType().name()) + .build(logger); + } + } + + @Override + public void startRecord() throws IOException { + insert = table.newInsert(); + setUp(insert.getRow()); + } + + @Override + public void endRecord() throws IOException { + try { + session.apply(insert); + recordsSinceFlush++; + if (recordsSinceFlush == FLUSH_FREQUENCY) { + flush(); + recordsSinceFlush = 0; + } + insert = null; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void abort() throws IOException { + } + + private void flush() throws IOException { + try { + // context.getStats().startWait(); + List responses = session.flush(); + for (OperationResponse response : responses) { + if (response.hasRowError()) { + throw new IOException(response.getRowError().toString()); + } + } + } catch (Exception e) { + throw new IOException(e); + } finally { + // context.getStats().stopWait(); + } + } + + @Override + public void cleanup() throws IOException { + flush(); + } +} diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java index 7ea4f2f1b41..af2775d7e4a 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSchemaFactory.java @@ -19,11 +19,15 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Set; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Writer; +import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.SchemaFactory; @@ -96,6 +100,23 @@ public Set getTableNames() { } } + @Override + public CreateTableEntry createNewTable(final String tableName, List partitionColumns) { + return new CreateTableEntry(){ + + @Override + public Writer getWriter(PhysicalOperator child) throws IOException { + return new KuduWriter(child, tableName, plugin); + } + + @Override + public List getPartitionColumns() { + return Collections.emptyList(); + } + + }; + } + @Override public void dropTable(String tableName) { try { diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java index 5e981b84612..15aa469ca9a 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduStoragePlugin.java @@ -18,16 +18,12 @@ package org.apache.drill.exec.store.kudu; import java.io.IOException; -import java.util.Collections; -import java.util.Set; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; -import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; -import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.kududb.client.KuduClient; import com.fasterxml.jackson.core.type.TypeReference; @@ -82,6 +78,11 @@ public KuduGroupScan getPhysicalScan(String userName, JSONOptions selection) thr return new KuduGroupScan(this, scanSpec, null); } + @Override + public boolean supportsWrite() { + return true; + } + @Override public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { schemaFactory.registerSchemas(schemaConfig, parent); @@ -92,8 +93,4 @@ public KuduStoragePluginConfig getConfig() { return engineConfig; } - @Override - public Set getOptimizerRules(OptimizerRulesContext optimizerRulesContext) { - return Collections.EMPTY_SET; - } } \ No newline at end of file diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java new file mode 100644 index 00000000000..03e29d3268c --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.io.IOException; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class KuduWriter extends AbstractWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriter.class); + + private final KuduStoragePlugin plugin; + private final String name; + + @JsonCreator + public KuduWriter( + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("name") String name, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException { + super(child); + this.plugin = (KuduStoragePlugin) engineRegistry.getPlugin(storageConfig); + this.name = name; + } + + + KuduWriter(PhysicalOperator child, String name, KuduStoragePlugin plugin) { + super(child); + this.name = name; + this.plugin = plugin; + } + + @Override + public int getOperatorType() { + return 3001; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new KuduWriter(child, name, plugin); + } + + public String getName() { + return name; + } + + public StoragePluginConfig getStorage() { + return plugin.getConfig(); + } + + @JsonIgnore + public KuduStoragePlugin getPlugin() { + return plugin; + } +} diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java new file mode 100644 index 00000000000..c200c17bb9e --- /dev/null +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriterBatchCreator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kudu; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.WriterRecordBatch; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; + +public class KuduWriterBatchCreator implements BatchCreator { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduWriterBatchCreator.class); + + @Override + public CloseableRecordBatch getBatch(FragmentContext context, KuduWriter config, List children) + throws ExecutionSetupException { + assert children != null && children.size() == 1; + + return new WriterRecordBatch(config, children.iterator().next(), context, new KuduRecordWriterImpl( + null, + config.getPlugin().getClient(), + config.getName())); + } +} + diff --git a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json index 3ba12c02f35..52884a6d3ba 100644 --- a/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-kudu/src/main/resources/bootstrap-storage-plugins.json @@ -2,8 +2,8 @@ "storage":{ kudu : { type:"kudu", - masterAddresses: "172.31.1.99", - enabled: true + masterAddresses: "1.2.3.4", + enabled: false } } } diff --git a/contrib/storage-kudu/src/main/resources/checkstyle-config.xml b/contrib/storage-kudu/src/main/resources/checkstyle-config.xml deleted file mode 100644 index 74cc856972a..00000000000 --- a/contrib/storage-kudu/src/main/resources/checkstyle-config.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml b/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml deleted file mode 100644 index 9d4682b9598..00000000000 --- a/contrib/storage-kudu/src/main/resources/checkstyle-suppressions.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java index 5f36f805740..0ee01349b02 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java @@ -20,11 +20,12 @@ import java.util.ArrayList; import java.util.List; +import org.junit.Ignore; import org.junit.Test; import org.kududb.ColumnSchema; import org.kududb.Schema; import org.kududb.Type; -import org.kududb.client.CreateTableBuilder; +import org.kududb.client.CreateTableOptions; import org.kududb.client.Insert; import org.kududb.client.KuduClient; import org.kududb.client.KuduScanner; @@ -36,9 +37,7 @@ import org.kududb.client.RowResultIterator; import org.kududb.client.SessionConfiguration; -import static org.kududb.Type.STRING; - - +@Ignore("requires remote kudu server") public class TestKuduConnect { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class); @@ -62,7 +61,7 @@ public static void createKuduTable(String tableName, int tablets, int replicas, Schema schema = new Schema(columns); - CreateTableBuilder builder = new CreateTableBuilder(); + CreateTableOptions builder = new CreateTableOptions(); builder.setNumReplicas(replicas); for (int i = 1; i < tablets; i++) { PartialRow splitRow = schema.newPartialRow(); diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java index d5e138ff5f5..450a1ad63a5 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java @@ -18,8 +18,10 @@ package org.apache.drill.store.kudu; import org.apache.drill.BaseTestQuery; +import org.junit.Ignore; import org.junit.Test; +@Ignore("requires a remote kudu server to run.") public class TestKuduPlugin extends BaseTestQuery { @Test @@ -33,4 +35,12 @@ public void testDescribe() throws Exception { test("show tables;"); test("describe demo"); } + + @Test + public void testCreate() throws Exception { + test("create table kudu.regions as select 1, * from sys.options limit 1"); + test("select * from kudu.regions"); + test("drop table kudu.regions"); + + } } diff --git a/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json new file mode 100644 index 00000000000..3ba12c02f35 --- /dev/null +++ b/contrib/storage-kudu/src/test/resources/bootstrap-storage-plugins.json @@ -0,0 +1,9 @@ +{ + "storage":{ + kudu : { + type:"kudu", + masterAddresses: "172.31.1.99", + enabled: true + } + } +} diff --git a/distribution/pom.xml b/distribution/pom.xml index 5aaf09d4bb3..135c974dfe8 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -166,6 +166,11 @@ tpch-sample-data ${project.version} + + org.apache.drill.contrib + drill-kudu-storage + ${project.version} + org.apache.drill.contrib drill-mongo-storage diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml index 449ac6c2947..12682e289f8 100644 --- a/distribution/src/assemble/bin.xml +++ b/distribution/src/assemble/bin.xml @@ -97,6 +97,7 @@ org.apache.drill.contrib:drill-mongo-storage org.apache.drill.contrib:drill-storage-hbase org.apache.drill.contrib:drill-jdbc-storage + org.apache.drill.contrib:drill-kudu-storage org.apache.drill.contrib:drill-gis