Skip to content

Commit

Permalink
MD-726: Add support for read_numbers_as_double and all_text_mode
Browse files Browse the repository at this point in the history
…options

+ MD-773: Add support to push-down filters on Date, Time and Timestamp
+ Fix the offsets in Date and Time data types in returned values
+ Updated support for DATE, TIME, and TIMESTAMP types
+ Modified unit-tests to check for operation push-down
  • Loading branch information
Smidth Panchamia authored and adityakishore committed Sep 9, 2016
1 parent b1218f3 commit c327f11
Show file tree
Hide file tree
Showing 14 changed files with 297 additions and 77 deletions.
Expand Up @@ -17,20 +17,66 @@
*/
package org.apache.drill.exec.store.maprdb;

import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.logical.FormatPluginConfig;

@JsonTypeName("maprdb")
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT)
public class MapRDBFormatPluginConfig implements FormatPluginConfig {

private boolean allTextMode = false;
private boolean readAllNumbersAsDouble = false;

@Override
public int hashCode() {
return 53;
}

@Override
public boolean equals(Object obj) {
return obj instanceof MapRDBFormatPluginConfig;
if (this == obj) {
return true;
}

if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj;

if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) {
return false;
}

if (allTextMode != other.allTextMode) {
return false;
}

return true;
}

public boolean isReadAllNumbersAsDouble() {
return readAllNumbersAsDouble;
}

public boolean isAllTextMode() {
return allTextMode;
}

@JsonProperty("allTextMode")
public void setAllTextMode(boolean mode) {
allTextMode = mode;
}

@JsonProperty("readAllNumbersAsDouble")
public void setReadAllNumbersAsDouble(boolean read) {
readAllNumbersAsDouble = read;
}
}
Expand Up @@ -53,6 +53,8 @@ public abstract class MapRDBGroupScan extends AbstractGroupScan {

private MapRDBFormatPlugin formatPlugin;

protected MapRDBFormatPluginConfig formatPluginConfig;

protected List<SchemaPath> columns;

protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping;
Expand All @@ -76,6 +78,7 @@ public MapRDBGroupScan(MapRDBGroupScan that) {
super(that);
this.columns = that.columns;
this.formatPlugin = that.formatPlugin;
this.formatPluginConfig = that.formatPluginConfig;
this.storagePlugin = that.storagePlugin;
this.regionsToScan = that.regionsToScan;
this.filterPushedDown = that.filterPushedDown;
Expand All @@ -86,6 +89,7 @@ public MapRDBGroupScan(FileSystemPlugin storagePlugin,
super(userName);
this.storagePlugin = storagePlugin;
this.formatPlugin = formatPlugin;
this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig();
this.columns = columns;
}

Expand Down
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.drill.exec.store.maprdb;

import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
Expand All @@ -32,12 +37,6 @@
import org.apache.drill.exec.store.maprdb.json.JsonConditionBuilder;
import org.apache.drill.exec.store.maprdb.json.JsonScanSpec;
import org.apache.drill.exec.store.maprdb.json.JsonTableGroupScan;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
import org.ojai.store.QueryCondition;

import com.google.common.collect.ImmutableList;

Expand Down
Expand Up @@ -48,7 +48,7 @@ public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<R
if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) {
readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context));
} else {
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getColumns(), context));
readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context));
}
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
Expand Down
Expand Up @@ -46,6 +46,7 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
@JsonProperty
public final StoragePluginConfig storage;
@JsonIgnore
private final MapRDBFormatPluginConfig fsFormatPluginConfig;
private final FileSystemPlugin fsStoragePlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
Expand All @@ -54,21 +55,24 @@ public class MapRDBSubScan extends AbstractBase implements SubScan {
@JsonCreator
public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig,
@JsonProperty("storage") StoragePluginConfig storage,
@JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("tableType") String tableType) throws ExecutionSetupException {
super(userName);
this.fsFormatPluginConfig = formatPluginConfig;
this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage);
this.regionScanSpecList = regionScanSpecList;
this.storage = storage;
this.columns = columns;
this.tableType = tableType;
}

public MapRDBSubScan(String userName, FileSystemPlugin storagePlugin, StoragePluginConfig config,
public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config,
List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) {
super(userName);
fsFormatPluginConfig = formatPluginConfig;
fsStoragePlugin = storagePlugin;
storage = config;
this.regionScanSpecList = maprSubScanSpecs;
Expand Down Expand Up @@ -97,7 +101,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new MapRDBSubScan(getUserName(), fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType);
}

@Override
Expand All @@ -114,4 +118,7 @@ public String getTableType() {
return tableType;
}

public MapRDBFormatPluginConfig getFormatPluginConfig() {
return fsFormatPluginConfig;
}
}
Expand Up @@ -171,7 +171,7 @@ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(),
return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
}

Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.maprdb.json;

