Skip to content

Commit

Permalink
Partition Pruning - SQL Part [HZ-2515] (#24813)
Browse files Browse the repository at this point in the history
Co-authored-by: Sasha Syrotenko <oleksandr.syrotenko@hazelcast.com>
Co-authored-by: Krzysztof Jamróz <krzysztof.jamroz@hazelcast.com>
  • Loading branch information
3 people committed Jul 21, 2023
1 parent fefdfe6 commit 2aea4a1
Show file tree
Hide file tree
Showing 29 changed files with 1,521 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.hazelcast.core.HazelcastException;
import com.hazelcast.dataconnection.impl.InternalDataConnectionService;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.AlterJobPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateJobPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateMappingPlan;
Expand Down Expand Up @@ -51,6 +51,7 @@
import com.hazelcast.jet.sql.impl.opt.logical.LogicalRel;
import com.hazelcast.jet.sql.impl.opt.logical.LogicalRules;
import com.hazelcast.jet.sql.impl.opt.logical.SelectByKeyMapLogicalRule;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMetadataQuery;
import com.hazelcast.jet.sql.impl.opt.physical.AssignDiscriminatorToScansRule;
import com.hazelcast.jet.sql.impl.opt.physical.CalcLimitTransposeRule;
import com.hazelcast.jet.sql.impl.opt.physical.CalcPhysicalRel;
Expand All @@ -68,6 +69,7 @@
import com.hazelcast.jet.sql.impl.opt.physical.SinkMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdateByKeyMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.visitor.RexToExpressionVisitor;
import com.hazelcast.jet.sql.impl.parse.QueryConvertResult;
import com.hazelcast.jet.sql.impl.parse.QueryParseResult;
import com.hazelcast.jet.sql.impl.parse.SqlAlterJob;
Expand Down Expand Up @@ -102,6 +104,8 @@
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.QueryUtils;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.extract.QueryPath;
import com.hazelcast.sql.impl.optimizer.OptimizationTask;
import com.hazelcast.sql.impl.optimizer.PlanKey;
import com.hazelcast.sql.impl.optimizer.PlanObjectKey;
Expand All @@ -110,8 +114,10 @@
import com.hazelcast.sql.impl.schema.IMapResolver;
import com.hazelcast.sql.impl.schema.Mapping;
import com.hazelcast.sql.impl.schema.MappingField;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.schema.TableResolver;
import com.hazelcast.sql.impl.schema.map.AbstractMapTable;
import com.hazelcast.sql.impl.schema.map.PartitionedMapTable;
import com.hazelcast.sql.impl.state.QueryResultRegistry;
import com.hazelcast.sql.impl.type.QueryDataType;
import org.apache.calcite.plan.Contexts;
Expand All @@ -130,6 +136,10 @@
import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexDynamicParam;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
import org.apache.calcite.sql.util.SqlString;
Expand All @@ -139,16 +149,23 @@
import java.security.Permission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.hazelcast.internal.cluster.Versions.V5_3;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.datamodel.Tuple3.tuple3;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateDataConnectionPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateIndexPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.DropIndexPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.ExplainStatementPlan;
import static com.hazelcast.jet.sql.impl.opt.OptUtils.schema;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

/**
Expand Down Expand Up @@ -631,7 +648,10 @@ private SqlPlanImpl toPlan(
);
} else if (physicalRel instanceof UpdatePhysicalRel) {
checkDmlOperationWithView(physicalRel);
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
Operation.UPDATE,
planKey,
Expand All @@ -658,7 +678,10 @@ private SqlPlanImpl toPlan(
} else if (physicalRel instanceof TableModify) {
checkDmlOperationWithView(physicalRel);
Operation operation = ((TableModify) physicalRel).getOperation();
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
operation,
planKey,
Expand All @@ -672,7 +695,10 @@ private SqlPlanImpl toPlan(
);
} else if (physicalRel instanceof DeletePhysicalRel) {
checkDmlOperationWithView(physicalRel);
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
Operation.DELETE,
planKey,
Expand All @@ -685,8 +711,11 @@ private SqlPlanImpl toPlan(
permissions
);
} else {
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(new RootRel(physicalRel), parameterMetadata,
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
new RootRel(physicalRel),
parameterMetadata,
context.getUsedViews());

SqlRowMetadata rowMetadata = createRowMetadata(
fieldNames,
physicalRel.schema(parameterMetadata).getTypes(),
Expand All @@ -701,7 +730,9 @@ private SqlPlanImpl toPlan(
OptUtils.isUnbounded(physicalRel),
rowMetadata,
planExecutor,
permissions
permissions,
partitionStrategyCandidates(physicalRel, parameterMetadata),
dagAndKeys.f2()
);
}
}
Expand Down Expand Up @@ -868,7 +899,7 @@ private SqlRowMetadata createRowMetadata(
return new SqlRowMetadata(columns);
}

private Tuple2<DAG, Set<PlanObjectKey>> createDag(
private Tuple3<DAG, Set<PlanObjectKey>, Integer> createDag(
PhysicalRel physicalRel,
QueryParameterMetadata parameterMetadata,
Set<PlanObjectKey> usedViews
Expand All @@ -885,7 +916,7 @@ private Tuple2<DAG, Set<PlanObjectKey>> createDag(
CreateTopLevelDagVisitor visitor = new CreateTopLevelDagVisitor(nodeEngine, parameterMetadata, wmKeysAssigner, usedViews);
physicalRel.accept(visitor);
visitor.optimizeFinishedDag();
return tuple2(visitor.getDag(), visitor.getObjectKeys());
return tuple3(visitor.getDag(), visitor.getObjectKeys(), visitor.requiredRootPartitionId());
}

private void checkDmlOperationWithView(PhysicalRel rel) {
Expand All @@ -895,6 +926,62 @@ private void checkDmlOperationWithView(PhysicalRel rel) {
}
}

private Map<String, List<Map<String, Expression<?>>>> partitionStrategyCandidates(
PhysicalRel root, QueryParameterMetadata parameterMetadata) {
HazelcastRelMetadataQuery query = OptUtils.metadataQuery(root);
final Map<String, List<Map<String, RexNode>>> prunabilityMap = query.extractPrunability(root);

RexBuilder b = HazelcastRexBuilder.INSTANCE;
RexToExpressionVisitor visitor = new RexToExpressionVisitor(schema(root.getRowType()), parameterMetadata);

final Map<String, Table> tableMap = tableResolvers().stream()
.map(TableResolver::getTables)
.flatMap(Collection::stream)
.filter(table -> table.getSchemaName().equals(QueryUtils.SCHEMA_NAME_PUBLIC))
.collect(Collectors.toMap(Table::getSqlName, Function.identity()));

final Map<String, List<Map<String, Expression<?>>>> result = new HashMap<>();
for (final String tableName : prunabilityMap.keySet()) {
assert tableMap.get(tableName) != null && tableMap.get(tableName) instanceof PartitionedMapTable;
var table = (PartitionedMapTable) tableMap.get(tableName);

var tableVariants = prunabilityMap.get(tableName);
final List<Map<String, Expression<?>>> convertedList = new ArrayList<>();

for (final Map<String, RexNode> variant : tableVariants) {
final Map<String, Expression<?>> convertedVariant = new HashMap<>();
for (final String columnName : variant.keySet()) {
final String fieldName = columnName.equals(QueryPath.KEY)
? QueryPath.KEY
: table.keyFields()
.filter(f -> f.getName().equals(columnName))
.findFirst()
.map(mapTableField -> mapTableField.getPath().getPath())
.orElseThrow(() -> QueryException.error(format("Can not find column %s in table %s",
tableName, columnName)));

final RexNode rexNode = variant.get(columnName);
if (rexNode instanceof RexDynamicParam) {
convertedVariant.put(fieldName, visitor.visitDynamicParam((RexDynamicParam) rexNode));
}

if (rexNode instanceof RexLiteral) {
convertedVariant.put(fieldName, visitor.visitLiteral((RexLiteral) rexNode));
}
}
if (!convertedVariant.isEmpty()) {
convertedList.add(convertedVariant);
}
}

final String mapName = table.getMapName();
result.putIfAbsent(mapName, convertedList);
}

return result;
}


/**
* Tries to find {@link ShouldNotExecuteRel} or {@link MustNotExecutePhysicalRel}
* in optimizer relational tree to throw exception before DAG construction phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.sql.impl.opt.cost.CostFactory;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdBoundedness;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdPrunability;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdRowCount;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdWatermarkedFields;
import com.hazelcast.jet.sql.impl.parse.QueryConvertResult;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class OptimizerContext {
HazelcastRelMdRowCount.SOURCE,
HazelcastRelMdBoundedness.SOURCE,
HazelcastRelMdWatermarkedFields.SOURCE,
HazelcastRelMdPrunability.SOURCE,
DefaultRelMetadataProvider.INSTANCE
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.JobConfigArguments;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetServiceBackend;
Expand Down Expand Up @@ -73,6 +74,11 @@
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.serialization.ClassDefinition;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.partition.strategy.AttributePartitioningStrategy;
import com.hazelcast.partition.strategy.DefaultPartitioningStrategy;
import com.hazelcast.internal.util.PartitioningStrategyUtil;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand All @@ -85,6 +91,7 @@
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.SqlErrorCode;
import com.hazelcast.sql.impl.UpdateSqlResultImpl;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.expression.ExpressionEvalContextImpl;
import com.hazelcast.sql.impl.row.EmptyRow;
Expand All @@ -104,11 +111,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -496,12 +505,78 @@ SqlResult execute(ExplainStatementPlan plan) {

SqlResult execute(SelectPlan plan, QueryId queryId, List<Object> arguments, long timeout) {
List<Object> args = prepareArguments(plan.getParameterMetadata(), arguments);
InternalSerializationService serializationService = Util.getSerializationService(hazelcastInstance);
ExpressionEvalContext evalContext = new ExpressionEvalContextImpl(
args,
serializationService,
Util.getNodeEngine(hazelcastInstance));

JobConfig jobConfig = new JobConfig()
.setArgument(SQL_ARGUMENTS_KEY_NAME, args)
.setArgument(KEY_SQL_QUERY_TEXT, plan.getQuery())
.setArgument(KEY_SQL_UNBOUNDED, plan.isStreaming())
.setTimeoutMillis(timeout);

final Set<Integer> partitions = new HashSet<>();
boolean allVariantsValid = true;
for (final String mapName : plan.getPartitionStrategyCandidates().keySet()) {
var perMapCandidates = plan.getPartitionStrategyCandidates().get(mapName);
final PartitioningStrategy<?> strategy = ((MapProxyImpl) hazelcastInstance.getMap(mapName))
.getPartitionStrategy();

// We only support Default and Attribute strategies, even if one of the maps uses non-Default/Attribute
// strategy, we should abort the process and clear list of already populated partitions so that partition
// pruning doesn't get activated at all for this query.
if (strategy != null
&& !(strategy instanceof DefaultPartitioningStrategy)
&& !(strategy instanceof AttributePartitioningStrategy)) {
allVariantsValid = false;
break;
}

// ordering of attributes matters for partitioning (1,2) produces different partition than (2,1).
final List<String> orderedKeyAttributes = new ArrayList<>();
if (strategy instanceof AttributePartitioningStrategy) {
final var attributeStrategy = (AttributePartitioningStrategy) strategy;
orderedKeyAttributes.addAll(asList(attributeStrategy.getPartitioningAttributes()));
} else {
orderedKeyAttributes.add(DEFAULT_UNIQUE_KEY);
}

for (final Map<String, Expression<?>> perMapCandidate : perMapCandidates) {
Object[] partitionKeyComponents = new Object[orderedKeyAttributes.size()];
for (int i = 0; i < orderedKeyAttributes.size(); i++) {
final String attribute = orderedKeyAttributes.get(i);
if (!perMapCandidate.containsKey(attribute)) {
// Shouldn't happen, defensive check in case Opt logic breaks and produces variants
// that do not contain all the required partitioning attributes.
throw new HazelcastException("Partition Pruning candidate"
+ " does not contain mandatory attribute: " + attribute);
}

partitionKeyComponents[i] = perMapCandidate.get(attribute).eval(null, evalContext);
}

final Partition partition = hazelcastInstance.getPartitionService().getPartition(
PartitioningStrategyUtil.constructAttributeBasedKey(partitionKeyComponents)
);
if (partition == null) {
// Can happen if the cluster is mid-repartitioning/migration, in this case we revert to
// non-pruning logic. Alternative scenario is if the produced partitioning key somehow invalid.
allVariantsValid = false;
break;
}
partitions.add(partition.getPartitionId());
}
}

if (!partitions.isEmpty() && allVariantsValid) {
if (plan.requiredRootPartitionId() != null) {
partitions.add(plan.requiredRootPartitionId());
}
jobConfig.setArgument(JobConfigArguments.KEY_REQUIRED_PARTITIONS, partitions);
}

QueryResultProducerImpl queryResultProducer = new QueryResultProducerImpl(!plan.isStreaming());
AbstractJetInstance<?> jet = (AbstractJetInstance<?>) hazelcastInstance.getJet();
long jobId = jet.newJobId();
Expand Down

0 comments on commit 2aea4a1

Please sign in to comment.