From 0b9e8c7b9519927e79df43b0032992b542a2222f Mon Sep 17 00:00:00 2001 From: Arina Ielchiieva Date: Thu, 1 Feb 2018 17:44:43 +0000 Subject: [PATCH] DRILL-6130: Fix NPE during physical plan submission for various storage plugins 1. Fixed ser / de issues for Hive, Kafka, Hbase plugins. 2. Added physical plan submission unit test for all storage plugins in contrib module. 3. Refactoring. --- .../exec/store/hbase/HBaseGroupScan.java | 5 +- .../drill/exec/store/hbase/HBaseSubScan.java | 38 +++-- .../apache/drill/hbase/TestHBaseQueries.java | 6 + .../planner/sql/HivePartitionDescriptor.java | 14 +- ...vertHiveParquetScanToDrillParquetScan.java | 14 +- .../hive/HiveDrillNativeParquetScan.java | 22 +-- .../hive/HiveDrillNativeParquetSubScan.java | 4 +- .../drill/exec/store/hive/HiveScan.java | 68 +++++---- .../drill/exec/store/hive/HiveSubScan.java | 130 ++++++++++-------- .../drill/exec/hive/TestHiveStorage.java | 7 +- .../store/jdbc/TestJdbcPluginWithMySQLIT.java | 5 + .../exec/store/kafka/KafkaGroupScan.java | 40 +++--- .../drill/exec/store/kafka/KafkaSubScan.java | 56 ++++---- .../exec/store/kafka/KafkaQueriesTest.java | 21 +-- .../exec/store/kafka/MessageIteratorTest.java | 2 +- .../drill/exec/store/kafka/TestKafkaSuit.java | 6 +- ...Constants.java => TestQueryConstants.java} | 26 ++-- .../kafka/cluster/EmbeddedKafkaCluster.java | 4 +- .../drill/exec/store/kudu/KuduGroupScan.java | 39 +++--- .../drill/exec/store/kudu/KuduSubScan.java | 39 ++---- .../drill/store/kudu/TestKuduPlugin.java | 8 +- .../exec/store/mongo/TestMongoQueries.java | 7 + .../store/openTSDB/TestOpenTSDBPlugin.java | 7 +- .../drill/exec/proto/UserBitShared.java | 21 ++- .../exec/proto/beans/CoreOperatorType.java | 4 +- .../src/main/protobuf/UserBitShared.proto | 1 + 26 files changed, 326 insertions(+), 268 deletions(-) rename contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/{QueryConstants.java => TestQueryConstants.java} (52%) diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 69c2725dd54..11782987ac4 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -160,7 +160,7 @@ private void init() { statsCalculator = new TableStatsCalculator(conn, hbaseScanSpec, storagePlugin.getContext().getConfig(), storagePluginConfig); boolean foundStartRegion = false; - regionsToScan = new TreeMap(); + regionsToScan = new TreeMap<>(); for (HRegionLocation regionLocation : regionLocations) { HRegionInfo regionInfo = regionLocation.getRegionInfo(); if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { @@ -338,8 +338,7 @@ public HBaseSubScan getSpecificScan(int minorFragmentId) { assert minorFragmentId < endpointFragmentMapping.size() : String.format( "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), minorFragmentId); - return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig, - endpointFragmentMapping.get(minorFragmentId), columns); + return new HBaseSubScan(getUserName(), storagePlugin, endpointFragmentMapping.get(minorFragmentId), columns); } @Override diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java index 0527391a24b..bd179fbac35 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java @@ -24,7 +24,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -49,9 +48,6 @@ public class HBaseSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseSubScan.class); - @JsonProperty - public final HBaseStoragePluginConfig storage; - @JsonIgnore private final HBaseStoragePlugin hbaseStoragePlugin; private final List regionScanSpecList; private final List columns; @@ -59,34 +55,36 @@ public class HBaseSubScan extends AbstractBase implements SubScan { @JsonCreator public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, - @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("hbaseStoragePluginConfig") HBaseStoragePluginConfig hbaseStoragePluginConfig, @JsonProperty("regionScanSpecList") LinkedList regionScanSpecList, @JsonProperty("columns") List columns) throws ExecutionSetupException { - super(userName); - hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage); - this.regionScanSpecList = regionScanSpecList; - this.storage = (HBaseStoragePluginConfig) storage; - this.columns = columns; + this(userName, + (HBaseStoragePlugin) registry.getPlugin(hbaseStoragePluginConfig), + regionScanSpecList, + columns); } - public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config, - List regionInfoList, List columns) { + public HBaseSubScan(String userName, + HBaseStoragePlugin hbaseStoragePlugin, + List regionInfoList, + List columns) { super(userName); - hbaseStoragePlugin = plugin; - storage = config; + this.hbaseStoragePlugin = hbaseStoragePlugin; this.regionScanSpecList = regionInfoList; this.columns = columns; } - public List getRegionScanSpecList() { - return regionScanSpecList; + @JsonProperty + public HBaseStoragePluginConfig getHbaseStoragePluginConfig() { + return hbaseStoragePlugin.getConfig(); } - @JsonIgnore - public HBaseStoragePluginConfig getStorageConfig() { - return storage; + @JsonProperty + public List getRegionScanSpecList() { + return regionScanSpecList; } + @JsonProperty public List getColumns() { return columns; } @@ -109,7 +107,7 @@ public T accept(PhysicalVisitor physicalVis @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.isEmpty()); - return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns); + return new HBaseSubScan(getUserName(), hbaseStoragePlugin, regionScanSpecList, columns); } @Override diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java index ee839c5c3d0..0d5349943d7 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseQueries.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.drill.PlanTestBase; import org.apache.drill.categories.HbaseStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.rpc.user.QueryDataBatch; @@ -109,4 +110,9 @@ public void testSelectFromSchema() throws Exception { runHBaseSQLVerifyCount("SELECT row_key\n" + " FROM hbase.TestTableNullStr t WHERE row_key='a1'", 1); } + + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hbase.TestTableNullStr"); + } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index b22c14da80a..2d2bb6c6984 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -64,7 +64,7 @@ public HivePartitionDescriptor(@SuppressWarnings("unused") final PlannerSettings this.scanRel = scanRel; this.managedBuffer = managedBuffer.reallocIfNeeded(256); this.defaultPartitionValue = defaultPartitionValue; - for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).hiveReadEntry.table.partitionKeys) { + for (HiveTableWrapper.FieldSchemaWrapper wrapper : ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry().table.partitionKeys) { partitionMap.put(wrapper.name, i); i++; } @@ -88,7 +88,7 @@ public int getMaxHierarchyLevel() { @Override public String getBaseTableLocation() { - HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); return origEntry.table.getTable().getSd().getLocation(); } @@ -97,7 +97,7 @@ public void populatePartitionVectors(ValueVector[] vectors, List fieldNameMap) { int record = 0; final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); - final Map partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap(); + final Map partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap(); for(PartitionLocation partitionLocation: partitions) { for(int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)){ final String hiveType = partitionNameTypeMap.get(fieldNameMap.get(partitionColumnIndex)); @@ -126,7 +126,7 @@ int record = 0; public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); String partitionName = column.getAsNamePart().getName(); - Map partitionNameTypeMap = hiveScan.hiveReadEntry.table.getPartitionNameTypeMap(); + Map partitionNameTypeMap = hiveScan.getHiveReadEntry().table.getPartitionNameTypeMap(); String hiveType = partitionNameTypeMap.get(partitionName); PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hiveType); @@ -143,7 +143,7 @@ public Integer getIdIfValid(String name) { @Override protected void createPartitionSublists() { List locations = new LinkedList<>(); - HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).hiveReadEntry; + HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); for (Partition partition: origEntry.getPartitions()) { locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation())); } @@ -165,7 +165,7 @@ public TableScan createTableScan(List newPartitions, boolean private GroupScan createNewGroupScan(List newPartitionLocations) throws ExecutionSetupException { HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); - HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; + HiveReadEntry origReadEntry = hiveScan.getHiveReadEntry(); List oldPartitions = origReadEntry.partitions; List newPartitions = Lists.newLinkedList(); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java index b1b966a4672..a7322454bc1 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java @@ -97,16 +97,16 @@ public boolean matches(RelOptRuleCall call) { final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); final HiveConf hiveConf = hiveScan.getHiveConf(); - final HiveTableWithColumnCache hiveTable = hiveScan.hiveReadEntry.getTable(); + final HiveTableWithColumnCache hiveTable = hiveScan.getHiveReadEntry().getTable(); final Class> tableInputFormat = - getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.hiveReadEntry, hiveTable.getSd(), + getInputFormatFromSD(HiveUtilities.getTableMetadata(hiveTable), hiveScan.getHiveReadEntry(), hiveTable.getSd(), hiveConf); if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class)) { return false; } - final List partitions = hiveScan.hiveReadEntry.getHivePartitionWrappers(); + final List partitions = hiveScan.getHiveReadEntry().getHivePartitionWrappers(); if (partitions == null) { return true; } @@ -116,7 +116,7 @@ public boolean matches(RelOptRuleCall call) { for (HivePartitionWrapper partition : partitions) { final StorageDescriptor partitionSD = partition.getPartition().getSd(); Class> inputFormat = getInputFormatFromSD( - HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry, partitionSD, + HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.getHiveReadEntry(), partitionSD, hiveConf); if (inputFormat == null || !inputFormat.equals(tableInputFormat)) { return false; @@ -172,7 +172,7 @@ public void onMatch(RelOptRuleCall call) { final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); final String partitionColumnLabel = settings.getFsPartitionColumnLabel(); - final Table hiveTable = hiveScan.hiveReadEntry.getTable(); + final Table hiveTable = hiveScan.getHiveReadEntry().getTable(); checkForUnsupportedDataTypes(hiveTable); final Map partitionColMapping = @@ -245,8 +245,8 @@ private DrillScanRel createNativeScanRel(final Map partitionColM final HiveDrillNativeParquetScan nativeHiveScan = new HiveDrillNativeParquetScan( hiveScan.getUserName(), - hiveScan.hiveReadEntry, - hiveScan.storagePlugin, + hiveScan.getHiveReadEntry(), + hiveScan.getStoragePlugin(), nativeScanCols, null); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index ccec61a73cb..202bd435e1c 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -41,16 +41,16 @@ public class HiveDrillNativeParquetScan extends HiveScan { @JsonCreator public HiveDrillNativeParquetScan(@JsonProperty("userName") String userName, - @JsonProperty("hive-table") HiveReadEntry hiveReadEntry, - @JsonProperty("storage-plugin") String storagePluginName, + @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") List columns, @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - super(userName, hiveReadEntry, storagePluginName, columns, pluginRegistry); + super(userName, hiveReadEntry, hiveStoragePluginConfig, columns, pluginRegistry); } - public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, + public HiveDrillNativeParquetScan(String userName, HiveReadEntry hiveReadEntry, HiveStoragePlugin hiveStoragePlugin, List columns, HiveMetadataProvider metadataProvider) throws ExecutionSetupException { - super(userName, hiveReadEntry, storagePlugin, columns, metadataProvider); + super(userName, hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); } public HiveDrillNativeParquetScan(final HiveScan hiveScan) { @@ -91,7 +91,7 @@ public PhysicalOperator getNewWithChildren(List children) thro @Override public HiveScan clone(HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider); + return new HiveDrillNativeParquetScan(getUserName(), hiveReadEntry, getStoragePlugin(), getColumns(), getMetadataProvider()); } @Override @@ -103,12 +103,12 @@ public GroupScan clone(List columns) { @Override public String toString() { - final List partitions = hiveReadEntry.getHivePartitionWrappers(); + final List partitions = getHiveReadEntry().getHivePartitionWrappers(); int numPartitions = partitions == null ? 0 : partitions.size(); - return "HiveDrillNativeParquetScan [table=" + hiveReadEntry.getHiveTableWrapper() - + ", columns=" + columns + return "HiveDrillNativeParquetScan [table=" + getHiveReadEntry().getHiveTableWrapper() + + ", columns=" + getColumns() + ", numPartitions=" + numPartitions + ", partitions= " + partitions - + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + "]"; + + ", inputDirectories=" + getMetadataProvider().getInputDirectories(getHiveReadEntry()) + "]"; } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java index 43cf98e166c..2129ed45423 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetSubScan.java @@ -42,9 +42,9 @@ public HiveDrillNativeParquetSubScan(@JacksonInject StoragePluginRegistry regist @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List splitClasses, @JsonProperty("columns") List columns, - @JsonProperty("storagePluginName") String pluginName) + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig) throws IOException, ExecutionSetupException, ReflectiveOperationException { - super(registry, userName, splits, hiveReadEntry, splitClasses, columns, pluginName); + super(registry, userName, splits, hiveReadEntry, splitClasses, columns, hiveStoragePluginConfig); } public HiveDrillNativeParquetSubScan(final HiveSubScan subScan) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index cf8a67191a0..11d47f304cb 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -61,41 +61,36 @@ public class HiveScan extends AbstractGroupScan { private static int HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN = 20; - @JsonProperty("hive-table") - public HiveReadEntry hiveReadEntry; + private final HiveStoragePlugin hiveStoragePlugin; + private final HiveReadEntry hiveReadEntry; + private final HiveMetadataProvider metadataProvider; - @JsonIgnore - public HiveStoragePlugin storagePlugin; - - @JsonProperty("columns") - public List columns; - - @JsonIgnore - protected final HiveMetadataProvider metadataProvider; - - @JsonIgnore private List> mappings; + private List inputSplits; - @JsonIgnore - protected List inputSplits; + protected List columns; @JsonCreator public HiveScan(@JsonProperty("userName") final String userName, - @JsonProperty("hive-table") final HiveReadEntry hiveReadEntry, - @JsonProperty("storage-plugin") final String storagePluginName, + @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry, + @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") final List columns, @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { - this(userName, hiveReadEntry, (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName), columns, null); + this(userName, + hiveReadEntry, + (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig), + columns, + null); } - public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, + public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin, final List columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException { super(userName); this.hiveReadEntry = hiveReadEntry; this.columns = columns; - this.storagePlugin = storagePlugin; + this.hiveStoragePlugin = hiveStoragePlugin; if (metadataProvider == null) { - this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, storagePlugin.getHiveConf()); + this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf()); } else { this.metadataProvider = metadataProvider; } @@ -105,19 +100,39 @@ public HiveScan(final HiveScan that) { super(that); this.columns = that.columns; this.hiveReadEntry = that.hiveReadEntry; - this.storagePlugin = that.storagePlugin; + this.hiveStoragePlugin = that.hiveStoragePlugin; this.metadataProvider = that.metadataProvider; } public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveScan(getUserName(), hiveReadEntry, storagePlugin, columns, metadataProvider); + return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); + } + + @JsonProperty + public HiveReadEntry getHiveReadEntry() { + return hiveReadEntry; } + @JsonProperty + public HiveStoragePluginConfig getHiveStoragePluginConfig() { + return hiveStoragePlugin.getConfig(); + } + + @JsonProperty public List getColumns() { return columns; } - protected List getInputSplits() { + @JsonIgnore + public HiveStoragePlugin getStoragePlugin() { + return hiveStoragePlugin; + } + + protected HiveMetadataProvider getMetadataProvider() { + return metadataProvider; + } + + private List getInputSplits() { if (inputSplits == null) { inputSplits = metadataProvider.getInputSplits(hiveReadEntry); } @@ -125,6 +140,7 @@ protected List getInputSplits() { return inputSplits; } + @Override public void applyAssignments(final List endpoints) { mappings = new ArrayList<>(); @@ -160,7 +176,7 @@ public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupE } final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts); - return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, storagePlugin); + return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -174,7 +190,7 @@ public int getMaxParallelizationWidth() { @Override public List getOperatorAffinity() { final Map endpointMap = new HashMap<>(); - for (final DrillbitEndpoint endpoint : storagePlugin.getContext().getBits()) { + for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) { endpointMap.put(endpoint.getAddress(), endpoint); logger.debug("endpoing address: {}", endpoint.getAddress()); } @@ -285,7 +301,7 @@ public boolean supportsPartitionFilterPushdown() { @JsonIgnore public HiveConf getHiveConf() { - return storagePlugin.getHiveConf(); + return hiveStoragePlugin.getHiveConf(); } @JsonIgnore diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index a1990a07893..8ca8647cb1d 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -24,6 +24,7 @@ import java.util.List; import com.fasterxml.jackson.annotation.JacksonInject; +import com.google.common.collect.ImmutableSet; import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -40,26 +41,20 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.collect.Iterators; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteStreams; @JsonTypeName("hive-sub-scan") public class HiveSubScan extends AbstractBase implements SubScan { - protected HiveReadEntry hiveReadEntry; - @JsonIgnore - protected List> inputSplits = new ArrayList<>(); - @JsonIgnore - protected HiveTableWithColumnCache table; - @JsonIgnore - protected List partitions; - @JsonIgnore - protected HiveStoragePlugin storagePlugin; - - private List> splits; - private List splitClasses; - protected List columns; + private final HiveReadEntry hiveReadEntry; + private final List> inputSplits = new ArrayList<>(); + private final HiveStoragePlugin hiveStoragePlugin; + private final List> splits; + private final List splitClasses; + private final HiveTableWithColumnCache table; + private final List partitions; + private final List columns; @JsonCreator public HiveSubScan(@JacksonInject StoragePluginRegistry registry, @@ -68,13 +63,22 @@ public HiveSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List splitClasses, @JsonProperty("columns") List columns, - @JsonProperty("storagePluginName") String pluginName) + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig) throws IOException, ExecutionSetupException, ReflectiveOperationException { - this(userName, splits, hiveReadEntry, splitClasses, columns, (HiveStoragePlugin)registry.getPlugin(pluginName)); - } - - public HiveSubScan(final String userName, final List> splits, final HiveReadEntry hiveReadEntry, - final List splitClasses, final List columns, final HiveStoragePlugin plugin) + this(userName, + splits, + hiveReadEntry, + splitClasses, + columns, + (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig)); + } + + public HiveSubScan(final String userName, + final List> splits, + final HiveReadEntry hiveReadEntry, + final List splitClasses, + final List columns, + final HiveStoragePlugin hiveStoragePlugin) throws IOException, ReflectiveOperationException { super(userName); this.hiveReadEntry = hiveReadEntry; @@ -83,66 +87,61 @@ public HiveSubScan(final String userName, final List> splits, final this.splits = splits; this.splitClasses = splitClasses; this.columns = columns; - this.storagePlugin = plugin; + this.hiveStoragePlugin = hiveStoragePlugin; for (int i = 0; i < splits.size(); i++) { inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i))); } } - @JsonProperty("storagePluginName") - @SuppressWarnings("unused") - public String getStoragePluginName() { - return storagePlugin.getName(); - } - - @JsonIgnore - public HiveStoragePlugin getStoragePlugin() { - return storagePlugin; - } - + @JsonProperty public List> getSplits() { return splits; } - public HiveTableWithColumnCache getTable() { - return table; - } - - public List getPartitions() { - return partitions; + @JsonProperty + public HiveReadEntry getHiveReadEntry() { + return hiveReadEntry; } + @JsonProperty public List getSplitClasses() { return splitClasses; } + @JsonProperty public List getColumns() { return columns; } + @JsonProperty + public HiveStoragePluginConfig getHiveStoragePluginConfig() { + return hiveStoragePlugin.getConfig(); + } + + @JsonIgnore + public HiveTableWithColumnCache getTable() { + return table; + } + + @JsonIgnore + public List getPartitions() { + return partitions; + } + + @JsonIgnore public List> getInputSplits() { return inputSplits; } - public HiveReadEntry getHiveReadEntry() { - return hiveReadEntry; + @JsonIgnore + public HiveStoragePlugin getStoragePlugin() { + return hiveStoragePlugin; } - public static List deserializeInputSplit(List base64, String className) throws IOException, ReflectiveOperationException{ - Constructor constructor = Class.forName(className).getDeclaredConstructor(); - if (constructor == null) { - throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor."); - } - constructor.setAccessible(true); - List splits = new ArrayList<>(); - for (String str : base64) { - InputSplit split = (InputSplit) constructor.newInstance(); - ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(str)); - split.readFields(byteArrayDataInput); - splits.add(split); - } - return splits; + @JsonIgnore + public HiveConf getHiveConf() { + return hiveStoragePlugin.getHiveConf(); } @Override @@ -153,7 +152,7 @@ public T accept(PhysicalVisitor physicalVis @Override public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { try { - return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, storagePlugin); + return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -161,7 +160,7 @@ public PhysicalOperator getNewWithChildren(List children) thro @Override public Iterator iterator() { - return Iterators.emptyIterator(); + return ImmutableSet.of().iterator(); } @Override @@ -169,8 +168,21 @@ public int getOperatorType() { return CoreOperatorType.HIVE_SUB_SCAN_VALUE; } - @JsonIgnore - public HiveConf getHiveConf() { - return storagePlugin.getHiveConf(); + private static List deserializeInputSplit(List base64, String className) + throws IOException, ReflectiveOperationException{ + Constructor constructor = Class.forName(className).getDeclaredConstructor(); + if (constructor == null) { + throw new ReflectiveOperationException("Class " + className + " does not implement a default constructor."); + } + constructor.setAccessible(true); + List splits = new ArrayList<>(); + for (String str : base64) { + InputSplit split = (InputSplit) constructor.newInstance(); + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(str)); + split.readFields(byteArrayDataInput); + splits.add(split); + } + return splits; } + } diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index 64f3b5bf7aa..c2412ad7e7f 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import org.apache.drill.PlanTestBase; import org.apache.drill.categories.HiveStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.common.exceptions.UserRemoteException; @@ -100,7 +101,6 @@ public void convertFromOnHiveBinaryType() throws Exception { /** * Test to ensure Drill reads the all supported types correctly both normal fields (converted to Nullable types) and * partition fields (converted to Required types). - * @throws Exception */ @Test public void readAllSupportedHiveDataTypes() throws Exception { @@ -558,6 +558,11 @@ public void testNonAsciiStringLiterals() throws Exception { .go(); } + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv"); + } + private void verifyColumnsMetadata(List columnsList, Map expectedResult) { for (UserProtos.ResultColumnMetadata columnMetadata : columnsList) { assertTrue("Column should be present in result set", expectedResult.containsKey(columnMetadata.getColumnName())); diff --git a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java index 00db46b9abb..7b8c21acc0d 100644 --- a/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java +++ b/contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithMySQLIT.java @@ -127,4 +127,9 @@ public void pushdownJoinAndFilterPushDown() throws Exception { testPlanMatchingPatterns(query, new String[] {}, new String[] { "Join", "Filter" }); } + @Test + public void testPhysicalPlanSubmission() throws Exception { + testPhysicalPlanExecutionBasedOnQuery("select * from mysql.`drill_mysql_test`.person"); + } + } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java index e08c7d78e5f..9cf575b3451 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java @@ -69,47 +69,49 @@ public class KafkaGroupScan extends AbstractGroupScan { private static final long MSG_SIZE = 1024; private final KafkaStoragePlugin kafkaStoragePlugin; - private final KafkaStoragePluginConfig kafkaStoragePluginConfig; - private List columns; private final KafkaScanSpec kafkaScanSpec; + private List columns; private List partitionWorkList; private ListMultimap assignments; private List affinities; @JsonCreator public KafkaGroupScan(@JsonProperty("userName") String userName, - @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, - @JsonProperty("columns") List columns, @JsonProperty("scanSpec") KafkaScanSpec scanSpec, - @JacksonInject StoragePluginRegistry pluginRegistry) { - this(userName, kafkaStoragePluginConfig, columns, scanSpec, (KafkaStoragePlugin) pluginRegistry); + @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, + @JsonProperty("columns") List columns, + @JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + this(userName, + (KafkaStoragePlugin) pluginRegistry.getPlugin(kafkaStoragePluginConfig), + columns, + scanSpec); } public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List columns) { super(StringUtils.EMPTY); this.kafkaStoragePlugin = kafkaStoragePlugin; - this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) kafkaStoragePlugin.getConfig(); this.columns = columns; this.kafkaScanSpec = kafkaScanSpec; init(); } - public KafkaGroupScan(String userName, KafkaStoragePluginConfig kafkaStoragePluginConfig, List columns, - KafkaScanSpec kafkaScanSpec, KafkaStoragePlugin pluginRegistry) { + public KafkaGroupScan(String userName, + KafkaStoragePlugin kafkaStoragePlugin, + List columns, + KafkaScanSpec kafkaScanSpec) { super(userName); - this.kafkaStoragePluginConfig = kafkaStoragePluginConfig; + this.kafkaStoragePlugin = kafkaStoragePlugin; this.columns = columns; this.kafkaScanSpec = kafkaScanSpec; - this.kafkaStoragePlugin = pluginRegistry; init(); } public KafkaGroupScan(KafkaGroupScan that) { super(that); - this.kafkaStoragePluginConfig = that.kafkaStoragePluginConfig; + this.kafkaStoragePlugin = that.kafkaStoragePlugin; this.columns = that.columns; this.kafkaScanSpec = that.kafkaScanSpec; - this.kafkaStoragePlugin = that.kafkaStoragePlugin; this.partitionWorkList = that.partitionWorkList; this.assignments = that.assignments; } @@ -242,7 +244,7 @@ public KafkaSubScan getSpecificScan(int minorFragmentId) { work.getBeginOffset(), work.getLatestOffset())); } - return new KafkaSubScan(getUserName(), kafkaStoragePlugin, kafkaStoragePluginConfig, columns, scanSpecList); + return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList); } @Override @@ -291,9 +293,9 @@ public GroupScan clone(List columns) { return clone; } - @JsonProperty("kafkaStoragePluginConfig") - public KafkaStoragePluginConfig getStorageConfig() { - return this.kafkaStoragePluginConfig; + @JsonProperty + public KafkaStoragePluginConfig getKafkaStoragePluginConfig() { + return kafkaStoragePlugin.getConfig(); } @JsonProperty @@ -301,8 +303,8 @@ public List getColumns() { return columns; } - @JsonProperty("kafkaScanSpec") - public KafkaScanSpec getScanSpec() { + @JsonProperty + public KafkaScanSpec getKafkaScanSpec() { return kafkaScanSpec; } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java index fc110b53e3c..468f766a968 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java @@ -41,34 +41,31 @@ @JsonTypeName("kafka-partition-scan") public class KafkaSubScan extends AbstractBase implements SubScan { - @JsonProperty - private final KafkaStoragePluginConfig KafkaStoragePluginConfig; - - @JsonIgnore private final KafkaStoragePlugin kafkaStoragePlugin; private final List columns; - private final List partitions; + private final List partitionSubScanSpecList; @JsonCreator - public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, - @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, - @JsonProperty("columns") List columns, - @JsonProperty("partitionSubScanSpecList") LinkedList partitions) + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("userName") String userName, + @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, + @JsonProperty("columns") List columns, + @JsonProperty("partitionSubScanSpecList") LinkedList partitionSubScanSpecList) throws ExecutionSetupException { - super(userName); - this.KafkaStoragePluginConfig = kafkaStoragePluginConfig; - this.columns = columns; - this.partitions = partitions; - this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig); + this(userName, + (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig), + columns, + partitionSubScanSpecList); } - public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig, - List columns, List partitionSubScanSpecList) { + public KafkaSubScan(String userName, + KafkaStoragePlugin kafkaStoragePlugin, + List columns, + List partitionSubScanSpecList) { super(userName); + this.kafkaStoragePlugin = kafkaStoragePlugin; this.columns = columns; - this.KafkaStoragePluginConfig = kafkStoragePluginConfig; - this.kafkaStoragePlugin = plugin; - this.partitions = partitionSubScanSpecList; + this.partitionSubScanSpecList = partitionSubScanSpecList; } @Override @@ -79,8 +76,7 @@ public T accept(PhysicalVisitor physicalVis @Override public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - return new KafkaSubScan(getUserName(), kafkaStoragePlugin, KafkaStoragePluginConfig, columns, - partitions); + return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList); } @Override @@ -88,22 +84,24 @@ public Iterator iterator() { return Collections.emptyIterator(); } - @JsonIgnore + @JsonProperty public KafkaStoragePluginConfig getKafkaStoragePluginConfig() { - return KafkaStoragePluginConfig; - } - - @JsonIgnore - public KafkaStoragePlugin getKafkaStoragePlugin() { - return kafkaStoragePlugin; + return kafkaStoragePlugin.getConfig(); } + @JsonProperty public List getColumns() { return columns; } + @JsonProperty public List getPartitionSubScanSpecList() { - return partitions; + return partitionSubScanSpecList; + } + + @JsonIgnore + public KafkaStoragePlugin getKafkaStoragePlugin() { + return kafkaStoragePlugin; } @Override diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index 1f5a1c0403d..ce9eb9984e3 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Set; @@ -39,7 +38,7 @@ public class KafkaQueriesTest extends KafkaTestBase { @Test public void testSqlQueryOnInvalidTopic() throws Exception { - String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.INVALID_TOPIC); + String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC); try { testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.> emptyList()) .build().run(); @@ -51,7 +50,7 @@ public void testSqlQueryOnInvalidTopic() throws Exception { @Test public void testResultCount() throws Exception { - String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC); runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG); } @@ -60,9 +59,9 @@ public void testPartitionMinOffset() throws Exception { // following kafka.tools.GetOffsetShell for earliest as -2 Map startOffsetsMap = fetchOffsets(-2); - String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset") - .baselineValues(startOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))).go(); + .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go(); } @Test @@ -70,9 +69,9 @@ public void testPartitionMaxOffset() throws Exception { // following kafka.tools.GetOffsetShell for latest as -1 Map endOffsetsMap = fetchOffsets(-1); - String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset") - .baselineValues(endOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go(); + .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go(); } private Map fetchOffsets(int flag) { @@ -80,7 +79,7 @@ private Map fetchOffsets(int flag) { new ByteArrayDeserializer(), new ByteArrayDeserializer()); Map offsetsMap = Maps.newHashMap(); - kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC)); + kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC)); // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions // evaluates lazily, seeking to the // first/last offset in all partitions only when poll(long) or @@ -110,4 +109,10 @@ private Map fetchOffsets(int flag) { return offsetsMap; } + @Test + public void testPhysicalPlanSubmission() throws Exception { + String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC); + testPhysicalPlanExecutionBasedOnQuery(query); + } + } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java index 0b315629564..4a155963fa3 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java @@ -49,7 +49,7 @@ public void setUp() { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); kafkaConsumer = new KafkaConsumer<>(consumerProps); - subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); + subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); } @After diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java index 34677cbf468..ed0174755ad 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -72,15 +72,15 @@ public static void initKafka() throws Exception { Properties topicProps = new Properties(); zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$); ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); - AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils - .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils); + .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils); logger.info("Topic Metadata: " + fetchTopicMetadataFromZk); KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); - generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG); + generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG); } initCount.incrementAndGet(); runningSuite = true; diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java similarity index 52% rename from contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java rename to contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java index ff58f7ea948..057af7eb9bc 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java @@ -17,24 +17,24 @@ */ package org.apache.drill.exec.store.kafka; -public interface QueryConstants { +public interface TestQueryConstants { // Kafka Server Prop Constants - public static final String BROKER_DELIM = ","; - public final String LOCAL_HOST = "127.0.0.1"; + String BROKER_DELIM = ","; + String LOCAL_HOST = "127.0.0.1"; // ZK - public final static String ZK_TMP = "zk_tmp"; - public final static int TICK_TIME = 500; - public final static int MAX_CLIENT_CONNECTIONS = 100; + String ZK_TMP = "zk_tmp"; + int TICK_TIME = 500; + int MAX_CLIENT_CONNECTIONS = 100; - public static final String JSON_TOPIC = "drill-json-topic"; - public static final String AVRO_TOPIC = "drill-avro-topic"; - public static final String INVALID_TOPIC = "invalid-topic"; + String JSON_TOPIC = "drill-json-topic"; + String AVRO_TOPIC = "drill-avro-topic"; + String INVALID_TOPIC = "invalid-topic"; // Queries - public static final String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`"; - public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`"; - public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`"; - public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`"; + String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`"; + String MSG_SELECT_QUERY = "select * from kafka.`%s`"; + String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`"; + String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`"; } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java index 319c66c11f4..663e0e47a86 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.ZookeeperHelper; import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; -import org.apache.drill.exec.store.kafka.QueryConstants; +import org.apache.drill.exec.store.kafka.TestQueryConstants; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.slf4j.Logger; @@ -35,7 +35,7 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; -public class EmbeddedKafkaCluster implements QueryConstants { +public class EmbeddedKafkaCluster implements TestQueryConstants { private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private List brokers; private final ZookeeperHelper zkHelper; diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java index dfc3c4470a6..7bddf18617e 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduGroupScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -45,7 +44,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; import org.apache.drill.exec.store.schedule.CompleteWork; @@ -59,10 +57,10 @@ public class KuduGroupScan extends AbstractGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduGroupScan.class); private static final long DEFAULT_TABLET_SIZE = 1000; - private KuduStoragePluginConfig storagePluginConfig; + private KuduStoragePlugin kuduStoragePlugin; private List columns; private KuduScanSpec kuduScanSpec; - private KuduStoragePlugin storagePlugin; + private boolean filterPushedDown = false; private List kuduWorkList = Lists.newArrayList(); private ListMultimap assignments; @@ -71,31 +69,31 @@ public class KuduGroupScan extends AbstractGroupScan { @JsonCreator public KuduGroupScan(@JsonProperty("kuduScanSpec") KuduScanSpec kuduScanSpec, - @JsonProperty("storage") KuduStoragePluginConfig storagePluginConfig, + @JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig, @JsonProperty("columns") List columns, @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { - this((KuduStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), kuduScanSpec, columns); + this((KuduStoragePlugin) pluginRegistry.getPlugin(kuduStoragePluginConfig), kuduScanSpec, columns); } - public KuduGroupScan(KuduStoragePlugin storagePlugin, KuduScanSpec scanSpec, - List columns) { + public KuduGroupScan(KuduStoragePlugin kuduStoragePlugin, + KuduScanSpec kuduScanSpec, + List columns) { super((String) null); - this.storagePlugin = storagePlugin; - this.storagePluginConfig = storagePlugin.getConfig(); - this.kuduScanSpec = scanSpec; + this.kuduStoragePlugin = kuduStoragePlugin; + this.kuduScanSpec = kuduScanSpec; this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; init(); } private void init() { String tableName = kuduScanSpec.getTableName(); - Collection endpoints = storagePlugin.getContext().getBits(); + Collection endpoints = kuduStoragePlugin.getContext().getBits(); Map endpointMap = Maps.newHashMap(); for (DrillbitEndpoint endpoint : endpoints) { endpointMap.put(endpoint.getAddress(), endpoint); } try { - List locations = storagePlugin.getClient().openTable(tableName).getTabletsLocations(10000); + List locations = kuduStoragePlugin.getClient().openTable(tableName).getTabletsLocations(10000); for (LocatedTablet tablet : locations) { KuduWork work = new KuduWork(tablet.getPartition().getPartitionKeyStart(), tablet.getPartition().getPartitionKeyEnd()); for (Replica replica : tablet.getReplicas()) { @@ -153,10 +151,9 @@ public int compareTo(CompleteWork o) { */ private KuduGroupScan(KuduGroupScan that) { super(that); + this.kuduStoragePlugin = that.kuduStoragePlugin; this.columns = that.columns; this.kuduScanSpec = that.kuduScanSpec; - this.storagePlugin = that.storagePlugin; - this.storagePluginConfig = that.storagePluginConfig; this.filterPushedDown = that.filterPushedDown; this.kuduWorkList = that.kuduWorkList; this.assignments = that.assignments; @@ -204,7 +201,7 @@ public KuduSubScan getSpecificScan(int minorFragmentId) { scanSpecList.add(new KuduSubScanSpec(getTableName(), work.getPartitionKeyStart(), work.getPartitionKeyEnd())); } - return new KuduSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns); + return new KuduSubScan(kuduStoragePlugin, scanSpecList, this.columns); } // KuduStoragePlugin plugin, KuduStoragePluginConfig config, @@ -224,7 +221,7 @@ public PhysicalOperator getNewWithChildren(List children) { @JsonIgnore public KuduStoragePlugin getStoragePlugin() { - return storagePlugin; + return kuduStoragePlugin; } @JsonIgnore @@ -244,9 +241,9 @@ public String toString() { + columns + "]"; } - @JsonProperty("storage") - public KuduStoragePluginConfig getStorageConfig() { - return this.storagePluginConfig; + @JsonProperty + public KuduStoragePluginConfig getKuduStoragePluginConfig() { + return kuduStoragePlugin.getConfig(); } @JsonProperty diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java index 9025db78add..ca577e753f2 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduSubScan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,9 +21,9 @@ import java.util.LinkedList; import java.util.List; +import com.google.common.collect.ImmutableSet; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -37,49 +37,40 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; // Class containing information for reading a single Kudu tablet -@JsonTypeName("kudu-tablet-scan") +@JsonTypeName("kudu-sub-scan") public class KuduSubScan extends AbstractBase implements SubScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KuduSubScan.class); - @JsonProperty - public final KuduStoragePluginConfig storage; - - private final KuduStoragePlugin kuduStoragePlugin; private final List tabletScanSpecList; private final List columns; @JsonCreator public KuduSubScan(@JacksonInject StoragePluginRegistry registry, - @JsonProperty("storage") StoragePluginConfig storage, - @JsonProperty("tabletScanSpecList") LinkedList tabletScanSpecList, + @JsonProperty("kuduStoragePluginConfig") KuduStoragePluginConfig kuduStoragePluginConfig, + @JsonProperty("tabletScanSpecList") LinkedList tabletScanSpecList, @JsonProperty("columns") List columns) throws ExecutionSetupException { super((String) null); - kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(storage); + kuduStoragePlugin = (KuduStoragePlugin) registry.getPlugin(kuduStoragePluginConfig); this.tabletScanSpecList = tabletScanSpecList; - this.storage = (KuduStoragePluginConfig) storage; this.columns = columns; } - public KuduSubScan(KuduStoragePlugin plugin, KuduStoragePluginConfig config, - List tabletInfoList, List columns) { + public KuduSubScan(KuduStoragePlugin plugin, List tabletInfoList, List columns) { super((String) null); - kuduStoragePlugin = plugin; - storage = config; + this.kuduStoragePlugin = plugin; this.tabletScanSpecList = tabletInfoList; this.columns = columns; } - public List getTabletScanSpecList() { - return tabletScanSpecList; + public KuduStoragePluginConfig getKuduStoragePluginConfig() { + return kuduStoragePlugin.getConfig(); } - @JsonIgnore - public KuduStoragePluginConfig getStorageConfig() { - return storage; + public List getTabletScanSpecList() { + return tabletScanSpecList; } public List getColumns() { @@ -104,12 +95,12 @@ public T accept(PhysicalVisitor physicalVis @Override public PhysicalOperator getNewWithChildren(List children) { Preconditions.checkArgument(children.isEmpty()); - return new KuduSubScan(kuduStoragePlugin, storage, tabletScanSpecList, columns); + return new KuduSubScan(kuduStoragePlugin, tabletScanSpecList, columns); } @Override public Iterator iterator() { - return Iterators.emptyIterator(); + return ImmutableSet.of().iterator(); } public static class KuduSubScanSpec { @@ -143,7 +134,7 @@ public byte[] getEndKey() { @Override public int getOperatorType() { - return CoreOperatorType.HBASE_SUB_SCAN_VALUE; + return CoreOperatorType.KUDU_SUB_SCAN_VALUE; } } diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java index b94033a2dc9..4e1c7fd5f74 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduPlugin.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.store.kudu; +import org.apache.drill.PlanTestBase; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.KuduStorageTest; import org.junit.Ignore; @@ -44,6 +45,11 @@ public void testCreate() throws Exception { test("create table kudu.regions as select 1, * from sys.options limit 1"); test("select * from kudu.regions"); test("drop table kudu.regions"); + } + @Test + public void testPhysicalPlanSubmission() throws Exception { + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from kudu.demo"); } + } diff --git a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java index d8043fdd1c6..8d0064f2f94 100644 --- a/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java +++ b/contrib/storage-mongo/src/test/java/org/apache/drill/exec/store/mongo/TestMongoQueries.java @@ -82,4 +82,11 @@ public void testUnShardedDBInShardedClusterWithGroupByProjectionAndFilter() thro DONUTS_DB, DONUTS_COLLECTION); runMongoSQLVerifyCount(queryString, 5); } + + @Test + public void testPhysicalPlanSubmission() throws Exception { + String query = String.format(TEST_BOOLEAN_FILTER_QUERY_TEMPLATE1, + EMPLOYEE_DB, EMPINFO_COLLECTION); + testPhysicalPlanExecutionBasedOnQuery(query); + } } diff --git a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java index 2d6c5063631..27ca09c61ae 100644 --- a/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java +++ b/contrib/storage-opentsdb/src/test/java/org/apache/drill/store/openTSDB/TestOpenTSDBPlugin.java @@ -174,10 +174,9 @@ public void testBasicQueryWithNonExistentTableName() throws Exception { } @Test - public void testPhysicalPlanExecutionBasedOnQuery() throws Exception { - String query = "EXPLAIN PLAN for select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; - String plan = getPlanInString(query, JSON_FORMAT); - Assert.assertEquals(18, testPhysical(plan)); + public void testPhysicalPlanSubmission() throws Exception { + String query = "select * from openTSDB.`(metric=warp.speed.test, start=47y-ago, aggregator=sum)`"; + testPhysicalPlanExecutionBasedOnQuery(query); } @Test diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 9ef1f8d5341..6ae9deb4cc8 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -521,6 +521,10 @@ public enum CoreOperatorType * KAFKA_SUB_SCAN = 38; */ KAFKA_SUB_SCAN(38, 38), + /** + * KUDU_SUB_SCAN = 39; + */ + KUDU_SUB_SCAN(39, 39), ; /** @@ -679,6 +683,10 @@ public enum CoreOperatorType * KAFKA_SUB_SCAN = 38; */ public static final int KAFKA_SUB_SCAN_VALUE = 38; + /** + * KUDU_SUB_SCAN = 39; + */ + public static final int KUDU_SUB_SCAN_VALUE = 39; public final int getNumber() { return value; } @@ -724,6 +732,7 @@ public static CoreOperatorType valueOf(int value) { case 36: return AVRO_SUB_SCAN; case 37: return PCAP_SUB_SCAN; case 38: return KAFKA_SUB_SCAN; + case 39: return KUDU_SUB_SCAN; default: return null; } } @@ -24104,7 +24113,7 @@ public Builder clearStatus() { "agmentState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALL" + "OCATION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\t" + "CANCELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_" + - "REQUESTED\020\006*\204\006\n\020CoreOperatorType\022\021\n\rSING" + + "REQUESTED\020\006*\227\006\n\020CoreOperatorType\022\021\n\rSING" + "LE_SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FIL" + "TER\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004" + "\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDE" + @@ -24123,11 +24132,11 @@ public Builder clearStatus() { "X_TO_JSON\020\037\022\025\n\021PRODUCER_CONSUMER\020 \022\022\n\016HB", "ASE_SUB_SCAN\020!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOO" + "P_JOIN\020#\022\021\n\rAVRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_" + - "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&*g\n\nSaslStatus" + - "\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SA" + - "SL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SA" + - "SL_FAILED\020\004B.\n\033org.apache.drill.exec.pro" + - "toB\rUserBitSharedH\001" + "SCAN\020%\022\022\n\016KAFKA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_S" + + "CAN\020\'*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n" + + "\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014S" + + "ASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.ap" + + "ache.drill.exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 8ad38a588c2..71595f72e99 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -60,7 +60,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite