Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-294]online upgrade from v0.8.0 to current version #467

Merged
merged 27 commits into from Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7937d2e
update first version
EJTTianYu Oct 16, 2019
87a5cbb
upgrade iotdb version
EJTTianYu Oct 18, 2019
4ef0e61
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Oct 18, 2019
dd326cd
finish upgrade tool for v0.8.0
EJTTianYu Oct 22, 2019
06edb83
fix some problem for upgrade
EJTTianYu Oct 22, 2019
eabec02
Merge branch 'master' into master
EJTTianYu Oct 22, 2019
20e78f4
fix maven compile error
EJTTianYu Oct 23, 2019
54f7d0d
Merge branch 'master' of https://github.com/EJTTianYu/incubator-iotdb
EJTTianYu Oct 23, 2019
15757dc
fix IO problem for upgrade
EJTTianYu Oct 24, 2019
45051c9
upgrade tmp
EJTTianYu Oct 30, 2019
83d4824
merge origin master
EJTTianYu Oct 30, 2019
33e1b38
[fix]fix not compatible bug
EJTTianYu Nov 1, 2019
35fdca5
[function]complete recover upgrade process
EJTTianYu Nov 4, 2019
6694280
[function]finish upgrade tool
EJTTianYu Nov 5, 2019
fb1760c
[fix]fix review comment in pr
EJTTianYu Nov 5, 2019
e9a1c3b
[fix]fix review comment
EJTTianYu Nov 6, 2019
7c9fdfc
[fix] fix review pr
EJTTianYu Nov 6, 2019
2ab4300
[fix] resolve pr review
EJTTianYu Nov 7, 2019
e7edd0f
[fix]fix review comment
EJTTianYu Nov 7, 2019
7d1a29e
[function] add script for offline upgrade
EJTTianYu Nov 7, 2019
74af359
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Nov 8, 2019
0ea801f
[fix] read from v0.8.0
EJTTianYu Nov 9, 2019
81c00f4
[fix] fix conflict with merge process
EJTTianYu Nov 11, 2019
a37bc55
[fix] fix merge conflict
EJTTianYu Nov 11, 2019
d1b12b3
[fix] fix merge conflict
EJTTianYu Nov 12, 2019
403141b
[fix] fix name details
EJTTianYu Nov 12, 2019
5a06115
[fix] fix name details
EJTTianYu Nov 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Expand Up @@ -191,6 +191,15 @@ chunk_buffer_pool_enable=false
# data.
# default_ttl=36000000

####################
### Upgrade Configurations
####################

# When there exists old version(v0.8.x) data, how many thread will be set up to perform upgrade tasks, 1 by default.
# Set to 1 when less than or equal to 0.
upgrade_thread_num=1


####################
### Merge Configurations
####################
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Expand Up @@ -336,6 +336,11 @@ public class IoTDBConfig {
*/
private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);

/**
* How many threads will be set up to perform upgrade tasks.
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
*/
private int upgradeThreadNum = 1;

/**
* How many threads will be set up to perform main merge tasks.
*/
Expand Down Expand Up @@ -1153,6 +1158,14 @@ public void setHdfsPort(String hdfsPort) {
this.hdfsPort = hdfsPort;
}

public int getUpgradeThreadNum() {
return upgradeThreadNum;
}

public void setUpgradeThreadNum(int upgradeThreadNum) {
this.upgradeThreadNum = upgradeThreadNum;
}

public String getDfsNameServices() {
return dfsNameServices;
}
Expand Down
Expand Up @@ -233,6 +233,8 @@ private void loadProps() {
conf.setExternalSortThreshold(Integer.parseInt(properties
.getProperty("external_sort_threshold",
Integer.toString(conf.getExternalSortThreshold()))));
conf.setUpgradeThreadNum(Integer.parseInt(properties.getProperty("upgrade_thread_num",
Integer.toString(conf.getUpgradeThreadNum()))));
conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
Long.toString(conf.getMergeMemoryBudget()))));
conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
Expand Down Expand Up @@ -94,6 +95,8 @@ private StorageEngine() {
throw new StorageEngineFailureException("create system directory failed!");
}

