diff --git a/.asf.yaml b/.asf.yaml index 6f67934f8f78..82a4ccad7072 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -30,4 +30,4 @@ github: - tsdb features: wiki: true - issues: true \ No newline at end of file + issues: true diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java index a599bd2d95b0..d8e2a665d294 100644 --- a/.mvn/wrapper/MavenWrapperDownloader.java +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -112,4 +112,4 @@ protected PasswordAuthentication getPasswordAuthentication() { } } -} \ No newline at end of file +} diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 index e3e37ad63e93..464f359739f0 100644 --- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 +++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 @@ -45,6 +45,7 @@ statement | DROP INDEX indexName=ID ON prefixPath #dropIndex //not support yet | MERGE #merge | FLUSH prefixPath? (COMMA prefixPath)* (booleanClause)?#flush + | SETTLE pathOrString #settle | FULL MERGE #fullMerge | CLEAR CACHE #clearcache | CREATE USER userName=ID password= stringLiteral#createUser @@ -1068,6 +1069,10 @@ FLUSH : F L U S H ; +SETTLE + : S E T T L E + ; + TASK : T A S K ; @@ -1193,7 +1198,7 @@ PLA ; LZ4 - : L Z '4' + : L Z '4' ; LATEST @@ -1423,6 +1428,11 @@ stringLiteral | DOUBLE_QUOTE_STRING_LITERAL ; +pathOrString + : prefixPath + | stringLiteral + ; + INT : [0-9]+; EXPONENT : INT ('e'|'E') ('+'|'-')? INT ; diff --git a/jenkins.pom b/jenkins.pom index dab9a4b82cb0..046af1064889 100644 --- a/jenkins.pom +++ b/jenkins.pom @@ -64,4 +64,4 @@ - \ No newline at end of file + diff --git a/server/src/assembly/resources/tools/tsfileToolSet/settle.bat b/server/src/assembly/resources/tools/tsfileToolSet/settle.bat new file mode 100644 index 000000000000..43cb97653469 --- /dev/null +++ b/server/src/assembly/resources/tools/tsfileToolSet/settle.bat @@ -0,0 +1,62 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one +@REM or more contributor license agreements. See the NOTICE file +@REM distributed with this work for additional information +@REM regarding copyright ownership. The ASF licenses this file +@REM to you under the Apache License, Version 2.0 (the +@REM "License"); you may not use this file except in compliance +@REM with the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, +@REM software distributed under the License is distributed on an +@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +@REM KIND, either express or implied. See the License for the +@REM specific language governing permissions and limitations +@REM under the License. +@REM + + +@echo off +echo ```````````````````````` +echo Starting Settling the TsFile +echo ```````````````````````` + +if "%OS%" == "Windows_NT" setlocal + +pushd %~dp0..\.. +if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% +popd + +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool +if NOT DEFINED JAVA_HOME goto :err + +@REM ----------------------------------------------------------------------------- +@REM ***** CLASSPATH library setting ***** +@REM Ensure that any user defined CLASSPATH variables are not used on startup +set CLASSPATH="%IOTDB_HOME%\lib\*" + +goto okClasspath + +:append +set CLASSPATH=%CLASSPATH%;%1 +goto :eof + +@REM ----------------------------------------------------------------------------- +:okClasspath + +"%JAVA_HOME%\bin\java" -cp "%CLASSPATH%" %MAIN_CLASS% %* + +goto finally + + +:err +echo JAVA_HOME environment variable must be set! +pause + + +@REM ----------------------------------------------------------------------------- +:finally + +ENDLOCAL diff --git a/server/src/assembly/resources/tools/tsfileToolSet/settle.sh b/server/src/assembly/resources/tools/tsfileToolSet/settle.sh new file mode 100644 index 000000000000..0deea5d01a53 --- /dev/null +++ b/server/src/assembly/resources/tools/tsfileToolSet/settle.sh @@ -0,0 +1,48 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +echo --------------------- +echo Starting Settling the TsFile +echo --------------------- + +if [ -z "${IOTDB_HOME}" ]; then + export IOTDB_HOME="$(cd "`dirname "$0"`"/../..; pwd)" +fi + +if [ -n "$JAVA_HOME" ]; then + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +CLASSPATH="" +for f in ${IOTDB_HOME}/lib/*.jar; do + CLASSPATH=${CLASSPATH}":"$f +done + +MAIN_CLASS=org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool + +"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@" +exit $? diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index d3b218ec7ce8..cbcae557f696 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -535,6 +535,9 @@ public class IoTDBConfig { /** How many threads will be set up to perform upgrade tasks. */ private int upgradeThreadNum = 1; + /** How many threads will be set up to perform settle tasks. */ + private int settleThreadNum = 1; + /** How many threads will be set up to perform main merge tasks. */ private int mergeThreadNum = 1; @@ -2055,6 +2058,10 @@ public int getUpgradeThreadNum() { return upgradeThreadNum; } + public int getSettleThreadNum() { + return settleThreadNum; + } + void setUpgradeThreadNum(int upgradeThreadNum) { this.upgradeThreadNum = upgradeThreadNum; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 7fcaca944579..05ea3d860de7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -797,6 +797,31 @@ public void upgradeAll() throws StorageEngineException { } } + public void getResourcesToBeSettled( + PartialPath sgPath, + List seqResourcesToBeSettled, + List unseqResourcesToBeSettled, + List tsFilePaths) + throws StorageEngineException { + VirtualStorageGroupManager vsg = processorMap.get(sgPath); + if (vsg == null) { + throw new StorageEngineException( + "The Storage Group " + sgPath.toString() + " is not existed."); + } + if (!vsg.getIsSettling().compareAndSet(false, true)) { + throw new StorageEngineException( + "Storage Group " + sgPath.getFullPath() + " is already being settled now."); + } + vsg.getResourcesToBeSettled(seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths); + } + + public void setSettling(PartialPath sgPath, boolean isSettling) { + if (processorMap.get(sgPath) == null) { + return; + } + processorMap.get(sgPath).setSettling(isSettling); + } + /** * merge all storage groups. * diff --git a/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleLog.java b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleLog.java new file mode 100644 index 000000000000..f576fce719af --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleLog.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.settle; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SettleLog { + private static final Logger logger = LoggerFactory.getLogger(SettleLog.class); + public static final String COMMA_SEPERATOR = ","; + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final String SETTLE_DIR = "settle"; + private static final String SETTLE_LOG_NAME = "settle.txt"; + private static BufferedWriter settleLogWriter; + private static File settleLogPath = // the path of upgrade log is "data/system/settle/settle.txt" + SystemFileFactory.INSTANCE.getFile( + SystemFileFactory.INSTANCE.getFile(config.getSystemDir(), SETTLE_DIR), SETTLE_LOG_NAME); + + private static final ReadWriteLock settleLogFileLock = new ReentrantReadWriteLock(); + + public static boolean createSettleLog() { + try { + if (!settleLogPath.getParentFile().exists()) { + settleLogPath.getParentFile().mkdirs(); + } + settleLogPath.createNewFile(); + settleLogWriter = new BufferedWriter(new FileWriter(getSettleLogPath(), true)); + return true; + } catch (IOException e) { + logger.error("meet error when creating settle log, file path:{}", settleLogPath, e); + return false; + } + } + + public static boolean writeSettleLog(String content) { + settleLogFileLock.writeLock().lock(); + try { + settleLogWriter.write(content); + settleLogWriter.newLine(); + settleLogWriter.flush(); + return true; + } catch (IOException e) { + logger.error("write settle log file failed, the log file:{}", getSettleLogPath(), e); + return false; + } finally { + settleLogFileLock.writeLock().unlock(); + } + } + + public static void closeLogWriter() { + try { + if (settleLogWriter != null) { + settleLogWriter.close(); + } + } catch (IOException e) { + logger.error("close upgrade log file failed, the log file:{}", getSettleLogPath(), e); + } + } + + public static String getSettleLogPath() { // "data/system/settle/settle.txt" + return settleLogPath.getAbsolutePath(); + } + + public static void setSettleLogPath(File settleLogPath) { + SettleLog.settleLogPath = settleLogPath; + } + + public enum SettleCheckStatus { + BEGIN_SETTLE_FILE(1), + AFTER_SETTLE_FILE(2), + SETTLE_SUCCESS(3); + + private final int checkStatus; + + SettleCheckStatus(int checkStatus) { + this.checkStatus = checkStatus; + } + + public int getCheckStatus() { + return checkStatus; + } + + @Override + public String toString() { + return String.valueOf(checkStatus); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleTask.java b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleTask.java new file mode 100644 index 000000000000..6a01e09b190a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/settle/SettleTask.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.settle; + +import org.apache.iotdb.db.concurrent.WrappedRunnable; +import org.apache.iotdb.db.engine.settle.SettleLog.SettleCheckStatus; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.service.SettleService; +import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class SettleTask extends WrappedRunnable { + private static final Logger logger = LoggerFactory.getLogger(SettleTask.class); + private final TsFileResource resourceToBeSettled; + + public SettleTask(TsFileResource resourceToBeSettled) { + this.resourceToBeSettled = resourceToBeSettled; + } + + @Override + public void runMayThrow() { + try { + settleTsFile(); + } catch (Exception e) { + logger.error( + "meet error when settling file:{}", resourceToBeSettled.getTsFile().getAbsolutePath(), e); + } + } + + public void settleTsFile() throws WriteProcessException { + List settledResources = new ArrayList<>(); + if (!resourceToBeSettled.isClosed()) { + logger.warn( + "The tsFile {} should be sealed when settling.", + resourceToBeSettled.getTsFile().getAbsolutePath()); + return; + } + TsFileAndModSettleTool tsFileAndModSettleTool = TsFileAndModSettleTool.getInstance(); + try { + if (tsFileAndModSettleTool.isSettledFileGenerated(resourceToBeSettled)) { + logger.info("find settled file for {}", resourceToBeSettled.getTsFile()); + settledResources = tsFileAndModSettleTool.findSettledFile(resourceToBeSettled); + } else { + logger.info("generate settled file for {}", resourceToBeSettled.getTsFile()); + // Write Settle Log, Status 1 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFile().getAbsolutePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.BEGIN_SETTLE_FILE); + tsFileAndModSettleTool.settleOneTsFileAndMod(resourceToBeSettled, settledResources); + // Write Settle Log, Status 2 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFile().getAbsolutePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.AFTER_SETTLE_FILE); + } + } catch (IllegalPathException + | IOException + | org.apache.iotdb.tsfile.exception.write.WriteProcessException e) { + resourceToBeSettled.readUnlock(); + logger.error("Exception to parse the tsfile in settling", e); + throw new WriteProcessException( + "Meet error when settling file: " + resourceToBeSettled.getTsFile().getAbsolutePath(), e); + } + resourceToBeSettled.getSettleTsFileCallBack().call(resourceToBeSettled, settledResources); + + // Write Settle Log, Status 3 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFile().getAbsolutePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.SETTLE_SUCCESS); + logger.info( + "Settle completes, file path:{} , the remaining file to be settled num: {}", + resourceToBeSettled.getTsFile().getAbsolutePath(), + SettleService.getINSTANCE().getFilesToBeSettledCount().get()); + + if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() == 0) { + SettleLog.closeLogWriter(); + SettleService.getINSTANCE().stop(); + logger.info("All files settled successfully! "); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index b5d1363a81d5..5cc8faa17743 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -23,6 +23,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; import org.apache.iotdb.db.engine.compaction.TsFileManagement; import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement; @@ -62,9 +64,12 @@ import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.query.control.QueryFileManager; import org.apache.iotdb.db.rescon.TsFileResourceManager; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.SettleService; +import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; import org.apache.iotdb.db.utils.CopyOnReadLinkedList; import org.apache.iotdb.db.utils.MmapUtil; import org.apache.iotdb.db.utils.TestOnly; @@ -155,6 +160,8 @@ public class StorageGroupProcessor { /** indicating the file to be loaded overlap with some files. */ private static final int POS_OVERLAP = -3; + private static final int WAL_BUFFER_SIZE = + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2; private final boolean enableMemControl = config.isEnableMemControl(); /** * a read write lock for guaranteeing concurrent safety when accessing all fields in this class @@ -173,15 +180,15 @@ public class StorageGroupProcessor { private final TreeMap workSequenceTsFileProcessors = new TreeMap<>(); /** time partition id in the storage group -> tsFileProcessor for this time partition */ private final TreeMap workUnsequenceTsFileProcessors = new TreeMap<>(); + + private final Deque walByteBufferPool = new LinkedList<>(); /** compactionMergeWorking is used to wait for last compaction to be done. */ private volatile boolean compactionMergeWorking = false; // upgrading sequence TsFile resource list private List upgradeSeqFileList = new LinkedList<>(); - /** sequence tsfile processors which are closing */ private CopyOnReadLinkedList closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>(); - // upgrading unsequence TsFile resource list private List upgradeUnseqFileList = new LinkedList<>(); @@ -205,7 +212,6 @@ public class StorageGroupProcessor { * unsequential file. */ private Map> partitionLatestFlushedTimeForEachDevice = new HashMap<>(); - /** used to record the latest flush time while upgrading and inserting */ private Map> newlyFlushedPartitionLatestFlushedTimeForEachDevice = new HashMap<>(); @@ -216,16 +222,12 @@ public class StorageGroupProcessor { * partitionLatestFlushedTimeForEachDevice */ private Map globalLatestFlushedTimeForEachDevice = new HashMap<>(); - /** virtual storage group id */ private String virtualStorageGroupId; - /** logical storage group name */ private String logicalStorageGroupName; - /** storage group system directory */ private File storageGroupSysDir; - /** manage seqFileList and unSeqFileList */ private TsFileManagement tsFileManagement; @@ -243,13 +245,10 @@ public class StorageGroupProcessor { * eventually removed. */ private long dataTTL = Long.MAX_VALUE; - /** file system factory (local or hdfs) */ private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); - /** file flush policy */ private TsFileFlushPolicy fileFlushPolicy; - /** * The max file versions in each partition. By recording this, if several IoTDB instances have the * same policy of closing file and their ingestion is identical, then files of the same version in @@ -257,7 +256,6 @@ public class StorageGroupProcessor { * across different instances. partition number -> max version number */ private Map partitionMaxFileVersions = new HashMap<>(); - /** storage group info for mem control */ private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this); /** @@ -266,21 +264,13 @@ public class StorageGroupProcessor { * files should have similar numbers of devices. Default value: INIT_ARRAY_SIZE = 64 */ private int deviceNumInLastClosedTsFile = DeviceTimeIndex.INIT_ARRAY_SIZE; - /** whether it's ready from recovery */ private boolean isReady = false; - /** close file listeners */ private List customCloseFileListeners = Collections.emptyList(); - /** flush listeners */ private List customFlushListeners = Collections.emptyList(); - private static final int WAL_BUFFER_SIZE = - IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2; - - private final Deque walByteBufferPool = new LinkedList<>(); - private int currentWalPoolSize = 0; // this field is used to avoid when one writer release bytebuffer back to pool, @@ -299,6 +289,47 @@ public class StorageGroupProcessor { */ private String insertWriteLockHolder = ""; + /** + * constrcut a storage group processor + * + * @param systemDir system dir path + * @param virtualStorageGroupId virtual storage group id e.g. 1 + * @param fileFlushPolicy file flush policy + * @param logicalStorageGroupName logical storage group name e.g. root.sg1 + */ + public StorageGroupProcessor( + String systemDir, + String virtualStorageGroupId, + TsFileFlushPolicy fileFlushPolicy, + String logicalStorageGroupName) + throws StorageGroupProcessorException { + this.virtualStorageGroupId = virtualStorageGroupId; + this.logicalStorageGroupName = logicalStorageGroupName; + this.fileFlushPolicy = fileFlushPolicy; + + storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId); + if (storageGroupSysDir.mkdirs()) { + logger.info( + "Storage Group system Directory {} doesn't exist, create it", + storageGroupSysDir.getPath()); + } else if (!storageGroupSysDir.exists()) { + logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath()); + } + this.tsFileManagement = + IoTDBDescriptor.getInstance() + .getConfig() + .getCompactionStrategy() + .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath()); + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + executorService.scheduleWithFixedDelay( + this::trimTask, + config.getWalPoolTrimIntervalInMS(), + config.getWalPoolTrimIntervalInMS(), + TimeUnit.MILLISECONDS); + recover(); + } + /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */ public ByteBuffer[] getWalDirectByteBuffer() { ByteBuffer[] res = new ByteBuffer[2]; @@ -378,47 +409,6 @@ private void trimTask() { } } - /** - * constrcut a storage group processor - * - * @param systemDir system dir path - * @param virtualStorageGroupId virtual storage group id e.g. 1 - * @param fileFlushPolicy file flush policy - * @param logicalStorageGroupName logical storage group name e.g. root.sg1 - */ - public StorageGroupProcessor( - String systemDir, - String virtualStorageGroupId, - TsFileFlushPolicy fileFlushPolicy, - String logicalStorageGroupName) - throws StorageGroupProcessorException { - this.virtualStorageGroupId = virtualStorageGroupId; - this.logicalStorageGroupName = logicalStorageGroupName; - this.fileFlushPolicy = fileFlushPolicy; - - storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId); - if (storageGroupSysDir.mkdirs()) { - logger.info( - "Storage Group system Directory {} doesn't exist, create it", - storageGroupSysDir.getPath()); - } else if (!storageGroupSysDir.exists()) { - logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath()); - } - this.tsFileManagement = - IoTDBDescriptor.getInstance() - .getConfig() - .getCompactionStrategy() - .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath()); - - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - executorService.scheduleWithFixedDelay( - this::trimTask, - config.getWalPoolTrimIntervalInMS(), - config.getWalPoolTrimIntervalInMS(), - TimeUnit.MILLISECONDS); - recover(); - } - public String getLogicalStorageGroupName() { return logicalStorageGroupName; } @@ -637,6 +627,54 @@ private void updateLatestFlushedTime() throws IOException { } } + public void addSettleFilesToList( + List seqResourcesToBeSettled, + List unseqResourcesToBeSettled, + List tsFilePaths) { + if (tsFilePaths.size() == 0) { + for (TsFileResource resource : tsFileManagement.getTsFileList(true)) { + if (!resource.isClosed()) { + continue; + } + resource.setSettleTsFileCallBack(this::settleTsFileCallBack); + seqResourcesToBeSettled.add(resource); + } + for (TsFileResource resource : tsFileManagement.getTsFileList(false)) { + if (!resource.isClosed()) { + continue; + } + resource.setSettleTsFileCallBack(this::settleTsFileCallBack); + unseqResourcesToBeSettled.add(resource); + } + } else { + for (String tsFilePath : tsFilePaths) { + File fileToBeSettled = new File(tsFilePath); + if (fileToBeSettled + .getParentFile() + .getParentFile() + .getParentFile() + .getParentFile() + .getName() + .equals("sequence")) { + for (TsFileResource resource : tsFileManagement.getTsFileList(true)) { + if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) { + resource.setSettleTsFileCallBack(this::settleTsFileCallBack); + seqResourcesToBeSettled.add(resource); + break; + } + } + } else { + for (TsFileResource resource : tsFileManagement.getTsFileList(false)) { + if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) { + unseqResourcesToBeSettled.add(resource); + break; + } + } + } + } + } + } + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private Pair, List> getAllFiles(List folders) throws IOException, StorageGroupProcessorException { @@ -1756,6 +1794,7 @@ public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorExcepti } // TODO need a read lock, please consider the concurrency with flush manager threads. + /** * build query data source by searching all tsfile which fit in query filter * @@ -1918,6 +1957,10 @@ public void delete( throw new IOException( "Delete failed. " + "Please do not delete until the old files upgraded."); } + if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) { + throw new IOException( + "Delete failed. " + "Please do not delete until the old files settled."); + } // TODO: how to avoid partial deletion? // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened // mod files in mergingModification, sequenceFileList, and unsequenceFileList @@ -2307,6 +2350,43 @@ private void upgradeTsFileResourceCallBack(TsFileResource tsFileResource) { } } + /** + * After finishing settling tsfile, we need to do 2 things : (1) move the new tsfile to the + * correct folder, including deleting its old mods file (2) update the relevant data of this old + * tsFile in memory ,eg: FileSequenceReader, TsFileManagement, cache, etc. + */ + private void settleTsFileCallBack( + TsFileResource oldTsFileResource, List newTsFileResources) + throws WriteProcessException { + oldTsFileResource.readUnlock(); + oldTsFileResource.writeLock(); + try { + TsFileAndModSettleTool.moveNewTsFile(oldTsFileResource, newTsFileResources); + if (TsFileAndModSettleTool.getInstance().recoverSettleFileMap.size() != 0) { + TsFileAndModSettleTool.getInstance() + .recoverSettleFileMap + .remove(oldTsFileResource.getTsFile().getAbsolutePath()); + } + // clear Cache , including chunk cache and timeseriesMetadata cache + ChunkCache.getInstance().clear(); + TimeSeriesMetadataCache.getInstance().clear(); + + // if old tsfile is being deleted in the process due to its all data's being deleted. + if (!oldTsFileResource.getTsFile().exists()) { + tsFileManagement.remove(oldTsFileResource, oldTsFileResource.isSeq()); + } + FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFilePath()); + oldTsFileResource.setSettleTsFileCallBack(null); + SettleService.getINSTANCE().getFilesToBeSettledCount().addAndGet(-1); + } catch (IOException e) { + logger.error("Exception to move new tsfile in settling", e); + throw new WriteProcessException( + "Meet error when settling file: " + oldTsFileResource.getTsFile().getAbsolutePath(), e); + } finally { + oldTsFileResource.writeUnlock(); + } + } + private void loadUpgradedResources(List resources, boolean isseq) { if (resources.isEmpty()) { return; @@ -3222,6 +3302,10 @@ public void setCustomFlushListeners(List customFlushListeners) { this.customFlushListeners = customFlushListeners; } + public String getInsertWriteLockHolder() { + return insertWriteLockHolder; + } + private enum LoadTsFileType { LOAD_SEQUENCE, LOAD_UNSEQUENCE @@ -3245,6 +3329,13 @@ public interface UpgradeTsFileResourceCallBack { void call(TsFileResource caller); } + @FunctionalInterface + public interface SettleTsFileCallBack { + + void call(TsFileResource oldTsFileResource, List newTsFileResources) + throws WriteProcessException; + } + @FunctionalInterface public interface CloseCompactionMergeCallBack { @@ -3256,8 +3347,4 @@ public interface TimePartitionFilter { boolean satisfy(String storageGroupName, long timePartitionId); } - - public String getInsertWriteLockHolder() { - return insertWriteLockHolder; - } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 5bc207057b26..dfbb57b5f984 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.SettleTsFileCallBack; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack; import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex; import org.apache.iotdb.db.engine.storagegroup.timeindex.FileTimeIndex; @@ -135,6 +136,8 @@ public TsFileProcessor getProcessor() { */ private UpgradeTsFileResourceCallBack upgradeTsFileResourceCallBack; + private SettleTsFileCallBack settleTsFileCallBack; + /** * indicate if this tsfile resource belongs to a sequence tsfile or not used for upgrading * v0.9.x/v1 -> 0.10/v2 @@ -711,6 +714,14 @@ public UpgradeTsFileResourceCallBack getUpgradeTsFileResourceCallBack() { return upgradeTsFileResourceCallBack; } + public SettleTsFileCallBack getSettleTsFileCallBack() { + return settleTsFileCallBack; + } + + public void setSettleTsFileCallBack(SettleTsFileCallBack settleTsFileCallBack) { + this.settleTsFileCallBack = settleTsFileCallBack; + } + /** make sure Either the deviceToIndex is not empty Or the path contains a partition folder */ public long getTimePartition() { return timeIndex.getTimePartition(file.getAbsolutePath()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java index 27bea071859f..7f6b90379a22 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java @@ -62,6 +62,8 @@ public class VirtualStorageGroupManager { */ private AtomicBoolean[] isVsgReady; + private AtomicBoolean isSettling = new AtomicBoolean(); + /** value of root.stats."root.sg".TOTAL_POINTS */ private long monitorSeriesValue; @@ -334,6 +336,18 @@ public void upgradeAll() { } } + public void getResourcesToBeSettled( + List seqResourcesToBeSettled, + List unseqResourcesToBeSettled, + List tsFilePaths) { + for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) { + if (storageGroupProcessor != null) { + storageGroupProcessor.addSettleFilesToList( + seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths); + } + } + } + /** push mergeAll operation down to all virtual storage group processors */ public void mergeAll(boolean isFullMerge) { for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) { @@ -446,4 +460,12 @@ public void releaseWalDirectByteBufferPool() { public void reset() { Arrays.fill(virtualStorageGroupProcessor, null); } + + public void setSettling(boolean settling) { + isSettling.set(settling); + } + + public AtomicBoolean getIsSettling() { + return isSettling; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java index 9572839c3583..ad20d774d1fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java @@ -183,6 +183,8 @@ private SQLConstant() { public static final int TOK_SET_SYSTEM_MODE = 110; + public static final int TOK_SETTLE = 111; + public static final Map tokenNames = new HashMap<>(); public static String[] getSingleRootArray() { @@ -253,6 +255,8 @@ public static String[] getSingleTimeArray() { tokenNames.put(TOK_SHOW_CONTINUOUS_QUERIES, "TOK_SHOW_CONTINUOUS_QUERIES"); tokenNames.put(TOK_SELECT_INTO, "TOK_SELECT_INTO"); + + tokenNames.put(TOK_SETTLE, "TOK_SETTLE"); } public static boolean isReservedPath(PartialPath pathStr) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java index 9f56a528cd00..6b5a11b2241b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.exception.TriggerExecutionException; import org.apache.iotdb.db.exception.TriggerManagementException; import org.apache.iotdb.db.exception.UDFRegistrationException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; @@ -104,6 +105,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan; import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan; +import org.apache.iotdb.db.qp.physical.sys.SettlePlan; import org.apache.iotdb.db.qp.physical.sys.ShowChildNodesPlan; import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan; import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan; @@ -130,6 +132,7 @@ import org.apache.iotdb.db.query.udf.service.UDFRegistrationInformation; import org.apache.iotdb.db.query.udf.service.UDFRegistrationService; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.service.SettleService; import org.apache.iotdb.db.tools.TsFileRewriteTool; import org.apache.iotdb.db.utils.AuthUtils; import org.apache.iotdb.db.utils.FileLoaderUtils; @@ -373,6 +376,9 @@ public boolean processNonQuery(PhysicalPlan plan) return operateCreateContinuousQuery((CreateContinuousQueryPlan) plan); case DROP_CONTINUOUS_QUERY: return operateDropContinuousQuery((DropContinuousQueryPlan) plan); + case SETTLE: + settle((SettlePlan) plan); + return true; default: throw new UnsupportedOperationException( String.format("operation %s is not supported", plan.getOperatorType())); @@ -1145,6 +1151,9 @@ private void loadFile(File file, OperateFilePlan plan) throws QueryProcessExcept "try to split the tsFile={} du to it spans multi partitions", tsFileResource.getTsFile().getPath()); TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResources); + tsFileResource.writeLock(); + tsFileResource.removeModFile(); + tsFileResource.writeUnlock(); logger.info( "after split, the old tsFile was split to {} new tsFiles", splitResources.size()); } @@ -2039,4 +2048,40 @@ List checkStorageGroupExist(List storageGroups) { } return noExistSg; } + + private void settle(SettlePlan plan) throws StorageEngineException { + if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) { + throw new StorageEngineException( + "Current system mode is read only, does not support file settle"); + } + if (!SettleService.getINSTANCE().isRecoverFinish()) { + throw new StorageEngineException("Existing sg that is not ready, please try later."); + } + PartialPath sgPath = null; + try { + List seqResourcesToBeSettled = new ArrayList<>(); + List unseqResourcesToBeSettled = new ArrayList<>(); + List tsFilePaths = new ArrayList<>(); + if (plan.isSgPath()) { + sgPath = plan.getSgPath(); + } else { + String tsFilePath = plan.getTsFilePath(); + if (new File(tsFilePath).isDirectory()) { + throw new WriteProcessException("The file should not be a directory."); + } else if (!new File(tsFilePath).exists()) { + throw new WriteProcessException("The tsFile " + tsFilePath + " is not existed."); + } + sgPath = SettleService.getINSTANCE().getSGByFilePath(tsFilePath); + tsFilePaths.add(tsFilePath); + } + StorageEngine.getInstance() + .getResourcesToBeSettled( + sgPath, seqResourcesToBeSettled, unseqResourcesToBeSettled, tsFilePaths); + SettleService.getINSTANCE().startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled); + StorageEngine.getInstance().setSettling(sgPath, false); + } catch (WriteProcessException e) { + if (sgPath != null) StorageEngine.getInstance().setSettling(sgPath, false); + throw new StorageEngineException(e.getMessage()); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java index fe9575e9a140..05432cb5dcc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java @@ -171,6 +171,8 @@ public enum OperatorType { CREATE_CONTINUOUS_QUERY, DROP_CONTINUOUS_QUERY, SHOW_CONTINUOUS_QUERIES, - SET_SYSTEM_MODE + SET_SYSTEM_MODE, + + SETTLE } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SettleOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SettleOperator.java new file mode 100644 index 000000000000..5c6174c2c143 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/SettleOperator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.qp.logical.sys; + +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.SettlePlan; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; + +public class SettleOperator extends Operator { + PartialPath sgPath; + String tsFilePath; + boolean isSgPath = false; + + public SettleOperator(int tokenIntType) { + super(tokenIntType); + operatorType = OperatorType.SETTLE; + } + + public PartialPath getSgPath() { + return sgPath; + } + + public void setSgPath(PartialPath sgPath) { + this.sgPath = sgPath; + } + + public String getTsFilePath() { + return tsFilePath; + } + + public void setTsFilePath(String tsFilePath) { + this.tsFilePath = tsFilePath; + } + + public boolean getIsSgPath() { + return isSgPath; + } + + public void setIsSgPath(boolean isSgPath) { + this.isSgPath = isSgPath; + } + + @Override + public PhysicalPlan generatePhysicalPlan(PhysicalGenerator generator) + throws QueryProcessException { + if (isSgPath) { + return new SettlePlan(getSgPath()); + } else { + return new SettlePlan(getTsFilePath()); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SettlePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SettlePlan.java new file mode 100644 index 000000000000..19cf0a73441e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SettlePlan.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.qp.physical.sys; + +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; + +import java.util.Collections; +import java.util.List; + +public class SettlePlan extends PhysicalPlan { + PartialPath sgPath; + String tsFilePath; + boolean isSgPath; + + public SettlePlan(PartialPath sgPath) { + super(false, OperatorType.SETTLE); + this.sgPath = sgPath; + setIsSgPath(true); + } + + public SettlePlan(String tsFilePath) { + super(false, OperatorType.SETTLE); + this.tsFilePath = tsFilePath; + setIsSgPath(false); + } + + public boolean isSgPath() { + return isSgPath; + } + + public void setIsSgPath(boolean isSgPath) { + this.isSgPath = isSgPath; + } + + @Override + public List getPaths() { + return Collections.singletonList(sgPath); + } + + public PartialPath getSgPath() { + return sgPath; + } + + public String getTsFilePath() { + return tsFilePath; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java index 1d4fc6d95921..15481643d388 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java @@ -83,6 +83,7 @@ import org.apache.iotdb.db.qp.logical.sys.SetStorageGroupOperator; import org.apache.iotdb.db.qp.logical.sys.SetSystemModeOperator; import org.apache.iotdb.db.qp.logical.sys.SetTTLOperator; +import org.apache.iotdb.db.qp.logical.sys.SettleOperator; import org.apache.iotdb.db.qp.logical.sys.ShowChildNodesOperator; import org.apache.iotdb.db.qp.logical.sys.ShowChildPathsOperator; import org.apache.iotdb.db.qp.logical.sys.ShowContinuousQueriesOperator; @@ -564,6 +565,24 @@ public Operator visitFlush(FlushContext ctx) { return flushOperator; } + @Override + public Operator visitSettle(SqlBaseParser.SettleContext ctx) { + SettleOperator settleOperator = new SettleOperator(SQLConstant.TOK_SETTLE); + if (ctx.pathOrString().prefixPath() != null) { + PartialPath sgPath = parsePrefixPath(ctx.pathOrString().prefixPath()); + settleOperator.setSgPath(sgPath); + settleOperator.setIsSgPath(true); + } else if (!ctx.pathOrString().stringLiteral().getText().equals("")) { + String tsFilePath = removeStringQuote(ctx.pathOrString().stringLiteral().getText()); + settleOperator.setTsFilePath(tsFilePath); + settleOperator.setIsSgPath(false); + } else { + // do nothing + } + + return settleOperator; + } + @Override public Operator visitFullMerge(FullMergeContext ctx) { return new MergeOperator(SQLConstant.TOK_FULL_MERGE); diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java index cf329af0198b..01cde88d1f4c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java @@ -103,9 +103,7 @@ public synchronized void closeFileAndRemoveReader(String filePath) throws IOExce } private void clearUnUsedFilesInFixTime() { - long examinePeriod = IoTDBDescriptor.getInstance().getConfig().getCacheFileReaderClearPeriod(); - executorService.scheduleAtFixedRate( () -> { synchronized (this) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index d26fa4bac1b6..f937037fadb1 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -155,6 +155,7 @@ private void setUp() throws StartupException { registerManager.register(StatMonitor.getInstance()); registerManager.register(SyncServerManager.getInstance()); registerManager.register(UpgradeSevice.getINSTANCE()); + registerManager.register(SettleService.getINSTANCE()); logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!"); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index 2f61d9024e16..7751ecabc6d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -36,6 +36,7 @@ public enum ServiceType { FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""), SYNC_SERVICE("SYNC ServerService", ""), UPGRADE_SERVICE("UPGRADE DataService", ""), + SETTLE_SERVICE("SETTLE DataService", ""), MERGE_SERVICE("Merge Manager", "Merge Manager"), COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"), PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"), diff --git a/server/src/main/java/org/apache/iotdb/db/service/SettleService.java b/server/src/main/java/org/apache/iotdb/db/service/SettleService.java new file mode 100644 index 000000000000..0d165f0d7d38 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/SettleService.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.service; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.settle.SettleLog; +import org.apache.iotdb.db.engine.settle.SettleTask; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +public class SettleService implements IService { + private static final Logger logger = LoggerFactory.getLogger(SettleService.class); + + private AtomicInteger threadCnt = new AtomicInteger(); + private ExecutorService settleThreadPool; + private boolean isRecoverFinish; + + private static AtomicInteger filesToBeSettledCount = new AtomicInteger(); + + public static SettleService getINSTANCE() { + return InstanceHolder.INSTANCE; + } + + public static class InstanceHolder { + private static final SettleService INSTANCE = new SettleService(); + + private InstanceHolder() {} + } + + @Override + public void start() { + if (settleThreadPool == null) { + int settleThreadNum = IoTDBDescriptor.getInstance().getConfig().getSettleThreadNum(); + settleThreadPool = + Executors.newFixedThreadPool( + settleThreadNum, r -> new Thread(r, "SettleThread-" + threadCnt.getAndIncrement())); + } + TsFileAndModSettleTool.findFilesToBeRecovered(); + + /* Classify the file paths by the SG, and then call the methods of StorageGroupProcessor of each + SG in turn to get the TsFileResources.*/ + Map> tmpSgResourcesMap = new HashMap<>(); // sgPath -> tsFilePaths + try { + for (String filePath : TsFileAndModSettleTool.getInstance().recoverSettleFileMap.keySet()) { + PartialPath sgPath = getSGByFilePath(filePath); + if (tmpSgResourcesMap.containsKey(sgPath)) { + List filePaths = tmpSgResourcesMap.get(sgPath); + filePaths.add(filePath); + tmpSgResourcesMap.put(sgPath, filePaths); + } else { + List tsFilePaths = new ArrayList<>(); + tsFilePaths.add(filePath); + tmpSgResourcesMap.put(sgPath, tsFilePaths); + } + } + while (!StorageEngine.getInstance().isAllSgReady()) { + // wait for all sg ready + } + List seqResourcesToBeSettled = new ArrayList<>(); + List unseqResourcesToBeSettled = new ArrayList<>(); + for (Map.Entry> entry : tmpSgResourcesMap.entrySet()) { + try { + StorageEngine.getInstance() + .getResourcesToBeSettled( + entry.getKey(), + seqResourcesToBeSettled, + unseqResourcesToBeSettled, + entry.getValue()); + } catch (StorageEngineException e) { + e.printStackTrace(); + } finally { + StorageEngine.getInstance().setSettling(entry.getKey(), false); + } + } + startSettling(seqResourcesToBeSettled, unseqResourcesToBeSettled); + setRecoverFinish(true); + } catch (WriteProcessException e) { + e.printStackTrace(); + } + } + + public void startSettling( + List seqResourcesToBeSettled, List unseqResourcesToBeSettled) + throws WriteProcessException { + filesToBeSettledCount.addAndGet( + seqResourcesToBeSettled.size() + unseqResourcesToBeSettled.size()); + if (!SettleLog.createSettleLog() || filesToBeSettledCount.get() == 0) { + stop(); + return; + } + logger.info( + "Totally find " + + (seqResourcesToBeSettled.size() + unseqResourcesToBeSettled.size()) + + " tsFiles to be settled."); + // settle seqTsFile + for (TsFileResource resource : seqResourcesToBeSettled) { + resource.readLock(); + resource.setSeq(true); + submitSettleTask(new SettleTask(resource)); + } + // settle unseqTsFile + for (TsFileResource resource : unseqResourcesToBeSettled) { + resource.readLock(); + resource.setSeq(false); + submitSettleTask(new SettleTask(resource)); + } + } + + @Override + public void stop() { + SettleLog.closeLogWriter(); + TsFileAndModSettleTool.clearRecoverSettleFileMap(); + filesToBeSettledCount.set(0); + if (settleThreadPool != null) { + settleThreadPool.shutdownNow(); + logger.info("Waiting for settle task pool to shut down"); + settleThreadPool = null; + logger.info("Settle service stopped"); + } + } + + @Override + public ServiceType getID() { + return ServiceType.SETTLE_SERVICE; + } + + public AtomicInteger getFilesToBeSettledCount() { + return filesToBeSettledCount; + } + + public PartialPath getSGByFilePath(String tsFilePath) throws WriteProcessException { + PartialPath sgPath = null; + try { + sgPath = + new PartialPath( + new File(tsFilePath).getParentFile().getParentFile().getParentFile().getName()); + } catch (IllegalPathException e) { + throw new WriteProcessException( + "Fail to get sg of this tsFile while parsing the file path.", e); + } + return sgPath; + } + + private void submitSettleTask(SettleTask settleTask) throws WriteProcessException { + // settleThreadPool.submit(settleTask); + settleTask.settleTsFile(); + } + + public boolean isRecoverFinish() { + return isRecoverFinish; + } + + public void setRecoverFinish(boolean recoverFinish) { + isRecoverFinish = recoverFinish; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java index cc4cbef0b6ab..d71b823392f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; @@ -34,12 +36,14 @@ import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2; @@ -69,12 +73,9 @@ public class TsFileRewriteTool implements AutoCloseable { protected TsFileSequenceReader reader; protected File oldTsFile; protected List oldModification; + protected TsFileResource oldTsFileResource; protected Iterator modsIterator; - /** new tsFile writer -> list of new modification */ - protected Map fileModificationMap; - - protected Deletion currentMod; protected Decoder defaultTimeDecoder = Decoder.getDecoderByType( TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), @@ -97,6 +98,7 @@ public class TsFileRewriteTool implements AutoCloseable { * @throws IOException If some I/O error occurs */ public TsFileRewriteTool(TsFileResource resourceToBeRewritten) throws IOException { + oldTsFileResource = resourceToBeRewritten; oldTsFile = resourceToBeRewritten.getTsFile(); String file = oldTsFile.getAbsolutePath(); reader = new TsFileSequenceReader(file); @@ -104,12 +106,12 @@ public TsFileRewriteTool(TsFileResource resourceToBeRewritten) throws IOExceptio if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFile.FILE_SUFFIX).exists()) { oldModification = (List) resourceToBeRewritten.getModFile().getModifications(); modsIterator = oldModification.iterator(); - fileModificationMap = new HashMap<>(); } } public TsFileRewriteTool(TsFileResource resourceToBeRewritten, boolean needReaderForV2) throws IOException { + oldTsFileResource = resourceToBeRewritten; oldTsFile = resourceToBeRewritten.getTsFile(); String file = oldTsFile.getAbsolutePath(); if (needReaderForV2) { @@ -121,7 +123,6 @@ public TsFileRewriteTool(TsFileResource resourceToBeRewritten, boolean needReade if (FSFactoryProducer.getFSFactory().getFile(file + ModificationFile.FILE_SUFFIX).exists()) { oldModification = (List) resourceToBeRewritten.getModFile().getModifications(); modsIterator = oldModification.iterator(); - fileModificationMap = new HashMap<>(); } } @@ -133,7 +134,7 @@ public TsFileRewriteTool(TsFileResource resourceToBeRewritten, boolean needReade */ public static void rewriteTsFile( TsFileResource resourceToBeRewritten, List rewrittenResources) - throws IOException, WriteProcessException { + throws IOException, WriteProcessException, IllegalPathException { try (TsFileRewriteTool rewriteTool = new TsFileRewriteTool(resourceToBeRewritten)) { rewriteTool.parseAndRewriteFile(rewrittenResources); } @@ -151,18 +152,22 @@ public void close() throws IOException { */ @SuppressWarnings({"squid:S3776", "deprecation"}) // Suppress high Cognitive Complexity warning public void parseAndRewriteFile(List rewrittenResources) - throws IOException, WriteProcessException { + throws IOException, WriteProcessException, IllegalPathException { // check if the TsFile has correct header if (!fileCheck()) { return; } - int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; + int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length; reader.position(headerLength); + if (reader.readMarker() != 3) { + throw new WriteProcessException( + "The version of this tsfile is too low, please upgrade it to the version 3."); + } // start to scan chunks and chunkGroups byte marker; - String deviceId = null; boolean firstChunkInChunkGroup = true; + long chunkHeaderOffset; try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { @@ -174,6 +179,7 @@ public void parseAndRewriteFile(List rewrittenResources) break; case MetaMarker.CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + chunkHeaderOffset = reader.position() - 1; ChunkHeader header = reader.readChunkHeader(marker); UnaryMeasurementSchema measurementSchema = new UnaryMeasurementSchema( @@ -191,7 +197,8 @@ public void parseAndRewriteFile(List rewrittenResources) // a new Page PageHeader pageHeader = reader.readPageHeader(dataType, header.getChunkType() == MetaMarker.CHUNK_HEADER); - boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader); + boolean needToDecode = + checkIfNeedToDecode(measurementSchema, deviceId, pageHeader, chunkHeaderOffset); needToDecodeInfo.add(needToDecode); ByteBuffer pageData = !needToDecode @@ -207,7 +214,8 @@ public void parseAndRewriteFile(List rewrittenResources) measurementSchema, pageHeadersInChunk, dataInChunk, - needToDecodeInfo); + needToDecodeInfo, + chunkHeaderOffset); firstChunkInChunkGroup = false; break; case MetaMarker.OPERATION_INDEX_RANGE: @@ -238,25 +246,7 @@ public void parseAndRewriteFile(List rewrittenResources) for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter)); } - // write the remain modification for new file - if (oldModification != null) { - while (currentMod != null || modsIterator.hasNext()) { - if (currentMod == null) { - currentMod = (Deletion) modsIterator.next(); - } - for (Entry entry : fileModificationMap.entrySet()) { - TsFileIOWriter tsFileIOWriter = entry.getKey(); - ModificationFile newMods = entry.getValue(); - newMods.write( - new Deletion( - currentMod.getPath(), - tsFileIOWriter.getFile().length(), - currentMod.getStartTime(), - currentMod.getEndTime())); - } - currentMod = null; - } - } + } catch (IOException e2) { throw new IOException( "TsFile rewrite process cannot proceed at position " @@ -275,10 +265,28 @@ public void parseAndRewriteFile(List rewrittenResources) * false. */ protected boolean checkIfNeedToDecode( - TSDataType dataType, TSEncoding encoding, PageHeader pageHeader) { + UnaryMeasurementSchema schema, String deviceId, PageHeader pageHeader, long chunkHeaderOffset) + throws IllegalPathException { if (pageHeader.getStatistics() == null) { return true; } + // Decode is required if the page has data to be deleted. Otherwise, decode is not required + if (oldModification != null) { + modsIterator = oldModification.iterator(); + Deletion currentDeletion = null; + while (modsIterator.hasNext()) { + currentDeletion = (Deletion) modsIterator.next(); + if (currentDeletion + .getPath() + .matchFullPath(new PartialPath(deviceId + "." + schema.getMeasurementId())) + && currentDeletion.getFileOffset() > chunkHeaderOffset) { + if (pageHeader.getStartTime() <= currentDeletion.getEndTime() + && pageHeader.getEndTime() >= currentDeletion.getStartTime()) { + return true; + } + } + } + } return StorageEngine.getTimePartition(pageHeader.getStartTime()) != StorageEngine.getTimePartition(pageHeader.getEndTime()); } @@ -294,13 +302,15 @@ protected void reWriteChunk( UnaryMeasurementSchema schema, List pageHeadersInChunk, List pageDataInChunk, - List needToDecodeInfoInChunk) - throws IOException, PageException { + List needToDecodeInfoInChunk, + long chunkHeaderOffset) + throws IOException, PageException, IllegalPathException { valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); Map partitionChunkWriterMap = new HashMap<>(); for (int i = 0; i < pageDataInChunk.size(); i++) { if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) { - decodeAndWritePage(schema, pageDataInChunk.get(i), partitionChunkWriterMap); + decodeAndWritePage( + deviceId, schema, pageDataInChunk.get(i), partitionChunkWriterMap, chunkHeaderOffset); } else { writePage( schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap); @@ -350,10 +360,6 @@ protected TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partiti logger.error("Create new TsFile {} failed because it exists", newFile); } TsFileIOWriter writer = new TsFileIOWriter(newFile); - if (oldModification != null) { - fileModificationMap.put( - writer, new ModificationFile(newFile + ModificationFile.FILE_SUFFIX)); - } return writer; } catch (IOException e) { logger.error("Create new TsFile {} failed ", newFile, e); @@ -376,17 +382,46 @@ protected void writePage( } protected void decodeAndWritePage( + String deviceId, UnaryMeasurementSchema schema, ByteBuffer pageData, - Map partitionChunkWriterMap) - throws IOException { + Map partitionChunkWriterMap, + long chunkHeaderOffset) + throws IOException, IllegalPathException { valueDecoder.reset(); PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null); + // read delete time range from old modification file + List deleteIntervalList = + getOldSortedDeleteIntervals(deviceId, schema, chunkHeaderOffset); + pageReader.setDeleteIntervalList(deleteIntervalList); BatchData batchData = pageReader.getAllSatisfiedPageData(); rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap); } + private List getOldSortedDeleteIntervals( + String deviceId, UnaryMeasurementSchema schema, long chunkHeaderOffset) + throws IllegalPathException { + if (oldModification != null) { + ChunkMetadata chunkMetadata = new ChunkMetadata(); + modsIterator = oldModification.iterator(); + Deletion currentDeletion = null; + while (modsIterator.hasNext()) { + currentDeletion = (Deletion) modsIterator.next(); + // if deletion path match the chunkPath, then add the deletion to the list + if (currentDeletion + .getPath() + .matchFullPath(new PartialPath(deviceId + "." + schema.getMeasurementId())) + && currentDeletion.getFileOffset() > chunkHeaderOffset) { + chunkMetadata.insertIntoSortedDeletions( + currentDeletion.getStartTime(), currentDeletion.getEndTime()); + } + } + return chunkMetadata.getDeleteIntervalList(); + } + return null; + } + protected void rewritePageIntoFiles( BatchData batchData, UnaryMeasurementSchema schema, diff --git a/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileAndModSettleTool.java b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileAndModSettleTool.java new file mode 100644 index 000000000000..3e7a7006ba7d --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/tools/settle/TsFileAndModSettleTool.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools.settle; + +import org.apache.iotdb.db.engine.settle.SettleLog; +import org.apache.iotdb.db.engine.settle.SettleLog.SettleCheckStatus; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.tools.TsFileRewriteTool; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +/** + * Offline Settle tool, which is used to settle TsFile and its corresponding mods file to a new + * TsFile. + */ +public class TsFileAndModSettleTool { + private static final Logger logger = LoggerFactory.getLogger(TsFileAndModSettleTool.class); + // TsFilePath -> SettleCheckStatus + public Map recoverSettleFileMap = new HashMap<>(); + private static final TsFileAndModSettleTool tsFileAndModSettleTool = new TsFileAndModSettleTool(); + + private TsFileAndModSettleTool() {} + + public static TsFileAndModSettleTool getInstance() { + return tsFileAndModSettleTool; + } + + public static void main(String[] args) { + Map oldTsFileResources = new HashMap<>(); + findFilesToBeRecovered(); + for (Map.Entry entry : getInstance().recoverSettleFileMap.entrySet()) { + String path = entry.getKey(); + TsFileResource resource = new TsFileResource(new File(path)); + resource.setClosed(true); + oldTsFileResources.put(resource.getTsFile().getName(), resource); + } + List tsFiles = checkArgs(args); + for (File file : tsFiles) { + if (!oldTsFileResources.containsKey(file.getName())) { + if (new File(file + TsFileResource.RESOURCE_SUFFIX).exists()) { + TsFileResource resource = new TsFileResource(file); + resource.setClosed(true); + oldTsFileResources.put(file.getName(), resource); + } + } + } + System.out.println( + "Totally find " + + oldTsFileResources.size() + + " tsFiles to be settled, including " + + getInstance().recoverSettleFileMap.size() + + " tsFiles to be recovered."); + settleTsFilesAndMods(oldTsFileResources); + } + + public static List checkArgs(String[] args) { + String filePath = "test.tsfile"; + List files = new ArrayList<>(); + if (args.length == 0) { + return null; + } else { + for (String arg : args) { + if (arg.endsWith(TSFILE_SUFFIX)) { // it's a file + File f = new File(arg); + if (!f.exists()) { + logger.warn("Cannot find TsFile : " + arg); + continue; + } + files.add(f); + } else { // it's a dir + List tmpFiles = getAllFilesInOneDirBySuffix(arg, TSFILE_SUFFIX); + files.addAll(tmpFiles); + } + } + } + return files; + } + + private static List getAllFilesInOneDirBySuffix(String dirPath, String suffix) { + File dir = new File(dirPath); + if (!dir.isDirectory()) { + logger.warn("It's not a directory path : " + dirPath); + return Collections.emptyList(); + } + if (!dir.exists()) { + logger.warn("Cannot find Directory : " + dirPath); + return Collections.emptyList(); + } + List tsFiles = + new ArrayList<>( + Arrays.asList(FSFactoryProducer.getFSFactory().listFilesBySuffix(dirPath, suffix))); + File[] tmpFiles = dir.listFiles(); + if (tmpFiles != null) { + for (File f : tmpFiles) { + if (f.isDirectory()) { + tsFiles.addAll(getAllFilesInOneDirBySuffix(f.getAbsolutePath(), suffix)); + } + } + } + return tsFiles; + } + + /** + * This method is used to settle tsFiles and mods files, so that each old TsFile corresponds to + * one or several new TsFiles. This method is only applicable to V3 TsFile. Each old TsFile + * corresponds to one or several new TsFileResources of the new TsFiles + */ + public static void settleTsFilesAndMods(Map resourcesToBeSettled) { + int successCount = 0; + Map> newTsFileResources = new HashMap<>(); + SettleLog.createSettleLog(); + for (Map.Entry entry : resourcesToBeSettled.entrySet()) { + TsFileResource resourceToBeSettled = entry.getValue(); + List settledTsFileResources = new ArrayList<>(); + try { + TsFileAndModSettleTool tsFileAndModSettleTool = TsFileAndModSettleTool.getInstance(); + System.out.println("Start settling for tsFile : " + resourceToBeSettled.getTsFilePath()); + if (tsFileAndModSettleTool.isSettledFileGenerated(resourceToBeSettled)) { + settledTsFileResources = tsFileAndModSettleTool.findSettledFile(resourceToBeSettled); + newTsFileResources.put(resourceToBeSettled.getTsFile().getName(), settledTsFileResources); + } else { + // Write Settle Log, Status 1 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFilePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.BEGIN_SETTLE_FILE); + tsFileAndModSettleTool.settleOneTsFileAndMod(resourceToBeSettled, settledTsFileResources); + // Write Settle Log, Status 2 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFilePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.AFTER_SETTLE_FILE); + newTsFileResources.put(resourceToBeSettled.getTsFile().getName(), settledTsFileResources); + } + + moveNewTsFile(resourceToBeSettled, settledTsFileResources); + // Write Settle Log, Status 3 + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFilePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.SETTLE_SUCCESS); + System.out.println( + "Finish settling successfully for tsFile : " + resourceToBeSettled.getTsFilePath()); + successCount++; + } catch (Exception e) { + System.out.println( + "Meet error while settling the tsFile : " + resourceToBeSettled.getTsFilePath()); + e.printStackTrace(); + } + } + if (resourcesToBeSettled.size() == successCount) { + SettleLog.closeLogWriter(); + System.out.println("Finish settling all tsfiles Successfully!"); + } else { + System.out.println( + "Finish Settling, " + + (resourcesToBeSettled.size() - successCount) + + " tsfiles meet errors."); + } + } + + /** + * The size of settledResources will be 0 in one of the following conditions: (1) old TsFile is + * not closed (2) old ModFile is not existed (3) all data in the old tsfile is being deleted after + * settling + */ + public void settleOneTsFileAndMod( + TsFileResource resourceToBeSettled, List settledResources) + throws WriteProcessException, IllegalPathException, IOException { + if (!resourceToBeSettled.isClosed()) { + logger.warn( + "The tsFile {} should be sealed when rewritting.", resourceToBeSettled.getTsFilePath()); + return; + } + // if no deletions to this tsfile, then return. + if (!resourceToBeSettled.getModFile().exists()) { + return; + } + try (TsFileRewriteTool tsFileRewriteTool = new TsFileRewriteTool(resourceToBeSettled)) { + tsFileRewriteTool.parseAndRewriteFile(settledResources); + } + if (settledResources.size() == 0) { + resourceToBeSettled.setDeleted(true); + } + } + + public static void findFilesToBeRecovered() { + if (FSFactoryProducer.getFSFactory().getFile(SettleLog.getSettleLogPath()).exists()) { + try (BufferedReader settleLogReader = + new BufferedReader( + new FileReader( + FSFactoryProducer.getFSFactory().getFile(SettleLog.getSettleLogPath())))) { + String line = null; + while ((line = settleLogReader.readLine()) != null && !line.equals("")) { + String oldFilePath = line.split(SettleLog.COMMA_SEPERATOR)[0]; + int settleCheckStatus = Integer.parseInt(line.split(SettleLog.COMMA_SEPERATOR)[1]); + if (settleCheckStatus == SettleCheckStatus.SETTLE_SUCCESS.getCheckStatus()) { + getInstance().recoverSettleFileMap.remove(oldFilePath); + continue; + } + getInstance().recoverSettleFileMap.put(oldFilePath, settleCheckStatus); + } + } catch (IOException e) { + logger.error( + "meet error when reading settle log, log path:{}", SettleLog.getSettleLogPath(), e); + } finally { + FSFactoryProducer.getFSFactory().getFile(SettleLog.getSettleLogPath()).delete(); + } + } + } + + /** this method is used to check whether the new file is settled when recovering old tsFile. */ + public boolean isSettledFileGenerated(TsFileResource oldTsFileResource) { + String oldFilePath = oldTsFileResource.getTsFilePath(); + return TsFileAndModSettleTool.getInstance().recoverSettleFileMap.containsKey(oldFilePath) + && TsFileAndModSettleTool.getInstance().recoverSettleFileMap.get(oldFilePath) + == SettleCheckStatus.AFTER_SETTLE_FILE.getCheckStatus(); + } + + /** when the new file is settled , we need to find and deserialize it. */ + public List findSettledFile(TsFileResource resourceToBeSettled) + throws IOException { + List settledTsFileResources = new ArrayList<>(); + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFilePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.BEGIN_SETTLE_FILE); + + File[] tmpFiles = resourceToBeSettled.getTsFile().getParentFile().listFiles(); + if (tmpFiles != null) { + for (File tempPartitionDir : tmpFiles) { + if (tempPartitionDir.isDirectory() + && FSFactoryProducer.getFSFactory() + .getFile( + tempPartitionDir, + resourceToBeSettled.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX) + .exists()) { + TsFileResource settledTsFileResource = + new TsFileResource( + FSFactoryProducer.getFSFactory() + .getFile(tempPartitionDir, resourceToBeSettled.getTsFile().getName())); + settledTsFileResource.deserialize(); + settledTsFileResources.add(settledTsFileResource); + } + } + } + SettleLog.writeSettleLog( + resourceToBeSettled.getTsFilePath() + + SettleLog.COMMA_SEPERATOR + + SettleCheckStatus.AFTER_SETTLE_FILE); + return settledTsFileResources; + } + + /** + * This method is used to move a new TsFile and its corresponding resource file to the correct + * folder. + * + * @param oldTsFileResource + * @param newTsFileResources if the old TsFile has not any deletions or all the data in which has + * been deleted or its modFile does not exist, then this size will be 0. + * @throws IOException + */ + public static void moveNewTsFile( + TsFileResource oldTsFileResource, List newTsFileResources) + throws IOException { + // delete old mods + oldTsFileResource.removeModFile(); + + File newPartitionDir = + new File( + oldTsFileResource.getTsFile().getParent() + + File.separator + + oldTsFileResource.getTimePartition()); + if (newTsFileResources.size() == 0) { // if the oldTsFile has no mods, it should not be deleted. + if (oldTsFileResource.isDeleted()) { + oldTsFileResource.remove(); + } + if (newPartitionDir.exists()) { + newPartitionDir.delete(); + } + return; + } + FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + File oldTsFile = oldTsFileResource.getTsFile(); + boolean isOldFileExisted = oldTsFile.exists(); + oldTsFile.delete(); + for (TsFileResource newTsFileResource : newTsFileResources) { + newPartitionDir = + new File( + oldTsFileResource.getTsFile().getParent() + + File.separator + + newTsFileResource.getTimePartition()); + // if old TsFile has been deleted by other threads, then delete its new TsFile. + if (!isOldFileExisted) { + newTsFileResource.remove(); + } else { + File newTsFile = newTsFileResource.getTsFile(); + + // move TsFile + fsFactory.moveFile(newTsFile, oldTsFile); + + // move .resource File + newTsFileResource.setFile(fsFactory.getFile(oldTsFile.getParent(), newTsFile.getName())); + newTsFileResource.setClosed(true); + try { + newTsFileResource.serialize(); + } catch (IOException e) { + e.printStackTrace(); + } + File tmpResourceFile = + fsFactory.getFile( + newPartitionDir, newTsFile.getName() + TsFileResource.RESOURCE_SUFFIX); + if (tmpResourceFile.exists()) { + tmpResourceFile.delete(); + } + } + // if the newPartition folder is empty, then it will be deleted + if (newPartitionDir.exists()) newPartitionDir.delete(); + } + } + + public static void clearRecoverSettleFileMap() { + getInstance().recoverSettleFileMap.clear(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java index 7b181b2cd85e..1457df4e3f79 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java @@ -19,9 +19,8 @@ package org.apache.iotdb.db.tools.upgrade; import org.apache.iotdb.db.engine.StorageEngine; -import org.apache.iotdb.db.engine.modification.Deletion; -import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.tools.TsFileRewriteTool; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; @@ -47,7 +46,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { @@ -97,10 +95,12 @@ private void upgradeFile(List upgradedResources) boolean firstChunkInChunkGroup = true; String deviceId = null; boolean skipReadingChunk = true; + long chunkHeaderOffset; try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { case MetaMarker.CHUNK_HEADER: + chunkHeaderOffset = reader.position() - 1; if (skipReadingChunk || deviceId == null) { ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader(); int dataSize = header.getDataSize(); @@ -138,7 +138,14 @@ private void upgradeFile(List upgradedResources) // a new Page PageHeader pageHeader = ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType); - boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader); + boolean needToDecode = + checkIfNeedToDecode( + dataType, + encoding, + pageHeader, + measurementSchema, + deviceId, + chunkHeaderOffset); needToDecodeInfo.add(needToDecode); ByteBuffer pageData = !needToDecode @@ -163,7 +170,8 @@ private void upgradeFile(List upgradedResources) measurementSchema, pageHeadersInChunk, dataInChunk, - needToDecodeInfo); + needToDecodeInfo, + chunkHeaderOffset); if (firstChunkInChunkGroup) { firstChunkInChunkGroup = false; } @@ -188,26 +196,6 @@ private void upgradeFile(List upgradedResources) break; case MetaMarker.VERSION: long version = ((TsFileSequenceReaderForV2) reader).readVersion(); - // convert old Modification to new - if (oldModification != null && modsIterator.hasNext()) { - if (currentMod == null) { - currentMod = (Deletion) modsIterator.next(); - } - if (currentMod.getFileOffset() <= version) { - for (Entry entry : - fileModificationMap.entrySet()) { - TsFileIOWriter tsFileIOWriter = entry.getKey(); - ModificationFile newMods = entry.getValue(); - newMods.write( - new Deletion( - currentMod.getPath(), - tsFileIOWriter.getFile().length(), - currentMod.getStartTime(), - currentMod.getEndTime())); - } - currentMod = null; - } - } // write plan indices for ending memtable for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { tsFileIOWriter.writePlanIndices(); @@ -223,25 +211,9 @@ private void upgradeFile(List upgradedResources) for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { upgradedResources.add(endFileAndGenerateResource(tsFileIOWriter)); } - // write the remain modification for new file - if (oldModification != null) { - while (currentMod != null || modsIterator.hasNext()) { - if (currentMod == null) { - currentMod = (Deletion) modsIterator.next(); - } - for (Entry entry : fileModificationMap.entrySet()) { - TsFileIOWriter tsFileIOWriter = entry.getKey(); - ModificationFile newMods = entry.getValue(); - newMods.write( - new Deletion( - currentMod.getPath(), - tsFileIOWriter.getFile().length(), - currentMod.getStartTime(), - currentMod.getEndTime())); - } - currentMod = null; - } - } + + oldTsFileResource.removeModFile(); + } catch (Exception e2) { throw new IOException( "TsFile upgrade process cannot proceed at position " @@ -267,17 +239,22 @@ public String upgradeTsFileName(String oldTsFileName) { * PLAIN encoding, and also add a sum statistic for BOOLEAN data, these types of data need to * decode to points and rewrite in new TsFile. */ - @Override protected boolean checkIfNeedToDecode( - TSDataType dataType, TSEncoding encoding, PageHeader pageHeader) { + TSDataType dataType, + TSEncoding encoding, + PageHeader pageHeader, + UnaryMeasurementSchema schema, + String deviceId, + long chunkHeaderOffset) + throws IllegalPathException { return dataType == TSDataType.BOOLEAN || dataType == TSDataType.TEXT || (dataType == TSDataType.INT32 && encoding == TSEncoding.PLAIN) || StorageEngine.getTimePartition(pageHeader.getStartTime()) - != StorageEngine.getTimePartition(pageHeader.getEndTime()); + != StorageEngine.getTimePartition(pageHeader.getEndTime()) + || super.checkIfNeedToDecode(schema, deviceId, pageHeader, chunkHeaderOffset); } - @Override protected void decodeAndWritePage( UnaryMeasurementSchema schema, ByteBuffer pageData, diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSettleIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSettleIT.java new file mode 100644 index 000000000000..921d25fc2bfe --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSettleIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class IoTDBSettleIT { + private static List sqls = new ArrayList<>(); + private static Connection connection; + + @BeforeClass + public static void setUp() throws Exception { + EnvironmentUtils.closeStatMonitor(); + initCreateSQLStatement(); + EnvironmentUtils.envSetUp(); + executeSql(); + } + + @AfterClass + public static void tearDown() throws Exception { + close(); + EnvironmentUtils.cleanEnv(); + } + + @Test + public void onlineSettleSGTest() { + try (Statement statement = connection.createStatement()) { + statement.execute("settle root.st1"); + } catch (SQLException e) { + Assert.fail(e.getMessage()); + } + } + + private static void close() throws SQLException { + if (Objects.nonNull(connection)) { + try { + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private static void initCreateSQLStatement() { + sqls.add("SET STORAGE GROUP TO root.st1"); + sqls.add("CREATE TIMESERIES root.st1.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN"); + for (int i = 1; i <= 10; i++) { + sqls.add("insert into root.st1.wf01.wt01(timestamp,status) values(" + 100 * i + ",false)"); + } + sqls.add("flush"); + sqls.add("delete from root.st1.wf01.wt01.* where time<500"); + } + + private static void executeSql() throws ClassNotFoundException, SQLException { + Class.forName(Config.JDBC_DRIVER_NAME); + connection = + DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + try (Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.execute(sql); + } + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/tools/TsFileAndModSettleToolTest.java b/server/src/test/java/org/apache/iotdb/db/tools/TsFileAndModSettleToolTest.java new file mode 100644 index 000000000000..caebc4c2db14 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/tools/TsFileAndModSettleToolTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.tools; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; +import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint; +import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; + +public class TsFileAndModSettleToolTest { + private final boolean newEnablePartition = true; + private final long newPartitionInterval = 3600_000; + protected final long maxTimestamp = 50000L; // 100000000L; + protected final String folder = "target" + File.separator + "settle"; + protected final String STORAGE_GROUP = "root.sg_0"; + protected final String DEVICE1 = STORAGE_GROUP + ".device_1"; + protected final String DEVICE2 = STORAGE_GROUP + ".device_2"; + protected final String SENSOR1 = "sensor_1"; + protected final String SENSOR2 = "sensor_2"; + private final long VALUE_OFFSET = 1; + private final Planner processor = new Planner(); + private String path = null; + private IoTDBConfig config; + private boolean originEnablePartition; + private long originPartitionInterval; + + @Before + public void setUp() { + EnvironmentUtils.envSetUp(); + + config = IoTDBDescriptor.getInstance().getConfig(); + originEnablePartition = config.isEnablePartition(); + originPartitionInterval = config.getPartitionInterval(); + + config.setEnablePartition(newEnablePartition); + config.setPartitionInterval(newPartitionInterval); + + StorageEngine.setEnablePartition(newEnablePartition); + StorageEngine.setTimePartitionInterval(newPartitionInterval); + + File f = new File(folder); + if (!f.exists()) { + boolean success = f.mkdir(); + Assert.assertTrue(success); + } + path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0.tsfile"; + } + + @After + public void tearDown() { + File[] fileLists = FSFactoryProducer.getFSFactory().listFilesBySuffix(folder, TSFILE_SUFFIX); + for (File f : fileLists) { + if (f.exists()) { + boolean deleteSuccess = f.delete(); + Assert.assertTrue(deleteSuccess); + } + } + config.setEnablePartition(originEnablePartition); + config.setPartitionInterval(originPartitionInterval); + + StorageEngine.setEnablePartition(originEnablePartition); + StorageEngine.setTimePartitionInterval(originPartitionInterval); + + File directory = new File(folder); + try { + FileUtils.deleteDirectory(directory); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + try { + EnvironmentUtils.cleanEnv(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void settleTsFilesAndModsTest() { // offline settleTool test + try { + List resourcesToBeSettled = createFiles(); + List settledResources = new ArrayList<>(); + for (TsFileResource oldResource : resourcesToBeSettled) { + TsFileAndModSettleTool tsFileAndModSettleTool = TsFileAndModSettleTool.getInstance(); + tsFileAndModSettleTool.settleOneTsFileAndMod(oldResource, settledResources); + } + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + public List createFiles() throws IOException, InterruptedException { + List resourcesToBeSettled = new ArrayList<>(); + HashMap> deviceSensorsMap = new HashMap<>(); + List sensors = new ArrayList<>(); + + // first File + sensors.add(SENSOR1); + deviceSensorsMap.put(DEVICE1, sensors); + String timeseriesPath = STORAGE_GROUP + DEVICE1 + SENSOR1; + createFile(resourcesToBeSettled, deviceSensorsMap, timeseriesPath); + + // second file + path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0.tsfile"; + sensors.add(SENSOR2); + deviceSensorsMap.put(DEVICE1, sensors); + timeseriesPath = STORAGE_GROUP + DEVICE1 + SENSOR2; + createFile(resourcesToBeSettled, deviceSensorsMap, timeseriesPath); + + Thread.sleep(100); + // third file + path = folder + File.separator + System.currentTimeMillis() + "-" + 0 + "-0.tsfile"; + createOneTsFile(deviceSensorsMap); + TsFileResource tsFileResource = new TsFileResource(new File(path)); + tsFileResource.serialize(); + tsFileResource.close(); + resourcesToBeSettled.add(tsFileResource); + + return resourcesToBeSettled; + } + + private void createFile( + List resourcesToBeSettled, + HashMap> deviceSensorsMap, + String timeseriesPath) + throws IOException { + createOneTsFile(deviceSensorsMap); + createlModificationFile(timeseriesPath); + TsFileResource tsFileResource = new TsFileResource(new File(path)); + tsFileResource.setModFile( + new ModificationFile(tsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)); + tsFileResource.serialize(); + tsFileResource.close(); + resourcesToBeSettled.add(tsFileResource); + } + + public void createlModificationFile(String timeseriesPath) { + String modFilePath = path + ModificationFile.FILE_SUFFIX; + ModificationFile modificationFile = new ModificationFile(modFilePath); + List mods = new ArrayList<>(); + try { + PartialPath partialPath = new PartialPath(timeseriesPath); + mods.add(new Deletion(partialPath, 10000000, 1500, 10000)); + mods.add(new Deletion(partialPath, 10000000, 20000, 30000)); + mods.add(new Deletion(partialPath, 10000000, 45000, 50000)); + for (Modification mod : mods) { + modificationFile.write(mod); + } + modificationFile.close(); + } catch (IllegalPathException | IOException e) { + Assert.fail(e.getMessage()); + } + } + + protected void createOneTsFile(HashMap> deviceSensorsMap) { + try { + File f = FSFactoryProducer.getFSFactory().getFile(path); + TsFileWriter tsFileWriter = new TsFileWriter(f); + // add measurements into file schema + try { + for (Map.Entry> entry : deviceSensorsMap.entrySet()) { + String device = entry.getKey(); + for (String sensor : entry.getValue()) { + tsFileWriter.registerTimeseries( + new Path(device, sensor), + new UnaryMeasurementSchema(sensor, TSDataType.INT64, TSEncoding.RLE)); + } + } + } catch (WriteProcessException e) { + Assert.fail(e.getMessage()); + } + + for (long timestamp = 0; timestamp < maxTimestamp; timestamp += 1000) { + for (Map.Entry> entry : deviceSensorsMap.entrySet()) { + String device = entry.getKey(); + TSRecord tsRecord = new TSRecord(timestamp, device); + for (String sensor : entry.getValue()) { + DataPoint dataPoint = new LongDataPoint(sensor, timestamp + VALUE_OFFSET); + tsRecord.addTuple(dataPoint); + } + tsFileWriter.write(tsRecord); + } + } + tsFileWriter.flushAllChunkGroups(); + tsFileWriter.close(); + } catch (Throwable e) { + Assert.fail(e.getMessage()); + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java index ba3b9a42dee3..469f6c69eb8a 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/TsFileRewriteToolTest.java @@ -21,8 +21,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.Planner; import org.apache.iotdb.db.qp.executor.IPlanExecutor; import org.apache.iotdb.db.qp.executor.PlanExecutor; @@ -60,28 +65,22 @@ public class TsFileRewriteToolTest { - private String path = null; - - private IoTDBConfig config; - private boolean originEnablePartition; - private long originPartitionInterval; - private final boolean newEnablePartition = true; private final long newPartitionInterval = 3600_000; - - private final long maxTimestamp = 100000000L; - - private final String folder = "target" + File.separator + "split"; - - private final String STORAGE_GROUP = "root.sg_0"; - private final String DEVICE1 = STORAGE_GROUP + ".device_1"; - private final String DEVICE2 = STORAGE_GROUP + ".device_2"; - private final String SENSOR1 = "sensor_1"; - private final String SENSOR2 = "sensor_2"; + protected final long maxTimestamp = 100000000L; + protected final String folder = "target" + File.separator + "split"; + protected final String STORAGE_GROUP = "root.sg_0"; + protected final String DEVICE1 = STORAGE_GROUP + ".device_1"; + protected final String DEVICE2 = STORAGE_GROUP + ".device_2"; + protected final String SENSOR1 = "sensor_1"; + protected final String SENSOR2 = "sensor_2"; private final long VALUE_OFFSET = 1; - private final IPlanExecutor queryExecutor = new PlanExecutor(); private final Planner processor = new Planner(); + private String path = null; + private IoTDBConfig config; + private boolean originEnablePartition; + private long originPartitionInterval; public TsFileRewriteToolTest() throws QueryProcessException {} @@ -199,13 +198,46 @@ public void loadFileWithOnlyOnePageTest() { } } + private void createFile( + List resourcesToBeSettled, + HashMap> deviceSensorsMap, + String timeseriesPath) + throws IOException { + createOneTsFile(deviceSensorsMap); + createlModificationFile(timeseriesPath); + TsFileResource tsFileResource = new TsFileResource(new File(path)); + tsFileResource.setModFile( + new ModificationFile(tsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)); + tsFileResource.serialize(); + tsFileResource.close(); + resourcesToBeSettled.add(tsFileResource); + } + + public void createlModificationFile(String timeseriesPath) { + String modFilePath = path + ModificationFile.FILE_SUFFIX; + ModificationFile modificationFile = new ModificationFile(modFilePath); + List mods = new ArrayList<>(); + try { + PartialPath partialPath = new PartialPath(timeseriesPath); + mods.add(new Deletion(partialPath, 10000000, 1500, 10000)); + mods.add(new Deletion(partialPath, 10000000, 20000, 30000)); + mods.add(new Deletion(partialPath, 10000000, 45000, 50000)); + for (Modification mod : mods) { + modificationFile.write(mod); + } + modificationFile.close(); + } catch (IllegalPathException | IOException e) { + Assert.fail(e.getMessage()); + } + } + private void splitFileAndQueryCheck(HashMap> deviceSensorsMap) { File tsFile = new File(path); TsFileResource tsFileResource = new TsFileResource(tsFile); List splitResource = new ArrayList<>(); try { TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResource); - } catch (IOException | WriteProcessException e) { + } catch (IOException | WriteProcessException | IllegalPathException e) { Assert.fail(e.getMessage()); } Assert.assertEquals(maxTimestamp / newPartitionInterval + 1, splitResource.size()); @@ -262,7 +294,7 @@ private void createOneTsFileWithOnlyOnePage(HashMap> device } } - private void createOneTsFile(HashMap> deviceSensorsMap) { + protected void createOneTsFile(HashMap> deviceSensorsMap) { try { File f = FSFactoryProducer.getFSFactory().getFile(path); TsFileWriter tsFileWriter = new TsFileWriter(f); @@ -388,7 +420,7 @@ private void splitTwoPagesFileAndQueryCheck(String device, String sensor) { List splitResource = new ArrayList<>(); try { TsFileRewriteTool.rewriteTsFile(tsFileResource, splitResource); - } catch (IOException | WriteProcessException e) { + } catch (IOException | WriteProcessException | IllegalPathException e) { Assert.fail(e.getMessage()); } Assert.assertEquals(2, splitResource.size()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index bbc4f6d6cdc2..322bdc27508d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -75,7 +75,7 @@ public class ChunkMetadata implements Accountable, IChunkMetadata { private String filePath; private byte mask; - private ChunkMetadata() {} + public ChunkMetadata() {} /** * constructor of ChunkMetaData.