Skip to content
Permalink
Browse files
[ASTERIXDB-3025][HYR] Introduce ITupleProjector
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
The TupleProjector is a way to pushdown value accesses
for the internal datasets (similar to Parquet's).

The current default implementation is basically a no-op.
This patch introduces the mechanism for the columnar
format's tuple projector implementation.

Change-Id: I1fadcef70451a7616e021771a1110413de4fb711
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15465
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
wailyk committed Mar 3, 2022
1 parent 51543d9 commit af65cd9aa3ac2ed11633973d1fa10c78b1d4c59e
Showing 14 changed files with 191 additions and 25 deletions.
@@ -66,6 +66,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;

/**
@@ -160,7 +161,8 @@ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext
jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive(), propagateFilter,
nonFilterWriterFactory, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
unnestMap.getGenerateCallBackProceedResultVar(),
isPrimaryIndexPointSearch(op, context.getPhysicalOptimizationConfig()));
isPrimaryIndexPointSearch(op, context.getPhysicalOptimizationConfig()),
DefaultTupleProjectorFactory.INSTANCE);
IOperatorDescriptor opDesc = btreeSearch.first;
opDesc.setSourceLocation(unnestMap.getSourceLocation());

@@ -53,6 +53,7 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;

public class DatasetDataSource extends DataSource {

@@ -135,7 +136,7 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceS
return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, context, true, false, null,
((DatasetDataSource) dataSource).getDataset(), primaryIndex.getIndexName(), null, null, true,
true, false, null, minFilterFieldIndexes, maxFilterFieldIndexes, tupleFilterFactory,
outputLimit, false, false);
outputLimit, false, false, DefaultTupleProjectorFactory.INSTANCE);
default:
throw new AlgebricksException("Unknown datasource type");
}
@@ -169,6 +169,7 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;

public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {

@@ -542,7 +543,8 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntim
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch) throws AlgebricksException {
boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory)
throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
@@ -601,12 +603,13 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntim
? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory, outputLimit)
maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory)
: new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
tupleProjectorFactory);
} else {
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
@@ -859,7 +862,7 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRunt
*
* @param dataset
* @return Number of elements that will be used to create a bloom filter per
* dataset per partition
* dataset per partition
* @throws AlgebricksException
*/
public long getCardinalityPerPartitionHint(Dataset dataset) throws AlgebricksException {
@@ -28,6 +28,8 @@ public class HyracksConstants {

public static final String INDEX_CURSOR_STATS = "INDEX_CURSOR_STATS";

public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";

private HyracksConstants() {
}
}
@@ -29,6 +29,8 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;

public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {

@@ -52,6 +54,7 @@ public class BTreeSearchOperatorDescriptor extends AbstractSingleActivityOperato
protected byte[] searchCallbackProceedResultTrueValue;
protected final ITupleFilterFactory tupleFilterFactory;
protected final long outputLimit;
protected final ITupleProjectorFactory tupleProjectorFactory;

public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
@@ -61,7 +64,8 @@ public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDes
IMissingWriterFactory nonFilterWriterFactory) {
this(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, null, -1, false, null, null);
maxFilterFieldIndexes, appendIndexFilter, nonFilterWriterFactory, null, -1, false, null, null,
DefaultTupleProjectorFactory.INSTANCE);
}

public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
@@ -71,7 +75,7 @@ public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDes
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue) {
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory tupleProjectorFactory) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -92,6 +96,7 @@ public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDes
this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
this.searchCallbackProceedResultFalseValue = searchCallbackProceedResultFalseValue;
this.searchCallbackProceedResultTrueValue = searchCallbackProceedResultTrueValue;
this.tupleProjectorFactory = tupleProjectorFactory;
}

@Override
@@ -102,7 +107,7 @@ public BTreeSearchOperatorNodePushable createPushRuntime(final IHyracksTaskConte
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, tupleProjectorFactory);
}