// recover upgrade process
UpgradeUtils.recoverUpgrade();
/*
* recover all storage group processors.
*/
Expand Down Expand Up @@ -209,6 +212,7 @@ public boolean insert(InsertPlan insertPlan) throws ProcessorException {

/**
* insert a BatchInsertPlan to a storage group
*
* @return result of each row
*/
public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws StorageEngineException {
Expand Down Expand Up @@ -311,6 +315,33 @@ public List<String> getOverlapFiles(String storageGroupName, TsFileResource appe
return Collections.emptyList();
}

/**
* count all Tsfiles which need to be upgraded
* @return total num of the tsfiles which need to be upgraded
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
}
return totalUpgradeFileNum;
}

/**
* upgrade all storage groups.
*
* @throws StorageEngineException StorageEngineException
*/
public void upgradeAll() throws StorageEngineException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
storageGroupProcessor.upgrade();
}
}

/**
* merge all storage groups.
*
Expand Down Expand Up @@ -371,7 +402,7 @@ public void setTTL(String storageGroup, long dataTTL) throws StorageEngineExcept
public void deleteStorageGroup(String storageGroupName) {
deleteAllDataFilesInOneStorageGroup(storageGroupName);
StorageGroupProcessor processor = processorMap.remove(storageGroupName);
if(processor != null) {
if (processor != null) {
processor.deleteFolder(systemDir);
}
}
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
Expand Down Expand Up @@ -69,14 +70,15 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;

public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles =
seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.unseqFiles =
unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.seqFiles = seqFiles.stream().filter(p -> p.isClosed() && !UpgradeUtils.isNeedUpgrade(p))
.collect(Collectors.toList());
this.unseqFiles = unseqFiles.stream().filter(p -> p.isClosed() && !UpgradeUtils.isNeedUpgrade(p))
.collect(Collectors.toList());
}

private boolean filterResource(TsFileResource res) {
return res.isClosed() && !res.isDeleted() && res.stillLives(timeLowerBound);
return res.isClosed() && !res.isDeleted() && res.stillLives(timeLowerBound) && !UpgradeUtils
.isNeedUpgrade(res);
}

public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
Expand Down
Expand Up @@ -116,7 +116,7 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
return;
}

seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
TsFileMetaDataCache.getInstance().remove(seqFile);
DeviceMetaDataCache.getInstance().remove(seqFile);
Expand Down Expand Up @@ -163,7 +163,7 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down Expand Up @@ -217,7 +217,7 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);

seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
resource.removeFileReader(seqFile);
TsFileMetaDataCache.getInstance().remove(seqFile);
Expand All @@ -232,7 +232,7 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down
Expand Up @@ -72,6 +72,8 @@
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.JobFileManager;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
Expand Down Expand Up @@ -185,8 +187,8 @@ public class StorageGroupProcessor {
private static final int MAX_CACHE_SENSORS = 5000;

/**
* when the data in a storage group is older than dataTTL, it is considered invalid and will
* be eventually removed.
* when the data in a storage group is older than dataTTL, it is considered invalid and will be
* eventually removed.
*/
private long dataTTL = Long.MAX_VALUE;

