Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ public void flushOneMemTable() {
MemTableFlushTask flushTask =
new MemTableFlushTask(memTableToFlush, writer, storageGroupName);
flushTask.syncFlushMemTable();
} catch (Exception e) {
} catch (Throwable e) {
if (writer == null) {
logger.info(
"{}: {} is closed during flush, abandon flush task",
Expand Down
9 changes: 4 additions & 5 deletions server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ private void setUp() throws StartupException, QueryProcessException {
logger.info("recover the schema...");
initMManager();
initServiceProvider();
// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
registerManager.register(RPCService.getInstance());
}
registerManager.register(JMXService.getInstance());
registerManager.register(FlushManager.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
Expand All @@ -149,11 +153,6 @@ private void setUp() throws StartupException, QueryProcessException {
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());

// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
registerManager.register(RPCService.getInstance());
}

initProtocols();
// in cluster mode, InfluxDBMManager has been initialized, so there is no need to init again to
// avoid wasting time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,45 +222,13 @@ private THsHaServer.Args initAsyncedHshaPoolArgs(

@SuppressWarnings("java:S2259")
private TServerTransport openTransport(String bindAddress, int port) throws TTransportException {
int maxRetry = 5;
long retryIntervalMS = 5000;
TTransportException lastExp = null;
for (int i = 0; i < maxRetry; i++) {
try {
return new TServerSocket(new InetSocketAddress(bindAddress, port));
} catch (TTransportException e) {
lastExp = e;
try {
Thread.sleep(retryIntervalMS);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
break;
}
}
}
throw lastExp == null ? new TTransportException() : lastExp;
return new TServerSocket(new InetSocketAddress(bindAddress, port));
}

private TServerTransport openNonblockingTransport(
String bindAddress, int port, int connectionTimeoutInMS) throws TTransportException {
int maxRetry = 5;
long retryIntervalMS = 5000;
TTransportException lastExp = null;
for (int i = 0; i < maxRetry; i++) {
try {
return new TNonblockingServerSocket(
new InetSocketAddress(bindAddress, port), connectionTimeoutInMS);
} catch (TTransportException e) {
lastExp = e;
try {
Thread.sleep(retryIntervalMS);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
break;
}
}
}
throw lastExp == null ? new TTransportException() : lastExp;
return new TNonblockingServerSocket(
new InetSocketAddress(bindAddress, port), connectionTimeoutInMS);
}

public void setThreadStopLatch(CountDownLatch threadStopLatch) {
Expand Down