Skip to content

Commit

Permalink
[ASTERIXDB-3144][RT] Make hash exchanges consider partitions map
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Some operators like INSERT,UPSERT, etc. will require their input
to hash partition the data based on a partitions map. This patch
is to make the hash exchanges satisfy this requirement.
Hash exchanges will take an optional partitions map to use when
hash partitioning.

- Make sure the partitions map is considered when comparing
  partitioning properties.

Change-Id: I71457603048e9be9467943918e21ce5ede658c19
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17489
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
AliSolaiman committed Apr 15, 2023
1 parent 736dd13 commit 5bbe412
Show file tree
Hide file tree
Showing 43 changed files with 337 additions and 130 deletions.
Expand Up @@ -216,7 +216,7 @@ private boolean useBatchPointSearch(ILogicalOperator op, PhysicalOptimizationCon

@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
// For primary indexes optimizing an equality condition we can reduce the broadcast requirement to hash partitioning.
if (isPrimaryIndex && isEqCondition) {
Expand All @@ -239,7 +239,11 @@ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op
orderColumns.add(new OrderColumn(orderVar, OrderKind.ASC));
}
propsLocal.add(new LocalOrderProperty(orderColumns));
pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, domain),
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
int[][] partitionsMap = mp.getPartitionsMap(dataset);
pv[0] = new StructuralPropertiesVector(
UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap),
propsLocal);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
Expand Down
Expand Up @@ -112,7 +112,7 @@ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext
dataset.getDatasetDetails(), context.getComputationNodeDomain());
IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
AbstractScanOperator as = (AbstractScanOperator) op;
deliveredProperties = dspp.computeDeliveredProperties(as.getVariables());
deliveredProperties = dspp.computeDeliveredProperties(as.getVariables(), context);
}

@Override
Expand Down Expand Up @@ -156,7 +156,7 @@ protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[

@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
Expand Down Expand Up @@ -65,11 +66,12 @@ public boolean isMicroOperator() {
}

@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
throws AlgebricksException {
IDataSource<?> ds = idx.getDataSource();
IDataSourcePropertiesProvider dspp = ds.getPropertiesProvider();
AbstractScanOperator as = (AbstractScanOperator) op;
deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables());
deliveredProperties = dspp.computeDeliveredProperties(as.getScanVariables(), context);
}

protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[] inputSchemas) {
Expand All @@ -85,7 +87,7 @@ protected int[] getKeyIndexes(List<LogicalVariable> keyVarList, IOperatorSchema[

@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) throws AlgebricksException {
if (requiresBroadcast) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(domain), null);
Expand Down
Expand Up @@ -121,9 +121,9 @@ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op
keysLeftBranchTileId.add(keysLeftBranch.get(0));
List<LogicalVariable> keysRightBranchTileId = new ArrayList<>();
keysRightBranchTileId.add(keysRightBranch.get(0));
IPartitioningProperty pp1 = new UnorderedPartitionedProperty(new ListSet<>(keysLeftBranchTileId),
IPartitioningProperty pp1 = UnorderedPartitionedProperty.of(new ListSet<>(keysLeftBranchTileId),
context.getComputationNodeDomain());
IPartitioningProperty pp2 = new UnorderedPartitionedProperty(new ListSet<>(keysRightBranchTileId),
IPartitioningProperty pp2 = UnorderedPartitionedProperty.of(new ListSet<>(keysRightBranchTileId),
context.getComputationNodeDomain());

List<ILocalStructuralProperty> localProperties1 = new ArrayList<>();
Expand Down
Expand Up @@ -340,7 +340,7 @@ private IntersectOperator createIntersectWithFilter(List<LogicalVariable> output
inputCompareVars.add(new ArrayList<>(intersect.getInputCompareVariables(i)));
}
IntersectOperator intersectWithFilter = new IntersectOperator(intersect.getOutputCompareVariables(),
outputFilterVars, inputCompareVars, filterVars);
outputFilterVars, inputCompareVars, filterVars, intersect.getPartitionsMap());
intersectWithFilter.setSourceLocation(intersect.getSourceLocation());
intersectWithFilter.getInputs().addAll(intersect.getInputs());
return intersectWithFilter;
Expand Down
Expand Up @@ -284,7 +284,8 @@ private boolean intersectAllSecondaryIndexes(List<Pair<IAccessMethod, Index>> ch
subRoots.add(subRoot);
}
// Connect each secondary index utilization plan to a common intersect operator.
ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context);
Index idx = chosenIndexes.get(0).getSecond();
ILogicalOperator primaryUnnestOp = connectAll2ndarySearchPlanWithIntersect(subRoots, context, idx);

subTree.getDataSourceRef().setValue(primaryUnnestOp);
return primaryUnnestOp != null;
Expand Down Expand Up @@ -312,7 +313,7 @@ private Pair<IAccessMethod, Index> fetchPrimaryIndexAmongChosenIndexes(
* Connect each secondary index utilization plan to a common INTERSECT operator.
*/
private ILogicalOperator connectAll2ndarySearchPlanWithIntersect(List<ILogicalOperator> subRoots,
IOptimizationContext context) throws AlgebricksException {
IOptimizationContext context, Index idx) throws AlgebricksException {
ILogicalOperator lop = subRoots.get(0);
List<List<LogicalVariable>> inputVars = new ArrayList<>(subRoots.size());
for (int i = 0; i < subRoots.size(); i++) {
Expand Down Expand Up @@ -360,7 +361,8 @@ private ILogicalOperator connectAll2ndarySearchPlanWithIntersect(List<ILogicalOp
VariableUtilities.substituteVariables(lop, inputVar, outputVar, context);
}

IntersectOperator intersect = new IntersectOperator(outputVars, inputVars);
int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap);
intersect.setSourceLocation(lop.getSourceLocation());
for (ILogicalOperator secondarySearch : subRoots) {
intersect.getInputs().add(secondarySearch.getInputs().get(0));
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
Expand Down Expand Up @@ -114,12 +115,14 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildDatasourceS
public IDataSourcePropertiesProvider getPropertiesProvider() {
return new IDataSourcePropertiesProvider() {
@Override
public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
IOptimizationContext ctx) {
return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
}

@Override
public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
IOptimizationContext ctx) {
List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
//TODO(ali): consider primary keys?
List<OrderColumn> secKeys = new ArrayList<>(numSecKeys);
Expand Down
Expand Up @@ -193,7 +193,7 @@ public void testJoinGroupby() throws AlgebricksException {

ExchangeOperator exchangeOperator1 = new ExchangeOperator();
exchangeOperator1.setExecutionMode(PARTITIONED);
exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
exchangeOperator1.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
exchangeOperator1.getInputs().add(new MutableObject<>(firstJoin));

EmptyTupleSourceOperator ets3 = new EmptyTupleSourceOperator();
Expand All @@ -207,7 +207,7 @@ public void testJoinGroupby() throws AlgebricksException {

ExchangeOperator exchangeOperator2 = new ExchangeOperator();
exchangeOperator2.setExecutionMode(PARTITIONED);
exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
exchangeOperator2.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
exchangeOperator2.getInputs().add(new MutableObject<>(groupByOperator));

LeftOuterJoinOperator secondJoin = new LeftOuterJoinOperator(new MutableObject<>(ConstantExpression.TRUE),
Expand Down
Expand Up @@ -51,7 +51,7 @@ public void testParallelGroupBy() throws AlgebricksException {
// Constructs a parallel group-by query plan.
GroupByOperator globalGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
ExchangeOperator exchange = new ExchangeOperator();
exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
exchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
GroupByOperator localGby = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
globalGby.getInputs().add(new MutableObject<>(exchange));
exchange.getInputs().add(new MutableObject<>(localGby));
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testParallelJoin() throws AlgebricksException {
// Left child plan of the join.
ExchangeOperator leftChildExchange = new ExchangeOperator();
leftChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
leftChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
InnerJoinOperator leftChild = makeJoinOperator(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
join.getInputs().add(new MutableObject<>(leftChildExchange));
leftChildExchange.getInputs().add(new MutableObject<>(leftChild));
Expand All @@ -106,7 +106,7 @@ public void testParallelJoin() throws AlgebricksException {
// Right child plan of the join.
ExchangeOperator rightChildExchange = new ExchangeOperator();
rightChildExchange.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null));
rightChildExchange.setPhysicalOperator(new HashPartitionExchangePOperator(Collections.emptyList(), null, null));
GroupByOperator rightChild = makeGroupByOperator(AbstractLogicalOperator.ExecutionMode.LOCAL);
join.getInputs().add(new MutableObject<>(rightChildExchange));
rightChildExchange.getInputs().add(new MutableObject<>(rightChild));
Expand Down
Expand Up @@ -22,7 +22,10 @@
import java.util.List;
import java.util.Set;

import org.apache.asterix.metadata.entities.Dataset;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.ListSet;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
Expand All @@ -47,7 +50,8 @@ public DataSourcePartitioningProvider(DataSource dataSource, INodeDomain domain)
}

@Override
public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables) {
public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable> scanVariables,
IOptimizationContext ctx) throws AlgebricksException {
IPhysicalPropertiesVector propsVector;
IPartitioningProperty pp;
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
Expand All @@ -58,12 +62,23 @@ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable>
ds.computeLocalStructuralProperties(propsLocal, scanVariables);
break;
case DataSource.Type.FEED:
pp = getFeedPartitioningProperty(ds, domain, scanVariables);
String dsName = ((FeedDataSource) ds).getTargetDataset();
Dataset feedDs = ((MetadataProvider) ctx.getMetadataProvider())
.findDataset(ds.getId().getDataverseName(), dsName);
int[][] partitionsMap1 = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(feedDs);
pp = getFeedDatasetPartitioningProperty(ds, domain, scanVariables, partitionsMap1);
break;
case DataSource.Type.INTERNAL_DATASET:
case DataSource.Type.SAMPLE:
Set<LogicalVariable> pvars = new ListSet<>();
pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars);
Dataset dataset;
if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
dataset = ((DatasetDataSource) ds).getDataset();
} else {
dataset = ((SampleDataSource) ds).getDataset();
}
int[][] partitionsMap = ((MetadataProvider) ctx.getMetadataProvider()).getPartitionsMap(dataset);
pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars, partitionsMap);
propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars)));
break;
default:
Expand All @@ -74,14 +89,22 @@ public IPhysicalPropertiesVector computeRequiredProperties(List<LogicalVariable>
}

@Override
public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables) {
if (ds.getDatasourceType() == DataSource.Type.INTERNAL_DATASET) {
IPartitioningProperty pp = new RandomPartitioningProperty(domain);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
ds.computeLocalStructuralProperties(propsLocal, scanVariables);
return new StructuralPropertiesVector(pp, propsLocal);
public IPhysicalPropertiesVector computeDeliveredProperties(List<LogicalVariable> scanVariables,
IOptimizationContext ctx) throws AlgebricksException {
switch (ds.getDatasourceType()) {
case DataSource.Type.INTERNAL_DATASET: {
IPartitioningProperty pp = new RandomPartitioningProperty(domain);
List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
ds.computeLocalStructuralProperties(propsLocal, scanVariables);
return new StructuralPropertiesVector(pp, propsLocal);
}
case DataSource.Type.FEED: {
IPartitioningProperty pp = getFeedPartitioningProperty(ds, domain, scanVariables);
return new StructuralPropertiesVector(pp, new ArrayList<>());
}
default:
return computeRequiredProperties(scanVariables, ctx);
}
return computeRequiredProperties(scanVariables);
}

private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
Expand All @@ -93,13 +116,26 @@ private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) {
}

private static IPartitioningProperty getInternalDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) {
List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars, int[][] partitionsMap) {
IPartitioningProperty pp;
if (scanVariables.size() < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
pp = new UnorderedPartitionedProperty(pvars, domain);
pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
}
return pp;
}

public static IPartitioningProperty getFeedDatasetPartitioningProperty(DataSource ds, INodeDomain domain,
List<LogicalVariable> scanVariables, int[][] partitionsMap) {
IPartitioningProperty pp;
if (scanVariables.size() < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
Set<LogicalVariable> pvars = new ListSet<>();
pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
pp = UnorderedPartitionedProperty.ofPartitionsMap(pvars, domain, partitionsMap);
}
return pp;
}
Expand All @@ -112,7 +148,7 @@ public static IPartitioningProperty getFeedPartitioningProperty(DataSource ds, I
} else {
Set<LogicalVariable> pvars = new ListSet<>();
pvars.addAll(ds.getPrimaryKeyVariables(scanVariables));
pp = new UnorderedPartitionedProperty(pvars, domain);
pp = UnorderedPartitionedProperty.of(pvars, domain);
}
return pp;
}
Expand Down

0 comments on commit 5bbe412

Please sign in to comment.