Skip to content

Commit

Permalink
DRILL-5714: Fix NPE when mapr-db plugin is used in table function
Browse files Browse the repository at this point in the history
close #902
  • Loading branch information
arina-ielchiieva authored and jinfengni committed Aug 15, 2017
1 parent 8b56423 commit fd7fba6
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 75 deletions.
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -21,10 +21,10 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT)
@JsonTypeName("maprdb")
@JsonInclude(Include.NON_DEFAULT)
public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {

public boolean allTextMode = false;
Expand All @@ -35,17 +35,22 @@ public class MapRDBFormatPluginConfig extends TableFormatPluginConfig {

@Override
public int hashCode() {
return 53;
int result = (allTextMode ? 1231 : 1237);
result = 31 * result + (enablePushdown ? 1231 : 1237);
result = 31 * result + (ignoreSchemaChange ? 1231 : 1237);
result = 31 * result + (readAllNumbersAsDouble ? 1231 : 1237);
result = 31 * result + (disableCountOptimization ? 1231 : 1237);
return result;
}

@Override
protected boolean impEquals(Object obj) {
MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj;
MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig) obj;
if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) {
return false;
} else if (allTextMode != other.allTextMode) {
return false;
} else if (isIgnoreSchemaChange() != other.isIgnoreSchemaChange()) {
} else if (ignoreSchemaChange != other.ignoreSchemaChange) {
return false;
} else if (enablePushdown != other.enablePushdown) {
return false;
Expand All @@ -63,40 +68,16 @@ public boolean isAllTextMode() {
return allTextMode;
}

@JsonProperty("allTextMode")
public void setAllTextMode(boolean mode) {
allTextMode = mode;
}

@JsonProperty("disableCountOptimization")
public void setDisableCountOptimization(boolean mode) {
disableCountOptimization = mode;
}

public boolean shouldDisableCountOptimization() {
public boolean disableCountOptimization() {
return disableCountOptimization;
}

@JsonProperty("readAllNumbersAsDouble")
public void setReadAllNumbersAsDouble(boolean read) {
readAllNumbersAsDouble = read;
}

public boolean isEnablePushdown() {
return enablePushdown;
}

@JsonProperty("enablePushdown")
public void setEnablePushdown(boolean enablePushdown) {
this.enablePushdown = enablePushdown;
}

public boolean isIgnoreSchemaChange() {
return ignoreSchemaChange;
}

public void setIgnoreSchemaChange(boolean ignoreSchemaChange) {
this.ignoreSchemaChange = ignoreSchemaChange;
}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -34,7 +34,7 @@
import com.google.common.collect.Lists;

public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);

@Override
public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
Expand All @@ -43,13 +43,16 @@ public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<R
for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
readers.add(new HBaseRecordReader(subScan.getFormatPlugin().getConnection(),
getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
readers.add(new HBaseRecordReader(
subScan.getFormatPlugin().getConnection(),
getHBaseSubScanSpec(scanSpec),
subScan.getColumns(),
context));
} else {
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
}
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
} catch (Exception e) {
throw new ExecutionSetupException(e);
}
}
return new ScanBatch(subScan, context, readers.iterator());
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.List;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
Expand All @@ -28,11 +29,9 @@
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
Expand All @@ -41,40 +40,32 @@
// Class containing information for reading a single HBase region
@JsonTypeName("maprdb-sub-scan")
public class MapRDBSubScan extends AbstractBase implements SubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);

@JsonProperty
public final StoragePluginConfig storageConfig;
@JsonIgnore
private final MapRDBFormatPluginConfig formatPluginConfig;
private final FileSystemPlugin storagePlugin;
private final MapRDBFormatPlugin formatPlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
private final String tableType;

private final MapRDBFormatPlugin formatPlugin;

@JsonCreator
public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
public MapRDBSubScan(@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("userName") String userName,
@JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
@JsonProperty("storageConfig") StoragePluginConfig storage,
@JsonProperty("storageConfig") StoragePluginConfig storageConfig,
@JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("tableType") String tableType) throws ExecutionSetupException {
this(userName, formatPluginConfig,
(FileSystemPlugin) registry.getPlugin(storage),
storage, regionScanSpecList, columns, tableType);
this(userName,
(MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatPluginConfig),
regionScanSpecList,
columns,
tableType);
}

public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig storageConfig,
public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
super(userName);
this.storageConfig = storageConfig;
this.storagePlugin = storagePlugin;
this.formatPluginConfig = formatPluginConfig;
this.formatPlugin = (MapRDBFormatPlugin) storagePlugin.getFormatPlugin(formatPluginConfig);

this.formatPlugin = formatPlugin;
this.regionScanSpecList = maprSubScanSpecs;
this.columns = columns;
this.tableType = tableType;
Expand All @@ -101,7 +92,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MapRDBSubScan(getUserName(), formatPluginConfig, storagePlugin, storageConfig, regionScanSpecList, columns, tableType);
return new MapRDBSubScan(getUserName(), formatPlugin, regionScanSpecList, columns, tableType);
}

@Override
Expand All @@ -118,13 +109,14 @@ public String getTableType() {
return tableType;
}

public MapRDBFormatPluginConfig getFormatPluginConfig() {
return formatPluginConfig;
}

@JsonIgnore
public MapRDBFormatPlugin getFormatPlugin() {
return formatPlugin;
}

@JsonIgnore
public MapRDBFormatPluginConfig getFormatPluginConfig() {
return (MapRDBFormatPluginConfig) formatPlugin.getConfig();
}

}
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -129,7 +129,7 @@ private void init() {
tableStats = new MapRDBTableStats(getHBaseConf(), hbaseScanSpec.getTableName());
}
boolean foundStartRegion = false;
regionsToScan = new TreeMap<TabletFragmentInfo, String>();
regionsToScan = new TreeMap<>();
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
for (HRegionLocation regionLocation : regionLocations) {
HRegionInfo regionInfo = regionLocation.getRegionInfo();
Expand Down Expand Up @@ -178,8 +178,7 @@ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
}

@Override
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -37,7 +37,6 @@
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.annotate.JsonCreator;
Expand Down Expand Up @@ -180,8 +179,7 @@ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
return new MapRDBSubScan(getUserName(), formatPlugin, endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
}

@Override
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -38,7 +38,6 @@
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
Expand Down Expand Up @@ -112,7 +111,7 @@ public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter));
}

disableCountOptimization = formatPluginConfig.shouldDisableCountOptimization();
disableCountOptimization = formatPluginConfig.disableCountOptimization();
setColumns(projectedColumns);
unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
Expand Down

0 comments on commit fd7fba6

Please sign in to comment.