Skip to content
Permalink
Browse files
[IOTDB-3024] Implement SchemaRegion Memory mode snapshot (#5925)
  • Loading branch information
MarcosZyk committed May 18, 2022
1 parent 80475f7 commit 05fc9f5421a47b589d4bec87b8655a7e3515beaa
Showing 10 changed files with 283 additions and 14 deletions.
@@ -165,7 +165,8 @@ public RSchemaRegion(
throws MetadataException {
this.schemaRegionId = schemaRegionId;
storageGroupFullPath = storageGroup.getFullPath();
init(storageGroupMNode);
this.storageGroupMNode = storageGroupMNode;
init();
try {
readWriteHandler = new RSchemaReadWriteHandler(schemaRegionDirPath, rSchemaConfLoader);
} catch (RocksDBException e) {
@@ -175,8 +176,7 @@ public RSchemaRegion(
}

@Override
public void init(IStorageGroupMNode storageGroupMNode) throws MetadataException {
this.storageGroupMNode = storageGroupMNode;
public void init() throws MetadataException {
schemaRegionDirPath =
config.getSchemaDir()
+ File.separator
@@ -218,6 +218,20 @@ public void deleteSchemaRegion() throws MetadataException {
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
}

@Override
public boolean createSnapshot(File snapshotDir) {
// todo implement this
throw new UnsupportedOperationException(
"Rocksdb mode currently doesn't support snapshot feature.");
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {
// todo implement this
throw new UnsupportedOperationException(
"Rocksdb mode currently doesn't support snapshot feature.");
}

@Override
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
try {
@@ -52,11 +52,13 @@ public void stop() {}

@Override
public boolean takeSnapshot(File snapshotDir) {
return false;
return schemaRegion.createSnapshot(snapshotDir);
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {}
public void loadSnapshot(File latestSnapshotRootDir) {
schemaRegion.loadSnapshot(latestSnapshotRootDir);
}

@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
@@ -45,6 +45,11 @@ private MetadataConstant() {
public static final String SCHEMA_FILE_NAME = "schema_file.pst";
public static final String SCHEMA_LOG_FILE_NAME = "schema_file_log.bin";

public static final String METADATA_LOG_SNAPSHOT = "mlog.bin.snapshot";
public static final String METADATA_LOG_SNAPSHOT_TMP = "mlog.bin.snapshot.tmp";
public static final String TAG_LOG_SNAPSHOT = "tlog.txt.snapshot";
public static final String TAG_LOG_SNAPSHOT_TMP = "tlog.txt.snapshot.tmp";

public static final String[] ALL_RESULT_NODES = new String[] {"root", "**"};
public static final PartialPath ALL_MATCH_PATTERN = new PartialPath(new String[] {"root", "**"});

@@ -51,6 +51,7 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -99,6 +100,14 @@ public void close() throws IOException {
logWriter.close();
}

public synchronized void copyTo(File targetFile) throws IOException {
// flush the mlogBuffer
sync();
// flush the os buffer
force();
FileUtils.copyFile(logFile, targetFile);
}

private void sync() {
try {
logWriter.write(mlogBuffer);
@@ -26,7 +26,6 @@
import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -43,6 +42,7 @@
import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.tsfile.utils.Pair;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -76,7 +76,7 @@
public interface ISchemaRegion {

// region Interfaces for initialization、recover and clear
void init(IStorageGroupMNode storageGroupMNode) throws MetadataException;
void init() throws MetadataException;

/** clear all metadata components of this schemaRegion */
void clear();
@@ -91,6 +91,10 @@ public interface ISchemaRegion {

// delete this schemaRegion and clear all resources
void deleteSchemaRegion() throws MetadataException;

boolean createSnapshot(File snapshotDir);

void loadSnapshot(File latestSnapshotRootDir);
// endregion

// region Interfaces for Timeseries operation
@@ -84,6 +84,7 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.commons.io.FileUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -159,6 +160,8 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion {

private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();

private final IStorageGroupMNode storageGroupMNode;
private MTreeBelowSGMemoryImpl mtree;
// device -> DeviceMNode
private LoadingCache<PartialPath, IMNode> mNodeCache;
@@ -186,11 +189,12 @@ public SchemaRegionMemoryImpl(
}
});

init(storageGroupMNode);
this.storageGroupMNode = storageGroupMNode;
init();
}

@SuppressWarnings("squid:S2093")
public synchronized void init(IStorageGroupMNode storageGroupMNode) throws MetadataException {
public synchronized void init() throws MetadataException {
if (initialized) {
return;
}
@@ -409,6 +413,79 @@ public synchronized void deleteSchemaRegion() throws MetadataException {
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
}

@Override
public synchronized boolean createSnapshot(File snapshotDir) {
File mLogSnapshotTmp =
SystemFileFactory.INSTANCE.getFile(snapshotDir, MetadataConstant.METADATA_LOG_SNAPSHOT_TMP);
File mLogSnapshot =
SystemFileFactory.INSTANCE.getFile(snapshotDir, MetadataConstant.METADATA_LOG_SNAPSHOT);

try {
logWriter.copyTo(mLogSnapshotTmp);
if (mLogSnapshot.exists() && !mLogSnapshot.delete()) {
logger.error(
"Failed to delete old snapshot file {} while creating snapshot for schemaRegion {}.",
mLogSnapshot.getName(),
schemaRegionId);
return false;
}

if (!mLogSnapshotTmp.renameTo(mLogSnapshot)) {
logger.error(
"Failed to rename {} to {} while creating snapshot for schemaRegion {}.",
mLogSnapshotTmp.getName(),
mLogSnapshot.getName(),
schemaRegionId);
mLogSnapshot.delete();
return false;
}

return tagManager.createSnapshot(snapshotDir);
} catch (IOException e) {
logger.error(
"Failed to create snapshot for schemaRegion {} due to {}",
schemaRegionId,
e.getMessage(),
e);
mLogSnapshot.delete();
return false;
} finally {
mLogSnapshotTmp.delete();
}
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {
File mLogSnapshot =
SystemFileFactory.INSTANCE.getFile(
latestSnapshotRootDir, MetadataConstant.METADATA_LOG_SNAPSHOT);
File tagSnapshot =
SystemFileFactory.INSTANCE.getFile(
latestSnapshotRootDir, MetadataConstant.TAG_LOG_SNAPSHOT);

clear();

File mLog =
SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath, MetadataConstant.METADATA_LOG);
File tagFile =
SystemFileFactory.INSTANCE.getFile(schemaRegionDirPath, MetadataConstant.TAG_LOG);
mLog.delete();
tagFile.delete();

try {
FileUtils.copyFile(mLogSnapshot, mLog);
FileUtils.copyFile(tagSnapshot, tagFile);

init();
} catch (IOException | MetadataException e) {
logger.error(
"Failed to load snapshot for schemaRegion {} ue to {}",
schemaRegionId,
e.getMessage(),
e);
}
}

// endregion

// region Interfaces and Implementation for Timeseries operation
@@ -161,6 +161,8 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {

private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance();
private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance();

private final IStorageGroupMNode storageGroupMNode;
private MTreeBelowSGCachedImpl mtree;
// device -> DeviceMNode
private LoadingCache<PartialPath, IMNode> mNodeCache;
@@ -194,12 +196,12 @@ public SchemaRegionSchemaFileImpl(
return mtree.getNodeByPath(partialPath);
}
});

init(storageGroupMNode);
this.storageGroupMNode = storageGroupMNode;
init();
}

@SuppressWarnings("squid:S2093")
public synchronized void init(IStorageGroupMNode storageGroupMNode) throws MetadataException {
public synchronized void init() throws MetadataException {
if (initialized) {
return;
}
@@ -420,6 +422,20 @@ public synchronized void deleteSchemaRegion() throws MetadataException {
SchemaRegionUtils.deleteSchemaRegionFolder(schemaRegionDirPath, logger);
}

@Override
public boolean createSnapshot(File snapshotDir) {
// todo implement this
throw new UnsupportedOperationException(
"Schema_File mode currently doesn't support snapshot feature.");
}

@Override
public void loadSnapshot(File latestSnapshotRootDir) {
// todo implement this
throw new UnsupportedOperationException(
"Schema_File mode currently doesn't support snapshot feature.");
}

// endregion

// region Interfaces and Implementation for Timeseries operation
@@ -24,6 +24,7 @@
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -40,6 +41,7 @@
public class TagLogFile implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(TagLogFile.class);
private File tagFile;
private FileChannel fileChannel;
private static final String LENGTH_EXCEED_MSG =
"Tag/Attribute exceeds the max length limit. "
@@ -63,11 +65,11 @@ public TagLogFile(String schemaDir, String logFileName) throws IOException {
}
}

File logFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);
tagFile = SystemFileFactory.INSTANCE.getFile(schemaDir + File.separator + logFileName);

this.fileChannel =
FileChannel.open(
logFile.toPath(),
tagFile.toPath(),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);
@@ -79,6 +81,12 @@ public TagLogFile(String schemaDir, String logFileName) throws IOException {
}
}

public synchronized void copyTo(File targetFile) throws IOException {
// flush os buffer
fileChannel.force(true);
FileUtils.copyFile(tagFile, targetFile);
}

/** @return tags map, attributes map */
public Pair<Map<String, String>, Map<String, String>> read(int size, long position)
throws IOException {
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.metadata.tag;

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -38,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -73,6 +75,38 @@ public TagManager(String sgSchemaDirPath) throws IOException {
tagLogFile = new TagLogFile(sgSchemaDirPath, MetadataConstant.TAG_LOG);
}

public synchronized boolean createSnapshot(File targetDir) {
File tagLogSnapshot =
SystemFileFactory.INSTANCE.getFile(targetDir, MetadataConstant.TAG_LOG_SNAPSHOT);
File tagLogSnapshotTmp =
SystemFileFactory.INSTANCE.getFile(targetDir, MetadataConstant.TAG_LOG_SNAPSHOT_TMP);
try {
tagLogFile.copyTo(tagLogSnapshotTmp);
if (tagLogSnapshot.exists() && !tagLogSnapshot.delete()) {
logger.error(
"Failed to delete old snapshot {} while creating tagManager snapshot.",
tagLogSnapshot.getName());
return false;
}
if (!tagLogSnapshotTmp.renameTo(tagLogSnapshot)) {
logger.error(
"Failed to rename {} to {} while creating tagManager snapshot.",
tagLogSnapshotTmp.getName(),
tagLogSnapshot.getName());
tagLogSnapshot.delete();
return false;
}

return true;
} catch (IOException e) {
logger.error("Failed to create tagManager snapshot due to {}", e.getMessage(), e);
tagLogSnapshot.delete();
return false;
} finally {
tagLogSnapshotTmp.delete();
}
}

public boolean recoverIndex(long offset, IMeasurementMNode measurementMNode) throws IOException {
Map<String, String> tags = tagLogFile.readTag(config.getTagAttributeTotalSize(), offset);
if (tags == null || tags.isEmpty()) {

0 comments on commit 05fc9f5

Please sign in to comment.