Skip to content

Commit

Permalink
[ASTERIXDB-3144][HYR][RT] Make sampling job support multiple partitions
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: no

Details:
This patch changes the sampling job to support
operating on multiple partitions. This is a step towards
achieving compute/storage separation.

Change-Id: If9abc68402adfe47ddeb5f1b1499e3414369f506
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17511
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
AliSolaiman committed May 5, 2023
1 parent de51cc4 commit afb22b3
Show file tree
Hide file tree
Showing 24 changed files with 153 additions and 145 deletions.
6 changes: 6 additions & 0 deletions asterixdb/asterix-app/pom.xml
Expand Up @@ -657,6 +657,12 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-util</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-util</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
Expand Down
Expand Up @@ -4459,7 +4459,7 @@ protected void handleCompactStatement(MetadataProvider metadataProvider, Stateme

if (ds.getDatasetType() == DatasetType.INTERNAL) {
for (Index index : indexes) {
if (index.isSecondaryIndex()) {
if (index.isSecondaryIndex() && !index.isSampleIndex()) {
jobsToExecute.add(
IndexUtil.buildSecondaryIndexCompactJobSpec(ds, index, metadataProvider, sourceLoc));
}
Expand Down
Expand Up @@ -116,6 +116,7 @@
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.TestUtil;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -214,7 +215,7 @@ public Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> getBulkLoa
fieldPermutation[i] = i;
}
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = getPartitionsMap(numPartitions);
int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -263,7 +264,7 @@ public Pair<LSMPrimaryInsertOperatorNodePushable, IPushRuntime> getInsertPipelin
}

int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = getPartitionsMap(numPartitions);
int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -372,7 +373,7 @@ public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getDeletePipeline
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = getPartitionsMap(numPartitions);
int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -838,7 +839,7 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = getPartitionsMap(numPartitions);
int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory =
new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -912,12 +913,4 @@ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, D
}
return new RecordDescriptor(outputSerDes, outputTypeTraits);
}

