Skip to content

Commit

Permalink
Add partition group configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 6, 2018
1 parent 2630440 commit 2997930
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 45 deletions.
Expand Up @@ -85,11 +85,11 @@ default Partition getPartition(String key) {
/**
* Partition group builder.
*/
abstract class Builder implements io.atomix.utils.Builder<ManagedPartitionGroup> {
protected final String name;
abstract class Builder<C extends PartitionGroupConfig<C>> implements io.atomix.utils.Builder<ManagedPartitionGroup> {
protected final C config;

protected Builder(String name) {
this.name = name;
protected Builder(C config) {
this.config = config;
}
}
}
@@ -0,0 +1,45 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.primitive.partition;

import io.atomix.utils.Config;

/**
* Partition group configuration.
*/
public class PartitionGroupConfig<C extends PartitionGroupConfig<C>> implements Config {
private String name;

/**
* Returns the partition group name.
*
* @return the partition group name
*/
public String getName() {
return name;
}

/**
* Sets the partition group name.
*
* @param name the partition group name
*/
@SuppressWarnings("unchecked")
public C setName(String name) {
this.name = name;
return (C) this;
}
}
Expand Up @@ -45,7 +45,6 @@
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand All @@ -60,7 +59,15 @@ public class PrimaryBackupPartitionGroup implements ManagedPartitionGroup {
* @return a new partition group builder
*/
public static Builder builder(String name) {
return new Builder(name);
return new Builder(new PrimaryBackupPartitionGroupConfig().setName(name));
}

private static Collection<PrimaryBackupPartition> buildPartitions(PrimaryBackupPartitionGroupConfig config) {
List<PrimaryBackupPartition> partitions = new ArrayList<>(config.getNumPartitions());
for (int i = 0; i < config.getNumPartitions(); i++) {
partitions.add(new PrimaryBackupPartition(PartitionId.from(config.getName(), i + 1), config.getMemberGroupProvider()));
}
return partitions;
}

private static final Logger LOGGER = LoggerFactory.getLogger(PrimaryBackupPartitionGroup.class);
Expand All @@ -70,7 +77,11 @@ public static Builder builder(String name) {
private final List<PartitionId> sortedPartitionIds = Lists.newCopyOnWriteArrayList();
private ThreadContextFactory threadFactory;

public PrimaryBackupPartitionGroup(String name, Collection<PrimaryBackupPartition> partitions) {
public PrimaryBackupPartitionGroup(PrimaryBackupPartitionGroupConfig config) {
this(config.getName(), buildPartitions(config));
}

private PrimaryBackupPartitionGroup(String name, Collection<PrimaryBackupPartition> partitions) {
this.name = checkNotNull(name);
partitions.forEach(p -> {
this.partitions.put(p.id(), p);
Expand Down Expand Up @@ -148,12 +159,9 @@ public String toString() {
/**
* Primary-backup partition group builder.
*/
public static class Builder extends PartitionGroup.Builder {
private int numPartitions;
private MemberGroupProvider memberGroupProvider = MemberGroupStrategy.NODE_AWARE;

protected Builder(String name) {
super(name);
public static class Builder extends PartitionGroup.Builder<PrimaryBackupPartitionGroupConfig> {
protected Builder(PrimaryBackupPartitionGroupConfig config) {
super(config);
}

/**
Expand All @@ -164,8 +172,7 @@ protected Builder(String name) {
* @throws IllegalArgumentException if the number of partitions is not positive
*/
public Builder withNumPartitions(int numPartitions) {
checkArgument(numPartitions > 0, "numPartitions must be positive");
this.numPartitions = numPartitions;
config.setNumPartitions(numPartitions);
return this;
}

Expand All @@ -176,7 +183,7 @@ public Builder withNumPartitions(int numPartitions) {
* @return the partition group builder
*/
public Builder withMemberGroupProvider(MemberGroupProvider memberGroupProvider) {
this.memberGroupProvider = checkNotNull(memberGroupProvider, "memberGroupProvider cannot be null");
config.setMemberGroupProvider(memberGroupProvider);
return this;
}

Expand Down Expand Up @@ -212,11 +219,7 @@ public Builder withMemberGroups(Collection<MemberGroup> memberGroups) {

@Override
public ManagedPartitionGroup build() {
List<PrimaryBackupPartition> partitions = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
partitions.add(new PrimaryBackupPartition(PartitionId.from(name, i + 1), memberGroupProvider));
}
return new PrimaryBackupPartitionGroup(name, partitions);
return new PrimaryBackupPartitionGroup(config);
}
}
}
@@ -0,0 +1,78 @@
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.backup.partition;

import io.atomix.primitive.partition.MemberGroupProvider;
import io.atomix.primitive.partition.MemberGroupStrategy;
import io.atomix.primitive.partition.PartitionGroupConfig;

/**
* Primary-backup partition group configuration.
*/
public class PrimaryBackupPartitionGroupConfig extends PartitionGroupConfig<PrimaryBackupPartitionGroupConfig> {
private int numPartitions;
private MemberGroupProvider memberGroupProvider = MemberGroupStrategy.NODE_AWARE;

/**
* Returns the number of partitions in the group.
*
* @return the number of partitions in the group.
*/
public int getNumPartitions() {
return numPartitions;
}

/**
* Sets the number of partitions in the group.
*
* @param numPartitions the number of partitions in the group
* @return the partition group configuration
*/
public PrimaryBackupPartitionGroupConfig setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
return this;
}

/**
* Returns the member group provider.
*
* @return the member group provider
*/
public MemberGroupProvider getMemberGroupProvider() {
return memberGroupProvider;
}

/**
* Sets the member group provider.
*
* @param memberGroupProvider the member group provider
* @return the partition group configuration
*/
public PrimaryBackupPartitionGroupConfig setMemberGroupProvider(MemberGroupProvider memberGroupProvider) {
this.memberGroupProvider = memberGroupProvider;
return this;
}

/**
* Sets the member group strategy.
*
* @param memberGroupStrategy the member group strategy
* @return the partition group configuration
*/
public PrimaryBackupPartitionGroupConfig setMemberGroupStrategy(MemberGroupStrategy memberGroupStrategy) {
return setMemberGroupProvider(memberGroupStrategy);
}
}
Expand Up @@ -50,8 +50,6 @@
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Raft partition group.
Expand All @@ -65,11 +63,20 @@ public class RaftPartitionGroup implements ManagedPartitionGroup {
* @return a new partition group builder
*/
public static Builder builder(String name) {
return new Builder(name);
return new Builder(new RaftPartitionGroupConfig().setName(name));
}

private static final Logger LOGGER = LoggerFactory.getLogger(RaftPartitionGroup.class);

private static Collection<RaftPartition> buildPartitions(RaftPartitionGroupConfig config) {
File partitionsDir = new File(config.getDataDirectory(), "partitions");
List<RaftPartition> partitions = new ArrayList<>(config.getNumPartitions());
for (int i = 0; i < config.getNumPartitions(); i++) {
partitions.add(new RaftPartition(PartitionId.from(config.getName(), i + 1), config.getStorageLevel(), new File(partitionsDir, String.valueOf(i + 1))));
}
return partitions;
}

private final String name;
private final int partitionSize;
private final Map<PartitionId, RaftPartition> partitions = Maps.newConcurrentMap();
Expand All @@ -79,7 +86,11 @@ public static Builder builder(String name) {
private Collection<PartitionMetadata> metadata;
private CompletableFuture<Void> metadataChangeFuture = CompletableFuture.completedFuture(null);

public RaftPartitionGroup(String name, Collection<RaftPartition> partitions, int partitionSize) {
public RaftPartitionGroup(RaftPartitionGroupConfig config) {
this(config.getName(), buildPartitions(config), config.getPartitionSize());
}

private RaftPartitionGroup(String name, Collection<RaftPartition> partitions, int partitionSize) {
this.name = name;
this.partitionSize = partitionSize;
partitions.forEach(p -> {
Expand Down Expand Up @@ -204,14 +215,9 @@ public String toString() {
/**
* Raft partition group builder.
*/
public static class Builder extends PartitionGroup.Builder {
private int numPartitions;
private int partitionSize;
private StorageLevel storageLevel = StorageLevel.MAPPED;
private File dataDirectory = new File(System.getProperty("user.dir"), "data");

protected Builder(String name) {
super(name);
public static class Builder extends PartitionGroup.Builder<RaftPartitionGroupConfig> {
protected Builder(RaftPartitionGroupConfig config) {
super(config);
}

/**
Expand All @@ -222,8 +228,7 @@ protected Builder(String name) {
* @throws IllegalArgumentException if the number of partitions is not positive
*/
public Builder withNumPartitions(int numPartitions) {
checkArgument(numPartitions > 0, "numPartitions must be positive");
this.numPartitions = numPartitions;
config.setNumPartitions(numPartitions);
return this;
}

Expand All @@ -235,8 +240,7 @@ public Builder withNumPartitions(int numPartitions) {
* @throws IllegalArgumentException if the partition size is not positive
*/
public Builder withPartitionSize(int partitionSize) {
checkArgument(partitionSize > 0, "partitionSize must be positive");
this.partitionSize = partitionSize;
config.setPartitionSize(partitionSize);
return this;
}

Expand All @@ -247,7 +251,7 @@ public Builder withPartitionSize(int partitionSize) {
* @return the Raft partition group builder
*/
public Builder withStorageLevel(StorageLevel storageLevel) {
this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
config.setStorageLevel(storageLevel);
return this;
}

Expand All @@ -258,18 +262,13 @@ public Builder withStorageLevel(StorageLevel storageLevel) {
* @return the replica builder
*/
public Builder withDataDirectory(File dataDir) {
this.dataDirectory = checkNotNull(dataDir, "dataDir cannot be null");
config.setDataDirectory(dataDir);
return this;
}

@Override
public ManagedPartitionGroup build() {
File partitionsDir = new File(dataDirectory, "partitions");
List<RaftPartition> partitions = new ArrayList<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
partitions.add(new RaftPartition(PartitionId.from(name, i + 1), storageLevel, new File(partitionsDir, String.valueOf(i + 1))));
}
return new RaftPartitionGroup(name, partitions, partitionSize);
return new RaftPartitionGroup(config);
}
}
}

0 comments on commit 2997930

Please sign in to comment.