Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition Pruning - SQL Part [HZ-2515] #24813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
2f61e8b
draft changes
ivanthescientist May 25, 2023
0cd7090
Merge branch 'master' into partition_pruning_sql_draft
ivanthescientist Jun 12, 2023
3c180fd
Revert "Remove redundant distribution trait [HZ-1600] (#22509)"
ivanthescientist Jun 12, 2023
4f033aa
draft optimizer changes
ivanthescientist Jun 14, 2023
79da206
WIP : use RelMd queries to detect partition prunability
Fly-Style Jun 16, 2023
c9fc96b
Small fixes and optimizer test support for prunability
Fly-Style Jun 19, 2023
0513d25
draft connect jet to sql
ivanthescientist Jun 20, 2023
2e565c3
WIP
Fly-Style Jun 20, 2023
a1e0b9f
Handle Calc in HzRelMdPrunability
Fly-Style Jun 20, 2023
41e7a62
naive AND impl
ivanthescientist Jun 22, 2023
c3dc0d9
Revert "Revert "Remove redundant distribution trait [HZ-1600] (#2250…
ivanthescientist Jun 22, 2023
46e2ab5
checkstyle fix
ivanthescientist Jun 22, 2023
8987f45
Refactoring; enable prunability in Calcite SQL optimizer
Fly-Style Jun 23, 2023
dc29866
small fix
ivanthescientist Jun 23, 2023
20a8d03
Merge branch 'master' into partition_pruning_sql_draft
ivanthescientist Jun 26, 2023
598c974
Merge branch 'master' into partition_pruning_sql_draft
ivanthescientist Jun 27, 2023
3deebda
preliminary fix for nested filters case
ivanthescientist Jun 27, 2023
c18cd35
refactor Prunability return type
ivanthescientist Jul 4, 2023
dd44e11
refactor plan executor logic
ivanthescientist Jul 4, 2023
e48d4e7
Merge branch 'master' into partition_pruning_sql_draft
ivanthescientist Jul 5, 2023
f1faf66
Merge branch 'master' into partition_pruning_sql_draft
k-jamroz Jul 5, 2023
1c010be
Merge branch 'master' into partition_pruning_sql_draft
k-jamroz Jul 5, 2023
f6eebaa
Add basic tests for partition-pruned SQL queries
k-jamroz Jul 6, 2023
9730b96
Fix partition key for single-element AttributePartitioningStrategy
k-jamroz Jul 6, 2023
0f476e7
Workaround problems with broken assumptions that all partitions are p…
k-jamroz Jul 6, 2023
c3a6d94
Log job config for light job
k-jamroz Jul 6, 2023
8a9108d
Fix checkstyle
k-jamroz Jul 6, 2023
675cd63
Fix more checkstyle
k-jamroz Jul 6, 2023
55568c0
Fix even more checkstyle
k-jamroz Jul 6, 2023
15660ba
PartitionArrangement is expected to be sorted
k-jamroz Jul 6, 2023
41d4c5f
Do not add lite member coordinator to members executing the job
k-jamroz Jul 6, 2023
ffd8c16
Fix agg
Fly-Style Jul 10, 2023
4616e73
Remove comment
Fly-Style Jul 10, 2023
1779365
Merge branch 'partition_pruning_sql_draft' of https://github.com/ivan…
ivanthescientist Jul 10, 2023
561875e
small fix
ivanthescientist Jul 10, 2023
cc03884
Proper formatting and todos
Fly-Style Jul 10, 2023
f407aaa
additional simple tests
ivanthescientist Jul 10, 2023
4f4c71c
small review fix
ivanthescientist Jul 10, 2023
62ca453
renaming fix
ivanthescientist Jul 11, 2023
bf008ab
small change to the test
ivanthescientist Jul 11, 2023
6d77bee
Merge branch 'master' into partition_pruning_sql_draft
ivanthescientist Jul 12, 2023
c6a0f96
Fix JetJobPrunabilityTest
Fly-Style Jul 12, 2023
8ae8099
More fixes
Fly-Style Jul 12, 2023
413201a
disabled calc aggregations test
ivanthescientist Jul 12, 2023
2e94b1c
Always add coordinator with all partitions to the job
Fly-Style Jul 12, 2023
744153b
Checkstyle
Fly-Style Jul 13, 2023
e195227
Enhance RelPrunabilityTest
Fly-Style Jul 13, 2023
e0f2d20
Enhance RootResultSink required ptition even better
Fly-Style Jul 13, 2023
e0d01c3
Improve readability
burakgok Jul 14, 2023
b1039ec
Do not mutate collection that has been passed to another object
k-jamroz Jul 14, 2023
cf6a736
Merge remote-tracking branch 'ivanthescientist/partition_pruning_sql_…
k-jamroz Jul 14, 2023
7f2eac5
Cleanup, use attribute partitioning strategy in RelPrunabilityTest
Fly-Style Jul 14, 2023
c763cfd
Simplify partitioningColumns determination in extractPrunability
k-jamroz Jul 14, 2023
d97301d
Default prunability forwarding is wrong
k-jamroz Jul 14, 2023
cb2c7f7
Fix prunability test
k-jamroz Jul 14, 2023
600772d
Disrupt prunability on any non-prunable input
Fly-Style Jul 17, 2023
dfc26a7
Regression test for default partitioning strategy
Fly-Style Jul 17, 2023
44ce799
change PruningIT type
ivanthescientist Jul 17, 2023
2f231ce
Explain why we translate column names
k-jamroz Jul 17, 2023
96b0110
Final review comments
Fly-Style Jul 17, 2023
eae05aa
Revert back sorted() in PartitionArrangement: HashMap does not guaran…
Fly-Style Jul 18, 2023
6029916
small review fixes
ivanthescientist Jul 19, 2023
c644930
test scenario for OBJECT key dynamic param comparisons pruning
ivanthescientist Jul 19, 2023
9128ef6
disabled member pruning for __key based filters
ivanthescientist Jul 19, 2023
3b523ab
Use O(N) allPartitions construction algorithm in default non-prunable…
Fly-Style Jul 19, 2023
92065d6
Use specific implementation for each available rel in HazelcastRelMdP…
Fly-Style Jul 19, 2023
765fc56
Document temporal solution when coordinator is always added to the jo…
Fly-Style Jul 19, 2023
6ef5f31
Refactor `allToOne` usages in CreateTopLevelDagVisitor to use compute…
Fly-Style Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.hazelcast.core.HazelcastException;
import com.hazelcast.dataconnection.impl.InternalDataConnectionService;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.AlterJobPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateJobPlan;
import com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateMappingPlan;
Expand Down Expand Up @@ -51,6 +51,7 @@
import com.hazelcast.jet.sql.impl.opt.logical.LogicalRel;
import com.hazelcast.jet.sql.impl.opt.logical.LogicalRules;
import com.hazelcast.jet.sql.impl.opt.logical.SelectByKeyMapLogicalRule;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMetadataQuery;
import com.hazelcast.jet.sql.impl.opt.physical.AssignDiscriminatorToScansRule;
import com.hazelcast.jet.sql.impl.opt.physical.CalcLimitTransposeRule;
import com.hazelcast.jet.sql.impl.opt.physical.CalcPhysicalRel;
Expand All @@ -68,6 +69,7 @@
import com.hazelcast.jet.sql.impl.opt.physical.SinkMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdateByKeyMapPhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.UpdatePhysicalRel;
import com.hazelcast.jet.sql.impl.opt.physical.visitor.RexToExpressionVisitor;
import com.hazelcast.jet.sql.impl.parse.QueryConvertResult;
import com.hazelcast.jet.sql.impl.parse.QueryParseResult;
import com.hazelcast.jet.sql.impl.parse.SqlAlterJob;
Expand Down Expand Up @@ -102,6 +104,8 @@
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.QueryUtils;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.extract.QueryPath;
import com.hazelcast.sql.impl.optimizer.OptimizationTask;
import com.hazelcast.sql.impl.optimizer.PlanKey;
import com.hazelcast.sql.impl.optimizer.PlanObjectKey;
Expand All @@ -110,8 +114,10 @@
import com.hazelcast.sql.impl.schema.IMapResolver;
import com.hazelcast.sql.impl.schema.Mapping;
import com.hazelcast.sql.impl.schema.MappingField;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.schema.TableResolver;
import com.hazelcast.sql.impl.schema.map.AbstractMapTable;
import com.hazelcast.sql.impl.schema.map.PartitionedMapTable;
import com.hazelcast.sql.impl.state.QueryResultRegistry;
import com.hazelcast.sql.impl.type.QueryDataType;
import org.apache.calcite.plan.Contexts;
Expand All @@ -130,6 +136,10 @@
import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexDynamicParam;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.dialect.PostgresqlSqlDialect;
import org.apache.calcite.sql.util.SqlString;
Expand All @@ -139,16 +149,23 @@
import java.security.Permission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.hazelcast.internal.cluster.Versions.V5_3;
import static com.hazelcast.jet.datamodel.Tuple2.tuple2;
import static com.hazelcast.jet.datamodel.Tuple3.tuple3;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateDataConnectionPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.CreateIndexPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.DropIndexPlan;
import static com.hazelcast.jet.sql.impl.SqlPlanImpl.ExplainStatementPlan;
import static com.hazelcast.jet.sql.impl.opt.OptUtils.schema;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

/**
Expand Down Expand Up @@ -631,7 +648,10 @@ private SqlPlanImpl toPlan(
);
} else if (physicalRel instanceof UpdatePhysicalRel) {
checkDmlOperationWithView(physicalRel);
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
Operation.UPDATE,
planKey,
Expand All @@ -658,7 +678,10 @@ private SqlPlanImpl toPlan(
} else if (physicalRel instanceof TableModify) {
checkDmlOperationWithView(physicalRel);
Operation operation = ((TableModify) physicalRel).getOperation();
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
operation,
planKey,
Expand All @@ -672,7 +695,10 @@ private SqlPlanImpl toPlan(
);
} else if (physicalRel instanceof DeletePhysicalRel) {
checkDmlOperationWithView(physicalRel);
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(physicalRel, parameterMetadata, context.getUsedViews());
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
physicalRel,
parameterMetadata,
context.getUsedViews());
return new DmlPlan(
Operation.DELETE,
planKey,
Expand All @@ -685,8 +711,11 @@ private SqlPlanImpl toPlan(
permissions
);
} else {
Tuple2<DAG, Set<PlanObjectKey>> dagAndKeys = createDag(new RootRel(physicalRel), parameterMetadata,
Tuple3<DAG, Set<PlanObjectKey>, Integer> dagAndKeys = createDag(
new RootRel(physicalRel),
parameterMetadata,
context.getUsedViews());

SqlRowMetadata rowMetadata = createRowMetadata(
fieldNames,
physicalRel.schema(parameterMetadata).getTypes(),
Expand All @@ -701,7 +730,9 @@ private SqlPlanImpl toPlan(
OptUtils.isUnbounded(physicalRel),
rowMetadata,
planExecutor,
permissions
permissions,
partitionStrategyCandidates(physicalRel, parameterMetadata),
dagAndKeys.f2()
);
}
}
Expand Down Expand Up @@ -868,7 +899,7 @@ private SqlRowMetadata createRowMetadata(
return new SqlRowMetadata(columns);
}

private Tuple2<DAG, Set<PlanObjectKey>> createDag(
private Tuple3<DAG, Set<PlanObjectKey>, Integer> createDag(
PhysicalRel physicalRel,
QueryParameterMetadata parameterMetadata,
Set<PlanObjectKey> usedViews
Expand All @@ -885,7 +916,7 @@ private Tuple2<DAG, Set<PlanObjectKey>> createDag(
CreateTopLevelDagVisitor visitor = new CreateTopLevelDagVisitor(nodeEngine, parameterMetadata, wmKeysAssigner, usedViews);
physicalRel.accept(visitor);
visitor.optimizeFinishedDag();
return tuple2(visitor.getDag(), visitor.getObjectKeys());
return tuple3(visitor.getDag(), visitor.getObjectKeys(), visitor.requiredRootPartitionId());
}

private void checkDmlOperationWithView(PhysicalRel rel) {
Expand All @@ -895,6 +926,62 @@ private void checkDmlOperationWithView(PhysicalRel rel) {
}
}

private Map<String, List<Map<String, Expression<?>>>> partitionStrategyCandidates(
PhysicalRel root, QueryParameterMetadata parameterMetadata) {
HazelcastRelMetadataQuery query = OptUtils.metadataQuery(root);
final Map<String, List<Map<String, RexNode>>> prunabilityMap = query.extractPrunability(root);

RexBuilder b = HazelcastRexBuilder.INSTANCE;
RexToExpressionVisitor visitor = new RexToExpressionVisitor(schema(root.getRowType()), parameterMetadata);

final Map<String, Table> tableMap = tableResolvers().stream()
.map(TableResolver::getTables)
.flatMap(Collection::stream)
.filter(table -> table.getSchemaName().equals(QueryUtils.SCHEMA_NAME_PUBLIC))
.collect(Collectors.toMap(Table::getSqlName, Function.identity()));

final Map<String, List<Map<String, Expression<?>>>> result = new HashMap<>();
for (final String tableName : prunabilityMap.keySet()) {
assert tableMap.get(tableName) != null && tableMap.get(tableName) instanceof PartitionedMapTable;
var table = (PartitionedMapTable) tableMap.get(tableName);

var tableVariants = prunabilityMap.get(tableName);
final List<Map<String, Expression<?>>> convertedList = new ArrayList<>();

for (final Map<String, RexNode> variant : tableVariants) {
final Map<String, Expression<?>> convertedVariant = new HashMap<>();
for (final String columnName : variant.keySet()) {
final String fieldName = columnName.equals(QueryPath.KEY)
? QueryPath.KEY
: table.keyFields()
.filter(f -> f.getName().equals(columnName))
.findFirst()
.map(mapTableField -> mapTableField.getPath().getPath())
.orElseThrow(() -> QueryException.error(format("Can not find column %s in table %s",
tableName, columnName)));

final RexNode rexNode = variant.get(columnName);
if (rexNode instanceof RexDynamicParam) {
convertedVariant.put(fieldName, visitor.visitDynamicParam((RexDynamicParam) rexNode));
}

if (rexNode instanceof RexLiteral) {
convertedVariant.put(fieldName, visitor.visitLiteral((RexLiteral) rexNode));
}
}
if (!convertedVariant.isEmpty()) {
convertedList.add(convertedVariant);
}
}

final String mapName = table.getMapName();
result.putIfAbsent(mapName, convertedList);
}

return result;
}


/**
* Tries to find {@link ShouldNotExecuteRel} or {@link MustNotExecutePhysicalRel}
* in optimizer relational tree to throw exception before DAG construction phase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.sql.impl.opt.cost.CostFactory;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdBoundedness;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdPrunability;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdRowCount;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdWatermarkedFields;
import com.hazelcast.jet.sql.impl.parse.QueryConvertResult;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class OptimizerContext {
HazelcastRelMdRowCount.SOURCE,
HazelcastRelMdBoundedness.SOURCE,
HazelcastRelMdWatermarkedFields.SOURCE,
HazelcastRelMdPrunability.SOURCE,
DefaultRelMetadataProvider.INSTANCE
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.hazelcast.jet.JobStateSnapshot;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.JobConfigArguments;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.AbstractJetInstance;
import com.hazelcast.jet.impl.JetServiceBackend;
Expand Down Expand Up @@ -73,6 +74,11 @@
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.serialization.ClassDefinition;
import com.hazelcast.partition.Partition;
import com.hazelcast.partition.PartitioningStrategy;
import com.hazelcast.partition.strategy.AttributePartitioningStrategy;
import com.hazelcast.partition.strategy.DefaultPartitioningStrategy;
import com.hazelcast.internal.util.PartitioningStrategyUtil;
import com.hazelcast.query.impl.getters.Extractors;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
Expand All @@ -85,6 +91,7 @@
import com.hazelcast.sql.impl.QueryParameterMetadata;
import com.hazelcast.sql.impl.SqlErrorCode;
import com.hazelcast.sql.impl.UpdateSqlResultImpl;
import com.hazelcast.sql.impl.expression.Expression;
import com.hazelcast.sql.impl.expression.ExpressionEvalContext;
import com.hazelcast.sql.impl.expression.ExpressionEvalContextImpl;
import com.hazelcast.sql.impl.row.EmptyRow;
Expand All @@ -104,11 +111,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -496,12 +505,78 @@ SqlResult execute(ExplainStatementPlan plan) {

SqlResult execute(SelectPlan plan, QueryId queryId, List<Object> arguments, long timeout) {
List<Object> args = prepareArguments(plan.getParameterMetadata(), arguments);
InternalSerializationService serializationService = Util.getSerializationService(hazelcastInstance);
ExpressionEvalContext evalContext = new ExpressionEvalContextImpl(
args,
serializationService,
Util.getNodeEngine(hazelcastInstance));

JobConfig jobConfig = new JobConfig()
.setArgument(SQL_ARGUMENTS_KEY_NAME, args)
.setArgument(KEY_SQL_QUERY_TEXT, plan.getQuery())
.setArgument(KEY_SQL_UNBOUNDED, plan.isStreaming())
.setTimeoutMillis(timeout);

final Set<Integer> partitions = new HashSet<>();
boolean allVariantsValid = true;
for (final String mapName : plan.getPartitionStrategyCandidates().keySet()) {
var perMapCandidates = plan.getPartitionStrategyCandidates().get(mapName);
final PartitioningStrategy<?> strategy = ((MapProxyImpl) hazelcastInstance.getMap(mapName))
.getPartitionStrategy();

// We only support Default and Attribute strategies, even if one of the maps uses non-Default/Attribute
// strategy, we should abort the process and clear list of already populated partitions so that partition
// pruning doesn't get activated at all for this query.
if (strategy != null
&& !(strategy instanceof DefaultPartitioningStrategy)
&& !(strategy instanceof AttributePartitioningStrategy)) {
allVariantsValid = false;
break;
}

// ordering of attributes matters for partitioning (1,2) produces different partition than (2,1).
final List<String> orderedKeyAttributes = new ArrayList<>();
if (strategy instanceof AttributePartitioningStrategy) {
final var attributeStrategy = (AttributePartitioningStrategy) strategy;
orderedKeyAttributes.addAll(asList(attributeStrategy.getPartitioningAttributes()));
} else {
orderedKeyAttributes.add(DEFAULT_UNIQUE_KEY);
}

for (final Map<String, Expression<?>> perMapCandidate : perMapCandidates) {
Object[] partitionKeyComponents = new Object[orderedKeyAttributes.size()];
for (int i = 0; i < orderedKeyAttributes.size(); i++) {
final String attribute = orderedKeyAttributes.get(i);
if (!perMapCandidate.containsKey(attribute)) {
// Shouldn't happen, defensive check in case Opt logic breaks and produces variants
// that do not contain all the required partitioning attributes.
throw new HazelcastException("Partition Pruning candidate"
+ " does not contain mandatory attribute: " + attribute);
}

partitionKeyComponents[i] = perMapCandidate.get(attribute).eval(null, evalContext);
}

final Partition partition = hazelcastInstance.getPartitionService().getPartition(
PartitioningStrategyUtil.constructAttributeBasedKey(partitionKeyComponents)
);
if (partition == null) {
// Can happen if the cluster is mid-repartitioning/migration, in this case we revert to
// non-pruning logic. Alternative scenario is if the produced partitioning key somehow invalid.
allVariantsValid = false;
break;
}
partitions.add(partition.getPartitionId());
}
}

if (!partitions.isEmpty() && allVariantsValid) {
if (plan.requiredRootPartitionId() != null) {
partitions.add(plan.requiredRootPartitionId());
}
jobConfig.setArgument(JobConfigArguments.KEY_REQUIRED_PARTITIONS, partitions);
}

QueryResultProducerImpl queryResultProducer = new QueryResultProducerImpl(!plan.isStreaming());
AbstractJetInstance<?> jet = (AbstractJetInstance<?>) hazelcastInstance.getJet();
long jobId = jet.newJobId();
Expand Down