private static int[][] getPartitionsMap(int numPartitions) {
int[][] map = new int[numPartitions][1];
for (int i = 0; i < numPartitions; i++) {
map[i] = new int[] { i };
}
return map;
}
}
Expand Up @@ -57,7 +57,7 @@
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -176,7 +176,7 @@ private void dropInUse(IHyracksTaskContext ctx, IndexDataflowHelperFactory helpe
dataflowHelper.open();
// try to drop in-use index (should fail)
IndexDropOperatorNodePushable dropInUseOp = new IndexDropOperatorNodePushable(helperFactory,
EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
EnumSet.noneOf(DropOption.class), ctx, 0, TestUtil.getPartitionsMap(1));
try {
dropInUseOp.initialize();
} catch (HyracksDataException e) {
Expand All @@ -192,7 +192,7 @@ private void dropInUseWithWait(IHyracksTaskContext ctx, IndexDataflowHelperFacto
dropFailed.set(false);
// drop with option wait for in-use should be successful once the index is closed
final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory,
EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtils.getPartitionsMap(1));
EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0, TestUtil.getPartitionsMap(1));
Thread dropThread = new Thread(() -> {
try {
dropWithWaitOp.initialize();
Expand All @@ -216,7 +216,7 @@ private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory
dropFailed.set(false);
// Dropping non-existing index
IndexDropOperatorNodePushable dropNonExistingOp = new IndexDropOperatorNodePushable(helperFactory,
EnumSet.noneOf(DropOption.class), ctx, 0, TestUtils.getPartitionsMap(1));
EnumSet.noneOf(DropOption.class), ctx, 0, TestUtil.getPartitionsMap(1));
try {
dropNonExistingOp.initialize();
} catch (HyracksDataException e) {
Expand All @@ -232,7 +232,7 @@ private void dropNonExistingWithIfExists(IHyracksTaskContext ctx, IndexDataflowH
// Dropping non-existing index with if exists option should be successful
dropFailed.set(false);
IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = new IndexDropOperatorNodePushable(helperFactory,
EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtils.getPartitionsMap(1));
EnumSet.of(DropOption.IF_EXISTS), ctx, 0, TestUtil.getPartitionsMap(1));
try {
dropNonExistingWithIfExistsOp.initialize();
} catch (HyracksDataException e) {
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -90,7 +89,6 @@
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionExtensionManager;
import org.apache.asterix.om.functions.IFunctionManager;
Expand Down Expand Up @@ -154,7 +152,6 @@
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
Expand Down Expand Up @@ -1796,22 +1793,9 @@ public PartitioningProperties getPartitioningProperties(Feed feed) throws Algebr
return dataPartitioningProvider.getPartitioningProperties(feed);
}

public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
.filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex())
.collect(Collectors.toList());
if (dsIndexes.isEmpty()) {
return Collections.emptyList();
}
List<String> datasetNodes = findNodes(ds.getNodeGroupName());
List<Pair<IFileSplitProvider, String>> indexesSplits =
dsIndexes.stream()
.map(idx -> new Pair<>(
StoragePathUtil.splitProvider(SplitsAndConstraintsUtil.getIndexSplits(
appCtx.getClusterStateManager(), ds, idx.getIndexName(), datasetNodes)),
idx.getIndexName()))
.collect(Collectors.toList());
return indexesSplits;
public List<Index> getSecondaryIndexes(Dataset ds) throws AlgebricksException {
return getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
.filter(idx -> idx.isSecondaryIndex() && !idx.isSampleIndex()).collect(Collectors.toList());
}

public LockList getLocks() {
Expand Down
Expand Up @@ -102,7 +102,7 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper {

private final MetadataProvider metadataProvider;
private final Dataset dataset;
private final Index index;
private final Index sampleIdx;
private final SourceLocation sourceLoc;

private ARecordType itemType;
Expand All @@ -115,11 +115,12 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper {
private Map<String, String> mergePolicyProperties;
private int groupbyNumFrames;
private int[][] computeStorageMap;
private int numPartitions;

protected SampleOperationsHelper(Dataset dataset, Index index, MetadataProvider metadataProvider,
protected SampleOperationsHelper(Dataset dataset, Index sampleIdx, MetadataProvider metadataProvider,
SourceLocation sourceLoc) {
this.dataset = dataset;
this.index = index;
this.sampleIdx = sampleIdx;
this.metadataProvider = metadataProvider;
this.sourceLoc = sourceLoc;
}
Expand All @@ -135,11 +136,16 @@ public void init() throws AlgebricksException {
comparatorFactories = dataset.getPrimaryComparatorFactories(metadataProvider, itemType, metaType);
groupbyNumFrames = getGroupByNumFrames(metadataProvider, sourceLoc);

PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(dataset, index.getIndexName());
fileSplitProvider = partitioningProperties.getSpiltsProvider();
partitionConstraint = partitioningProperties.getConstraints();
computeStorageMap = partitioningProperties.getComputeStorageMap();
// make sure to always use the dataset + index to get the partitioning properties
// this is because in some situations the nodegroup of the passed dataset is different from the index
// this can happen during a rebalance for example where the dataset represents the new target dataset while
// the index object information is fetched from the old source dataset
PartitioningProperties samplePartitioningProperties =
metadataProvider.getPartitioningProperties(dataset, sampleIdx.getIndexName());
fileSplitProvider = samplePartitioningProperties.getSpiltsProvider();
partitionConstraint = samplePartitioningProperties.getConstraints();
computeStorageMap = samplePartitioningProperties.getComputeStorageMap();
numPartitions = samplePartitioningProperties.getNumberOfPartitions();
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
mergePolicyFactory = compactionInfo.first;
Expand All @@ -150,7 +156,7 @@ public void init() throws AlgebricksException {
public JobSpecification buildCreationJobSpec() throws AlgebricksException {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
mergePolicyFactory, mergePolicyProperties);
IIndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
fileSplitProvider, resourceFactory, true);
Expand All @@ -165,7 +171,7 @@ public JobSpecification buildCreationJobSpec() throws AlgebricksException {

@Override
public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
Index.SampleIndexDetails indexDetails = (Index.SampleIndexDetails) index.getIndexDetails();
Index.SampleIndexDetails indexDetails = (Index.SampleIndexDetails) sampleIdx.getIndexDetails();
int sampleCardinalityTarget = indexDetails.getSampleCardinalityTarget();
long sampleSeed = indexDetails.getSampleSeed();
IDataFormat format = metadataProvider.getDataFormat();
Expand All @@ -189,16 +195,18 @@ public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
sourceOp = targetOp;

// primary index scan ----> stream stats op
List<Pair<IFileSplitProvider, String>> indexesInfo = metadataProvider.getSplitProviderOfAllIndexes(dataset);
IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[indexesInfo.size()];
String[] names = new String[indexesInfo.size()];
List<Index> dsIndexes = metadataProvider.getSecondaryIndexes(dataset);
IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[dsIndexes.size()];
String[] names = new String[dsIndexes.size()];
for (int i = 0; i < indexes.length; i++) {
Pair<IFileSplitProvider, String> indexInfo = indexesInfo.get(i);
indexes[i] = new IndexDataflowHelperFactory(storageMgr, indexInfo.first);
names[i] = indexInfo.second;
Index idx = dsIndexes.get(i);
PartitioningProperties idxPartitioningProps =
metadataProvider.getPartitioningProperties(dataset, idx.getIndexName());
indexes[i] = new IndexDataflowHelperFactory(storageMgr, idxPartitioningProps.getSpiltsProvider());
names[i] = idx.getIndexName();
}
targetOp =
new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes, names);
targetOp = new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes,
names, computeStorageMap);
spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0);
sourceOp = targetOp;

Expand Down Expand Up @@ -318,18 +326,17 @@ public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor,
long numElementHint) throws AlgebricksException {
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset);
int[] pkFields = new int[dataset.getPrimaryKeys().size()];
for (int i = 0; i < pkFields.length; i++) {
pkFields[i] = fieldPermutation[i];
}
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
partitioningProperties.getNumberOfPartitions());
ITuplePartitionerFactory partitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
LSMIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new LSMIndexBulkLoadOperatorDescriptor(spec,
recordDesc, fieldPermutation, fillFactor, false, numElementHint, true, dataflowHelperFactory, null,
LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
partitioningProperties.getComputeStorageMap());
computeStorageMap);
treeIndexBulkLoadOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
partitionConstraint);
Expand All @@ -339,7 +346,7 @@ protected LSMIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecif
@Override
public JobSpecification buildDropJobSpec(Set<IndexDropOperatorDescriptor.DropOption> options)
throws AlgebricksException {
return SecondaryTreeIndexOperationsHelper.buildDropJobSpecImpl(dataset, index, options, metadataProvider,
return SecondaryTreeIndexOperationsHelper.buildDropJobSpecImpl(dataset, sampleIdx, options, metadataProvider,
sourceLoc);
}

Expand Down

0 comments on commit afb22b3

Please sign in to comment.