Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move lz4 decompress to backend executor #1237

Merged
merged 7 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void truncate() {

@Override
public void mutate(BackendMutation mutation) {
if (mutation.isEmpty()) {
return;
}
// Just add to local buffer
this.getOrNewBatch().add(mutation);
}
Expand Down Expand Up @@ -203,7 +206,7 @@ private Object submitAndWait(StoreCommand command) {
}

private Object queryByRaft(Object query, Function<Object, Object> func) {
if (!this.context.isSafeRead()) {
if (this.node().selfIsLeader() || !this.context.isSafeRead()) {
return func.apply(query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public Status status() {
private RaftResult<T> get() {
try {
return this.future.get(WAIT_RAFT_LOG_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
} catch (ExecutionException e) {
throw new BackendException("ExecutionException", e);
} catch (InterruptedException e) {
throw new BackendException("InterruptedException", e);
} catch (TimeoutException e) {
throw new BackendException("Wait closure timeout");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private void submitCommand(StoreCommand command, StoreClosure closure) {
// compress return BytesBuffer
ByteBuffer buffer = LZ4Util.compress(command.data(),
RaftSharedContext.BLOCK_SIZE)
.forReadWritten()
.asByteBuffer();
LOG.debug("The bytes size of command(compressed) {} is {}",
command.action(), buffer.limit());
Expand Down Expand Up @@ -217,7 +218,6 @@ private void waitIfBusy() {
if (counter <= 0) {
return;
}
// TODO:should sleep or throw exception directly?
// It may lead many thread sleep, but this is exactly what I want
long time = counter * BUSY_SLEEP_FACTOR;
LOG.info("The node {} will sleep {} ms", this.node, time);
Expand All @@ -230,7 +230,8 @@ private void waitIfBusy() {
if (this.busyCounter.get() > 0) {
synchronized (this) {
if (this.busyCounter.get() > 0) {
this.busyCounter.decrementAndGet();
counter = this.busyCounter.decrementAndGet();
LOG.info("Decrease busy counter: [{}]", counter);
}
}
}
Expand Down Expand Up @@ -267,7 +268,7 @@ public void onError(PeerId peer, Status status) {
if (this.isWriteBufferOverflow(status)) {
// increment busy counter
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Busy counter: [{}]", count);
LOG.info("Increase busy counter: [{}]", count);
}
}

Expand All @@ -278,6 +279,16 @@ private boolean isWriteBufferOverflow(Status status) {
status.getErrorMsg().contains(expectMsg);
}

/**
* Maybe useful in the future
*/
private boolean isRpcTimeout(Status status) {
String expectMsg = "Invoke timeout";
return RaftError.EINTERNAL == status.getRaftError() &&
status.getErrorMsg() != null &&
status.getErrorMsg().contains(expectMsg);
}

@Override
public void onDestroyed(PeerId peer) {
LOG.warn("Replicator {} prepare to offline", peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.baidu.hugegraph.HugeException;
Expand Down Expand Up @@ -266,6 +267,9 @@ public void notifyCache(HugeType type, Id id) {
eventHub = this.params.graphEventHub();
} else if (type.isSchema()) {
eventHub = this.params.schemaEventHub();
if (id.number() && id.asLong() < 0) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment

}
} else {
return;
}
Expand Down Expand Up @@ -307,6 +311,11 @@ private HugeConfig config() {
}

private RpcServer initAndStartRpcServer() {
BoltRaftRpcFactory.CHANNEL_WRITE_BUF_LOW_WATER_MARK =
this.config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
BoltRaftRpcFactory.CHANNEL_WRITE_BUF_HIGH_WATER_MARK =
this.config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);

PeerId serverId = new PeerId();
serverId.parse(this.config().get(CoreOptions.RAFT_ENDPOINT));
RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
Expand Down Expand Up @@ -344,7 +353,7 @@ private static ExecutorService newPool(int coreThreads, int maxThreads,
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(true)
.enableMetric(false)
.coreThreads(coreThreads)
.maximumThreads(maxThreads)
.keepAliveSeconds(300L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,28 @@ public void onApply(Iterator iter) {
StoreClosure closure = null;
try {
while (iter.hasNext()) {
StoreType type;
StoreAction action;
BytesBuffer buffer;
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
buffer = BytesBuffer.wrap(closure.command().data());
} else {
// Follower need readMutation data
buffer = LZ4Util.decompress(iter.getData().array(),
RaftSharedContext.BLOCK_SIZE);
}
// The first two bytes are StoreType and StoreAction
type = StoreType.valueOf(buffer.read());
action = StoreAction.valueOf(buffer.read());
if (closure != null) {
// Closure is null on follower node
BytesBuffer buffer = BytesBuffer.wrap(closure.command().data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
return this.applyCommand(type, action, buffer);
});
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
this.applyCommand(type, action, buffer);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,22 @@ public static synchronized CoreOptions instance() {
60000
);

public static final ConfigOption<Integer> RAFT_RPC_BUF_LOW_WATER_MARK =
new ConfigOption<>(
"raft.rpc_buf_low_water_mark",
"",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer write some description

positiveInt(),
10 * 1024 * 1024
);

public static final ConfigOption<Integer> RAFT_RPC_BUF_HIGH_WATER_MARK =
new ConfigOption<>(
"raft.rpc_buf_high_water_mark",
"",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

positiveInt(),
20 * 1024 * 1024
);

public static final ConfigOption<Integer> RATE_LIMIT_WRITE =
new ConfigOption<>(
"rate_limit.write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static BytesBuffer compress(byte[] bytes, int blockSize,
} catch (IOException e) {
throw new BackendException("Failed to compress", e);
}
/*
* If need perform reading outside the method,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to perform

* remember to call forReadWritten()
*/
return buf;
}

Expand Down Expand Up @@ -82,6 +86,10 @@ public static BytesBuffer decompress(byte[] bytes, int blockSize,
} catch (IOException e) {
throw new BackendException("Failed to decompress", e);
}
/*
* If need perform reading outside the method,
* remember to call forReadWritten()
*/
return buf;
}
}