Skip to content

Commit

Permalink
[ASTERIXDB-3144][RT] Introduce DataPartitioningProvider
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add storage partitioning scheme config (dyanmic or static) and
  default it to dynamic.
- Introduce DataPartitioningProvider which encapsulates the logic
  for dataset partitioning based on the partitioning scheme.

Change-Id: Ia2bbc716fb4c2e9abca06e8f8629b15bd48bc7f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17503
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Murtadha Hubail <mhubail@apache.org>
  • Loading branch information
mhubail committed May 2, 2023
1 parent 066fd56 commit 09f4cdc
Show file tree
Hide file tree
Showing 24 changed files with 483 additions and 265 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
Expand Down Expand Up @@ -241,10 +242,9 @@ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op
propsLocal.add(new LocalOrderProperty(orderColumns));
MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
Dataset dataset = mp.findDataset(searchIndex.getDataverseName(), searchIndex.getDatasetName());
int[][] partitionsMap = mp.getPartitionsMap(dataset);
pv[0] = new StructuralPropertiesVector(
UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars, domain, partitionsMap),
propsLocal);
PartitioningProperties partitioningProperties = mp.getPartitioningProperties(dataset);
pv[0] = new StructuralPropertiesVector(UnorderedPartitionedProperty.ofPartitionsMap(searchKeyVars,
domain, partitioningProperties.getComputeStorageMap()), propsLocal);
return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@

import static org.apache.asterix.common.utils.IdentifierUtil.dataset;

import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.declared.DataSourceId;
Expand Down Expand Up @@ -58,7 +59,6 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
Expand Down Expand Up @@ -171,8 +171,7 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedInd
}
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset, indexName);
// TODO: Here we assume there is only one search key field.
int queryField = keyFields[0];
// Get tokenizer and search modifier factories.
Expand All @@ -183,20 +182,18 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedInd
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory =
FullTextUtil.fetchFilterAndCreateConfigEvaluator(metadataProvider, secondaryIndex.getDataverseName(),
((Index.TextIndexDetails) secondaryIndex.getIndexDetails()).getFullTextConfigName());
IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), secondarySplitsAndConstraint.first);

int numPartitions = MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);

IIndexDataflowHelperFactory dataflowHelperFactory =
new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
partitioningProperties.getSpiltsProvider());
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
new LSMInvertedIndexSearchOperatorDescriptor(jobSpec, outputRecDesc, queryField, dataflowHelperFactory,
queryTokenizerFactory, fullTextConfigEvaluatorFactory, searchModifierFactory, retainInput,
retainMissing, nonMatchWriterFactory,
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex,
IndexOperation.SEARCH, null),
minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys,
propagateIndexFilter, nonFilterWriterFactory, frameLimit, partitionsMap);
return new Pair<>(invIndexSearchOp, secondarySplitsAndConstraint.second);
propagateIndexFilter, nonFilterWriterFactory, frameLimit,
partitioningProperties.getComputeStorageMap());
return new Pair<>(invIndexSearchOp, partitioningProperties.getConstraints());
}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.TreeMap;

import org.apache.asterix.algebra.operators.CommitOperator;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
Expand Down Expand Up @@ -361,9 +362,9 @@ private ILogicalOperator connectAll2ndarySearchPlanWithIntersect(List<ILogicalOp
outputVars.add(outputVar);
VariableUtilities.substituteVariables(lop, inputVar, outputVar, context);
}

