Skip to content

Commit

Permalink
1.process GroupCommitEventLoop lost signal when error occur
Browse files Browse the repository at this point in the history
2.improve message body size limit and reserve log entry capacity
3.update default max message size limit to 4M
4.improve the prompt information
5.update version to 1.2.5
  • Loading branch information
zhenghao committed Jul 11, 2019
1 parent 52e5276 commit 8e1b0c1
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 294 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.menina</groupId>
<artifactId>raft</artifactId>
<version>1.2.4</version>
<version>1.2.5</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
6 changes: 3 additions & 3 deletions raft.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ ioThreadsNum: 20
backgroundThreadsNum: 1
# if true, flush dish after each batch write
logFlushEnable: false
# index file size (4k)
# index file size (4MB)
maxIndexSize: 4194304
# max message size (8k)
maxMessageSize: 8192
# max message size (4MB)
maxMessageSize: 4194304
# max segment size (100MB)
maxSegmentSize: 104857600
# max segment alive time (7 Day)
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/menina/raft/Raft.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public long propose(byte[] data, long timeout, TimeUnit timeUnit) throws RaftExc

@Override
public long propose(byte[] data, long timeout, TimeUnit timeUnit, Map<String, String> attachments) throws RaftException {
Preconditions.checkArgument(data.length <= raftNode.config().getMaxMessageSize(),
"request message body size " + data.length + " exceed limit: " + raftNode.config().getMaxMessageSize());
checkAvailable();
long expectOffset;
appendLock.lock();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/menina/raft/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public enum StorageType {

public static final int DEFAULT_RECOVER_BATCH_SIZE = DEFAULT_BATCH_SIZE << 4;

public static final int MAX_ENTRY_SIZE = 0x2000;
public static final int MAX_ENTRY_SIZE = 0x400000;

public static final long SEGMENT_INDEX_SIZE = 0x400000L;

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/menina/raft/common/task/SentinelTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.base.Preconditions;
import org.menina.raft.api.Node;
import org.menina.raft.api.State;
import org.menina.raft.transport.RpcTransporter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -39,6 +40,13 @@ public void run() {
if (TimeUnit.MILLISECONDS.toSeconds((now - raftNode.nodeInfo().getGroupCommitTick()) * raftNode.config().getClockAccuracyMills()) > maxCommitMills) {
log.warn("group commit thread blocked more than {} seconds, please check network-io, disk-io, load, " +
"current node processing capacity is close to the upper limit", maxCommitMills);
log.info("--------------node state-------------");
log.info(raftNode.nodeInfo().toString());
if (RpcTransporter.class.isAssignableFrom(raftNode.transporter().getClass())) {
log.info(((RpcTransporter) (raftNode.transporter())).getRequestChannel().toString());
}

log.info("-------------node state end-------------");
raftNode.nodeInfo().setGroupCommitTick(raftNode.clock().now());
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/menina/raft/core/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.menina.rail.config.ServerOptions;
import org.menina.rail.server.ExporterServer;
import lombok.extern.slf4j.Slf4j;
import org.menina.raft.api.State;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -51,7 +50,7 @@ private void backend() {
this.snapshotter.recover();
RaftProto.Snapshot snapshot = this.snapshotter.loadNewest();
this.nextOffsetMetaData = this.wal.recoverWal();
this.nodeInfo().setReplayState(State.ReplayState.REPLAYING);
this.nodeInfo().setReplayState(ReplayState.REPLAYING);
if (snapshot != null) {
this.raftLog.updateSnapshotMetadata(snapshot.getMeta());
this.raftLog.appliedTo(snapshot.getMeta().getIndex());
Expand All @@ -73,7 +72,7 @@ private void backend() {
this.recover(snapshot, latest);
this.storage.recover();
if (latest == null || (snapshot != null && latest.getIndex() <= snapshot.getMeta().getIndex())) {
nodeInfo().setReplayState(State.ReplayState.REPLAYED);
nodeInfo().setReplayState(ReplayState.REPLAYED);
log.info("state machine replay success, replay state {}", nodeInfo().getReplayState());
}
} catch (IOException e) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/menina/raft/core/RequestChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.menina.raft.common.Ready;
import org.menina.raft.message.RaftProto;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -21,13 +22,14 @@
/**
* @author zhenghao
* @date 2019/2/20
* <p>
*
* 1.pending message
* 2.event loop interaction
*/
@Slf4j
@ThreadSafe
@Data
@ToString
public class RequestChannel {

private volatile boolean ready = false;
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/menina/raft/core/loop/GroupCommitLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ public void accept(RaftProto.Message message) {
}
});
}

} catch (Throwable t) {
log.error(t.getMessage(), t);
} finally {
requestChannel.setCanCommit(false);
requestChannel.setAdvance(true);
requestChannel.setReady(false);
} catch (Throwable t) {
log.error(t.getMessage(), t);
}
}
}
Expand Down
Loading

0 comments on commit 8e1b0c1

Please sign in to comment.