@Override
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
@@ -30,9 +31,11 @@
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;

public class BTreeSearchOperatorNodePushable extends IndexSearchOperatorNodePushable {
protected final boolean lowKeyInclusive;
@@ -52,7 +55,7 @@ public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, R
this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter, nonFilterWriterFactory, null, -1,
false, null, null);
false, null, null, DefaultTupleProjectorFactory.INSTANCE);
}

public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, RecordDescriptor inputRecDesc,
@@ -62,11 +65,12 @@ public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int partition, R
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit, appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue);
searchCallbackProceedResultFalseValue, searchCallbackProceedResultTrueValue, projectorFactory);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -111,7 +115,8 @@ protected int getFieldCount() {

@Override
protected void addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws HyracksDataException {
// No additional parameters are required for the B+Tree search case
//Set tuple projector to get the information about the pushed down value accesses (if supported by the index)
iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
}

}
@@ -44,6 +44,7 @@
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
@@ -54,6 +55,8 @@
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.projection.ITupleProjector;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
import org.apache.hyracks.util.IThreadStatsCollector;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -107,6 +110,7 @@ public abstract class IndexSearchOperatorNodePushable extends AbstractUnaryInput
protected final long outputLimit;
protected long outputCount = 0;
protected boolean finished;
protected final ITupleProjector tupleProjector;

// no filter and limit pushdown
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
@@ -116,7 +120,7 @@ public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor
IMissingWriterFactory nonFilterWriterFactory) throws HyracksDataException {
this(ctx, inputRecDesc, partition, minFilterFieldIndexes, maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, nonMatchWriterFactory, searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, null, -1, false, null, null);
nonFilterWriterFactory, null, -1, false, null, null, DefaultTupleProjectorFactory.INSTANCE);
}

public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition,
@@ -125,7 +129,8 @@ public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor
ISearchOperationCallbackFactory searchCallbackFactory, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory tupleFactoryFactory, long outputLimit,
boolean appendSearchCallbackProceedResult, byte[] searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue) throws HyracksDataException {
byte[] searchCallbackProceedResultTrueValue, ITupleProjectorFactory projectorFactory)
throws HyracksDataException {
this.ctx = ctx;
this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
this.retainInput = retainInput;
@@ -162,6 +167,8 @@ public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor
if (this.tupleFilterFactory != null && this.retainMissing) {
throw new IllegalStateException("RetainMissing with tuple filter is not supported");
}

tupleProjector = projectorFactory.createTupleProjector(ctx);
}

protected abstract ISearchPredicate createSearchPredicate();
@@ -349,10 +356,7 @@ private void subscribeForStats(IIndex index) {

protected void writeTupleToOutput(ITupleReference tuple) throws IOException {
try {
for (int i = 0; i < tuple.getFieldCount(); i++) {
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
tupleProjector.project(tuple, dos, tb);
} catch (Exception e) {
throw e;
}
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.common.impls;

import java.io.DataOutput;
import java.io.IOException;

import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.common.projection.ITupleProjector;

class DefaultTupleProjector implements ITupleProjector {
public static final ITupleProjector INSTANCE = new DefaultTupleProjector();

private DefaultTupleProjector() {
}

@Override
public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
for (int i = 0; i < tuple.getFieldCount(); i++) {
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
}
}
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.common.impls;

import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.projection.ITupleProjector;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;

public class DefaultTupleProjectorFactory implements ITupleProjectorFactory {
private static final long serialVersionUID = -4525893018744087821L;
public static final DefaultTupleProjectorFactory INSTANCE = new DefaultTupleProjectorFactory();

private DefaultTupleProjectorFactory() {
}

@Override
public ITupleProjector createTupleProjector(IHyracksTaskContext context) throws HyracksDataException {
return DefaultTupleProjector.INSTANCE;
}
}

0 comments on commit af65cd9

Please sign in to comment.