Skip to content

Commit

Permalink
Merge fe7b0fe into 0734ec1
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Jan 3, 2020
2 parents 0734ec1 + fe7b0fe commit 4e8cf64
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 70 deletions.
54 changes: 22 additions & 32 deletions server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Expand Up @@ -21,7 +21,6 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
Expand All @@ -39,6 +38,8 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand All @@ -48,6 +49,7 @@
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.MNode;
Expand All @@ -71,7 +73,7 @@ public class StorageEngine implements IService {

private final Logger logger;
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000;
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;

/**
* a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
Expand All @@ -94,6 +96,7 @@ public static StorageEngine getInstance() {
}

private ScheduledExecutorService ttlCheckThread;
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();

private StorageEngine() {
logger = LoggerFactory.getLogger(StorageEngine.class);
Expand All @@ -116,7 +119,7 @@ private StorageEngine() {
for (MNode storageGroup : sgNodes) {
futures.add(recoveryThreadPool.submit((Callable<Void>) () -> {
StorageGroupProcessor processor = new StorageGroupProcessor(systemDir,
storageGroup.getFullPath());
storageGroup.getFullPath(), fileFlushPolicy);
processor.setDataTTL(storageGroup.getDataTTL());
processorMap.put(storageGroup.getFullPath(), processor);
logger.info("Storage Group Processor {} is recovered successfully",
Expand Down Expand Up @@ -182,7 +185,7 @@ public StorageGroupProcessor getProcessor(String path) throws StorageEngineExcep
if (processor == null) {
logger.info("construct a processor instance, the storage group is {}, Thread is {}",
storageGroupName, Thread.currentThread().getId());
processor = new StorageGroupProcessor(systemDir, storageGroupName);
processor = new StorageGroupProcessor(systemDir, storageGroupName, fileFlushPolicy);
processor.setDataTTL(
MManager.getInstance().getNodeByPathWithCheck(storageGroupName).getDataTTL());
processorMap.put(storageGroupName, processor);
Expand Down Expand Up @@ -264,6 +267,21 @@ public void syncCloseAllProcessor() {
}
}

public void asyncCloseProcessor(String storageGroupName, boolean isSeq)
throws StorageGroupNotSetException {
StorageGroupProcessor processor = processorMap.get(storageGroupName);
if (processor != null) {
processor.writeLock();
try {
processor.moveOneWorkProcessorToClosingList(isSeq);
} finally {
processor.writeUnlock();
}
} else {
throw new StorageGroupNotSetException(storageGroupName);
}
}

/**
* update data.
*/
Expand Down Expand Up @@ -307,34 +325,6 @@ public Set calTopKMeasurement(String deviceId, String sensorId, double k)
return storageGroupProcessor.calTopKMeasurement(sensorId, k);
}

/**
* Append one specified tsfile to the storage group. <b>This method is only provided for
* transmission module</b>
*
* @param storageGroupName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
@SuppressWarnings("unused") // reimplement sync module
public boolean appendFileToStorageGroupProcessor(String storageGroupName,
TsFileResource appendFile,
String appendFilePath) throws StorageEngineException {
// TODO reimplement sync module
return true;
}

/**
* get all overlap TsFiles which are conflict with the appendFile.
*
* @param storageGroupName the seriesPath of storage group
* @param appendFile the appended tsfile information
*/
@SuppressWarnings("unused") // reimplement sync module
public List<String> getOverlapFiles(String storageGroupName, TsFileResource appendFile,
String uuid) throws StorageEngineException {
// TODO reimplement sync module
return Collections.emptyList();
}

/**
* count all Tsfiles which need to be upgraded
*
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.flush;

import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone
* IoTDB, the flush or close is executed without constraint. But in the distributed version, the
* close is controlled by the leader and should not be performed by the follower alone.
*/
public interface TsFileFlushPolicy {

void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor processor, boolean isSeq);

class DirectFlushPolicy implements TsFileFlushPolicy{

private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class);

@Override
public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor,
boolean isSeq) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq);
} else {
tsFileProcessor.asyncFlush();
}
}
}
}
Expand Up @@ -42,6 +42,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
Expand Down Expand Up @@ -184,14 +185,16 @@ public class StorageGroupProcessor {
private long dataTTL = Long.MAX_VALUE;

private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private TsFileFlushPolicy fileFlushPolicy;

// allDirectFileVersions records the versions of the direct TsFiles (generated by flush), not
// including the files generated by merge
private Set<Long> allDirectFileVersions = new HashSet<>();

public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
public StorageGroupProcessor(String systemInfoDir, String storageGroupName, TsFileFlushPolicy fileFlushPolicy)
throws StorageGroupProcessorException {
this.storageGroupName = storageGroupName;
this.fileFlushPolicy = fileFlushPolicy;

// construct the file schema
this.schema = constructSchema(storageGroupName);
Expand Down Expand Up @@ -448,15 +451,7 @@ private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan,

// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
moveOneWorkProcessorToClosingList(sequence);
} else {
tsFileProcessor.asyncFlush();
}
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}

