Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b060f39
use differential logs in wal
jt2594838 Oct 12, 2020
e2a379a
fix issues
jt2594838 Oct 12, 2020
f60f1a0
fix unset datatypes
jt2594838 Oct 12, 2020
6c2f90a
fix serialization
jt2594838 Oct 12, 2020
81c359c
add restart logs
jt2594838 Oct 13, 2020
14826d0
Merge branch 'add_restart_logs' into improve_wal
jt2594838 Oct 13, 2020
053fc5d
merge add restart logs
jt2594838 Oct 13, 2020
abaada5
reduce log level
jt2594838 Oct 13, 2020
2b4de7a
reduce log level
jt2594838 Oct 13, 2020
24b4154
add more logs
jt2594838 Oct 13, 2020
1f568ff
Merge branch 'add_restart_logs' into improve_wal
jt2594838 Oct 13, 2020
b12e9a5
remove time encoding
jt2594838 Oct 13, 2020
59e6bca
reduce index size
jt2594838 Oct 16, 2020
3731655
add RandomAccessArrayDeque
jt2594838 Oct 16, 2020
929295f
add series sort time
jt2594838 Oct 16, 2020
3792164
Merge branch 'add_restart_logs' into improve_wal
jt2594838 Oct 16, 2020
e932398
fix log loss
jt2594838 Oct 16, 2020
6c0f5b1
Merge branch 'master' into improve_wal
jt2594838 Oct 27, 2020
cf793ea
Remove ShardedSecrets
jt2594838 Oct 27, 2020
a5104c2
Merge branch 'master' into improve_wal
jt2594838 Nov 17, 2020
5995838
fix InsertRowPlanSerialization
jt2594838 Nov 17, 2020
e43152f
Merge branch 'master' into improve_wal
jt2594838 Dec 14, 2020
27704a3
fix inheritence
jt2594838 Dec 14, 2020
5df3e60
fix review
jt2594838 Dec 15, 2020
e30139c
Merge branch 'master' into improve_wal
jt2594838 Dec 15, 2020
d26f729
add test header
jt2594838 Dec 15, 2020
28e3629
merge master branch
HTHou Sep 15, 2021
708959b
Fix merge
HTHou Sep 15, 2021
4706dca
fix tests
HTHou Sep 15, 2021
faa8699
fix sonar issue
HTHou Sep 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ private Map<Long, List<TsFileResource>> splitResourcesByPartition(

/** recover from file */
private void recover() throws StorageGroupProcessorException {
long recoverStart = System.currentTimeMillis();
logger.info(
String.format(
"start recovering virtual storage group %s[%s]",
Expand Down Expand Up @@ -464,12 +465,16 @@ private void recover() throws StorageGroupProcessorException {
splitResourcesByPartition(tmpSeqTsFiles);
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
splitResourcesByPartition(tmpUnseqTsFiles);
long innerStart = System.currentTimeMillis();
for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) {
recoverTsFiles(value, true);
}
logger.info("Recovery of seq files costs {}ms", System.currentTimeMillis() - innerStart);
innerStart = System.currentTimeMillis();
for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
recoverTsFiles(value, false);
}
logger.info("Recovery of unseq files costs {}ms", System.currentTimeMillis() - innerStart);

String taskName =
logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis();
Expand All @@ -487,6 +492,7 @@ private void recover() throws StorageGroupProcessorException {
taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(),
logicalStorageGroupName);
innerStart = System.currentTimeMillis();
logger.info(
"{} - {} a RecoverMergeTask {} starts...",
logicalStorageGroupName,
Expand All @@ -497,6 +503,12 @@ private void recover() throws StorageGroupProcessorException {
if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
mergingMods.delete();
}
logger.info(
"{} - {} a RecoverMergeTask {} ends after {}ms",
logicalStorageGroupName,
virtualStorageGroupId,
taskName,
System.currentTimeMillis() - innerStart);
recoverCompaction();
for (TsFileResource resource : tsFileManagement.getTsFileList(true)) {
long partitionNum = resource.getTimePartition();
Expand Down Expand Up @@ -543,11 +555,11 @@ private void recover() throws StorageGroupProcessorException {
timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
}
}

logger.info(
String.format(
"the virtual storage group %s[%s] is recovered successfully",
logicalStorageGroupName, virtualStorageGroupId));
"the virtual storage group {}[{}] is recovered successfully after {}ms",
logicalStorageGroupName,
virtualStorageGroupId,
System.currentTimeMillis() - recoverStart);
}

private void recoverCompaction() {
Expand Down Expand Up @@ -753,6 +765,7 @@ private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {

RestorableTsFileIOWriter writer;
try {
long start = System.currentTimeMillis();
// this tsfile is not zero level, no need to perform redo wal
if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) {
writer =
Expand All @@ -768,6 +781,10 @@ private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {
writer =
recoverPerformer.recover(true, this::getWalDirectByteBuffer, this::releaseWalBuffer);
}
logger.debug(
"Recovery of {} costs {}ms",
tsFileResource.getTsFile(),
System.currentTimeMillis() - start);
} catch (StorageGroupProcessorException e) {
logger.warn(
"Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
Expand Down
134 changes: 127 additions & 7 deletions server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.utils.datastructure.RandomAccessArrayDeque;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
Expand Down Expand Up @@ -111,10 +112,6 @@ protected PhysicalPlan(boolean isQuery, Operator.OperatorType operatorType) {
this.operatorType = operatorType;
}

public String printQueryPlan() {
return "abstract plan";
}

public abstract List<PartialPath> getPaths();

public void setPaths(List<PartialPath> paths) {}
Expand Down Expand Up @@ -181,6 +178,26 @@ public void deserialize(ByteBuffer buffer) throws IllegalPathException, IOExcept
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

/**
* Serialize the plan into the given buffer. This is provided for WAL, so fields that can be
* recovered will not be serialized.
*
* @param buffer
*/
public void serialize(ByteBuffer buffer, PhysicalPlan base) {
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

/**
* Deserialize the plan from the given buffer. This is provided for WAL, and must be used with
* serializeToWAL.
*
* @param buffer
*/
public void deserialize(ByteBuffer buffer, PhysicalPlan base) {
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}

protected void putString(ByteBuffer buffer, String value) {
if (value == null) {
buffer.putInt(NULL_VALUE_LEN);
Expand Down Expand Up @@ -237,18 +254,24 @@ public void setLoginUserName(String loginUserName) {

public static class Factory {

public static final String UNRECOGNIZED_LOG_TYPE = "unrecognized log type ";

private Factory() {
// hidden initializer
}

public static PhysicalPlan create(ByteBuffer buffer) throws IOException, IllegalPathException {
int typeNum = buffer.get();
if (typeNum >= PhysicalPlanType.values().length) {
throw new IOException("unrecognized log type " + typeNum);
throw new IOException(UNRECOGNIZED_LOG_TYPE + typeNum);
}
PhysicalPlanType type = PhysicalPlanType.values()[typeNum];
return create(type, buffer);
}

public static PhysicalPlan create(PhysicalPlanType type, ByteBuffer buffer)
throws IllegalPathException, IOException {
PhysicalPlan plan;
// TODO-Cluster: support more plans
switch (type) {
case INSERT:
plan = new InsertRowPlan();
Expand Down Expand Up @@ -419,11 +442,52 @@ public static PhysicalPlan create(ByteBuffer buffer) throws IOException, Illegal
plan = new SetSystemModePlan();
break;
default:
throw new IOException("unrecognized log type " + type);
throw new IOException(UNRECOGNIZED_LOG_TYPE + type);
}
plan.deserialize(buffer);
return plan;
}

public static PhysicalPlan create(
ByteBuffer buffer, RandomAccessArrayDeque<PhysicalPlan> planWindow)
throws IOException, IllegalPathException {
short baseIndex = buffer.getShort();
int typeNum = buffer.get();
if (typeNum >= PhysicalPlanType.values().length) {
throw new IOException(UNRECOGNIZED_LOG_TYPE + typeNum);
}
PhysicalPlanType type = PhysicalPlanType.values()[typeNum];
PhysicalPlan plan;
switch (type) {
case INSERT:
plan = new InsertRowPlan();
if (baseIndex < 0) {
plan.deserialize(buffer);
} else {
InsertRowPlan baseInsertRowPlan = (InsertRowPlan) getPlan(planWindow, baseIndex);
plan.deserialize(buffer, baseInsertRowPlan);
}
break;
case BATCHINSERT:
plan = new InsertTabletPlan();
if (baseIndex < 0) {
plan.deserialize(buffer);
} else {
InsertTabletPlan baseInsertTabletPlan =
(InsertTabletPlan) getPlan(planWindow, baseIndex);
plan.deserialize(buffer, baseInsertTabletPlan);
}
break;
default:
plan = create(type, buffer);
}
return plan;
}

private static PhysicalPlan getPlan(
RandomAccessArrayDeque<PhysicalPlan> planWindow, int index) {
return planWindow.get(index);
}
}

/** If you want to add new PhysicalPlanType, you must add it in the last. */
Expand Down Expand Up @@ -495,6 +559,62 @@ public void setIndex(long index) {
this.index = index;
}

protected void putDiffTime(long time, long base, ByteBuffer buffer) {
long timeDiff = time - base;
TimeDiffType diffType = TimeDiffType.fromDiff(timeDiff);
buffer.put((byte) diffType.ordinal());
switch (diffType) {
case INT:
buffer.putInt((int) timeDiff);
break;
case BYTE:
buffer.put((byte) timeDiff);
break;
case SHORT:
buffer.putShort((short) timeDiff);
break;
case LONG:
default:
// NOTICE HERE
buffer.putLong(time);
}
}

protected long getDiffTime(ByteBuffer buffer, long base) {
TimeDiffType diffType = TimeDiffType.values()[buffer.get()];
switch (diffType) {
case INT:
return buffer.getInt() + base;
case BYTE:
return buffer.get() + base;
case SHORT:
return buffer.getShort() + base;
case LONG:
default:
// NOTICE HERE
return buffer.getLong();
}
}

public enum TimeDiffType {
BYTE,
SHORT,
INT,
LONG;

public static TimeDiffType fromDiff(long timeDiff) {
if (Byte.MIN_VALUE <= timeDiff && timeDiff <= Byte.MAX_VALUE) {
return BYTE;
} else if (Short.MIN_VALUE <= timeDiff && timeDiff <= Short.MAX_VALUE) {
return SHORT;
} else if (Integer.MIN_VALUE <= timeDiff && timeDiff <= Integer.MAX_VALUE) {
return INT;
} else {
return LONG;
}
}
}

/**
* Check the integrity of the plan in case that the plan is generated by a careless user through
* Session API.
Expand Down
Loading