Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-294]online upgrade from v0.8.0 to current version #467

Merged
merged 27 commits into from Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7937d2e
update first version
EJTTianYu Oct 16, 2019
87a5cbb
upgrade iotdb version
EJTTianYu Oct 18, 2019
4ef0e61
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Oct 18, 2019
dd326cd
finish upgrade tool for v0.8.0
EJTTianYu Oct 22, 2019
06edb83
fix some problem for upgrade
EJTTianYu Oct 22, 2019
eabec02
Merge branch 'master' into master
EJTTianYu Oct 22, 2019
20e78f4
fix maven compile error
EJTTianYu Oct 23, 2019
54f7d0d
Merge branch 'master' of https://github.com/EJTTianYu/incubator-iotdb
EJTTianYu Oct 23, 2019
15757dc
fix IO problem for upgrade
EJTTianYu Oct 24, 2019
45051c9
upgrade tmp
EJTTianYu Oct 30, 2019
83d4824
merge origin master
EJTTianYu Oct 30, 2019
33e1b38
[fix]fix not compatible bug
EJTTianYu Nov 1, 2019
35fdca5
[function]complete recover upgrade process
EJTTianYu Nov 4, 2019
6694280
[function]finish upgrade tool
EJTTianYu Nov 5, 2019
fb1760c
[fix]fix review comment in pr
EJTTianYu Nov 5, 2019
e9a1c3b
[fix]fix review comment
EJTTianYu Nov 6, 2019
7c9fdfc
[fix] fix review pr
EJTTianYu Nov 6, 2019
2ab4300
[fix] resolve pr review
EJTTianYu Nov 7, 2019
e7edd0f
[fix]fix review comment
EJTTianYu Nov 7, 2019
7d1a29e
[function] add script for offline upgrade
EJTTianYu Nov 7, 2019
74af359
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Nov 8, 2019
0ea801f
[fix] read from v0.8.0
EJTTianYu Nov 9, 2019
81c00f4
[fix] fix conflict with merge process
EJTTianYu Nov 11, 2019
a37bc55
[fix] fix merge conflict
EJTTianYu Nov 11, 2019
d1b12b3
[fix] fix merge conflict
EJTTianYu Nov 12, 2019
403141b
[fix] fix name details
EJTTianYu Nov 12, 2019
5a06115
[fix] fix name details
EJTTianYu Nov 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Expand Up @@ -192,10 +192,10 @@ chunk_buffer_pool_enable=false
# default_ttl=36000000

####################
### Update Configurations
### Upgrade Configurations
####################

# When there exists old version(v0.8.0) data, how many thread will be set up to perform upgrade tasks, 1 by default.
# When there exists old version(v0.8.x) data, how many thread will be set up to perform upgrade tasks, 1 by default.
# Set to 1 when less than or equal to 0.
upgrade_thread_num=1

Expand Down
Expand Up @@ -315,6 +315,10 @@ public List<String> getOverlapFiles(String storageGroupName, TsFileResource appe
return Collections.emptyList();
}

/**
* count all Tsfiles which need to upgrade
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
* @return total num of the tsfiles which need to upgrade
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
Expand Down
Expand Up @@ -187,8 +187,8 @@ public class StorageGroupProcessor {
private static final int MAX_CACHE_SENSORS = 5000;

/**
* when the data in a storage group is older than dataTTL, it is considered invalid and will
* be eventually removed.
* when the data in a storage group is older than dataTTL, it is considered invalid and will be
* eventually removed.
*/
private long dataTTL = Long.MAX_VALUE;

