Skip to content

Commit

Permalink
IGNITE-22128 Balancing partitions across stripes (#3690)
Browse files Browse the repository at this point in the history
  • Loading branch information
vldpyatkov authored and AMashenkov committed May 14, 2024
1 parent 74aea83 commit 523b2eb
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class StripeAwareLogManager extends LogManagerImpl {
private LogStorage logStorage;

/** Stripe, that corresponds to the current log storage instance. */
private final Stripe stripe;
private Stripe stripe;

/** Size threshold of log entries list, that will trigger the flush upon the excess. */
private int maxAppendBufferSize;
Expand All @@ -56,15 +56,6 @@ public class StripeAwareLogManager extends LogManagerImpl {
*/
private boolean sharedLogStorage;

/**
* Constructor.
*
* @param stripe Stripe that corresponds to a worker thread in {@link LogManagerOptions#getLogManagerDisruptor()}.
*/
public StripeAwareLogManager(Stripe stripe) {
this.stripe = stripe;
}

@Override
public boolean init(LogManagerOptions opts) {
LogStorage logStorage = opts.getLogStorage();
Expand All @@ -73,7 +64,15 @@ public boolean init(LogManagerOptions opts) {
this.logStorage = logStorage;
this.maxAppendBufferSize = opts.getRaftOptions().getMaxAppendBufferSize();

return super.init(opts);
boolean isInitSuccessfully = super.init(opts);

int stripe = opts.getLogManagerDisruptor().getStripe(opts.getNode().getNodeId());

assert stripe != -1;

this.stripe = opts.getLogStripes().get(stripe);

return isInitSuccessfully;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ private boolean initSnapshotStorage() {
private boolean initLogStorage() {
Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
this.logManager = new StripeAwareLogManager(options.getLogStripes().get(stripe));
final LogManagerOptions opts = new LogManagerOptions();
this.logManager = new StripeAwareLogManager();

LogManagerOptions opts = new LogManagerOptions();
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
Expand All @@ -622,6 +622,7 @@ private boolean initLogStorage() {
opts.setNodeMetrics(this.metrics);
opts.setRaftOptions(this.raftOptions);
opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
opts.setLogStripes(options.getLogStripes());

return this.logManager.init(opts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand All @@ -48,6 +49,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(StripedDisruptor.class);

/** The counter is used to generate the next stripe to subscribe to in order to be a round-robin hash. */
private final AtomicInteger incrementalCounter = new AtomicInteger();

/** Array of disruptors. Each Disruptor in the appropriate stripe. */
private final Disruptor<T>[] disruptors;

Expand Down Expand Up @@ -152,7 +156,7 @@ public StripedDisruptor(
.setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() : new BlockingWaitStrategy())
.build();

eventHandlers.add(new StripeEntryHandler());
eventHandlers.add(new StripeEntryHandler(i));
exceptionHandlers.add(new StripeExceptionHandler(name));

disruptor.handleEventsWith(eventHandlers.get(i));
Expand Down Expand Up @@ -202,12 +206,16 @@ public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler) {
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
eventHandlers.get(getStripe(nodeId)).subscribe(nodeId, handler);
assert getStripe(nodeId) == -1 : "The double subscriber for the one replication group [nodeId=" + nodeId + "].";

int stripeId = nextStripeToSubscribe();

eventHandlers.get(stripeId).subscribe(nodeId, handler);

if (exceptionHandler != null)
exceptionHandlers.get(getStripe(nodeId)).subscribe(nodeId, exceptionHandler);
exceptionHandlers.get(stripeId).subscribe(nodeId, exceptionHandler);

return queues[getStripe(nodeId)];
return queues[stripeId];
}

/**
Expand All @@ -216,18 +224,37 @@ public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsume
* @param nodeId Node id.
*/
public void unsubscribe(NodeId nodeId) {
eventHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
exceptionHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
int stripeId = getStripe(nodeId);

assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";

eventHandlers.get(stripeId).unsubscribe(nodeId);
exceptionHandlers.get(stripeId).unsubscribe(nodeId);
}

/**
* Determines a stripe by a node id and returns a stripe number.
* If the replication group is already subscribed, this method determines a stripe by a node id and returns a stripe number.
* If the replication group did not subscribed yet, this method returns {@code -1};
*
* @param nodeId Node id.
* @return Stripe of the Striped disruptor.
*/
public int getStripe(NodeId nodeId) {
return Math.abs(nodeId.hashCode() % stripes);
for (StripeEntryHandler handler : eventHandlers) {
if (handler.isSubscribed(nodeId)) {
return handler.stripeId;
}
}

return -1;
}

/**
* Generates the next stripe number in a round-robin manner.
* @return The stripe number.
*/
private int nextStripeToSubscribe() {
return Math.abs(incrementalCounter.getAndIncrement() % stripes);
}

/**
Expand All @@ -237,24 +264,40 @@ public int getStripe(NodeId nodeId) {
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> queue(NodeId nodeId) {
return queues[getStripe(nodeId)];
int stripeId = getStripe(nodeId);

assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";

return queues[stripeId];
}

/**
* Event handler for stripe of the Striped disruptor.
* It routs an event to the event handler for a group.
*/
private class StripeEntryHandler implements EventHandler<T> {
private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = new ConcurrentHashMap<>();

/** Size of the batch that is currently being handled. */
private int currentBatchSize = 0;

/** Stripe id. */
private final int stripeId;

/**
* The constructor.
*/
StripeEntryHandler() {
subscribers = new ConcurrentHashMap<>();
StripeEntryHandler(int stripeId) {
this.stripeId = stripeId;
}

/**
* Checks the replication group is subscribed to this stripe or not.
* @param nodeId Replication group node id.
* @return True if the group is subscribed, false otherwise.
*/
public boolean isSubscribed(NodeId nodeId) {
return subscribers.containsKey(nodeId);
}

/**
Expand Down Expand Up @@ -283,7 +326,7 @@ void unsubscribe(NodeId nodeId) {
// TODO: IGNITE-20536 Need to add assert that handler is not null and to implement a no-op handler.
if (handler != null) {
if (metrics != null && metrics.enabled()) {
metrics.hitToStripe(getStripe(event.nodeId()));
metrics.hitToStripe(stripeId);

if (endOfBatch) {
metrics.addBatchSize(currentBatchSize + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.option;

import java.util.List;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
Expand All @@ -38,6 +40,15 @@ public class LogManagerOptions {
private NodeMetrics nodeMetrics;
private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
private List<Stripe> logStripes;

public void setLogStripes(List<Stripe> logStripes) {
this.logStripes = logStripes;
}

public List<Stripe> getLogStripes() {
return this.logStripes;
}

public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
return logManagerDisruptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.ignite.disruptor;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
Expand Down Expand Up @@ -124,6 +127,53 @@ public void testDisruptorSimple() throws Exception {
disruptor.shutdown();
}

/**
* Checks the distribution of subscribed handlers across stripes.
* The distribution algorithm has to distribute handlers as evenly as possible using the round-robin algorithm.
*/
@Test
public void tesDistributionHandlers() {
Random random = new Random();

int stripes = random.nextInt(20);

StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test", "test-disruptor",
16384,
NodeIdAwareTestObj::new,
stripes,
false,
false,
null);

int handlers = random.nextInt(100);

log.info("Handlers will be distributed across stripes [handlers={}, stripes={}]", handlers, stripes);

int[] distribution = new int[stripes];

for (int i = 0; i < handlers; i++) {
GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();

var nodeId = new NodeId("grp", new PeerId(UUID.randomUUID().toString()));

disruptor.subscribe(nodeId, handler);

int stripe = disruptor.getStripe(nodeId);

assertNotEquals(-1, stripe);

distribution[stripe]++;
}

log.info("Result distribution [distribution={}]", distribution);

int reference = distribution[0];

for (int i = 1; i < stripes; i++) {
assertTrue(distribution[i] == reference || distribution[i] + 1 == reference || distribution[i] - 1 == reference);
}
}

/** Group event handler. */
private static class GroupAwareTestObjHandler implements EventHandler<NodeIdAwareTestObj> {
/** This is a container for the batch events. */
Expand Down

0 comments on commit 523b2eb

Please sign in to comment.