Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-294]online upgrade from v0.8.0 to current version #467

Merged
merged 27 commits into from Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7937d2e
update first version
EJTTianYu Oct 16, 2019
87a5cbb
upgrade iotdb version
EJTTianYu Oct 18, 2019
4ef0e61
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Oct 18, 2019
dd326cd
finish upgrade tool for v0.8.0
EJTTianYu Oct 22, 2019
06edb83
fix some problem for upgrade
EJTTianYu Oct 22, 2019
eabec02
Merge branch 'master' into master
EJTTianYu Oct 22, 2019
20e78f4
fix maven compile error
EJTTianYu Oct 23, 2019
54f7d0d
Merge branch 'master' of https://github.com/EJTTianYu/incubator-iotdb
EJTTianYu Oct 23, 2019
15757dc
fix IO problem for upgrade
EJTTianYu Oct 24, 2019
45051c9
upgrade tmp
EJTTianYu Oct 30, 2019
83d4824
merge origin master
EJTTianYu Oct 30, 2019
33e1b38
[fix]fix not compatible bug
EJTTianYu Nov 1, 2019
35fdca5
[function]complete recover upgrade process
EJTTianYu Nov 4, 2019
6694280
[function]finish upgrade tool
EJTTianYu Nov 5, 2019
fb1760c
[fix]fix review comment in pr
EJTTianYu Nov 5, 2019
e9a1c3b
[fix]fix review comment
EJTTianYu Nov 6, 2019
7c9fdfc
[fix] fix review pr
EJTTianYu Nov 6, 2019
2ab4300
[fix] resolve pr review
EJTTianYu Nov 7, 2019
e7edd0f
[fix]fix review comment
EJTTianYu Nov 7, 2019
7d1a29e
[function] add script for offline upgrade
EJTTianYu Nov 7, 2019
74af359
Merge branch 'master' of https://github.com/apache/incubator-iotdb
EJTTianYu Nov 8, 2019
0ea801f
[fix] read from v0.8.0
EJTTianYu Nov 9, 2019
81c00f4
[fix] fix conflict with merge process
EJTTianYu Nov 11, 2019
a37bc55
[fix] fix merge conflict
EJTTianYu Nov 11, 2019
d1b12b3
[fix] fix merge conflict
EJTTianYu Nov 12, 2019
403141b
[fix] fix name details
EJTTianYu Nov 12, 2019
5a06115
[fix] fix name details
EJTTianYu Nov 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Expand Up @@ -201,6 +201,15 @@ chunk_buffer_pool_enable=false
# data.
# default_ttl=36000000

####################
### Upgrade Configurations
####################

# When there exists old version(v0.8.x) data, how many thread will be set up to perform upgrade tasks, 1 by default.
# Set to 1 when less than or equal to 0.
upgrade_thread_num=1


####################
### Merge Configurations
####################
Expand Down
27 changes: 27 additions & 0 deletions server/src/assembly/resources/tools/upgrade/config.properties
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# how many thread will be set up to perform offline upgrade tasks
upgrade_thread_num=10
# Used to specify the data dirs that need to be upgraded
# Commas could be used to separate the folder paths if there are more than one data dir that needs to be upgraded
old_version_data_dirs=/Users/tianyu/3sjar/data
# Used to specify the upgrading data dirs
# It is worth noting that the length of the old_version_data_dirs and new_version_data_dirs parameters should be equal.
new_version_data_dirs=/Users/tianyu/3sjar/data1
67 changes: 67 additions & 0 deletions server/src/assembly/resources/tools/upgrade/offline-upgrade.bat
@@ -0,0 +1,67 @@
@REM
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM


@echo off
if "%OS%" == "Windows_NT" setlocal

pushd %~dp0..\..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd

set IOTDB_CONF=%IOTDB_HOME%\conf

if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.upgrade.OfflineUpgradeTool
if NOT DEFINED JAVA_HOME goto :err

