Skip to content

Commit

Permalink
fix flush
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Mar 17, 2020
1 parent f9fd91f commit 9302734
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ public TSStatus[] insertBatch(BatchInsertPlan batchInsertPlan) throws StorageEng
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
for (StorageGroupProcessor processor : processorMap.values()) {
processor.waitForAllCurrentTsFileProcessorsClosed();
//TODO do we need to wait for all merging tasks to be finished here?
processor.closeAllResources();
processor.syncCloseAllTsFileProcessors();
}
}

Expand All @@ -321,13 +319,13 @@ public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkSequenceTsFileProcessors())) {
processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
}
} else {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkUnsequenceTsFileProcessor())) {
processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor);
processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor t
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor);
storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
} else {
tsFileProcessor.asyncFlush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
* <p>
* (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
* (2) someone calls syncCloseAllTsFileProcessors(). (up to now, only flush command from
* cli will call this method)<br/>
* <p>
* UnSequence data has the similar process as above.
Expand Down Expand Up @@ -745,7 +745,7 @@ private TsFileProcessor getOrCreateTsFileProcessorIntern(long timeRangeId,
tsFileProcessorTreeMap.size(),
IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2,
storageGroupName);
moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue());
asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
}

// build new processor
Expand Down Expand Up @@ -820,8 +820,7 @@ private String getNewTsFileName(long time, long version, int mergeCnt) {
/**
* thread-safety should be ensured by caller
*/
public void moveOneWorkProcessorToClosingList(boolean sequence,
TsFileProcessor tsFileProcessor) {
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (sequence) {
Expand Down Expand Up @@ -852,8 +851,8 @@ public void moveOneWorkProcessorToClosingList(boolean sequence,
*/
public void deleteFolder(String systemDir) {
logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
waitForAllCurrentTsFileProcessorsClosed();
writeLock();
syncCloseAllTsFileProcessors();
try {
File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
if (storageGroupFolder.exists()) {
Expand Down Expand Up @@ -885,7 +884,8 @@ public void closeAllResources() {

public void syncDeleteDataFiles() {
logger.info("{} will close all files for deleting data files", storageGroupName);
waitForAllCurrentTsFileProcessorsClosed();
writeLock();
syncCloseAllTsFileProcessors();
//normally, mergingModification is just need to be closed by after a merge task is finished.
//we close it here just for IT test.
if (this.mergingModification != null) {
Expand All @@ -896,7 +896,6 @@ public void syncDeleteDataFiles() {
}

}
writeLock();
try {
closeAllResources();
List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
Expand Down Expand Up @@ -995,39 +994,44 @@ private void checkFileTTL(TsFileResource resource, long timeLowerBound, boolean
/**
* This method will be blocked until all tsfile processors are closed.
*/
public void waitForAllCurrentTsFileProcessorsClosed() {
synchronized (closeStorageGroupCondition) {
try {
putAllWorkingTsFileProcessorIntoClosingList();
long startTime = System.currentTimeMillis();
while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
.isEmpty()) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
(System.currentTimeMillis() - startTime)/1000);
public void syncCloseAllTsFileProcessors() {
writeLock();
try {
synchronized (closeStorageGroupCondition) {
try {
asyncCloseAllTsFileProcessors();
long startTime = System.currentTimeMillis();
while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
.isEmpty()) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
(System.currentTimeMillis() - startTime)/1000);
}
}
} catch (InterruptedException e) {
logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
+ "group {}", storageGroupName, e);
}
} catch (InterruptedException e) {
logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
+ "group {}", storageGroupName, e);
}
} finally {
writeUnlock();
}
}

public void putAllWorkingTsFileProcessorIntoClosingList() {
public void asyncCloseAllTsFileProcessors() {
writeLock();
try {
logger.info("async force close all files in storage group: {}", storageGroupName);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workSequenceTsFileProcessors.values())) {
moveOneWorkProcessorToClosingList(true, tsFileProcessor);
asyncCloseOneTsFileProcessor(true, tsFileProcessor);
}
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workUnsequenceTsFileProcessors.values())) {
moveOneWorkProcessorToClosingList(false, tsFileProcessor);
asyncCloseOneTsFileProcessor(false, tsFileProcessor);
}
} finally {
writeUnlock();
Expand Down Expand Up @@ -1370,7 +1374,7 @@ public void merge(boolean fullMerge) {
}
logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
fullMerge);
waitForAllCurrentTsFileProcessorsClosed();
syncCloseAllTsFileProcessors();
if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
logger.info("{} no files to be merged", storageGroupName);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ protected void insertData() throws IOException, QueryProcessException {
for (int j = 11; j <= 20; j++) {
insertOneRecord(j, j);
}
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
storageGroupProcessor.asyncCloseAllTsFileProcessors();

for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, 0); // will be covered when read
}
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
storageGroupProcessor.syncCloseAllTsFileProcessors();

for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, j);
}
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
storageGroupProcessor.syncCloseAllTsFileProcessors();

insertOneRecord(2, 100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testUnseqUnsealedDelete() throws QueryProcessException, IOException
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();

for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
Expand Down Expand Up @@ -137,10 +137,10 @@ public void testSequenceSyncClose() throws QueryProcessException {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();
}

processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);

Expand Down Expand Up @@ -178,7 +178,7 @@ public void testIoTDBRowBatchWriteAndSyncClose() throws WriteProcessException {
batchInsertPlan1.setRowCount(times.length);

processor.insertBatch(batchInsertPlan1);
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();

BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements,
dataTypes);
Expand All @@ -193,8 +193,8 @@ public void testIoTDBRowBatchWriteAndSyncClose() throws WriteProcessException {
batchInsertPlan2.setRowCount(times.length);

processor.insertBatch(batchInsertPlan2);
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.waitForAllCurrentTsFileProcessorsClosed();
processor.asyncCloseAllTsFileProcessors();
processor.syncCloseAllTsFileProcessors();

QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
Expand All @@ -214,18 +214,18 @@ public void testSeqAndUnSeqSyncClose() throws QueryProcessException {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();
}
processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();

for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();
}

processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();

QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
Expand All @@ -247,18 +247,18 @@ public void testMerge() throws QueryProcessException {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();
}
processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();

for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
processor.putAllWorkingTsFileProcessorIntoClosingList();
processor.asyncCloseAllTsFileProcessors();
}

processor.waitForAllCurrentTsFileProcessorsClosed();
processor.syncCloseAllTsFileProcessors();
processor.merge(true);
while (mergeLock.get() == 0) {
// wait
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void setUp()

@After
public void tearDown() throws IOException, StorageEngineException {
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
storageGroupProcessor.syncCloseAllTsFileProcessors();
EnvironmentUtils.cleanEnv();
}

Expand Down Expand Up @@ -160,15 +160,15 @@ private void prepareData() throws QueryProcessException {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
storageGroupProcessor.asyncCloseAllTsFileProcessors();
}
}
// unsequence data
for (int i = 0; i < 1000; i++) {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
storageGroupProcessor.asyncCloseAllTsFileProcessors();
}
}
}
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testTTLRead() throws IOException, QueryProcessException, StorageEngi
public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
prepareData();

storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
storageGroupProcessor.syncCloseAllTsFileProcessors();

// files before ttl
File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testShowTTL()
@Test
public void testTTLCleanFile() throws QueryProcessException {
prepareData();
storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
storageGroupProcessor.syncCloseAllTsFileProcessors();

assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());
Expand Down

0 comments on commit 9302734

Please sign in to comment.