Skip to content

Commit

Permalink
Merge pull request #135 from liyue2008/fix-store-seek-exception
Browse files Browse the repository at this point in the history
修复某些情况下Follower不响应Leader复制请求的问题。
  • Loading branch information
liyue2008 committed Mar 3, 2020
2 parents 8ae0ea0 + ed90109 commit e29122a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,12 @@ public Command handle(Transport transport, Command command) throws TransportExce
new AppendEntriesResponse.Build().success(false).nextPosition(-1L).build());
}

return leaderElection.handleAppendEntriesRequest(request);

Command response = leaderElection.handleAppendEntriesRequest(request);
if (null == response) {
response = new Command(new JoyQueueHeader(Direction.RESPONSE, CommandType.RAFT_APPEND_ENTRIES_RESPONSE),
new AppendEntriesResponse.Build().success(false).nextPosition(-1L).build());
}
return response;
} catch (Exception e) {
logger.warn("Handle append entries request of topic {} partition group {} fail",
request.getTopic(), request.getPartitionGroup(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private PositioningStore.Config getIndexStoreConfig(StoreConfig config) {

private PositioningStore.Config getMessageStoreConfig(StoreConfig config) {
return new PositioningStore.Config(config.getMessageFileSize(),
config.getFileHeaderSize(), config.getDiskFullRatio());
config.getFileHeaderSize(), config.getDiskFullRatio(),config.getMaxMessageLength());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class PositioningStore<T> implements Closeable {
private final int fileHeaderSize;
private final int fileDataSize;
private final int diskFullRatio;
private final int maxMessageLength;
private final File base;
private final LogSerializer<T> serializer;
private final PreloadBufferPool bufferPool;
Expand All @@ -67,6 +68,7 @@ public PositioningStore(File base, Config config, PreloadBufferPool bufferPool,
this.base = base;
this.fileHeaderSize = config.fileHeaderSize;
this.fileDataSize = config.fileDataSize;
this.maxMessageLength = config.maxMessageLength;
if(config.diskFullRatio <= 0 || config.diskFullRatio > 100) {
logger.warn("Invalid config diskFullRatio: {}, using default: {}.", config.diskFullRatio, Config.DEFAULT_DISK_FULL_RATIO);
diskFullRatio = Config.DEFAULT_DISK_FULL_RATIO;
Expand Down Expand Up @@ -284,7 +286,8 @@ private void recoverFileMap() throws IOException {

private long toLogTail(long position) {
T t = null;
while (position >= left()) {
long seekEndPosition = Math.max(position - 2 * maxMessageLength, left());
while (position >= seekEndPosition) {
try {
t = tryRead(position--);
} catch (Throwable ignored) {
Expand All @@ -297,7 +300,8 @@ private long toLogTail(long position) {
public long toLogStart(long position) {

T t = null;
while (position >= left()) {
long seekEndPosition = Math.max(position - 2 * maxMessageLength, left());
while (position >= seekEndPosition) {
try {
t = tryRead(position--);
} catch (Throwable ignored) {
Expand Down Expand Up @@ -508,7 +512,7 @@ private void checkReadPosition(long position) {
}

public long physicalSize() {
return storeFileMap.values().stream().map(StoreFile::file).mapToLong(File::length).sum();
return right() - left();
}

/**
Expand Down Expand Up @@ -675,6 +679,7 @@ public static class Config {
public static final int DEFAULT_FILE_HEADER_SIZE = 128;
public static final int DEFAULT_FILE_DATA_SIZE = 128 * 1024 * 1024;
public static final int DEFAULT_DISK_FULL_RATIO = 90;
public static final int DEFAULT_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024;

/**
* 文件头长度
Expand All @@ -687,6 +692,8 @@ public static class Config {

private final int diskFullRatio;

private final int maxMessageLength;

public Config() {
this(DEFAULT_FILE_DATA_SIZE,
DEFAULT_FILE_HEADER_SIZE);
Expand All @@ -701,9 +708,13 @@ public Config(int fileDataSize, int fileHeaderSize) {
this(fileDataSize, fileHeaderSize, DEFAULT_DISK_FULL_RATIO);
}
public Config(int fileDataSize, int fileHeaderSize, int diskFullRatio) {
this(fileDataSize, fileHeaderSize, diskFullRatio, DEFAULT_MAX_MESSAGE_LENGTH);
}
public Config(int fileDataSize, int fileHeaderSize, int diskFullRatio, int maxMessageLength) {
this.fileDataSize = fileDataSize;
this.fileHeaderSize = fileHeaderSize;
this.diskFullRatio = diskFullRatio;
this.maxMessageLength = maxMessageLength;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void loadRoUnsafe() throws IOException {
bufferType = MAPPED_BUFFER;
pageBuffer.clear();
} catch (Throwable t) {
logger.warn("Exception: ", t);
// logger.warn("Exception: ", t);
bufferPool.releaseMMap(this);
pageBuffer = null;
throw t;
Expand Down

0 comments on commit e29122a

Please sign in to comment.