Skip to content

Commit

Permalink
Merge 1f5d4e5 into 4a760ac
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Jan 14, 2020
2 parents 4a760ac + 1f5d4e5 commit d46dc73
Showing 1 changed file with 11 additions and 1 deletion.
Expand Up @@ -43,6 +43,7 @@ public class MemTableFlushTask {
private static final Logger logger = LoggerFactory.getLogger(MemTableFlushTask.class);
private static final FlushSubTaskPoolManager subTaskPoolManager = FlushSubTaskPoolManager
.getInstance();
private Future encodingTaskFuture;
private Future ioTaskFuture;
private RestorableTsFileIOWriter writer;

Expand All @@ -61,7 +62,7 @@ public MemTableFlushTask(IMemTable memTable, Schema schema, RestorableTsFileIOWr
this.schema = schema;
this.writer = writer;
this.storageGroup = storageGroup;
subTaskPoolManager.submit(encodingTask);
this.encodingTaskFuture = subTaskPoolManager.submit(encodingTask);
this.ioTaskFuture = subTaskPoolManager.submit(ioTask);
logger.debug("flush task of Storage group {} memtable {} is created ",
storageGroup, memTable.getVersion());
Expand Down Expand Up @@ -94,6 +95,15 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
storageGroup, memTable.getVersion(), sortTime);

try {
encodingTaskFuture.get();
} catch (InterruptedException | ExecutionException e) {
// avoid ioTask waiting forever
noMoreIOTask = true;
ioTaskFuture.cancel(true);
throw e;
}

ioTaskFuture.get();

logger.info(
Expand Down

0 comments on commit d46dc73

Please sign in to comment.