Skip to content
Permalink
Browse files
DRILL-6575: Add store.hive.conf.properties option to allow set Hive p…
…roperties at session level

closes #1365
  • Loading branch information
arina-ielchiieva committed Jul 9, 2018
1 parent aee899c commit 139f7156bcb2f6fef5b36f116c9c1b6095fc4b9c
Showing 22 changed files with 457 additions and 247 deletions.
@@ -88,7 +88,7 @@ public void onMatch(RelOptRuleCall call) {
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
hiveScan.getStoragePlugin().getHiveConf());
hiveScan.getHiveConf());
if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
// table is empty, use original scan
return;
@@ -134,7 +134,7 @@ private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) throws
List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
.map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
.collect(Collectors.toList());
JsonTableGroupScan nariveMapRDBScan =
JsonTableGroupScan nativeMapRDBScan =
new JsonTableGroupScan(
hiveScan.getUserName(),
hiveScan.getStoragePlugin(),
@@ -155,7 +155,7 @@ private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) throws
hiveScanRel.getCluster(),
hiveScanRel.getTraitSet(),
hiveScanRel.getTable(),
nariveMapRDBScan,
nativeMapRDBScan,
nativeScanRowType,
hiveScanCols);
}
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.planner.sql.logical;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -43,6 +41,8 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@@ -88,7 +88,7 @@ public void onMatch(RelOptRuleCall call) {
final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();

final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf());
final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf());
final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry);

if (logicalInputSplits.isEmpty()) {
@@ -123,7 +123,7 @@ Hive does not always give correct costing (i.e. for external tables Hive does no
* Create mapping of Hive partition column to directory column mapping.
*/
private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) {
final Map<String, String> partitionColMapping = Maps.newHashMap();
final Map<String, String> partitionColMapping = new HashMap<>();
int i = 0;
for (FieldSchema col : hiveTable.getPartitionKeys()) {
partitionColMapping.put(col.getName(), partitionColumnLabel+i);
@@ -143,8 +143,8 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);

final List<String> nativeScanColNames = Lists.newArrayList();
final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
final List<String> nativeScanColNames = new ArrayList<>();
final List<RelDataType> nativeScanColTypes = new ArrayList<>();
for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
final String dirColName = partitionColMapping.get(field.getName());
if (dirColName != null) { // partition column
@@ -161,8 +161,8 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
// Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of
// columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is,
// unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'.
final List<SchemaPath> nativeScanCols = Lists.newArrayList();
for(SchemaPath colName : hiveScanRel.getColumns()) {
final List<SchemaPath> nativeScanCols = new ArrayList<>();
for (SchemaPath colName : hiveScanRel.getColumns()) {
final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath());
if (partitionCol != null) {
nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
@@ -177,7 +177,8 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
hiveScan.getUserName(),
nativeScanCols,
hiveScan.getStoragePlugin(),
logicalInputSplits);
logicalInputSplits,
hiveScan.getConfProperties());

return new DrillScanRel(
hiveScanRel.getCluster(),
@@ -194,7 +195,7 @@ private DrillScanRel createNativeScanRel(final Map<String, String> partitionColM
private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) {

final List<RexNode> rexNodes = Lists.newArrayList();
final List<RexNode> rexNodes = new ArrayList<>();
final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
final RelDataType hiveScanRowType = hiveScanRel.getRowType();

@@ -38,13 +38,15 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

@JsonTypeName("hive-drill-native-parquet-row-group-scan")
public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan {

private final HiveStoragePlugin hiveStoragePlugin;
private final HiveStoragePluginConfig hiveStoragePluginConfig;
private final HivePartitionHolder hivePartitionHolder;
private final Map<String, String> confProperties;

@JsonCreator
public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -53,12 +55,14 @@ public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry r
@JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
@JsonProperty("confProperties") Map<String, String> confProperties,
@JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
this(userName,
(HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
rowGroupReadEntries,
columns,
hivePartitionHolder,
confProperties,
filter);
}

@@ -67,11 +71,13 @@ public HiveDrillNativeParquetRowGroupScan(String userName,
List<RowGroupReadEntry> rowGroupReadEntries,
List<SchemaPath> columns,
HivePartitionHolder hivePartitionHolder,
Map<String, String> confProperties,
LogicalExpression filter) {
super(userName, rowGroupReadEntries, columns, filter);
this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration");
this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
this.hivePartitionHolder = hivePartitionHolder;
this.confProperties = confProperties;
}

@JsonProperty
@@ -84,6 +90,11 @@ public HivePartitionHolder getHivePartitionHolder() {
return hivePartitionHolder;
}

@JsonProperty
public Map<String, String> getConfProperties() {
return confProperties;
}

@JsonIgnore
public HiveStoragePlugin getHiveStoragePlugin() {
return hiveStoragePlugin;
@@ -92,7 +103,7 @@ public HiveStoragePlugin getHiveStoragePlugin() {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
}

@Override
@@ -102,7 +113,7 @@ public int getOperatorType() {

@Override
public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter);
}

@Override
@@ -114,7 +125,7 @@ public boolean areCorruptDatesAutoCorrected() {
public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
Path path = new Path(rowGroupReadEntry.getPath()).getParent();
return new ProjectionPusher().pushProjectionsAndFilters(
new JobConf(hiveStoragePlugin.getHiveConf()),
new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
path.getParent());
}

@@ -62,7 +62,8 @@
public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {

private final HiveStoragePlugin hiveStoragePlugin;
private HivePartitionHolder hivePartitionHolder;
private final HivePartitionHolder hivePartitionHolder;
private final Map<String, String> confProperties;

@JsonCreator
public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -71,30 +72,35 @@ public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineReg
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("entries") List<ReadEntryWithPath> entries,
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
@JsonProperty("confProperties") Map<String, String> confProperties,
@JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter);
this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig);
this.hivePartitionHolder = hivePartitionHolder;
this.confProperties = confProperties;

init();
}

public HiveDrillNativeParquetScan(String userName,
List<SchemaPath> columns,
HiveStoragePlugin hiveStoragePlugin,
List<LogicalInputSplit> logicalInputSplits) throws IOException {
this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE);
List<LogicalInputSplit> logicalInputSplits,
Map<String, String> confProperties) throws IOException {
this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE);
}

public HiveDrillNativeParquetScan(String userName,
List<SchemaPath> columns,
HiveStoragePlugin hiveStoragePlugin,
List<LogicalInputSplit> logicalInputSplits,
Map<String, String> confProperties,
LogicalExpression filter) throws IOException {
super(userName, columns, new ArrayList<>(), filter);

this.hiveStoragePlugin = hiveStoragePlugin;
this.hivePartitionHolder = new HivePartitionHolder();
this.confProperties = confProperties;

for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator();
@@ -122,6 +128,7 @@ private HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) {
super(that);
this.hiveStoragePlugin = that.hiveStoragePlugin;
this.hivePartitionHolder = that.hivePartitionHolder;
this.confProperties = that.confProperties;
}

@JsonProperty
@@ -134,6 +141,11 @@ public HivePartitionHolder getHivePartitionHolder() {
return hivePartitionHolder;
}

@JsonProperty
public Map<String, String> getConfProperties() {
return confProperties;
}

@Override
public SubScan getSpecificScan(int minorFragmentId) {
List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
@@ -142,7 +154,7 @@ public SubScan getSpecificScan(int minorFragmentId) {
List<String> values = hivePartitionHolder.get(readEntry.getPath());
subPartitionHolder.add(readEntry.getPath(), values);
}
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter);
}

@Override

0 comments on commit 139f715

Please sign in to comment.