import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
Expand All @@ -31,8 +30,13 @@
import org.apache.drill.common.expression.ValueExpressions.LongExpression;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.joda.time.LocalTime;
import org.ojai.Value;
import org.ojai.types.ODate;
import org.ojai.types.OTime;
import org.ojai.types.OTimestamp;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -59,7 +63,7 @@ public static boolean isCompareFunction(String functionName) {
public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
return false;
}

public static CompareFunctionsProcessor process(FunctionCall call) {
String functionName = call.getName();
LogicalExpression nameArg = call.args.get(0);
Expand Down Expand Up @@ -151,15 +155,20 @@ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) thro
this.path = path;
return true;
}
/*

if (valueArg instanceof DateExpression) {
this.value = KeyValueBuilder.initFrom(new ODate(((DateExpression)valueArg).getDate()));
long d = ((DateExpression)valueArg).getDate();
final long MILLISECONDS_IN_A_DAY = (long)1000 * 60 * 60 * 24;
int daysSinceEpoch = (int)(d / MILLISECONDS_IN_A_DAY);
this.value = KeyValueBuilder.initFrom(ODate.fromDaysSinceEpoch(daysSinceEpoch));
this.path = path;
return true;
}

if (valueArg instanceof TimeExpression) {
this.value = KeyValueBuilder.initFrom(new OTime(((TimeExpression)valueArg).getTime()));
int t = ((TimeExpression)valueArg).getTime();
LocalTime lT = LocalTime.fromMillisOfDay(t);
this.value = KeyValueBuilder.initFrom(new OTime(lT.getHourOfDay(), lT.getMinuteOfHour(), lT.getSecondOfMinute(), lT.getMillisOfSecond()));
this.path = path;
return true;
}
Expand All @@ -169,7 +178,7 @@ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) thro
this.path = path;
return true;
}
*/

return false;
}

Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.maprdb.json;

import static org.ojai.DocumentConstants.ID_KEY;

import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
Expand Down Expand Up @@ -164,8 +162,6 @@ private JsonScanSpec createJsonScanSpec(FunctionCall call,
SchemaPath field = processor.getPath();
Value fieldValue = processor.getValue();

boolean isRowKey = field.getAsUnescapedPath().equals(ID_KEY);

QueryCondition cond = null;
switch (functionName) {
case "equal":
Expand Down Expand Up @@ -231,7 +227,7 @@ private JsonScanSpec createJsonScanSpec(FunctionCall call,
case "like":
cond = MapRDB.newCondition().like(field.getAsUnescapedPath(), fieldValue.getString()).build();
break;

default:
}

Expand Down
Expand Up @@ -18,10 +18,10 @@
package org.apache.drill.exec.store.maprdb.json;

import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.drill.exec.store.maprdb.MapRDBSubScanSpec;
import org.apache.hadoop.hbase.HConstants;
import org.bouncycastle.util.Arrays;
import org.ojai.DocumentConstants;
import org.ojai.Value;
import org.ojai.store.QueryCondition;
Expand All @@ -45,17 +45,17 @@ public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
@JsonProperty("stopRow") byte[] stopRow,
@JsonProperty("cond") QueryCondition cond) {
super(tableName, regionServer, null, null, null, null);

this.condition = MapRDB.newCondition().and();

if (cond != null) {
this.condition.condition(cond);
}

if (startRow != null &&
Arrays.areEqual(startRow, HConstants.EMPTY_START_ROW) == false) {
Arrays.equals(startRow, HConstants.EMPTY_START_ROW) == false) {
Value startVal = IdCodec.decode(startRow);

switch(startVal.getType()) {
case BINARY:
this.condition.is(DocumentConstants.ID_FIELD, Op.GREATER_OR_EQUAL, startVal.getBinary());
Expand All @@ -68,11 +68,11 @@ public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
+ " for _id");
}
}

if (stopRow != null &&
Arrays.areEqual(stopRow, HConstants.EMPTY_END_ROW) == false) {
Arrays.equals(stopRow, HConstants.EMPTY_END_ROW) == false) {
Value stopVal = IdCodec.decode(stopRow);

switch(stopVal.getType()) {
case BINARY:
this.condition.is(DocumentConstants.ID_FIELD, Op.LESS, stopVal.getBinary());
Expand All @@ -85,7 +85,7 @@ public JsonSubScanSpec(@JsonProperty("tableName") String tableName,
+ " for _id");
}
}

this.condition.close().build();
}

Expand All @@ -98,6 +98,7 @@ public QueryCondition getCondition() {
return this.condition;
}

@Override
public byte[] getSerializedFilter() {
if (this.condition != null) {
ByteBuffer bbuf = ((ConditionImpl)this.condition).getDescriptor().getSerialized();
Expand Down
Expand Up @@ -146,7 +146,7 @@ public MapRDBSubScan getSpecificScan(int minorFragmentId) {
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
return new MapRDBSubScan(getUserName(), getStoragePlugin(), getStoragePlugin().getConfig(),
return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(),
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_JSON);
}

Expand Down

0 comments on commit c327f11

Please sign in to comment.