Skip to content

Commit

Permalink
DRILL-2514: Add support for impersonation in FileSystem storage plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Apr 21, 2015
1 parent 117b749 commit 40c9040
Show file tree
Hide file tree
Showing 71 changed files with 1,097 additions and 259 deletions.
Expand Up @@ -101,14 +101,17 @@ public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) {
private long scanSizeInBytes = 0;

@JsonCreator
public HBaseGroupScan(@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
public HBaseGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
@JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
this ((HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
this (userName, (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), hbaseScanSpec, columns);
}

public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) {
public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
List<SchemaPath> columns) {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.hbaseScanSpec = scanSpec;
Expand All @@ -121,6 +124,7 @@ public HBaseGroupScan(HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
* @param that The HBaseGroupScan to clone
*/
private HBaseGroupScan(HBaseGroupScan that) {
super(that);
this.columns = that.columns;
this.hbaseScanSpec = that.hbaseScanSpec;
this.endpointFragmentMapping = that.endpointFragmentMapping;
Expand Down Expand Up @@ -342,7 +346,8 @@ public HBaseSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new HBaseSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
return new HBaseSubScan(getUserName(), storagePlugin, storagePluginConfig,
endpointFragmentMapping.get(minorFragmentId), columns);
}

@Override
Expand Down Expand Up @@ -427,7 +432,9 @@ public boolean isFilterPushedDown() {
* Empty constructor, do not use, only for testing.
*/
@VisibleForTesting
public HBaseGroupScan() { }
public HBaseGroupScan() {
super((String)null);
}

/**
* Do not use, only for testing.
Expand Down
Expand Up @@ -62,7 +62,8 @@ public void onMatch(RelOptRuleCall call) {
return; //no filter pushdown ==> No transformation.
}

final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
newScanSpec, groupScan.getColumns());
newGroupsScan.setFilterPushedDown(true);

final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
Expand Down
Expand Up @@ -25,8 +25,8 @@
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.Table;

import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
Expand All @@ -46,7 +46,7 @@ public HBaseSchemaFactory(HBaseStoragePlugin plugin, String name) throws IOExcep
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
HBaseSchema schema = new HBaseSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
Expand Down
Expand Up @@ -23,9 +23,9 @@
import net.hydromatic.optiq.SchemaPlus;

import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -60,14 +60,14 @@ public boolean supportsRead() {
}

@Override
public HBaseGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
public HBaseGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
HBaseScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<HBaseScanSpec>() {});
return new HBaseGroupScan(this, scanSpec, null);
return new HBaseGroupScan(userName, this, scanSpec, null);
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(schemaConfig, parent);
}

@Override
Expand Down
Expand Up @@ -55,17 +55,20 @@ public class HBaseSubScan extends AbstractBase implements SubScan {

@JsonCreator
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
super(userName);
hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
this.storage = (HBaseStoragePluginConfig) storage;
this.columns = columns;
}

public HBaseSubScan(HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
public HBaseSubScan(String userName, HBaseStoragePlugin plugin, HBaseStoragePluginConfig config,
List<HBaseSubScanSpec> regionInfoList, List<SchemaPath> columns) {
super(userName);
hbaseStoragePlugin = plugin;
storage = config;
this.regionScanSpecList = regionInfoList;
Expand Down Expand Up @@ -103,7 +106,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new HBaseSubScan(hbaseStoragePlugin, storage, regionScanSpecList, columns);
return new HBaseSubScan(getUserName(), hbaseStoragePlugin, storage, regionScanSpecList, columns);
}

@Override
Expand Down
Expand Up @@ -132,7 +132,7 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro

try {
HiveScan oldScan = (HiveScan) scanRel.getGroupScan();
HiveScan hiveScan = new HiveScan(newReadEntry, oldScan.storagePlugin, oldScan.columns);
HiveScan hiveScan = new HiveScan(oldScan.getUserName(), newReadEntry, oldScan.storagePlugin, oldScan.columns);
PartitionPruningUtil.rewritePlan(call, filterRel, projectRel, scanRel, hiveScan, builder);
} catch (ExecutionSetupException e) {
throw new DrillRuntimeException(e);
Expand Down
Expand Up @@ -94,10 +94,12 @@ public class HiveScan extends AbstractGroupScan {
private long rowCount = 0;

@JsonCreator
public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
public HiveScan(@JsonProperty("userName") final String userName,
@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
@JsonProperty("storage-plugin") final String storagePluginName,
@JsonProperty("columns") final List<SchemaPath> columns,
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.storagePluginName = storagePluginName;
this.storagePlugin = (HiveStoragePlugin) pluginRegistry.getPlugin(storagePluginName);
Expand All @@ -106,7 +108,8 @@ public HiveScan(@JsonProperty("hive-table") final HiveReadEntry hiveReadEntry,
endpoints = storagePlugin.getContext().getBits();
}

public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin storagePlugin, final List<SchemaPath> columns) throws ExecutionSetupException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.storagePlugin = storagePlugin;
Expand All @@ -116,6 +119,7 @@ public HiveScan(final HiveReadEntry hiveReadEntry, final HiveStoragePlugin stora
}

private HiveScan(final HiveScan that) {
super(that);
this.columns = that.columns;
this.endpoints = that.endpoints;
this.hiveReadEntry = that.hiveReadEntry;
Expand Down Expand Up @@ -226,8 +230,9 @@ public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupE
if (parts.contains(null)) {
parts = null;
}

final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.table, parts, hiveReadEntry.hiveConfigOverride);
return new HiveSubScan(encodedInputSplits, subEntry, splitTypes, columns);
return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
Expand Down
Expand Up @@ -29,9 +29,9 @@
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;

Expand Down Expand Up @@ -67,23 +67,23 @@ public DrillbitContext getContext() {
}

@Override
public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException {
HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
try {
if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
throw new UnsupportedOperationException(
"Querying views created in Hive from Drill is not supported in current version.");
}

return new HiveScan(hiveReadEntry, this, columns);
return new HiveScan(userName, hiveReadEntry, this, columns);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(schemaConfig, parent);
}
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of(HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_PROJECT, HivePushPartitionFilterIntoScan.HIVE_FILTER_ON_SCAN);
Expand Down
Expand Up @@ -62,10 +62,12 @@ public class HiveSubScan extends AbstractBase implements SubScan {
private List<Partition> partitions;

@JsonCreator
public HiveSubScan(@JsonProperty("splits") List<String> splits,
public HiveSubScan(@JsonProperty("userName") String userName,
@JsonProperty("splits") List<String> splits,
@JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry,
@JsonProperty("splitClasses") List<String> splitClasses,
@JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.table = hiveReadEntry.getTable();
this.partitions = hiveReadEntry.getPartitions();
Expand Down Expand Up @@ -126,7 +128,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
try {
return new HiveSubScan(splits, hiveReadEntry, splitClasses, columns);
return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
Expand Down
Expand Up @@ -30,8 +30,8 @@

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveStoragePlugin;
Expand Down Expand Up @@ -187,7 +187,7 @@ public HiveReadEntry load(String key) throws Exception {
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
HiveSchema schema = new HiveSchema(schemaName);
SchemaPlus hPlus = parent.add(schemaName, schema);
schema.setHolder(hPlus);
Expand Down
Expand Up @@ -115,17 +115,20 @@ public int compare(List<MongoSubScanSpec> list1,
private boolean filterPushedDown = false;

@JsonCreator
public MongoGroupScan(@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
public MongoGroupScan(
@JsonProperty("userName") String userName,
@JsonProperty("mongoScanSpec") MongoScanSpec scanSpec,
@JsonProperty("storage") MongoStoragePluginConfig storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException,
ExecutionSetupException {
this((MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
this(userName, (MongoStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig),
scanSpec, columns);
}

public MongoGroupScan(MongoStoragePlugin storagePlugin,
public MongoGroupScan(String userName, MongoStoragePlugin storagePlugin,
MongoScanSpec scanSpec, List<SchemaPath> columns) throws IOException {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.scanSpec = scanSpec;
Expand All @@ -140,6 +143,7 @@ public MongoGroupScan(MongoStoragePlugin storagePlugin,
* The MongoGroupScan to clone
*/
private MongoGroupScan(MongoGroupScan that) {
super(that);
this.scanSpec = that.scanSpec;
this.columns = that.columns;
this.storagePlugin = that.storagePlugin;
Expand Down Expand Up @@ -446,7 +450,7 @@ private MongoSubScanSpec buildSubScanSpecAndGet(ChunkInfo chunkInfo) {
@Override
public MongoSubScan getSpecificScan(int minorFragmentId)
throws ExecutionSetupException {
return new MongoSubScan(storagePlugin, storagePluginConfig,
return new MongoSubScan(getUserName(), storagePlugin, storagePluginConfig,
endpointFragmentMapping.get(minorFragmentId), columns);
}

Expand Down Expand Up @@ -554,6 +558,7 @@ public String toString() {

@VisibleForTesting
MongoGroupScan() {
super((String)null);
}

@JsonIgnore
Expand Down
Expand Up @@ -68,7 +68,7 @@ public void onMatch(RelOptRuleCall call) {

MongoGroupScan newGroupsScan = null;
try {
newGroupsScan = new MongoGroupScan(groupScan.getStoragePlugin(),
newGroupsScan = new MongoGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
newScanSpec, groupScan.getColumns());
} catch (IOException e) {
logger.error(e.getMessage(), e);
Expand Down
Expand Up @@ -25,9 +25,9 @@
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.mongo.schema.MongoSchemaFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,8 +63,8 @@ public MongoStoragePluginConfig getConfig() {
}

@Override
public void registerSchemas(UserSession session, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(session, parent);
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
schemaFactory.registerSchemas(schemaConfig, parent);
}

@Override
Expand All @@ -73,12 +73,9 @@ public boolean supportsRead() {
}

@Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection)
throws IOException {
MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(),
new TypeReference<MongoScanSpec>() {
});
return new MongoGroupScan(this, mongoScanSpec, null);
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
MongoScanSpec mongoScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<MongoScanSpec>() {});
return new MongoGroupScan(userName, this, mongoScanSpec, null);
}

public Set<StoragePluginOptimizerRule> getOptimizerRules() {
Expand Down

0 comments on commit 40c9040

Please sign in to comment.