Expand Down Expand Up @@ -362,7 +362,7 @@ public void addMeasurement(String measurementId, TSDataType dataType, TSEncoding
public boolean insert(InsertPlan insertPlan) throws QueryProcessorException {
// reject insertions that are out of ttl
if (!checkTTL(insertPlan.getTime())) {
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
writeLock();
try {
Expand Down Expand Up @@ -419,8 +419,6 @@ public Integer[] insertBatch(BatchInsertPlan batchInsertPlan) throws QueryProces
}

/**
*
* @param time
* @return whether the given time falls in ttl
*/
private boolean checkTTL(long time) {
Expand Down Expand Up @@ -832,9 +830,6 @@ private List<TsFileResource> getFileReSourceListForQuery(List<TsFileResource> ts
}

/**
*
* @param tsFileResource
* @param deviceId
* @return true if the device is contained in the TsFile and it lives beyond TTL
*/
private boolean testResourceDevice(TsFileResource tsFileResource, String deviceId) {
Expand Down Expand Up @@ -978,7 +973,11 @@ private void closeUnsealedTsFileProcessor(
}
}


/**
* count all Tsfiles in the storage group which need to upgrade
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
*
* @return total num of the tsfiles which need to upgrade in the storage group
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
*/
public int countUpgradeFiles() {
int cntUpgradeFileNum = 0;
for (TsFileResource seqTsFileResource : sequenceFileList) {
Expand All @@ -994,22 +993,15 @@ public int countUpgradeFiles() {
return cntUpgradeFileNum;
}


public void upgrade() {
insertLock.readLock().lock();
try {
for (TsFileResource seqTsFileResource : sequenceFileList) {
seqTsFileResource.doUpgrade();
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
unseqTsFileResource.doUpgrade();
}
} finally {
insertLock.readLock().unlock();
for (TsFileResource seqTsFileResource : sequenceFileList) {
seqTsFileResource.doUpgrade();
}
for (TsFileResource unseqTsFileResource : unSequenceFileList) {
unseqTsFileResource.doUpgrade();
}
}


public void merge(boolean fullMerge) {
writeLock();
try {
Expand All @@ -1027,7 +1019,8 @@ public void merge(boolean fullMerge) {

long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
long timeLowerBound = System.currentTimeMillis() - dataTTL;
MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeLowerBound);
MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList,
timeLowerBound);

IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
try {
Expand Down
Expand Up @@ -18,9 +18,22 @@
*/
package org.apache.iotdb.db.engine.upgrade;

public class UpgradeCheckStatus {
public enum UpgradeCheckStatus {

public static final long BEGIN_UPGRADE_FILE = 1;
public static final long AFTER_UPGRADE_FILE = 2;
public static final long UPGRADE_SUCCESS = 3;
BEGIN_UPGRADE_FILE(1), AFTER_UPGRADE_FILE(2), UPGRADE_SUCCESS(3);

private final int checkStatusCode;

UpgradeCheckStatus(int checkStatusCode) {
this.checkStatusCode = checkStatusCode;
}

public int getCheckStatusCode() {
return checkStatusCode;
}

@Override
public String toString() {
return String.valueOf(checkStatusCode);
}
}
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.upgrade;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UpgradeLog {

private static final Logger logger = LoggerFactory.getLogger(UpgradeLog.class);

private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String UPGRADE_DIR = "upgrade";
private static final String UPGRADE_LOG_NAME = "upgrade.txt";
private static File upgradeLogPath = SystemFileFactory.INSTANCE
.getFile(SystemFileFactory.INSTANCE.getFile(config.getSystemDir(), UPGRADE_DIR),
UPGRADE_LOG_NAME);

public static boolean createUpgradeLog() {
try {
if (!upgradeLogPath.getParentFile().exists()) {
upgradeLogPath.getParentFile().mkdirs();
}
upgradeLogPath.createNewFile();
return true;
} catch (IOException e) {
logger.error("meet error when create upgrade log, file path:{}",
upgradeLogPath, e);
return false;
}
}

public static String getUpgradeLogPath() {
return upgradeLogPath.getAbsolutePath();
}

public static boolean writeUpgradeLogFile(String content) {
UpgradeUtils.getUpgradeLogLock().writeLock().lock();
try (BufferedWriter upgradeLogWriter = new BufferedWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not necessary to open a writer every time, you can reuse one.

FSFactoryProducer.getFSFactory().getBufferedWriter(getUpgradeLogPath(), true))) {
upgradeLogWriter.write(content);
upgradeLogWriter.newLine();
return true;
} catch(IOException e) {
logger.error("write upgrade log file failed, the log file:{}", getUpgradeLogPath(), e);
return false;
} finally {
UpgradeUtils.getUpgradeLogLock().writeLock().unlock();
}
}
}
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.iotdb.db.engine.upgrade;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.utils.UpgradeUtils;
Expand All @@ -33,61 +31,26 @@ public class UpgradeTask implements Runnable {

private final TsFileResource upgradeResource;
private static final Logger logger = LoggerFactory.getLogger(UpgradeTask.class);
private static final String DOT_SEPERATOR = ",";
private static final String COMMA_SEPERATOR = ",";


public UpgradeTask(TsFileResource upgradeResource) {
this.upgradeResource = upgradeResource;
}


private static boolean writeUpgradeLogFile(RandomAccessFile upgradeLog, String content,
long position) {
UpgradeUtils.getUpgradeLogLock().writeLock().lock();
try {
upgradeLog.seek(position);
upgradeLog.write(content.getBytes());
upgradeLog.write(System.getProperty("line.separator").getBytes());
return true;
} catch (IOException e) {
logger.error("write upgrade log file failed, the log file:{}", upgradeLog);
return false;
} finally {
UpgradeUtils.getUpgradeLogLock().writeLock().unlock();
}
}

private static long getUpgradeLogLength(RandomAccessFile upgradeLog) {
UpgradeUtils.getUpgradeLogLock().readLock().lock();
try {
return upgradeLog.length();
} catch (IOException e) {
logger.error("read upgrade log file failed, the log file:{}", upgradeLog);
return 0;
} finally {
UpgradeUtils.getUpgradeLogLock().readLock().unlock();
}
}

@Override
public void run() {
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
try (RandomAccessFile upgradeLogFile = new RandomAccessFile(UpgradeUtils.getUpgradeLogPath(),
"rw")) {
try {
upgradeResource.getWriteQueryLock().readLock().lock();
String tsfilePathBefore = upgradeResource.getFile().getAbsolutePath();
String tsfilePathAfter =
upgradeResource.getFile().getParentFile().getParent() + File.separator + "tmp"
+ File.separator + "upgrade_" + upgradeResource
.getFile().getName();
String tsfilePathAfter = UpgradeUtils.getUpgradeFileName(upgradeResource.getFile());

long upgradePostion = getUpgradeLogLength(upgradeLogFile);
writeUpgradeLogFile(upgradeLogFile,
tsfilePathBefore + DOT_SEPERATOR + tsfilePathAfter + DOT_SEPERATOR
+ UpgradeCheckStatus.BEGIN_UPGRADE_FILE, upgradePostion);
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.BEGIN_UPGRADE_FILE);
try {
UpgradeTool.upgradeOneTsfile(tsfilePathBefore, tsfilePathAfter);
writeUpgradeLogFile(upgradeLogFile,
tsfilePathBefore + DOT_SEPERATOR + tsfilePathAfter + DOT_SEPERATOR
+ UpgradeCheckStatus.AFTER_UPGRADE_FILE, upgradePostion);
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.AFTER_UPGRADE_FILE);
} catch (IOException e) {
logger.error("generate upgrade file failed, the file to be upgraded:{}", tsfilePathBefore);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the upgrade occurs an error, you just throw an Exception and move the incomplete file to the data directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in recovery process, system will remove the imcomplete tsfile

Copy link
Contributor

@fanhualta fanhualta Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it occurs in the running of the system, the system engine may read this file which lead to errors.

} finally {
Expand All @@ -99,9 +62,8 @@ public void run() {
FSFactoryProducer.getFSFactory()
.moveFile(FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter),
FSFactoryProducer.getFSFactory().getFile(tsfilePathBefore));
writeUpgradeLogFile(upgradeLogFile,
tsfilePathBefore + DOT_SEPERATOR + tsfilePathAfter + DOT_SEPERATOR
+ UpgradeCheckStatus.UPGRADE_SUCCESS, upgradePostion);
UpgradeLog.writeUpgradeLogFile(
tsfilePathBefore + COMMA_SEPERATOR + UpgradeCheckStatus.UPGRADE_SUCCESS);
FSFactoryProducer.getFSFactory().getFile(tsfilePathAfter).getParentFile().delete();
} finally {
upgradeResource.getWriteQueryLock().writeLock().unlock();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.upgrade.UpgradeLog;
import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void start() throws StartupException {
}
upgradeThreadPool = Executors.newFixedThreadPool(updateThreadNum,
r -> new Thread(r, "UpgradeThread-" + threadCnt.getAndIncrement()));
UpgradeUtils.createUpgradeLog();
UpgradeLog.createUpgradeLog();
countUpgradeFiles();
upgradeAll();
}
Expand All @@ -80,11 +81,21 @@ public ServiceType getID() {


public static void setCntUpgradeFileNum(int cntUpgradeFileNum) {
UpgradeSevice.cntUpgradeFileNum = cntUpgradeFileNum;
UpgradeUtils.getCntUpgradeFileLock().writeLock().lock();
try {
UpgradeSevice.cntUpgradeFileNum = cntUpgradeFileNum;
} finally {
UpgradeUtils.getCntUpgradeFileLock().writeLock().unlock();
}
}

public static int getCntUpgradeFileNum() {
return cntUpgradeFileNum;
UpgradeUtils.getCntUpgradeFileLock().readLock().lock();
try {
return cntUpgradeFileNum;
} finally {
UpgradeUtils.getCntUpgradeFileLock().readLock().unlock();
}
}

public void submitUpgradeTask(UpgradeTask upgradeTask) {
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -93,6 +94,9 @@ public void getCurrentLocalFiles(String dataDir) {
dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME)
.listFiles();
for (File sgFolder : allSGFolders) {
if (sgFolder.getName().equals(TsFileConstant.PATH_UPGRADE)){
continue;
}
allSGs.add(sgFolder.getName());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashSet<>());
File[] files = sgFolder.listFiles();
Expand Down