int[][] partitionsMap = metadataProvider.getPartitionsMap(idx);
IntersectOperator intersect = new IntersectOperator(outputVars, inputVars, partitionsMap);
PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(idx);
IntersectOperator intersect =
new IntersectOperator(outputVars, inputVars, partitioningProperties.getComputeStorageMap());
intersect.setSourceLocation(lop.getSourceLocation());
for (ILogicalOperator secondarySearch : subRoots) {
intersect.getInputs().add(secondarySearch.getInputs().get(0));
Expand Down
Expand Up @@ -47,13 +47,15 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.metadata.IMetadataLockUtil;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.metadata.utils.DataPartitioningProvider;
import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.asterix.runtime.job.listener.NodeJobTracker;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class CcApplicationContext implements ICcApplicationContext {
private final IConfigValidator configValidator;
private final IAdapterFactoryService adapterFactoryService;
private final ReentrantReadWriteLock compilationLock = new ReentrantReadWriteLock(true);
private final IDataPartitioningProvider dataPartitioningProvider;

public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
Expand Down Expand Up @@ -154,6 +157,7 @@ public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hc
requestTracker = new RequestTracker(this);
configValidator = configValidatorFactory.create();
this.adapterFactoryService = adapterFactoryService;
dataPartitioningProvider = new DataPartitioningProvider(this);
}

@Override
Expand Down Expand Up @@ -357,4 +361,9 @@ public IAdapterFactoryService getAdapterFactoryService() {
public ReentrantReadWriteLock getCompilationLock() {
return compilationLock;
}

@Override
public IDataPartitioningProvider getDataPartitioningProvider() {
return dataPartitioningProvider;
}
}
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.asterix.utils;

import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;

public class DataverseUtil {

Expand All @@ -35,10 +33,11 @@ private DataverseUtil() {

public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, MetadataProvider metadata) {
JobSpecification jobSpec = RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadata.splitAndConstraints(dataverse.getDataverseName());
FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first, false);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
PartitioningProperties partitioningProperties = metadata.splitAndConstraints(dataverse.getDataverseName());
FileRemoveOperatorDescriptor frod =
new FileRemoveOperatorDescriptor(jobSpec, partitioningProperties.getSpiltsProvider(), false);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod,
partitioningProperties.getConstraints());
jobSpec.addRoot(frod);
return jobSpec;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.apache.asterix.utils;

import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.TxnId;
Expand All @@ -29,7 +30,6 @@
import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
Expand All @@ -38,7 +38,6 @@
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;

public class FlushDatasetUtil {
private FlushDatasetUtil() {
Expand Down Expand Up @@ -66,9 +65,9 @@ public static void flushDataset(IHyracksClientConnection hcc, MetadataProvider m

spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);

Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset, dataset.getDatasetName());
AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(dataset, dataset.getDatasetName());
AlgebricksPartitionConstraint primaryPartitionConstraint = partitioningProperties.getConstraints();

AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,
primaryPartitionConstraint);
Expand Down
Expand Up @@ -214,7 +214,7 @@ public Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> getBulkLoa
fieldPermutation[i] = i;
}
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -263,7 +263,7 @@ public Pair<LSMPrimaryInsertOperatorNodePushable, IPushRuntime> getInsertPipelin
}

int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -372,7 +372,7 @@ public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getDeletePipeline
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -838,7 +838,7 @@ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeli
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
int[][] partitionsMap = getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
ITuplePartitionerFactory tuplePartitionerFactory =
new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
Expand Down Expand Up @@ -912,4 +912,12 @@ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, D
}
return new RecordDescriptor(outputSerDes, outputTypeTraits);
}

private static int[][] getPartitionsMap(int numPartitions) {
int[][] map = new int[numPartitions][1];
for (int i = 0; i < numPartitions; i++) {
map[i] = new int[] { i };
}
return map;
}
}
Expand Up @@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
Expand Up @@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
Expand Up @@ -54,6 +54,7 @@
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.common.cluster;

import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;

public class PartitioningProperties {
private final IFileSplitProvider splitsProvider;
private final AlgebricksPartitionConstraint constraints;
private final int[][] computeStorageMap;

private PartitioningProperties(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints,
int[][] computeStorageMap) {
this.splitsProvider = splitsProvider;
this.constraints = constraints;
this.computeStorageMap = computeStorageMap;
}

public static PartitioningProperties of(IFileSplitProvider splitsProvider,
AlgebricksPartitionConstraint constraints, int[][] computeStorageMap) {
return new PartitioningProperties(splitsProvider, constraints, computeStorageMap);
}

public IFileSplitProvider getSpiltsProvider() {
return splitsProvider;
}

public AlgebricksPartitionConstraint getConstraints() {
return constraints;
}

public int[][] getComputeStorageMap() {
return computeStorageMap;
}

public int getNumberOfPartitions() {
return splitsProvider.getFileSplits().length;
}
}

0 comments on commit 09f4cdc

Please sign in to comment.