Skip to content

Commit

Permalink
DRILL-6454: Native MapR DB plugin support for Hive MapR-DB json table
Browse files Browse the repository at this point in the history
closes #1314
  • Loading branch information
vdiravka committed Jun 22, 2018
1 parent 67d992c commit b92f599
Show file tree
Hide file tree
Showing 17 changed files with 378 additions and 172 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FileSystemPlugin;
Expand All @@ -45,20 +46,20 @@ public abstract class TableFormatPlugin implements FormatPlugin {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
.getLogger(TableFormatPlugin.class); .getLogger(TableFormatPlugin.class);


private final FileSystemConfig storageConfig; private final StoragePluginConfig storageConfig;
private final TableFormatPluginConfig config; private final TableFormatPluginConfig config;
private final Configuration fsConf; private final Configuration fsConf;
private final DrillbitContext context; private final DrillbitContext context;
private final String name; private final String name;


private volatile FileSystemPlugin storagePlugin; private volatile AbstractStoragePlugin storagePlugin;
private final MapRFileSystem maprfs; private final MapRFileSystem maprfs;


protected TableFormatPlugin(String name, DrillbitContext context, Configuration fsConf, protected TableFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, TableFormatPluginConfig formatConfig) { StoragePluginConfig storageConfig, TableFormatPluginConfig formatConfig) {
this.context = context; this.context = context;
this.config = formatConfig; this.config = formatConfig;
this.storageConfig = (FileSystemConfig) storageConfig; this.storageConfig = storageConfig;
this.fsConf = fsConf; this.fsConf = fsConf;
this.name = name == null ? "maprdb" : name; this.name = name == null ? "maprdb" : name;
try { try {
Expand Down Expand Up @@ -119,10 +120,10 @@ public String getName() {
return name; return name;
} }


public synchronized FileSystemPlugin getStoragePlugin() { public synchronized AbstractStoragePlugin getStoragePlugin() {
if (this.storagePlugin == null) { if (this.storagePlugin == null) {
try { try {
this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig)); this.storagePlugin = (AbstractStoragePlugin) context.getStorage().getPlugin(storageConfig);
} catch (ExecutionSetupException e) { } catch (ExecutionSetupException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
Expand Down
Expand Up @@ -18,8 +18,8 @@
package org.apache.drill.exec.store.mapr.db; package org.apache.drill.exec.store.mapr.db;


import java.io.IOException; import java.io.IOException;
import java.util.List;


import com.mapr.fs.MapRFileStatus;
import com.mapr.fs.tables.TableProperties; import com.mapr.fs.tables.TableProperties;
import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable;
Expand All @@ -31,7 +31,6 @@
import org.apache.drill.exec.store.mapr.TableFormatMatcher; import org.apache.drill.exec.store.mapr.TableFormatMatcher;
import org.apache.drill.exec.store.mapr.TableFormatPlugin; import org.apache.drill.exec.store.mapr.TableFormatPlugin;


import com.mapr.fs.MapRFileStatus;
import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable; import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;


Expand All @@ -54,19 +53,16 @@ protected boolean isSupportedTable(MapRFileStatus status) throws IOException {
public DrillTable isReadable(DrillFileSystem fs, public DrillTable isReadable(DrillFileSystem fs,
FileSelection selection, FileSystemPlugin fsPlugin, FileSelection selection, FileSystemPlugin fsPlugin,
String storageEngineName, SchemaConfig schemaConfig) throws IOException { String storageEngineName, SchemaConfig schemaConfig) throws IOException {

if (isFileReadable(fs, selection.getFirstPath(fs))) { if (isFileReadable(fs, selection.getFirstPath(fs))) {
List<String> files = selection.getFiles(); MapRDBFormatPlugin mapRDBFormatPlugin = (MapRDBFormatPlugin) getFormatPlugin();
assert (files.size() == 1); String tableName = mapRDBFormatPlugin.getTableName(selection);
String tableName = files.get(0); TableProperties props = mapRDBFormatPlugin.getMaprFS().getTableProperties(new Path(tableName));
TableProperties props = getFormatPlugin().getMaprFS().getTableProperties(new Path(tableName));

if (props.getAttr().getJson()) { if (props.getAttr().getJson()) {
return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(), return new DynamicDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(),
new FormatSelection(getFormatPlugin().getConfig(), selection)); new FormatSelection(mapRDBFormatPlugin.getConfig(), selection));
} else { } else {
FormatSelection formatSelection = new FormatSelection(getFormatPlugin().getConfig(), selection); FormatSelection formatSelection = new FormatSelection(mapRDBFormatPlugin.getConfig(), selection);
return new MapRDBBinaryTable(storageEngineName, fsPlugin, (MapRDBFormatPlugin) getFormatPlugin(), formatSelection); return new MapRDBBinaryTable(storageEngineName, fsPlugin, mapRDBFormatPlugin, formatSelection);
} }
} }
return null; return null;
Expand Down
Expand Up @@ -35,8 +35,8 @@
import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;


import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -49,7 +49,7 @@
public abstract class MapRDBGroupScan extends AbstractGroupScan { public abstract class MapRDBGroupScan extends AbstractGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);


protected FileSystemPlugin storagePlugin; protected AbstractStoragePlugin storagePlugin;


protected MapRDBFormatPlugin formatPlugin; protected MapRDBFormatPlugin formatPlugin;


Expand Down Expand Up @@ -84,7 +84,7 @@ public MapRDBGroupScan(MapRDBGroupScan that) {
this.filterPushedDown = that.filterPushedDown; this.filterPushedDown = that.filterPushedDown;
} }


public MapRDBGroupScan(FileSystemPlugin storagePlugin, public MapRDBGroupScan(AbstractStoragePlugin storagePlugin,
MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) { MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) {
super(userName); super(userName);
this.storagePlugin = storagePlugin; this.storagePlugin = storagePlugin;
Expand Down Expand Up @@ -254,7 +254,7 @@ public FileSystemConfig getStorageConfig() {
} }


@JsonIgnore @JsonIgnore
public FileSystemPlugin getStoragePlugin(){ public AbstractStoragePlugin getStoragePlugin(){
return storagePlugin; return storagePlugin;
} }


Expand Down
Expand Up @@ -144,7 +144,7 @@ protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call,
final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());


// Depending on whether is a project in the middle, assign either scan or copy of project to childRel. // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));


if (jsonConditionBuilder.isAllExpressionsConverted()) { if (jsonConditionBuilder.isAllExpressionsConverted()) {
/* /*
Expand Down
Expand Up @@ -30,9 +30,9 @@
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants; import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
import org.apache.drill.exec.store.hbase.HBaseScanSpec; import org.apache.drill.exec.store.hbase.HBaseScanSpec;
import org.apache.drill.exec.store.hbase.HBaseUtils; import org.apache.drill.exec.store.hbase.HBaseUtils;
Expand Down Expand Up @@ -78,19 +78,19 @@ public BinaryTableGroupScan(@JsonProperty("userName") final String userName,
@JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
this (userName, this (userName,
(FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig), (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
(MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
scanSpec, columns); scanSpec, columns);
} }


public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin, public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) { MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
super(storagePlugin, formatPlugin, columns, userName); super(storagePlugin, formatPlugin, columns, userName);
this.hbaseScanSpec = scanSpec; this.hbaseScanSpec = scanSpec;
init(); init();
} }


public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin, public BinaryTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec,
List<SchemaPath> columns, MapRDBTableStats tableStats) { List<SchemaPath> columns, MapRDBTableStats tableStats) {
super(storagePlugin, formatPlugin, columns, userName); super(storagePlugin, formatPlugin, columns, userName);
Expand Down
Expand Up @@ -30,9 +30,9 @@
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan; import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
Expand Down Expand Up @@ -71,12 +71,12 @@ public JsonTableGroupScan(@JsonProperty("userName") final String userName,
@JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
this (userName, this (userName,
(FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig), (AbstractStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
(MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
scanSpec, columns); scanSpec, columns);
} }


public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin, public JsonTableGroupScan(String userName, AbstractStoragePlugin storagePlugin,
MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) { MapRDBFormatPlugin formatPlugin, JsonScanSpec scanSpec, List<SchemaPath> columns) {
super(storagePlugin, formatPlugin, columns, userName); super(storagePlugin, formatPlugin, columns, userName);
this.scanSpec = scanSpec; this.scanSpec = scanSpec;
Expand Down
5 changes: 5 additions & 0 deletions contrib/storage-hive/core/pom.xml
Expand Up @@ -142,6 +142,11 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.drill.contrib</groupId>
<artifactId>drill-format-mapr</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.logical;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.ojai.DocumentConstants;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.drill.exec.store.hive.HiveUtilities.nativeReadersRuleMatches;

/**
* Convert Hive scan to use Drill's native MapR-DB reader instead of Hive's MapR-DB JSON Handler.
*/
public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePluginOptimizerRule {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.class);

public static final ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan INSTANCE =
new ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan();

/**
* The constants from org.apache.hadoop.hive.maprdb.json.conf.MapRDBConstants
*/
private static final String MAPRDB_PFX = "maprdb.";
private static final String MAPRDB_TABLE_NAME = MAPRDB_PFX + "table.name";
private static final String ID_KEY = DocumentConstants.ID_KEY;
private static final String MAPRDB_COLUMN_ID = MAPRDB_PFX + "column.id";

private ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan() {
super(RelOptHelper.any(DrillScanRel.class), "ConvertHiveScanToHiveDrillNativeScan:MapR-DB");
}

/**
* {@see org.apache.drill.exec.store.hive.HiveUtilities#nativeReadersRuleMatches}
*/
@Override
public boolean matches(RelOptRuleCall call) {
try {
return nativeReadersRuleMatches(call,
Class.forName("org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat"));
} catch (ClassNotFoundException e) {
throw UserException.resourceError(e)
.message("Current Drill build is not designed for working with Hive MapR-DB tables. " +
"Please disable \"%s\" option", ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER)
.build(logger);
}
}

@Override
public void onMatch(RelOptRuleCall call) {
try {
DrillScanRel hiveScanRel = call.rel(0);

HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
hiveScan.getStoragePlugin().getHiveConf());
if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
// table is empty, use original scan
return;
}

if (hiveScan.getHiveReadEntry().getTable().isSetPartitionKeys()) {
logger.warn("Hive MapR-DB JSON Handler doesn't support table partitioning. Consider recreating table without " +
"partitions");
}

DrillScanRel nativeScanRel = createNativeScanRel(hiveScanRel);
call.transformTo(nativeScanRel);

/*
Drill native scan should take precedence over Hive since it's more efficient and faster.
Hive does not always give correct costing (i.e. for external tables Hive does not have number of rows
and we calculate them approximately). On the contrary, Drill calculates number of rows exactly
and thus Hive Scan can be chosen instead of Drill native scan because costings allegedly lower for Hive.
To ensure Drill MapR-DB Json scan will be chosen, reduce Hive scan importance to 0.
*/
call.getPlanner().setImportance(hiveScanRel, 0.0);
} catch (final Exception e) {
logger.warn("Failed to convert HiveScan to JsonScanSpec", e);
}
}

/**
* Helper method which creates a DrillScanRel with native Drill HiveScan.
*/
private DrillScanRel createNativeScanRel(final DrillScanRel hiveScanRel) throws Exception {
RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
Map<String, String> parameters = hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters();

JsonScanSpec scanSpec = new JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null);
MapRDBFormatPlugin mapRDBFormatPlugin = new MapRDBFormatPlugin(
"hive-maprdb",
hiveScan.getStoragePlugin().getContext(),
hiveScan.getHiveConf(),
hiveScan.getStoragePlugin().getConfig(),
new MapRDBFormatPluginConfig()
);
List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
.map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath))
.collect(Collectors.toList());
JsonTableGroupScan nariveMapRDBScan =
new JsonTableGroupScan(
hiveScan.getUserName(),
hiveScan.getStoragePlugin(),
mapRDBFormatPlugin,
scanSpec,
hiveScanCols
);

List<String> nativeScanColNames = hiveScanRel.getRowType().getFieldList().stream()
.map(field -> replaceOverriddenColumnId(parameters, field.getName()))
.collect(Collectors.toList());
List<RelDataType> nativeScanColTypes = hiveScanRel.getRowType().getFieldList().stream()
.map(RelDataTypeField::getType)
.collect(Collectors.toList());
RelDataType nativeScanRowType = typeFactory.createStructType(nativeScanColTypes, nativeScanColNames);

return new DrillScanRel(
hiveScanRel.getCluster(),
hiveScanRel.getTraitSet(),
hiveScanRel.getTable(),
nariveMapRDBScan,
nativeScanRowType,
hiveScanCols);
}

/**
* Hive maps column id "_id" with custom user column id name. Replace it for {@link DrillScanRel}
*
* @param parameters Hive table properties
* @param colName Hive column name
* @return original column name, null if colName is absent
*/
private String replaceOverriddenColumnId(Map<String, String> parameters, String colName) {
return colName != null && colName.equals(parameters.get(MAPRDB_COLUMN_ID)) ? ID_KEY : colName;
}

/**
* The same as above, but for {@link SchemaPath} object
*
* @param parameters Hive table properties
* @param colNameSchemaPath SchemaPath with Hive column name
* @return SchemaPath with original column name
*/
private SchemaPath replaceOverriddenSchemaPath(Map<String, String> parameters, SchemaPath colNameSchemaPath) {
String hiveColumnName = colNameSchemaPath.getRootSegmentPath();
return hiveColumnName != null && hiveColumnName.equals(parameters.get(MAPRDB_COLUMN_ID))
? SchemaPath.getSimplePath(ID_KEY) : colNameSchemaPath;
}
}

0 comments on commit b92f599

Please sign in to comment.