Skip to content

Commit

Permalink
[ASTERIXDB-3144][CONF] Configurable storage partitions count
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Make number of storage partitions used for static partitioning
  configurable.

Change-Id: I1549b408562910c32bbbb6764f366c95af345da9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17519
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
  • Loading branch information
mhubail committed May 9, 2023
1 parent 4b8e5aa commit a69202d
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 15 deletions.
Expand Up @@ -55,6 +55,7 @@
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"storage.partitions.count" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
Expand Up @@ -55,6 +55,7 @@
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"storage.partitions.count" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
Expand Up @@ -55,6 +55,7 @@
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"storage.partitioning" : "dynamic",
"storage.partitions.count" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
Expand Down
Expand Up @@ -280,4 +280,10 @@ ClusterState waitForState(Predicate<ClusterState> condition, long timeout, TimeU
* @return true if any of the nodes is currently inactive, otherwise false
*/
boolean nodesFailed(Set<String> nodeIds);

/**
* Gets the count of storage partitions
* @return the count of storage partitions
*/
int getStoragePartitionsCount();
}
Expand Up @@ -24,11 +24,14 @@
import java.util.List;
import java.util.Map;

import org.apache.asterix.common.utils.StorageConstants;

public class StorageComputePartitionsMap {

private final Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>();
private final int storagePartitionsCount;

public StorageComputePartitionsMap(int storagePartitionsCount) {
this.storagePartitionsCount = storagePartitionsCount;
}

public void addStoragePartition(int stoPart, ComputePartition compute) {
stoToComputeLocation.put(stoPart, compute);
Expand All @@ -41,7 +44,7 @@ public int[][] getComputeToStorageMap(boolean metadataDataset) {
computeToStoragePartitions.put(computePartitionIdForMetadata,
Collections.singletonList(computePartitionIdForMetadata));
} else {
for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
for (int i = 0; i < storagePartitionsCount; i++) {
ComputePartition computePartition = getComputePartition(i);
int computeId = computePartition.getId();
List<Integer> storagePartitions =
Expand All @@ -64,13 +67,14 @@ public ComputePartition getComputePartition(int storagePartition) {
public static StorageComputePartitionsMap computePartitionsMap(IClusterStateManager clusterStateManager) {
ClusterPartition metadataPartition = clusterStateManager.getMetadataPartition();
Map<Integer, ClusterPartition> clusterPartitions = clusterStateManager.getClusterPartitions();
StorageComputePartitionsMap newMap = new StorageComputePartitionsMap();
final int storagePartitionsCount = clusterStateManager.getStoragePartitionsCount();
StorageComputePartitionsMap newMap = new StorageComputePartitionsMap(storagePartitionsCount);
newMap.addStoragePartition(metadataPartition.getPartitionId(),
new ComputePartition(metadataPartition.getPartitionId(), metadataPartition.getActiveNodeId()));
int storagePartitionsPerComputePartition = StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size();
int storagePartitionsPerComputePartition = storagePartitionsCount / clusterPartitions.size();
int storagePartitionId = 0;
int lastComputePartition = 1;
int remainingStoragePartition = StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size();
int remainingStoragePartition = storagePartitionsCount % clusterPartitions.size();
for (Map.Entry<Integer, ClusterPartition> cp : clusterPartitions.entrySet()) {
ClusterPartition clusterPartition = cp.getValue();
for (int i = 0; i < storagePartitionsPerComputePartition; i++) {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
Expand Down Expand Up @@ -66,7 +67,8 @@ public enum Option implements IOption {
STORAGE_COLUMN_MAX_TUPLE_COUNT(NONNEGATIVE_INTEGER, 15000),
STORAGE_COLUMN_FREE_SPACE_TOLERANCE(DOUBLE, 0.15),
STORAGE_FORMAT(STRING, "row"),
STORAGE_PARTITIONING(STRING, "dynamic");
STORAGE_PARTITIONING(STRING, "dynamic"),
STORAGE_PARTITIONS_COUNT(INTEGER, 8);

private final IOptionType interpreter;
private final Object defaultValue;
Expand All @@ -84,6 +86,7 @@ public Section section() {
case STORAGE_GLOBAL_CLEANUP:
case STORAGE_GLOBAL_CLEANUP_TIMEOUT:
case STORAGE_PARTITIONING:
case STORAGE_PARTITIONS_COUNT:
return Section.COMMON;
default:
return Section.NC;
Expand Down Expand Up @@ -143,8 +146,11 @@ public String description() {
case STORAGE_FORMAT:
return "The default storage format (either row or column)";
case STORAGE_PARTITIONING:
return "The storage partitioning scheme (either dynamic or static). This value should not be changed"
+ " after any dataset have been created";
return "The storage partitioning scheme (either dynamic or static). This value should not be"
+ " changed after any dataset has been created";
case STORAGE_PARTITIONS_COUNT:
return "The number of storage partitions to use for static partitioning. This value should not be"
+ " changed after any dataset has been created";
default:
throw new IllegalStateException("NYI: " + this);
}
Expand Down Expand Up @@ -297,4 +303,8 @@ public String getStorageFormat() {
public PartitioningScheme getPartitioningScheme() {
return PartitioningScheme.fromName(accessor.getString(Option.STORAGE_PARTITIONING));
}

public int getStoragePartitionsCount() {
return accessor.getInt(Option.STORAGE_PARTITIONS_COUNT);
}
}
Expand Up @@ -48,7 +48,6 @@ public class StorageConstants {
public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME = "correlated-prefix";
public static final Map<String, String> DEFAULT_COMPACTION_POLICY_PROPERTIES;
public static final int METADATA_PARTITION = -1;
public static final int NUM_STORAGE_PARTITIONS = 8;

/**
* The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataTransactionContext;
Expand All @@ -49,10 +48,12 @@ public abstract class DataPartitioningProvider implements IDataPartitioningProvi

protected final ICcApplicationContext appCtx;
protected final ClusterStateManager clusterStateManager;
protected final int storagePartitionsCounts;

DataPartitioningProvider(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
this.clusterStateManager = (ClusterStateManager) appCtx.getClusterStateManager();
this.storagePartitionsCounts = clusterStateManager.getStoragePartitionsCount();
}

public static DataPartitioningProvider create(ICcApplicationContext appCtx) {
Expand Down Expand Up @@ -86,10 +87,9 @@ public PartitioningProperties getPartitioningProperties(Feed feed) throws Asteri
return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
}

protected static int getNumberOfPartitions(Dataset ds) {
protected int getNumberOfPartitions(Dataset ds) {
return MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())
? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS
: StorageConstants.NUM_STORAGE_PARTITIONS;
? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS : storagePartitionsCounts;
}

protected static int getLocationsCount(AlgebricksPartitionConstraint constraint) {
Expand Down
Expand Up @@ -73,7 +73,7 @@ private SplitComputeLocations getDataverseSplits(DataverseName dataverseName) {
List<String> locations = new ArrayList<>();
Set<Integer> uniqueLocations = new HashSet<>();
StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
for (int i = 0; i < storagePartitionsCounts; i++) {
File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
StoragePathUtil.prepareDataverseName(dataverseName));
ComputePartition computePartition = partitionMap.getComputePartition(i);
Expand Down
Expand Up @@ -503,6 +503,11 @@ public synchronized boolean nodesFailed(Set<String> nodeIds) {
return nodeIds.stream().anyMatch(failedNodes::contains);
}

@Override
public int getStoragePartitionsCount() {
return appCtx.getStorageProperties().getStoragePartitionsCount();
}

public synchronized StorageComputePartitionsMap getStorageComputeMap() {
return storageComputePartitionsMap;
}
Expand Down

0 comments on commit a69202d

Please sign in to comment.