From ba7ff96f39e5e6835218eeb784b72f60cb0dde3d Mon Sep 17 00:00:00 2001 From: Jiang Tian Date: Fri, 3 Jan 2020 14:41:16 +0800 Subject: [PATCH] abstract TsFileFlushPolicy and allow specifying storage groups in flush command (#685) * abstract TsFileFlushPolicy and allow specifying storage groups in flush command --- .../apache/iotdb/db/engine/StorageEngine.java | 54 +++++++--------- .../db/engine/flush/TsFileFlushPolicy.java | 54 ++++++++++++++++ .../storagegroup/StorageGroupProcessor.java | 37 ++++------- .../engine/storagegroup/TsFileProcessor.java | 6 +- .../iotdb/db/service/TSServiceImpl.java | 31 ++++++++- .../engine/cache/DeviceMetaDataCacheTest.java | 3 +- .../StorageGroupProcessorTest.java | 3 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 3 +- .../integration/IoTDBFlushQueryMergeTest.java | 64 ++++++++++++++++++- .../db/query/reader/ReaderTestHelper.java | 3 +- 10 files changed, 188 insertions(+), 70 deletions(-) create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index f379f0d06d3e..6e95fdfda347 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; @@ -39,6 +38,8 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -48,6 +49,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; import org.apache.iotdb.db.exception.storageGroup.StorageGroupException; +import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException; import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.MNode; @@ -71,7 +73,7 @@ public class StorageEngine implements IService { private final Logger logger; private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private static final long TTL_CHECK_INTERVAL = 60 * 1000; + private static final long TTL_CHECK_INTERVAL = 60 * 1000L; /** * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor @@ -94,6 +96,7 @@ public static StorageEngine getInstance() { } private ScheduledExecutorService ttlCheckThread; + private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy(); private StorageEngine() { logger = LoggerFactory.getLogger(StorageEngine.class); @@ -116,7 +119,7 @@ private StorageEngine() { for (MNode storageGroup : sgNodes) { futures.add(recoveryThreadPool.submit((Callable) () -> { StorageGroupProcessor processor = new StorageGroupProcessor(systemDir, - storageGroup.getFullPath()); + storageGroup.getFullPath(), fileFlushPolicy); processor.setDataTTL(storageGroup.getDataTTL()); processorMap.put(storageGroup.getFullPath(), processor); logger.info("Storage Group Processor {} is recovered successfully", @@ -182,7 +185,7 @@ public StorageGroupProcessor getProcessor(String path) throws StorageEngineExcep if (processor == null) { logger.info("construct a processor instance, the storage group is {}, Thread is {}", storageGroupName, Thread.currentThread().getId()); - processor = new StorageGroupProcessor(systemDir, storageGroupName); + processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy); processor.setDataTTL( MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL()); processorMap.put(storageGroupName, processor); @@ -264,6 +267,21 @@ public void syncCloseAllProcessor() { } } + public void asyncCloseProcessor(String storageGroupName, boolean isSeq) + throws StorageGroupNotSetException { + StorageGroupProcessor processor = processorMap.get(storageGroupName); + if (processor != null) { + processor.writeLock(); + try { + processor.moveOneWorkProcessorToClosingList(isSeq); + } finally { + processor.writeUnlock(); + } + } else { + throw new StorageGroupNotSetException(storageGroupName); + } + } + /** * update data. */ @@ -307,34 +325,6 @@ public Set calTopKMeasurement(String deviceId, String sensorId, double k) return storageGroupProcessor.calTopKMeasurement(sensorId, k); } - /** - * Append one specified tsfile to the storage group. This method is only provided for - * transmission module - * - * @param storageGroupName the seriesPath of storage group - * @param appendFile the appended tsfile information - */ - @SuppressWarnings("unused") // reimplement sync module - public boolean appendFileToStorageGroupProcessor(String storageGroupName, - TsFileResource appendFile, - String appendFilePath) throws StorageEngineException { - // TODO reimplement sync module - return true; - } - - /** - * get all overlap TsFiles which are conflict with the appendFile. - * - * @param storageGroupName the seriesPath of storage group - * @param appendFile the appended tsfile information - */ - @SuppressWarnings("unused") // reimplement sync module - public List getOverlapFiles(String storageGroupName, TsFileResource appendFile, - String uuid) throws StorageEngineException { - // TODO reimplement sync module - return Collections.emptyList(); - } - /** * count all Tsfiles which need to be upgraded * diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java new file mode 100644 index 000000000000..06d8d191f8ec --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.flush; + +import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; +import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone + * IoTDB, the flush or close is executed without constraint. But in the distributed version, the + * close is controlled by the leader and should not be performed by the follower alone. + */ +public interface TsFileFlushPolicy { + + void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq); + + class DirectFlushPolicy implements TsFileFlushPolicy{ + + private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class); + + @Override + public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor, + boolean isSeq) { + logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}", + tsFileProcessor.getWorkMemTableMemory(), + tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); + + if (tsFileProcessor.shouldClose()) { + storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq); + } else { + tsFileProcessor.asyncFlush(); + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index d58b83a8a8a1..810295560399 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.merge.manage.MergeResource; import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector; @@ -184,14 +185,16 @@ public class StorageGroupProcessor { private long dataTTL = Long.MAX_VALUE; private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + private TsFileFlushPolicy fileFlushPolicy; // allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not // including the files generated by merge private Set allDirectFileVersions = new HashSet<>(); - public StorageGroupProcessor(String systemInfoDir, String storageGroupName) + public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException { this.storageGroupName = storageGroupName; + this.fileFlushPolicy = fileFlushPolicy; // construct the file schema this.schema = constructSchema(storageGroupName); @@ -448,15 +451,7 @@ private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan, // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { - logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}", - tsFileProcessor.getWorkMemTableMemory(), - tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); - - if (tsFileProcessor.shouldClose()) { - moveOneWorkProcessorToClosingList(sequence); - } else { - tsFileProcessor.asyncFlush(); - } + fileFlushPolicy.apply(this, tsFileProcessor, sequence); } } @@ -481,15 +476,7 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { - logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}", - tsFileProcessor.getWorkMemTableMemory(), - tsFileProcessor.getTsFileResource().getFile().getAbsolutePath()); - - if (tsFileProcessor.shouldClose()) { - moveOneWorkProcessorToClosingList(sequence); - } else { - tsFileProcessor.asyncFlush(); - } + fileFlushPolicy.apply(this, tsFileProcessor, sequence); } } @@ -552,18 +539,18 @@ private TsFileProcessor createTsFileProcessor(boolean sequence) /** - * only called by insert(), thread-safety should be ensured by caller + * thread-safety should be ensured by caller */ - private void moveOneWorkProcessorToClosingList(boolean sequence) { + public void moveOneWorkProcessorToClosingList(boolean sequence) { //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. - if (sequence) { + if (sequence && workSequenceTsFileProcessor != null) { closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor); updateEndTimeMap(workSequenceTsFileProcessor); workSequenceTsFileProcessor.asyncClose(); workSequenceTsFileProcessor = null; logger.info("close a sequence tsfile processor {}", storageGroupName); - } else { + } else if (workUnSequenceTsFileProcessor != null){ closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor); workUnSequenceTsFileProcessor.asyncClose(); workUnSequenceTsFileProcessor = null; @@ -777,11 +764,11 @@ public Set calTopKMeasurement(String sensorId, double k) { return sensorSet; } - private void writeLock() { + public void writeLock() { insertLock.writeLock().lock(); } - private void writeUnlock() { + public void writeUnlock() { insertLock.writeLock().unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index cdf649eb81f5..2680b96dee2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -226,7 +226,7 @@ public void deleteDataInMemory(Deletion deletion) { } } - TsFileResource getTsFileResource() { + public TsFileResource getTsFileResource() { return tsFileResource; } @@ -252,7 +252,7 @@ private long getMemtableSizeThresholdBasedOnSeriesNum() { } - boolean shouldClose() { + public boolean shouldClose() { long fileSize = tsFileResource.getFileSize(); long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig() .getTsFileSizeThreshold(); @@ -543,7 +543,7 @@ public int getFlushingMemTableSize() { return flushingMemTables.size(); } - long getWorkMemTableMemory() { + public long getWorkMemTableMemory() { return workMemTable.memSize(); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 1f7357233fd7..7fb8456f6904 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -56,6 +56,7 @@ import org.apache.iotdb.db.exception.path.PathException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.SQLParserException; +import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metrics.server.SqlArgument; import org.apache.iotdb.db.qp.QueryProcessor; @@ -469,10 +470,15 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag return false; } statement = statement.toLowerCase(); + if (statement.startsWith("flush")) { + try { + execFlush(statement); + } catch (StorageGroupNotSetException e) { + throw new StorageEngineException(e); + } + return true; + } switch (statement) { - case "flush": - StorageEngine.getInstance().syncCloseAllProcessor(); - return true; case "merge": StorageEngine.getInstance() .mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); @@ -485,6 +491,25 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag } } + private void execFlush(String statement) throws StorageGroupNotSetException { + String[] args = statement.split("\\s+"); + if (args.length == 1) { + StorageEngine.getInstance().syncCloseAllProcessor(); + } else if (args.length == 2){ + String[] storageGroups = args[1].split(","); + for (String storageGroup : storageGroups) { + StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true); + StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false); + } + } else { + String[] storageGroups = args[1].split(","); + boolean isSeq = Boolean.parseBoolean(args[2]); + for (String storageGroup : storageGroups) { + StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq); + } + } + } + @Override public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) { long t1 = System.currentTimeMillis(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java index d14e21e1e7b7..5cec03d1b910 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -68,7 +69,7 @@ public void setUp() throws Exception { EnvironmentUtils.envSetUp(); MetadataManagerHelper.initMetadata(); ActiveTimeSeriesCounter.getInstance().init(storageGroup); - storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup); + storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy()); insertData(); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 410de23b6eca..aed98a1a89c5 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; @@ -262,7 +263,7 @@ public void testMerge() throws QueryProcessException { class DummySGP extends StorageGroupProcessor { DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException { - super(systemInfoDir, storageGroupName); + super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy()); } @Override diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 334cfe28555f..318706370ffd 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.directories.DirectoryManager; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.StartupException; @@ -90,7 +91,7 @@ private void createSchemas() MManager.getInstance().setStorageGroupToMTree(sg1); MManager.getInstance().setStorageGroupToMTree(sg2); storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig() - .getSystemDir(), sg1); + .getSystemDir(), sg1, new DirectFlushPolicy()); MManager.getInstance().addPathToMTree(g1s1, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, Collections.emptyMap()); storageGroupProcessor.addMeasurement("s1", TSDataType.INT64, TSEncoding.PLAIN, diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java index 286b6ced75ae..c3c9513b36c3 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFlushQueryMergeTest.java @@ -18,6 +18,14 @@ */ package org.apache.iotdb.db.integration; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; @@ -25,8 +33,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.sql.*; -import static org.junit.Assert.fail; public class IoTDBFlushQueryMergeTest { @@ -81,7 +87,7 @@ private static void insertData() throws ClassNotFoundException { } @Test - public void selectAllSQLTest() throws ClassNotFoundException, SQLException { + public void selectAllSQLTest() throws ClassNotFoundException { Class.forName(Config.JDBC_DRIVER_NAME); try (Connection connection = DriverManager @@ -102,4 +108,56 @@ public void selectAllSQLTest() throws ClassNotFoundException, SQLException { fail(e.getMessage()); } } + + @Test + public void testFlushGivenGroup() throws ClassNotFoundException { + Class.forName(Config.JDBC_DRIVER_NAME); + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true); + String insertTemplate = + "INSERT INTO root.group%d(timestamp, s1, s2, s3) VALUES (%d, %d, %f, %s)"; + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + statement.execute("SET STORAGE GROUP TO root.group1"); + statement.execute("SET STORAGE GROUP TO root.group2"); + statement.execute("SET STORAGE GROUP TO root.group3"); + + for (int i = 1; i <= 3; i++) { + for (int j = 10; j < 20; j++) { + statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j))); + } + } + statement.execute("FLUSH"); + + for (int i = 1; i <= 3; i++) { + for (int j = 0; j < 10; j++) { + statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j))); + } + } + statement.execute("FLUSH root.group1"); + statement.execute("FLUSH root.group2,root.group3"); + + for (int i = 1; i <= 3; i++) { + for (int j = 0; j < 30; j++) { + statement.execute(String.format(insertTemplate, i, j, j, j*0.1, String.valueOf(j))); + } + } + statement.execute("FLUSH root.group1 true"); + statement.execute("FLUSH root.group2,root.group3 false"); + + ResultSet resultSet = statement.executeQuery("SELECT * from root.group1,root.group2,root" + + ".group3"); + int i = 0; + while (resultSet.next()) { + i ++; + } + assertEquals(30, i); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false); + } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java index 2677c9bf386d..5173405071c7 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.MetadataManagerHelper; +import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; @@ -52,7 +53,7 @@ public void setUp() throws Exception { EnvironmentUtils.envSetUp(); MetadataManagerHelper.initMetadata(); ActiveTimeSeriesCounter.getInstance().init(storageGroup); - storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup); + storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy()); insertData(); }