Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abstract TsFileFlushPolicy and allow specifying storage groups in flush command #685

Merged
merged 5 commits into from Jan 3, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 22 additions & 32 deletions server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Expand Up @@ -21,7 +21,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +38,8 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand All @@ -48,6 +49,7 @@
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MNode;
Expand All @@ -71,7 +73,7 @@ public class StorageEngine implements IService {

private final Logger logger;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000;
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;

/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
Expand All @@ -94,6 +96,7 @@ public static StorageEngine getInstance() {
}

private ScheduledExecutorService ttlCheckThread;
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();

private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
Expand All @@ -116,7 +119,7 @@ private StorageEngine() {
for (MNode storageGroup : sgNodes) {
futures.add(recoveryThreadPool.submit((Callable<Void>) () -> {
StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
storageGroup.getFullPath());
storageGroup.getFullPath(), fileFlushPolicy);
processor.setDataTTL(storageGroup.getDataTTL());
processorMap.put(storageGroup.getFullPath(), processor);
logger.info("Storage Group Processor {} is recovered successfully",
Expand Down Expand Up @@ -182,7 +185,7 @@ public StorageGroupProcessor getProcessor(String path) throws StorageEngineExcep
if (processor == null) {
logger.info("construct a processor instance, the storage group is {}, Thread is {}",
storageGroupName, Thread.currentThread().getId());
processor = new StorageGroupProcessor(systemDir, storageGroupName);
processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy);
processor.setDataTTL(
MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL());
processorMap.put(storageGroupName, processor);
Expand Down Expand Up @@ -264,6 +267,21 @@ public void syncCloseAllProcessor() {
}
}

public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
throws StorageGroupNotSetException {
StorageGroupProcessor processor = processorMap.get(storageGroupName);
if (storageGroupName != null) {
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
processor.writeLock();
try {
processor.moveOneWorkProcessorToClosingList(isSeq);
} finally {
processor.writeUnlock();
}
} else {
throw new StorageGroupNotSetException(storageGroupName);
}
}

/**
* update data.
*/
Expand Down Expand Up @@ -307,34 +325,6 @@ public Set calTopKMeasurement(String deviceId, String sensorId, double k)
return storageGroupProcessor.calTopKMeasurement(sensorId, k);
}

/**
* Append one specified tsfile to the storage group. <b>This method is only provided for
* transmission module</b>
*
* @param storageGroupName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
@SuppressWarnings("unused") // reimplement sync module
public boolean appendFileToStorageGroupProcessor(String storageGroupName,
TsFileResource appendFile,
String appendFilePath) throws StorageEngineException {
// TODO reimplement sync module
return true;
}

/**
* get all overlap TsFiles which are conflict with the appendFile.
*
* @param storageGroupName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
@SuppressWarnings("unused") // reimplement sync module
public List<String> getOverlapFiles(String storageGroupName, TsFileResource appendFile,
String uuid) throws StorageEngineException {
// TODO reimplement sync module
return Collections.emptyList();
}

/**
* count all Tsfiles which need to be upgraded
*
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.flush;

import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
* IoTDB, the flush or close is executed without constraint. But in the distributed version, the
* close is controlled by the leader and should not be performed by the follower alone.
*/
public interface TsFileFlushPolicy {

void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq);

class DirectFlushPolicy implements TsFileFlushPolicy{

private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);

@Override
public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
boolean isSeq) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq);
} else {
tsFileProcessor.asyncFlush();
}
}
}
}
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
Expand Down Expand Up @@ -184,10 +185,12 @@ public class StorageGroupProcessor {
private long dataTTL = Long.MAX_VALUE;

private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private TsFileFlushPolicy fileFlushPolicy;

public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
this.fileFlushPolicy = fileFlushPolicy;

// construct the file schema
this.schema = constructSchema(storageGroupName);
Expand Down Expand Up @@ -437,15 +440,7 @@ private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,

// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
moveOneWorkProcessorToClosingList(sequence);
} else {
tsFileProcessor.asyncFlush();
}
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}

Expand All @@ -470,15 +465,7 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)

// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
moveOneWorkProcessorToClosingList(sequence);
} else {
tsFileProcessor.asyncFlush();
}
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}

Expand Down Expand Up @@ -540,18 +527,18 @@ private TsFileProcessor createTsFileProcessor(boolean sequence)


/**
* only called by insert(), thread-safety should be ensured by caller
* thread-safety should be ensured by caller
*/
private void moveOneWorkProcessorToClosingList(boolean sequence) {
public void moveOneWorkProcessorToClosingList(boolean sequence) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (sequence) {
if (sequence && workSequenceTsFileProcessor != null) {
closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
updateEndTimeMap(workSequenceTsFileProcessor);
workSequenceTsFileProcessor.asyncClose();
workSequenceTsFileProcessor = null;
logger.info("close a sequence tsfile processor {}", storageGroupName);
} else {
} else if (workUnSequenceTsFileProcessor != null){
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
workUnSequenceTsFileProcessor = null;
Expand Down Expand Up @@ -765,11 +752,11 @@ public Set calTopKMeasurement(String sensorId, double k) {
return sensorSet;
}

private void writeLock() {
public void writeLock() {
insertLock.writeLock().lock();
}

private void writeUnlock() {
public void writeUnlock() {
insertLock.writeLock().unlock();
}

Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
Expand Down Expand Up @@ -224,7 +223,7 @@ public void deleteDataInMemory(Deletion deletion) {
}
}

TsFileResource getTsFileResource() {
public TsFileResource getTsFileResource() {
return tsFileResource;
}

Expand All @@ -250,7 +249,7 @@ private long getMemtableSizeThresholdBasedOnSeriesNum() {
}


boolean shouldClose() {
public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
.getTsFileSizeThreshold();
Expand Down Expand Up @@ -541,7 +540,7 @@ public int getFlushingMemTableSize() {
return flushingMemTables.size();
}

long getWorkMemTableMemory() {
public long getWorkMemTableMemory() {
return workMemTable.memSize();
}

Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.qp.QueryProcessor;
Expand Down Expand Up @@ -469,10 +470,15 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag
return false;
}
statement = statement.toLowerCase();
if (statement.startsWith("flush")) {
try {
execFlush(statement);
} catch (StorageGroupNotSetException e) {
throw new StorageEngineException(e);
}
return true;
}
switch (statement) {
case "flush":
StorageEngine.getInstance().syncCloseAllProcessor();
return true;
case "merge":
StorageEngine.getInstance()
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
Expand All @@ -485,6 +491,25 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag
}
}

private void execFlush(String statement) throws StorageGroupNotSetException {
String[] args = statement.split("\\s+");
if (args.length == 1) {
StorageEngine.getInstance().syncCloseAllProcessor();
} else if (args.length == 2){
String[] storageGroups = args[1].split(",");
for (String storageGroup : storageGroups) {
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
}
} else {
String[] storageGroups = args[1].split(",");
boolean isSeq = Boolean.parseBoolean(args[2]);
for (String storageGroup : storageGroups) {
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
}
}
}

@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
insertData();
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testMerge() throws QueryProcessException {
class DummySGP extends StorageGroupProcessor {

DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
super(systemInfoDir, storageGroupName);
super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
}

@Override
Expand Down