@REM -----------------------------------------------------------------------------
@REM JVM Opts we'll use in legacy run or installation
set JAVA_OPTS=-ea^
-Dlogback.configurationFile="%IOTDB_CONF%\logback-tool.xml"^
-DIOTDB_HOME=%IOTDB_HOME%

@REM ***** CLASSPATH library setting *****
@REM Ensure that any user defined CLASSPATH variables are not used on startup
set CLASSPATH="%IOTDB_HOME%\lib"

@REM For each jar in the IOTDB_HOME lib directory call append to build the CLASSPATH variable.
for %%i in ("%IOTDB_HOME%\lib\*.jar") do call :append "%%i"
goto okClasspath

:append
set CLASSPATH=%CLASSPATH%;%1
goto :eof

@REM -----------------------------------------------------------------------------
:okClasspath

"%JAVA_HOME%\bin\java" %JAVA_OPTS% %JAVA_OPTS% -cp "%CLASSPATH%" %MAIN_CLASS% %*

goto finally


:err
echo JAVA_HOME environment variable must be set!
pause


@REM -----------------------------------------------------------------------------
:finally

ENDLOCAL
47 changes: 47 additions & 0 deletions server/src/assembly/resources/tools/upgrade/offline-upgrade.sh
@@ -0,0 +1,47 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

if [ -z "${IOTDB_HOME}" ]; then
export IOTDB_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
fi

IOTDB_CONF=${IOTDB_HOME}/conf

CLASSPATH=""
for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done

MAIN_CLASS=org.apache.iotdb.db.tools.upgrade.OfflineUpgradeTool

if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
JAVA="$java"
break
fi
done
else
JAVA=java
fi

iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback-tool.xml"

exec "$JAVA" $iotdb_parms -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
13 changes: 13 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Expand Up @@ -336,6 +336,11 @@ public class IoTDBConfig {
*/
private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);

/**
* How many threads will be set up to perform upgrade tasks.
fanhualta marked this conversation as resolved.
Show resolved Hide resolved
*/
private int upgradeThreadNum = 1;

/**
* How many threads will be set up to perform main merge tasks.
*/
Expand Down Expand Up @@ -1169,6 +1174,14 @@ public void setHdfsPort(String hdfsPort) {
this.hdfsPort = hdfsPort;
}

public int getUpgradeThreadNum() {
return upgradeThreadNum;
}

public void setUpgradeThreadNum(int upgradeThreadNum) {
this.upgradeThreadNum = upgradeThreadNum;
}

public String getDfsNameServices() {
return dfsNameServices;
}
Expand Down
Expand Up @@ -232,6 +232,8 @@ private void loadProps() {
conf.setExternalSortThreshold(Integer.parseInt(properties
.getProperty("external_sort_threshold",
Integer.toString(conf.getExternalSortThreshold()))));
conf.setUpgradeThreadNum(Integer.parseInt(properties.getProperty("upgrade_thread_num",
Integer.toString(conf.getUpgradeThreadNum()))));
conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
Long.toString(conf.getMergeMemoryBudget()))));
conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",
Expand Down
30 changes: 30 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Expand Up @@ -58,6 +58,7 @@
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
Expand Down Expand Up @@ -105,6 +106,8 @@ private StorageEngine() {
throw new StorageEngineFailureException("create system directory failed!");
}

// recover upgrade process
UpgradeUtils.recoverUpgrade();
/*
* recover all storage group processors.
*/
Expand Down Expand Up @@ -330,6 +333,33 @@ public List<String> getOverlapFiles(String storageGroupName, TsFileResource appe
return Collections.emptyList();
}

/**
* count all Tsfiles which need to be upgraded
* @return total num of the tsfiles which need to be upgraded
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
totalUpgradeFileNum += storageGroupProcessor.countUpgradeFiles();
}
return totalUpgradeFileNum;
}

/**
* upgrade all storage groups.
*
* @throws StorageEngineException StorageEngineException
*/
public void upgradeAll() throws StorageEngineException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
storageGroupProcessor.upgrade();
}
}

