Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22128 Balancing partitions across stripes #3690

Merged
merged 3 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class StripeAwareLogManager extends LogManagerImpl {
private LogStorage logStorage;

/** Stripe, that corresponds to the current log storage instance. */
private final Stripe stripe;
private Stripe stripe;

/** Size threshold of log entries list, that will trigger the flush upon the excess. */
private int maxAppendBufferSize;
Expand All @@ -56,15 +56,6 @@ public class StripeAwareLogManager extends LogManagerImpl {
*/
private boolean sharedLogStorage;

/**
* Constructor.
*
* @param stripe Stripe that corresponds to a worker thread in {@link LogManagerOptions#getLogManagerDisruptor()}.
*/
public StripeAwareLogManager(Stripe stripe) {
this.stripe = stripe;
}

@Override
public boolean init(LogManagerOptions opts) {
LogStorage logStorage = opts.getLogStorage();
Expand All @@ -73,7 +64,15 @@ public boolean init(LogManagerOptions opts) {
this.logStorage = logStorage;
this.maxAppendBufferSize = opts.getRaftOptions().getMaxAppendBufferSize();

return super.init(opts);
boolean isInitSuccessfully = super.init(opts);

int stripe = opts.getLogManagerDisruptor().getStripe(opts.getNode().getNodeId());

assert stripe != -1;

this.stripe = opts.getLogStripes().get(stripe);

return isInitSuccessfully;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,9 @@ private boolean initSnapshotStorage() {
private boolean initLogStorage() {
Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
this.logManager = new StripeAwareLogManager(options.getLogStripes().get(stripe));
final LogManagerOptions opts = new LogManagerOptions();
this.logManager = new StripeAwareLogManager();

LogManagerOptions opts = new LogManagerOptions();
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
Expand All @@ -622,6 +622,7 @@ private boolean initLogStorage() {
opts.setNodeMetrics(this.metrics);
opts.setRaftOptions(this.raftOptions);
opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
opts.setLogStripes(options.getLogStripes());

return this.logManager.init(opts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand All @@ -48,6 +49,9 @@ public class StripedDisruptor<T extends NodeIdAware> {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(StripedDisruptor.class);

/** The counter is used to generate the next stripe to subscribe to in order to be a round-robin hash. */
private final AtomicInteger incrementalCounter = new AtomicInteger();

/** Array of disruptors. Each Disruptor in the appropriate stripe. */
private final Disruptor<T>[] disruptors;

Expand Down Expand Up @@ -152,7 +156,7 @@ public StripedDisruptor(
.setWaitStrategy(useYieldStrategy ? new YieldingWaitStrategy() : new BlockingWaitStrategy())
.build();

eventHandlers.add(new StripeEntryHandler());
eventHandlers.add(new StripeEntryHandler(i));
exceptionHandlers.add(new StripeExceptionHandler(name));

disruptor.handleEventsWith(eventHandlers.get(i));
Expand Down Expand Up @@ -202,12 +206,16 @@ public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler) {
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
eventHandlers.get(getStripe(nodeId)).subscribe(nodeId, handler);
assert getStripe(nodeId) == -1 : "The double subscriber for the one replication group [nodeId=" + nodeId + "].";

int stripeId = nextStripeToSubscribe();

eventHandlers.get(stripeId).subscribe(nodeId, handler);

if (exceptionHandler != null)
exceptionHandlers.get(getStripe(nodeId)).subscribe(nodeId, exceptionHandler);
exceptionHandlers.get(stripeId).subscribe(nodeId, exceptionHandler);

return queues[getStripe(nodeId)];
return queues[stripeId];
}

/**
Expand All @@ -216,18 +224,37 @@ public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsume
* @param nodeId Node id.
*/
public void unsubscribe(NodeId nodeId) {
eventHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
exceptionHandlers.get(getStripe(nodeId)).unsubscribe(nodeId);
int stripeId = getStripe(nodeId);

assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";

eventHandlers.get(stripeId).unsubscribe(nodeId);
exceptionHandlers.get(stripeId).unsubscribe(nodeId);
}

/**
* Determines a stripe by a node id and returns a stripe number.
* If the replication group is already subscribed, this method determines a stripe by a node id and returns a stripe number.
* If the replication group did not subscribed yet, this method returns {@code -1};
*
* @param nodeId Node id.
* @return Stripe of the Striped disruptor.
*/
public int getStripe(NodeId nodeId) {
return Math.abs(nodeId.hashCode() % stripes);
for (StripeEntryHandler handler : eventHandlers) {
if (handler.isSubscribed(nodeId)) {
return handler.stripeId;
}
}

return -1;
}

/**
* Generates the next stripe number in a round-robin manner.
* @return The stripe number.
*/
private int nextStripeToSubscribe() {
return Math.abs(incrementalCounter.getAndIncrement() % stripes);
}

/**
Expand All @@ -237,24 +264,40 @@ public int getStripe(NodeId nodeId) {
* @return Disruptor queue appropriate to the group.
*/
public RingBuffer<T> queue(NodeId nodeId) {
return queues[getStripe(nodeId)];
int stripeId = getStripe(nodeId);

assert stripeId != -1 : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";

return queues[stripeId];
}

/**
* Event handler for stripe of the Striped disruptor.
* It routs an event to the event handler for a group.
*/
private class StripeEntryHandler implements EventHandler<T> {
private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers;
private final ConcurrentHashMap<NodeId, EventHandler<T>> subscribers = new ConcurrentHashMap<>();

/** Size of the batch that is currently being handled. */
private int currentBatchSize = 0;

/** Stripe id. */
private final int stripeId;

/**
* The constructor.
*/
StripeEntryHandler() {
subscribers = new ConcurrentHashMap<>();
StripeEntryHandler(int stripeId) {
this.stripeId = stripeId;
}

/**
* Checks the replication group is subscribed to this stripe or not.
* @param nodeId Replication group node id.
* @return True if the group is subscribed, false otherwise.
*/
public boolean isSubscribed(NodeId nodeId) {
return subscribers.containsKey(nodeId);
}

/**
Expand Down Expand Up @@ -283,7 +326,7 @@ void unsubscribe(NodeId nodeId) {
// TODO: IGNITE-20536 Need to add assert that handler is not null and to implement a no-op handler.
if (handler != null) {
if (metrics != null && metrics.enabled()) {
metrics.hitToStripe(getStripe(event.nodeId()));
metrics.hitToStripe(stripeId);

if (endOfBatch) {
metrics.addBatchSize(currentBatchSize + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.option;

import java.util.List;
import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
Expand All @@ -38,6 +40,15 @@ public class LogManagerOptions {
private NodeMetrics nodeMetrics;
private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
private List<Stripe> logStripes;

public void setLogStripes(List<Stripe> logStripes) {
this.logStripes = logStripes;
}

public List<Stripe> getLogStripes() {
return this.logStripes;
}

public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
return logManagerDisruptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.ignite.disruptor;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
Expand Down Expand Up @@ -124,6 +127,53 @@ public void testDisruptorSimple() throws Exception {
disruptor.shutdown();
}

/**
* Checks the distribution of subscribed handlers across stripes.
* The distribution algorithm has to distribute handlers as evenly as possible using the round-robin algorithm.
*/
@Test
public void tesDistributionHandlers() {
Random random = new Random();

int stripes = random.nextInt(20);

StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test", "test-disruptor",
16384,
NodeIdAwareTestObj::new,
stripes,
false,
false,
null);

int handlers = random.nextInt(100);

log.info("Handlers will be distributed across stripes [handlers={}, stripes={}]", handlers, stripes);

int[] distribution = new int[stripes];

for (int i = 0; i < handlers; i++) {
GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();

var nodeId = new NodeId("grp", new PeerId(UUID.randomUUID().toString()));

disruptor.subscribe(nodeId, handler);

int stripe = disruptor.getStripe(nodeId);

assertNotEquals(-1, stripe);

distribution[stripe]++;
}

log.info("Result distribution [distribution={}]", distribution);

int reference = distribution[0];

for (int i = 1; i < stripes; i++) {
assertTrue(distribution[i] == reference || distribution[i] + 1 == reference || distribution[i] - 1 == reference);
}
}

/** Group event handler. */
private static class GroupAwareTestObjHandler implements EventHandler<NodeIdAwareTestObj> {
/** This is a container for the batch events. */
Expand Down