Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client-cpp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@
<cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
<thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
<iotdb.server.script>start-server.bat</iotdb.server.script>
<boost.include.dir />
<boost.library.dir />
<boost.include.dir/>
<boost.library.dir/>
</properties>
</profile>
<profile>
Expand Down
4 changes: 1 addition & 3 deletions cluster/src/assembly/resources/conf/iotdb-cluster.properties
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,4 @@ max_client_pernode_permember_number=1000
# If the number of connections created for a node exceeds `max_client_pernode_permember_number`,
# we need to wait so much time for other connections to be released until timeout,
# or a new connection will be created.
wait_client_timeout_ms=5000

enable_query_redirect=false
wait_client_timeout_ms=5000
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ public class ClusterConfig {

private boolean openServerRpcPort = false;

private boolean enableQueryRedirect = false;

public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
Expand Down Expand Up @@ -469,12 +467,4 @@ public long getWaitClientTimeoutMS() {
public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
}

public boolean isEnableQueryRedirect() {
return enableQueryRedirect;
}

public void setEnableQueryRedirect(boolean enableQueryRedirect) {
this.enableQueryRedirect = enableQueryRedirect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,6 @@ private void loadProps() {
properties.getProperty(
"wait_client_timeout_ms", String.valueOf(config.getWaitClientTimeoutMS()))));

config.setEnableQueryRedirect(
Boolean.parseBoolean(
properties.getProperty(
"enable_query_redirect", String.valueOf(config.isEnableQueryRedirect()))));

String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
Expand Down Expand Up @@ -307,7 +308,8 @@ public void executeNonQueryPlan(
}

@Override
public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
public void requestCommitIndex(
Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index");
if (service != null) {
service.requestCommitIndex(header, resultHandler);
Expand Down Expand Up @@ -919,7 +921,7 @@ public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException
}

@Override
public long requestCommitIndex(Node header) throws TException {
public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {
return getDataSyncService(header).requestCommitIndex(header);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
Expand Down Expand Up @@ -224,7 +225,8 @@ public void executeNonQueryPlan(
}

@Override
public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
public void requestCommitIndex(
Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
asyncService.requestCommitIndex(header, resultHandler);
}

Expand Down Expand Up @@ -331,7 +333,7 @@ public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException
}

@Override
public long requestCommitIndex(Node header) throws TException {
public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {
return syncService.requestCommitIndex(header);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
Expand Down Expand Up @@ -406,30 +407,7 @@ public HeartBeatResponse processHeartbeatRequest(HeartBeatRequest request) {
response.setLastLogTerm(logManager.getLastLogTerm());
}

if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
// there are more local logs that can be committed, commit them in a ThreadPool so the
// heartbeat response will not be blocked
CommitLogTask commitLogTask =
new CommitLogTask(
logManager, request.getCommitLogIndex(), request.getCommitLogTerm());
commitLogTask.registerCallback(new CommitLogCallback(this));
// if the log is not consistent, the commitment will be blocked until the leader makes the
// node catch up
if (commitLogPool != null && !commitLogPool.isShutdown()) {
commitLogPool.submit(commitLogTask);
}

logger.debug(
"{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, "
+ "localLast: {}-{}",
name,
request.getCommitLogIndex(),
request.getCommitLogTerm(),
logManager.getCommitLogIndex(),
logManager.getCommitLogTerm(),
logManager.getLastLogIndex(),
logManager.getLastLogTerm());
}
tryUpdateCommitIndex(leaderTerm, request.getCommitLogIndex(), request.getCommitLogTerm());

if (logger.isTraceEnabled()) {
logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
Expand All @@ -439,6 +417,31 @@ public HeartBeatResponse processHeartbeatRequest(HeartBeatRequest request) {
}
}

private void tryUpdateCommitIndex(long leaderTerm, long commitIndex, long commitTerm) {
if (leaderTerm >= term.get() && logManager.getCommitLogIndex() < commitIndex) {
// there are more local logs that can be committed, commit them in a ThreadPool so the
// heartbeat response will not be blocked
CommitLogTask commitLogTask = new CommitLogTask(logManager, commitIndex, commitTerm);
commitLogTask.registerCallback(new CommitLogCallback(this));
// if the log is not consistent, the commitment will be blocked until the leader makes the
// node catch up
if (commitLogPool != null && !commitLogPool.isShutdown()) {
commitLogPool.submit(commitLogTask);
}

logger.debug(
"{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, "
+ "localLast: {}-{}",
name,
commitIndex,
commitTerm,
logManager.getCommitLogIndex(),
logManager.getCommitLogTerm(),
logManager.getLastLogIndex(),
logManager.getLastLogTerm());
}
}

/**
* Process an ElectionRequest. If the request comes from the last leader, accept it. Else decide
* whether to accept by examining the log status of the elector.
Expand Down Expand Up @@ -872,8 +875,14 @@ public void waitLeader() {
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
long leaderCommitId = Long.MIN_VALUE;
RequestCommitIndexResponse response;
try {
leaderCommitId = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync();
response = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync();
leaderCommitId = response.getCommitLogIndex();

tryUpdateCommitIndex(
response.getTerm(), response.getCommitLogIndex(), response.getCommitLogTerm());

return syncLocalApply(leaderCommitId);
} catch (TException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
Expand Down Expand Up @@ -1057,9 +1066,12 @@ static void setWaitLeaderTimeMs(long waitLeaderTimeMs) {
}

@SuppressWarnings("java:S2274") // enable timeout
protected long requestCommitIdAsync() throws TException, InterruptedException {
protected RequestCommitIndexResponse requestCommitIdAsync()
throws TException, InterruptedException {
// use Long.MAX_VALUE to indicate a timeout
AtomicReference<Long> commitIdResult = new AtomicReference<>(Long.MAX_VALUE);
RequestCommitIndexResponse response =
new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE);
AtomicReference<RequestCommitIndexResponse> commitIdResult = new AtomicReference<>(response);
AsyncClient client = getAsyncClient(leader.get());
if (client == null) {
// cannot connect to the leader
Expand All @@ -1073,24 +1085,25 @@ protected long requestCommitIdAsync() throws TException, InterruptedException {
return commitIdResult.get();
}

private long requestCommitIdSync() throws TException {
private RequestCommitIndexResponse requestCommitIdSync() throws TException {
Client client = getSyncClient(leader.get());
RequestCommitIndexResponse response;
if (client == null) {
// cannot connect to the leader
logger.warn(MSG_NO_LEADER_IN_SYNC, name);
// use Long.MAX_VALUE to indicate a timeouts
return Long.MAX_VALUE;
response = new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE);
return response;
}
long commitIndex;
try {
commitIndex = client.requestCommitIndex(getHeader());
response = client.requestCommitIndex(getHeader());
} catch (TException e) {
client.getInputProtocol().getTransport().close();
throw e;
} finally {
ClientUtils.putBackSyncClient(client);
}
return commitIndex;
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.IOUtils;
Expand Down Expand Up @@ -85,10 +86,22 @@ public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long
}

@Override
public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) {
long commitIndex = member.getCommitIndex();
public void requestCommitIndex(
Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
long commitIndex;
long commitTerm;
long curTerm;
synchronized (member.getTerm()) {
commitIndex = member.getLogManager().getCommitLogIndex();
commitTerm = member.getLogManager().getCommitLogTerm();
curTerm = member.getTerm().get();
}

RequestCommitIndexResponse response =
new RequestCommitIndexResponse(curTerm, commitIndex, commitTerm);

if (commitIndex != Long.MIN_VALUE) {
resultHandler.onComplete(commitIndex);
resultHandler.onComplete(response);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
Expand Down Expand Up @@ -93,10 +94,22 @@ public long appendEntries(AppendEntriesRequest request) throws TException {
}

@Override
public long requestCommitIndex(Node header) throws TException {
long commitIndex = member.getCommitIndex();
public RequestCommitIndexResponse requestCommitIndex(Node header) throws TException {

long commitIndex;
long commitTerm;
long curTerm;
synchronized (member.getTerm()) {
commitIndex = member.getLogManager().getCommitLogIndex();
commitTerm = member.getLogManager().getCommitLogTerm();
curTerm = member.getTerm().get();
}

RequestCommitIndexResponse response =
new RequestCommitIndexResponse(curTerm, commitIndex, commitTerm);

if (commitIndex != Long.MIN_VALUE) {
return commitIndex;
return response;
}

member.waitLeader();
Expand All @@ -105,14 +118,14 @@ public long requestCommitIndex(Node header) throws TException {
throw new TException(new LeaderUnknownException(member.getAllNodes()));
}
try {
commitIndex = client.requestCommitIndex(header);
response = client.requestCommitIndex(header);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
throw e;
} finally {
ClientUtils.putBackSyncClient(client);
}
return commitIndex;
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.cluster.query;

import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
Expand Down Expand Up @@ -52,14 +51,12 @@ public class ClusterDataQueryExecutorTest extends BaseQueryTest {
@Before
public void setUp() throws Exception {
super.setUp();
ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(true);
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
ClusterDescriptor.getInstance().getConfig().setEnableQueryRedirect(false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
Expand Down Expand Up @@ -244,11 +245,11 @@ public void pullSnapshot(

@Override
public void requestCommitIndex(
Node header, AsyncMethodCallback<Long> resultHandler) {
Node header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
new Thread(
() -> {
if (enableSyncLeader) {
resultHandler.onComplete(-1L);
resultHandler.onComplete(new RequestCommitIndexResponse());
} else {
resultHandler.onError(new TestException());
}
Expand Down
Loading