Expand Down Expand Up @@ -360,7 +362,7 @@ public void addMeasurement(String measurementId, TSDataType dataType, TSEncoding
public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
// reject insertions that are out of ttl
if (!checkTTL(insertPlan.getTime())) {
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
writeLock();
try {
Expand Down Expand Up @@ -417,8 +419,6 @@ public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProces
}

/**
*
* @param time
* @return whether the given time falls in ttl
*/
private boolean checkTTL(long time) {
Expand Down Expand Up @@ -668,7 +668,7 @@ private void checkFileTTL(TsFileResource resource, long timeLowerBound, boolean
return;
}
// ensure that the file is not used by any queries
if (resource.getMergeQueryLock().writeLock().tryLock()) {
if (resource.getWriteQueryLock().writeLock().tryLock()) {
try {
// physical removal
resource.remove();
Expand All @@ -682,7 +682,7 @@ private void checkFileTTL(TsFileResource resource, long timeLowerBound, boolean
unSequenceFileList.remove(resource);
}
} finally {
resource.getMergeQueryLock().writeLock().unlock();
resource.getWriteQueryLock().writeLock().unlock();
}
}
} finally {
Expand Down Expand Up @@ -830,9 +830,6 @@ private List<TsFileResource> getFileReSourceListForQuery(List<TsFileResource> ts
}

/**
*
* @param tsFileResource
* @param deviceId
* @return true if the device is contained in the TsFile and it lives beyond TTL
*/
private boolean testResourceDevice(TsFileResource tsFileResource, String deviceId) {
Expand Down Expand Up @@ -976,6 +973,35 @@ private void closeUnsealedTsFileProcessor(
}
}

/**
* count all Tsfiles in the storage group which need to be upgraded
*
* @return total num of the tsfiles which need to be upgraded in the storage group
*/
public int countUpgradeFiles() {
int cntUpgradeFileNum = 0;
for (TsFileResource seqTsFileResource : sequenceFileList) {
if (UpgradeUtils.isNeedUpgrade(seqTsFileResource)) {
cntUpgradeFileNum += 1;
}
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
if (UpgradeUtils.isNeedUpgrade(unseqTsFileResource)) {
cntUpgradeFileNum += 1;
}
}
return cntUpgradeFileNum;
}

public void upgrade() {
for (TsFileResource seqTsFileResource : sequenceFileList) {
seqTsFileResource.doUpgrade();
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
unseqTsFileResource.doUpgrade();
}
}

public void merge(boolean fullMerge) {
writeLock();
try {
Expand All @@ -993,7 +1019,8 @@ public void merge(boolean fullMerge) {

long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
long timeLowerBound = System.currentTimeMillis() - dataTTL;
MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeLowerBound);
MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList,
timeLowerBound);

IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
try {
Expand Down Expand Up @@ -1059,17 +1086,17 @@ private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
}

for (TsFileResource unseqFile : unseqFiles) {
unseqFile.getMergeQueryLock().writeLock().lock();
unseqFile.getWriteQueryLock().writeLock().lock();
try {
unseqFile.remove();
} finally {
unseqFile.getMergeQueryLock().writeLock().unlock();
unseqFile.getWriteQueryLock().writeLock().unlock();
}
}
}

private void updateMergeModification(TsFileResource seqFile) {
seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
// remove old modifications and write modifications generated during merge
seqFile.removeModFile();
Expand All @@ -1082,7 +1109,7 @@ private void updateMergeModification(TsFileResource seqFile) {
logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
seqFile.getFile(), e);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down Expand Up @@ -1314,12 +1341,12 @@ public void deleteTsfile(File deletedTsfile) {
if (deletedTsFileResource == null) {
return;
}
deletedTsFileResource.getMergeQueryLock().writeLock().lock();
deletedTsFileResource.getWriteQueryLock().writeLock().lock();
try {
logger.info("Delete tsfile {} in sync loading process.", deletedTsFileResource.getFile());
deletedTsFileResource.remove();
} finally {
deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
deletedTsFileResource.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.iotdb.db.engine.storagegroup;

import java.io.*;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +31,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
Expand Down Expand Up @@ -59,6 +65,7 @@ public class TsFileResource {
private volatile boolean deleted = false;
private volatile boolean isMerging = false;


/**
* Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
* process.
Expand All @@ -70,7 +77,7 @@ public class TsFileResource {
*/
private ReadOnlyMemChunk readOnlyMemChunk;

private ReentrantReadWriteLock mergeQueryLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock writeQueryLock = new ReentrantReadWriteLock();

private FSFactory fsFactory = FSFactoryProducer.getFSFactory();

Expand Down Expand Up @@ -230,8 +237,14 @@ public TsFileProcessor getUnsealedFileProcessor() {
return processor;
}

public ReentrantReadWriteLock getMergeQueryLock() {
return mergeQueryLock;
public ReentrantReadWriteLock getWriteQueryLock() {
return writeQueryLock;
}

public void doUpgrade() {
if (UpgradeUtils.isNeedUpgrade(this)) {
UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this));
}
}

public void removeModFile() throws IOException {
Expand Down