Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ target_config_nodes=127.0.0.1:22277
# data_dirs=data/data


# mult_dir_strategy
# multi_dir_strategy
# The strategy is used to choose a directory from data_dirs for the system to store a new tsfile.
# System provides four strategies to choose from, or user can create his own strategy by extending org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy.
# The info of the four strategies are as follows:
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,20 @@ void setMultiDirStrategyClassName(String multiDirStrategyClassName) {
this.multiDirStrategyClassName = multiDirStrategyClassName;
}

public void checkMultiDirStrategyClassName() {
if (isClusterMode
&& !(multiDirStrategyClassName.equals(DEFAULT_MULTI_DIR_STRATEGY)
|| multiDirStrategyClassName.equals(
MULTI_DIR_STRATEGY_PREFIX + DEFAULT_MULTI_DIR_STRATEGY))) {
String msg =
String.format(
"Cannot set multi_dir_strategy to %s, because cluster mode only allows MaxDiskUsableSpaceFirstStrategy.",
multiDirStrategyClassName);
logger.error(msg);
throw new RuntimeException(msg);
}
}

public int getBatchSize() {
return batchSize;
}
Expand Down Expand Up @@ -2973,6 +2987,7 @@ public boolean isClusterMode() {

public void setClusterMode(boolean isClusterMode) {
this.isClusterMode = isClusterMode;
checkMultiDirStrategyClassName();
}

public int getDataNodeId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,15 @@ public void loadProperties(Properties properties) {
conf.setSyncMlogPeriodInMs(forceMlogPeriodInMs);
}

String oldMultiDirStrategyClassName = conf.getMultiDirStrategyClassName();
conf.setMultiDirStrategyClassName(
properties.getProperty("multi_dir_strategy", conf.getMultiDirStrategyClassName()));
try {
conf.checkMultiDirStrategyClassName();
} catch (Exception e) {
conf.setMultiDirStrategyClassName(oldMultiDirStrategyClassName);
throw e;
}

conf.setBatchSize(
Integer.parseInt(
Expand Down
42 changes: 16 additions & 26 deletions server/src/main/java/org/apache/iotdb/db/service/DataNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ protected void serverCheckAndInit() throws ConfigurationException, IOException {
if (config.getRpcAddress().equals("0.0.0.0")) {
config.setRpcAddress(config.getInternalAddress());
}
thisNode.setIp(IoTDBDescriptor.getInstance().getConfig().getInternalAddress());
thisNode.setPort(IoTDBDescriptor.getInstance().getConfig().getInternalPort());
thisNode.setIp(config.getInternalAddress());
thisNode.setPort(config.getInternalPort());
}

protected void doAddNode() {
Expand Down Expand Up @@ -161,16 +161,15 @@ private void prepareDataNode() throws StartupException {
// Register services
JMXService.registerMBean(getInstance(), mbeanName);
// set the mpp mode to true
IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
IoTDBDescriptor.getInstance().getConfig().setClusterMode(true);
config.setMppMode(true);
config.setClusterMode(true);
}

/** register DataNode with ConfigNode */
private void registerInConfigNode() throws StartupException {
int retry = DEFAULT_JOIN_RETRY;

ConfigNodeInfo.getInstance()
.updateConfigNodeList(IoTDBDescriptor.getInstance().getConfig().getTargetConfigNodeList());
ConfigNodeInfo.getInstance().updateConfigNodeList(config.getTargetConfigNodeList());
while (retry > 0) {
logger.info("start registering to the cluster.");
try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
Expand Down Expand Up @@ -227,7 +226,7 @@ private void registerInConfigNode() throws StartupException {

try {
// wait 5s to start the next try
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getJoinClusterTimeOutMs());
Thread.sleep(config.getJoinClusterTimeOutMs());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when waiting to register to the cluster", e);
Expand Down Expand Up @@ -331,7 +330,7 @@ private void setUpRPCService() throws StartupException {
IoTDBDescriptor.getInstance()
.getConfig()
.setRpcImplClassName(ClientRPCServiceImpl.class.getName());
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
if (config.isEnableRpcService()) {
registerManager.register(RPCService.getInstance());
}
// init service protocols
Expand Down Expand Up @@ -369,27 +368,18 @@ private void registerUdfServices() throws StartupException {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(
UDFExecutableManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getUdfTemporaryLibDir(),
IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
registerManager.register(
UDFClassLoaderManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getUdfDir()));
config.getUdfTemporaryLibDir(), config.getUdfDir()));
registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir()));
registerManager.register(
UDFRegistrationService.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getSystemDir()
+ File.separator
+ "udf"
+ File.separator));
config.getSystemDir() + File.separator + "udf" + File.separator));
}

private void registerTriggerServices() throws StartupException {
registerManager.register(
TriggerExecutableManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getTriggerTemporaryLibDir(),
IoTDBDescriptor.getInstance().getConfig().getTriggerDir()));
registerManager.register(
TriggerClassLoaderManager.setupAndGetInstance(
IoTDBDescriptor.getInstance().getConfig().getTriggerDir()));
config.getTriggerTemporaryLibDir(), config.getTriggerDir()));
registerManager.register(TriggerClassLoaderManager.setupAndGetInstance(config.getTriggerDir()));
registerManager.register(TriggerManagementService.setupAndGetInstance());
}

Expand All @@ -400,8 +390,8 @@ private void initSchemaEngine() {
logger.info("spend {}ms to recover schema.", end);
logger.info(
"After initializing, sequence tsFile threshold is {}, unsequence tsFile threshold is {}",
IoTDBDescriptor.getInstance().getConfig().getSeqTsFileSize(),
IoTDBDescriptor.getInstance().getConfig().getUnSeqTsFileSize());
config.getSeqTsFileSize(),
config.getUnSeqTsFileSize());
}

public void stop() {
Expand All @@ -421,11 +411,11 @@ private void initServiceProvider() throws QueryProcessException {
}

private void initProtocols() throws StartupException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableInfluxDBRpcService()) {
if (config.isEnableInfluxDBRpcService()) {
registerManager.register(InfluxDBRPCService.getInstance());
IoTDB.initInfluxDBMManager();
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
if (config.isEnableMQTTService()) {
registerManager.register(MQTTService.getInstance());
}
if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
Expand Down
4 changes: 0 additions & 4 deletions server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ public static void setServiceProvider(ServiceProvider serviceProvider) {
IoTDB.serviceProvider = serviceProvider;
}

public static void setClusterMode() {
config.setClusterMode(true);
}

public void active() {
StartupChecks checks = new StartupChecks().withDefaultTest();
try {
Expand Down