Expand All @@ -481,15 +476,7 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence)

// check memtable size and may asyncTryToFlush the work memtable
if (tsFileProcessor.shouldFlush()) {
logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
tsFileProcessor.getWorkMemTableMemory(),
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());

if (tsFileProcessor.shouldClose()) {
moveOneWorkProcessorToClosingList(sequence);
} else {
tsFileProcessor.asyncFlush();
}
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
}

Expand Down Expand Up @@ -552,18 +539,18 @@ private TsFileProcessor createTsFileProcessor(boolean sequence)


/**
* only called by insert(), thread-safety should be ensured by caller
* thread-safety should be ensured by caller
*/
private void moveOneWorkProcessorToClosingList(boolean sequence) {
public void moveOneWorkProcessorToClosingList(boolean sequence) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (sequence) {
if (sequence && workSequenceTsFileProcessor != null) {
closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
updateEndTimeMap(workSequenceTsFileProcessor);
workSequenceTsFileProcessor.asyncClose();
workSequenceTsFileProcessor = null;
logger.info("close a sequence tsfile processor {}", storageGroupName);
} else {
} else if (workUnSequenceTsFileProcessor != null){
closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
workUnSequenceTsFileProcessor.asyncClose();
workUnSequenceTsFileProcessor = null;
Expand Down Expand Up @@ -777,11 +764,11 @@ public Set calTopKMeasurement(String sensorId, double k) {
return sensorSet;
}

private void writeLock() {
public void writeLock() {
insertLock.writeLock().lock();
}

private void writeUnlock() {
public void writeUnlock() {
insertLock.writeLock().unlock();
}

Expand Down
Expand Up @@ -226,7 +226,7 @@ public void deleteDataInMemory(Deletion deletion) {
}
}

TsFileResource getTsFileResource() {
public TsFileResource getTsFileResource() {
return tsFileResource;
}

Expand All @@ -252,7 +252,7 @@ private long getMemtableSizeThresholdBasedOnSeriesNum() {
}


boolean shouldClose() {
public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
.getTsFileSizeThreshold();
Expand Down Expand Up @@ -543,7 +543,7 @@ public int getFlushingMemTableSize() {
return flushingMemTables.size();
}

long getWorkMemTableMemory() {
public long getWorkMemTableMemory() {
return workMemTable.memSize();
}

Expand Down
Expand Up @@ -56,6 +56,7 @@
import org.apache.iotdb.db.exception.path.PathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.qp.QueryProcessor;
Expand Down Expand Up @@ -469,10 +470,15 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag
return false;
}
statement = statement.toLowerCase();
if (statement.startsWith("flush")) {
try {
execFlush(statement);
} catch (StorageGroupNotSetException e) {
throw new StorageEngineException(e);
}
return true;
}
switch (statement) {
case "flush":
StorageEngine.getInstance().syncCloseAllProcessor();
return true;
case "merge":
StorageEngine.getInstance()
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
Expand All @@ -485,6 +491,25 @@ private boolean execAdminCommand(String statement, long sessionId) throws Storag
}
}

private void execFlush(String statement) throws StorageGroupNotSetException {
String[] args = statement.split("\\s+");
if (args.length == 1) {
StorageEngine.getInstance().syncCloseAllProcessor();
} else if (args.length == 2){
String[] storageGroups = args[1].split(",");
for (String storageGroup : storageGroups) {
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
}
} else {
String[] storageGroups = args[1].split(",");
boolean isSeq = Boolean.parseBoolean(args[2]);
for (String storageGroup : storageGroups) {
StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
}
}
}

@Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
MetadataManagerHelper.initMetadata();
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup);
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
insertData();
}

Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testMerge() throws QueryProcessException {
class DummySGP extends StorageGroupProcessor {

DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
super(systemInfoDir, storageGroupName);
super(systemInfoDir, storageGroupName, new TsFileFlushPolicy.DirectFlushPolicy());
}

@Override
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.StartupException;
Expand Down Expand Up @@ -90,7 +91,7 @@ private void createSchemas()
MManager.getInstance().setStorageGroupToMTree(sg1);
MManager.getInstance().setStorageGroupToMTree(sg2);
storageGroupProcessor = new StorageGroupProcessor(IoTDBDescriptor.getInstance().getConfig()
.getSystemDir(), sg1);
.getSystemDir(), sg1, new DirectFlushPolicy());
MManager.getInstance().addPathToMTree(g1s1, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.UNCOMPRESSED, Collections.emptyMap());
storageGroupProcessor.addMeasurement("s1", TSDataType.INT64, TSEncoding.PLAIN,
Expand Down

0 comments on commit 4e8cf64

Please sign in to comment.