/**
* merge all storage groups.
*
Expand Down
Expand Up @@ -36,14 +36,12 @@
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.chunk.ChunkBuffer;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
Expand All @@ -69,10 +67,10 @@ public class MergeResource {
private boolean cacheDeviceMeta = false;

public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles =
seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.unseqFiles =
unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.seqFiles = seqFiles.stream().filter(this::filterResource)
.collect(Collectors.toList());
this.unseqFiles = unseqFiles.stream().filter(this::filterResource)
.collect(Collectors.toList());
}

private boolean filterResource(TsFileResource res) {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -155,6 +156,24 @@ void select(boolean useTightBound) throws IOException {

selectOverlappedSeqFiles(unseqFile);

// skip if the unseqFile and tmpSelectedSeqFiles has TsFileResources that need to be upgraded
boolean isNeedUpgrade = false;
if (UpgradeUtils.isNeedUpgrade(unseqFile)) {
isNeedUpgrade = true;
}
for (Integer seqIdx : tmpSelectedSeqFiles) {
if (UpgradeUtils.isNeedUpgrade(resource.getSeqFiles().get(seqIdx))) {
isNeedUpgrade = true;
break;
}
}
if (isNeedUpgrade) {
tmpSelectedSeqFiles.clear();
unseqIndex++;
timeConsumption = System.currentTimeMillis() - startTime;
continue;
}

tempMaxSeqFileCost = maxSeqFileCost;
long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, tmpSelectedSeqFiles,
startTime, timeLimit) :
Expand All @@ -181,7 +200,7 @@ void select(boolean useTightBound) throws IOException {
}

private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
if (seqSelectedNum == resource.getSeqFiles().size()) {
if (seqSelectedNum == resource.getSeqFiles().size() || UpgradeUtils.isNeedUpgrade(unseqFile)) {
return;
}
int tmpSelectedNum = 0;
Expand All @@ -200,7 +219,7 @@ private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
if (unseqEndTime <= seqEndTime) {
// the unseqFile overlaps current seqFile
tmpSelectedSeqFiles.add(i);
tmpSelectedNum ++;
tmpSelectedNum++;
// the device of the unseqFile can not merge with later seqFiles
noMoreOverlap = true;
} else if (unseqStartTime <= seqEndTime) {
Expand Down Expand Up @@ -251,7 +270,8 @@ private long calculateLooseMemoryCost(TsFileResource tmpSelectedUnseqFile,
private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime, timeLimit);
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime,
timeLimit);
}

private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
Expand All @@ -264,11 +284,13 @@ private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
return cost;
}

private long calculateTightFileMemoryCost(TsFileResource seqFile, IFileQueryMemMeasurement measurement)
private long calculateTightFileMemoryCost(TsFileResource seqFile,
IFileQueryMemMeasurement measurement)
throws IOException {
Long cost = maxSeriesQueryCostMap.get(seqFile);
if (cost == null) {
long[] chunkNums = MergeUtils.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
long[] chunkNums = MergeUtils
.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
long totalChunkNum = chunkNums[0];
long maxChunkNum = chunkNums[1];
cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
Expand Down
Expand Up @@ -116,7 +116,7 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
return;
}

seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
TsFileMetaDataCache.getInstance().remove(seqFile);
DeviceMetaDataCache.getInstance().remove(seqFile);
Expand Down Expand Up @@ -163,7 +163,7 @@ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down Expand Up @@ -217,7 +217,7 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
mergeLogger.logFileMergeEnd();
logger.debug("{} moved unmerged chunks of {} to the new file", taskName, seqFile);

seqFile.getMergeQueryLock().writeLock().lock();
seqFile.getWriteQueryLock().writeLock().lock();
try {
resource.removeFileReader(seqFile);
TsFileMetaDataCache.getInstance().remove(seqFile);
Expand All @@ -232,7 +232,7 @@ private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
new File(nextMergeVersionFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
seqFile.setFile(nextMergeVersionFile);
} finally {
seqFile.getMergeQueryLock().writeLock().unlock();
seqFile.getWriteQueryLock().writeLock().unlock();
}
}

Expand Down