diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties index bf96f17ede16..e1fa368328eb 100644 --- a/iotdb/iotdb/conf/iotdb-engine.properties +++ b/iotdb/iotdb/conf/iotdb-engine.properties @@ -19,18 +19,24 @@ # rpc_port=6667 + # Write ahead log configuration # Is write ahead log enable enable_wal=true -# When the total number of write ahead log in the file and memory reaches the specified size, all the logs are compressed and the unused logs are removed -# Increase this value, it will lead to short write pause. Decrease this value, it will increase IO and CPU consumption -wal_cleanup_threshold=500000 + # When a certain amount ofwrite ahead log is reached, it will be flushed to disk # It is possible to lose at most flush_wal_threshold operations flush_wal_threshold=10000 + # The cycle when write ahead log is periodically refreshed to disk(in milliseconds) # It is possible to lose at most flush_wal_period_in_ms ms operations flush_wal_period_in_ms=10 + +# The cycle when write ahead log is periodically forced to be written to disk(in milliseconds) +# If force_wal_period_in_ms = 0 it means force write ahead log to be written to disk after each refreshment +# Set this parameter to 0 may slow down the ingestion on slow disk. +force_wal_period_in_ms=10 + # database features configuration # data dir # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/data). @@ -43,6 +49,7 @@ flush_wal_period_in_ms=10 # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # data_dir=/path/iotdb/data/data + # tsfile dir # For this property, multiple directories should be set, and all directories should be separated by ",". All TsFiles will be allocated separately in all these directories. Moreover, setting absolute directories is suggested. # If this property is unset, system will save the TsFiles in the default relative path directory under the data_dir folder(i.e., %IOTDB_HOME%/data/data/settled). @@ -55,6 +62,7 @@ flush_wal_period_in_ms=10 # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # tsfile_dir=/path/iotdb/data/data,data/data + # mult_dir_strategy # The strategy is used to choose a directory from tsfile_dir for the system to store a new tsfile. # System provides three strategies to choose from, or user can create his own strategy by extending org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy. @@ -66,6 +74,7 @@ flush_wal_period_in_ms=10 # If this property is unset, system will use MaxDiskUsableSpaceFirstStrategy as default strategy. # For this property, fully-qualified class name (include package name) and simple class name are both acceptable. # mult_dir_strategy=MaxDiskUsableSpaceFirstStrategy + # system dir # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data/system). # If it is absolute, system will save the data in exact location it points to. @@ -77,6 +86,7 @@ flush_wal_period_in_ms=10 # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # sys_dir=/path/iotdb/data/system + # wal dir # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/data). # If it is absolute, system will save the data in the exact location it points to. @@ -88,53 +98,70 @@ flush_wal_period_in_ms=10 # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # wal_dir=/path/iotdb/data + # The maximum concurrent thread number for merging overflow # Increase this value, it will increase IO and CPU consumption # Decrease this value, when there is much overflow data, it will increase disk usage, which will reduce read speed # When the value<=0 or > CPU core number, use the CPU core number. merge_concurrent_threads=0 + # Maximum number of folders open at the same time # Increase this value, it will use more memory, random I/O becomes smaller, file fragmentation (i.e., group) is more neat. # Decrease this value, it will use less memory, random I/O becomes greater, file fragmentation is less neat. # group_size_in_byte * max_opened_folder = theoretical value of maximum memory occupancy # For an application, the total amount of folder is equal to the number of storage_group settings in SQL max_opened_folder=100 + # The amount of data that is read every time when IoTDB merge data. fetch_size=10000 + # The period time of flushing data from memory to file. # The unit is second. period_time_for_flush_in_second=3600 + # The period time for merge overflow data with tsfile data. # The unit is second. period_time_for_merge_in_second=7200 + #When set to true, start timing flush and merge service. False, stop timing flush and merge service. #Default is true. enable_timing_close_and_Merge=true + # time zone of server side # default value is +08:00 # eg. +08:00, -01:00 time_zone=+08:00 + # if memory used by write reaches this threshold, auto flush will be triggered, percentile of Java heap memory mem_threshold_warning=0.5 + # if memory used by write reaches this threshold, write will be blocked, percentile of Java heap memory mem_threshold_dangerous=0.6 + # every such interval, a thread will check if memory exceeds mem_threshold_warning # if do exceed, auto flush will be triggered, in ms, 1s by default mem_monitor_interval=1000 + # Decide how to control memory used by inserting data. # 0 is RecordMemController, which count the size of every record (tuple). # 1 is JVMMemController, which use JVM heap memory as threshold. mem_controller_type=0 + # When a bufferwrite's metadata size (in byte) exceed this, the bufferwrite is forced closed. bufferwrite_meta_size_threshold=209715200 + # When a bufferwrite's file size (in byte) exceeds this, the bufferwrite is forced closed. bufferwrite_file_size_threshold=2147483648 + # When a overflow's metadata size (in byte) exceeds this, the overflow is forced closed. overflow_meta_size_threshold=20971520 + # When a overflow's file size (in byte) exceeds this, the overflow is forced closed. overflow_file_size_threshold=209715200 + # How many thread can concurrently flush. When <= 0, use CPU core number. concurrent_flush_thread=0 + # Statistics Monitor configuration # Set enable_stat_monitor true(or false) to enable(or disable) the StatMonitor that stores statistics info periodically. # back_loop_period_sec decides the period when StatMonitor writes statistics info into IoTDB. @@ -143,36 +170,49 @@ concurrent_flush_thread=0 # Note: IoTDB requires stat_monitor_detect_freq_sec >= 600s and stat_monitor_retain_interval_sec >= 600s. # The monitor, which writes statistics info to IoTDB periodically, is disabled by default. enable_stat_monitor=false + # The period that StatMonitor stores statistics info back_loop_period_sec=5 + # The interval at which StatMonitor starts to check whether statistics info can be deleted due to exceeding the retention volume stat_monitor_detect_freq_sec=600 + # The minimum age of statistics storage information to be eligible for deletion due to age stat_monitor_retain_interval_sec=600 + # When set false, MemMonitorThread and MemStatisticThread will not be created. enable_mem_monitor=true + # When set to true, small flush will be triggered periodically even if memory threshold is not exceeded. enable_small_flush=true + # The interval of small flush in ms. small_flush_interval=60000 + # The threshold of lines of external sort external_sort_threshold=50 + # cache size for MManager. # This cache is used to improve write speed where all path check and TSDataType will be cached in MManager with corresponding Path. schema_manager_cache_size=300000 + # If the size in byte of a WAL log exceeds this, it won't be written. # Generally the default value 4MB is enough. max_log_entry_size=4194304 + # IoTDB postBack server properties # Whether to allow to post back, the default allowed is_postback_enable=true + # PostBack server port address postback_server_port=5555 + # White IP list of Postback client. # Please use the form of network segment to present the range of IP, for example: 192.168.0.0/16 # If there are more than one IP segment, please separate them by commas # The default is to allow all IP to postback IP_white_list=0.0.0.0/0 + # Choose a postBack strategy of merging historical data: #1. It's more likely to update historical data, please choose "true". #2. It's more likely not to update historical data or you don't know exactly, please choose "false". diff --git a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index 69c018196c14..c7ea74172c35 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@ -20,18 +20,22 @@ package org.apache.iotdb.db.concurrent; public enum ThreadName { - JDBC_SERVICE("JDBC-ServerServiceImpl"), JDBC_CLIENT("JDBC-Client"), MERGE_SERVICE( - "Merge-ServerServiceImpl"), CLOSE_MERGE_SERVICE( - "Close-Merge-ServerServiceImpl"), CLOSE_MERGE_DAEMON( - "Close-Merge-Daemon-Thread"), CLOSE_DAEMON("Close-Daemon-Thread"), MERGE_DAEMON( - "Merge-Daemon-Thread"), MEMORY_MONITOR("IoTDB-MemMonitor-Thread"), MEMORY_STATISTICS( - "IoTDB-MemStatistic-Thread"), FLUSH_PARTIAL_POLICY( - "IoTDB-FlushPartialPolicy-Thread"), FORCE_FLUSH_ALL_POLICY( - "IoTDB-ForceFlushAllPolicy-Thread"), STAT_MONITOR( - "StatMonitor-ServerServiceImpl"), FLUSH_SERVICE( - "Flush-ServerServiceImpl"), WAL_DAEMON( - "IoTDB-MultiFileLogNodeManager-Sync-Thread"), INDEX_SERVICE( - "Index-ServerServiceImpl"); + JDBC_SERVICE("JDBC-ServerServiceImpl"), + JDBC_CLIENT("JDBC-Client"), + MERGE_SERVICE("Merge-ServerServiceImpl"), + CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"), + CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"), + CLOSE_DAEMON("Close-Daemon-Thread"), + MERGE_DAEMON("Merge-Daemon-Thread"), + MEMORY_MONITOR("IoTDB-MemMonitor-Thread"), + MEMORY_STATISTICS("IoTDB-MemStatistic-Thread"), + FLUSH_PARTIAL_POLICY("IoTDB-FlushPartialPolicy-Thread"), + FORCE_FLUSH_ALL_POLICY("IoTDB-ForceFlushAllPolicy-Thread"), + STAT_MONITOR("StatMonitor-ServerServiceImpl"), + FLUSH_SERVICE("Flush-ServerServiceImpl"), + WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"), + WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"), + INDEX_SERVICE("Index-ServerServiceImpl"); private String name; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 290efb076ae5..5d71cd71a409 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -52,22 +52,32 @@ public class IoTDBConfig { public int flushWalThreshold = 10000; /** - * The cycle when write ahead logs are periodically refreshed to disk(in milliseconds). - * It is possible to lose at most flush_wal_period_in_ms ms operations. + * The cycle when write ahead logs are periodically refreshed to disk(in milliseconds). It is + * possible to lose at most flush_wal_period_in_ms ms operations. */ public long flushWalPeriodInMs = 10000; + + /** + * The cycle when write ahead log is periodically forced to be written to disk(in milliseconds) If + * set this parameter to 0 it means call outputStream.force(true) after every each write + */ + public long forceWalPeriodInMs = 10; + /** * Data directory. */ public String dataDir = null; + /** * System directory. */ public String sysDir = null; + /** * Wal directory. */ public String walDir = null; + /** * Data directory of Overflow data. */ @@ -110,14 +120,13 @@ public class IoTDBConfig { public String indexFileDir = "index"; /** - * Temporary directory for temporary files of read (External Sort). - * TODO: unused field + * Temporary directory for temporary files of read (External Sort). TODO: unused field */ public String readTmpFileDir = "readTmp"; /** - * The maximum concurrent thread number for merging overflow. - * When the value <=0 or > CPU core number, use the CPU core number. + * The maximum concurrent thread number for merging overflow. When the value <=0 or > CPU core + * number, use the CPU core number. */ public int mergeConcurrentThreads = Runtime.getRuntime().availableProcessors(); @@ -149,8 +158,7 @@ public class IoTDBConfig { /** * When set true, start timed flush and merge service. Else, stop timed flush and merge service. - * The default value is true. - * TODO: 'timed' better explains this than 'timing'. + * The default value is true. TODO: 'timed' better explains this than 'timing'. */ public boolean enableTimingCloseAndMerge = true; @@ -161,7 +169,8 @@ public class IoTDBConfig { public ZoneId zoneID = ZoneId.systemDefault(); /** - * BufferWriteProcessor and OverflowProcessor will immediately flush if this threshold is reached. + * BufferWriteProcessor and OverflowProcessor will immediately flush if this threshold is + * reached. */ public long memThresholdWarning = (long) (0.5 * Runtime.getRuntime().maxMemory()); /** @@ -175,8 +184,8 @@ public class IoTDBConfig { public long memMonitorInterval = 1000; /** * Decide how to control memory usage of inserting data. 0 is RecordMemController, which sums the - * size of each record (tuple). 1 is JVMMemController, which uses the JVM heap memory as - * the memory usage indicator. + * size of each record (tuple). 1 is JVMMemController, which uses the JVM heap memory as the + * memory usage indicator. */ public int memControllerType = 1; /** @@ -200,8 +209,8 @@ public class IoTDBConfig { */ public boolean enableMemMonitor = false; /** - * When set to true, small flushes will be triggered periodically even if the memory threshold - * is not exceeded. + * When set to true, small flushes will be triggered periodically even if the memory threshold is + * not exceeded. */ public boolean enableSmallFlush = false; /** @@ -209,8 +218,8 @@ public class IoTDBConfig { */ public long smallFlushInterval = 60 * 1000; /** - * The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. - * The default value is 5s. + * The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. The default + * value is 5s. */ public int backLoopPeriodSec = 5; /** @@ -222,8 +231,8 @@ public class IoTDBConfig { */ public int statMonitorDetectFreqSec = 60 * 10; /** - * Set the maximum time to keep monitor statistics information in IoTDB. - * The default value is 600s. + * Set the maximum time to keep monitor statistics information in IoTDB. The default value is + * 600s. */ public int statMonitorRetainIntervalSec = 60 * 10; /** @@ -236,8 +245,8 @@ public class IoTDBConfig { */ public int mManagerCacheSize = 400000; /** - * The maximum size of a single log in byte. If a log exceeds this size, - * it cannot be written to the WAL file and an exception is thrown. + * The maximum size of a single log in byte. If a log exceeds this size, it cannot be written to + * the WAL file and an exception is thrown. */ public int maxLogEntrySize = 4 * 1024 * 1024; /** @@ -253,9 +262,9 @@ public class IoTDBConfig { */ public String languageVersion = "EN"; /** - * Choose a postBack strategy of merging historical data: - * 1. It's more likely to update historical data, choose "true". - * 2. It's more likely not to update historical data or you don't know exactly, choose "false". + * Choose a postBack strategy of merging historical data: 1. It's more likely to update historical + * data, choose "true". 2. It's more likely not to update historical data or you don't know + * exactly, choose "false". */ public boolean update_historical_data_possibility = false; public String ipWhiteList = "0.0.0.0/0"; diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index d5c8c1fd3632..291333e8f7ed 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -40,7 +40,7 @@ private IoTDBDescriptor() { } public static final IoTDBDescriptor getInstance() { - return TsfileDBDescriptorHolder.INSTANCE; + return IoTDBDescriptorHolder.INSTANCE; } public IoTDBConfig getConfig() { @@ -49,7 +49,6 @@ public IoTDBConfig getConfig() { /** * load an property file and set TsfileDBConfig variables. - * */ private void loadProps() { InputStream inputStream = null; @@ -123,6 +122,9 @@ private void loadProps() { conf.flushWalPeriodInMs = Integer .parseInt(properties.getProperty("flush_wal_period_in_ms", conf.flushWalPeriodInMs + "")); + conf.forceWalPeriodInMs = Integer + .parseInt(properties.getProperty("force_wal_period_in_ms", + conf.forceWalPeriodInMs + "")); conf.dataDir = properties.getProperty("data_dir", conf.dataDir); conf.bufferWriteDirs = properties.getProperty("tsfile_dir", conf.default_tsfile_dir) @@ -265,7 +267,7 @@ private void loadProps() { } } - private static class TsfileDBDescriptorHolder { + private static class IoTDBDescriptorHolder { private static final IoTDBDescriptor INSTANCE = new IoTDBDescriptor(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java index 485c95d2c12a..694bf1f0f3d3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/ILogWriter.java @@ -26,5 +26,7 @@ public interface ILogWriter { void write(List logCache) throws IOException; + void force() throws IOException; + void close() throws IOException; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java index af3e0e11e2a6..863a9ae076e7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/io/LogWriter.java @@ -26,12 +26,15 @@ import java.nio.channels.FileChannel; import java.util.List; import java.util.zip.CRC32; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; public class LogWriter implements ILogWriter { private File logFile; private FileChannel outputStream; private CRC32 checkSummer = new CRC32(); + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); public LogWriter(String logFilePath) { logFile = new File(logFilePath); @@ -56,7 +59,16 @@ public void write(List logCache) throws IOException { } buffer.flip(); outputStream.write(buffer); - outputStream.force(true); + if (config.forceWalPeriodInMs == 0) { + outputStream.force(true); + } + } + + @Override + public void force() throws IOException { + if (outputStream != null) { + outputStream.force(true); + } } @Override diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java index a675baf1c676..4a5fbcf39d3b 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java @@ -44,7 +44,9 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService { private Map nodeMap; private Thread syncThread; + private Thread forceThread; private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private final Runnable syncTask = new Runnable() { @Override public void run() { @@ -72,6 +74,33 @@ public void run() { } }; + private final Runnable forceTask = new Runnable() { + @Override + public void run() { + while (true) { + if (Thread.interrupted()) { + logger.info("WAL force thread exits."); + break; + } + logger.debug("Timed force starts, {} nodes to be flushed", nodeMap.size()); + for (WriteLogNode node : nodeMap.values()) { + try { + node.force(); + } catch (IOException e) { + logger.error("Cannot force {}, because {}", node.toString(), e.toString()); + } + } + logger.debug("Timed force finished"); + try { + Thread.sleep(config.forceWalPeriodInMs); + } catch (InterruptedException e) { + logger.info("WAL force thread exits."); + break; + } + } + } + }; + private MultiFileLogNodeManager() { nodeMap = new ConcurrentHashMap<>(); } @@ -123,16 +152,24 @@ public void recover() throws RecoverException { @Override public void close() { - if (syncThread == null || !syncThread.isAlive()) { + if (!isActivated(syncThread) && !isActivated(forceThread)) { logger.error("MultiFileLogNodeManager has not yet started"); return; } - logger.info("LogNodeManager starts closing.."); - syncThread.interrupt(); - logger.info("Waiting for sync thread to stop"); - while (syncThread.isAlive()) { - // wait + if (isActivated(syncThread)) { + syncThread.interrupt(); + logger.info("Waiting for sync thread to stop"); + while (syncThread.isAlive()) { + // wait for syncThread + } + } + if (isActivated(forceThread)) { + forceThread.interrupt(); + logger.info("Waiting for force thread to stop"); + while (forceThread.isAlive()) { + // wait for forceThread + } } logger.info("{} nodes to be closed", nodeMap.size()); for (WriteLogNode node : nodeMap.values()) { @@ -179,10 +216,15 @@ public void start() throws StartupException { if (!config.enableWal) { return; } - if (syncThread == null || !syncThread.isAlive()) { + if (!isActivated(syncThread)) { InstanceHolder.instance.syncThread = new Thread(InstanceHolder.instance.syncTask, ThreadName.WAL_DAEMON.getName()); InstanceHolder.instance.syncThread.start(); + if (config.forceWalPeriodInMs > 0 && !isActivated(forceThread)) { + InstanceHolder.instance.forceThread = new Thread(InstanceHolder.instance.forceTask, + ThreadName.WAL_FORCE_DAEMON.getName()); + InstanceHolder.instance.forceThread.start(); + } } else { logger.warn("MultiFileLogNodeManager has already started"); } @@ -207,6 +249,10 @@ public ServiceType getID() { return ServiceType.WAL_SERVICE; } + private boolean isActivated(Thread thread) { + return thread != null && thread.isAlive(); + } + private static class InstanceHolder { private static MultiFileLogNodeManager instance = new MultiFileLogNodeManager(); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java index cc8c65f5dae1..1d94c194237c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java @@ -65,6 +65,8 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable(200l, 300l)); } @@ -204,7 +220,8 @@ public void encodeDecodeTest() throws IOException { @Test public void SQLEncodingComparisonTest() throws WALOverSizedException { - String sql = "INSERT INTO root.logTestDevice(time,s1,s2,s3,s4) VALUES (100,1.0,15,\"str\",false)"; + String sql = "INSERT INTO root.logTestDevice(time,s1,s2,s3,s4) " + + "VALUES (100,1.0,15,\"str\",false)"; InsertPlan bwInsertPlan = new InsertPlan(1, "root.logTestDevice", 100, Arrays.asList("s1", "s2", "s3", "s4"), Arrays.asList("1.0", "15", "str", "false"));