Skip to content

Commit

Permalink
[dingo-net-netty, dingo-store-mpu] Fix checkstyle warnings.
Browse files Browse the repository at this point in the history
Signed-off-by: Ketor <d.ketor@gmail.com>
  • Loading branch information
ketor authored and astor-oss committed Nov 19, 2022
1 parent 5e52f8e commit 4a93592
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 80 deletions.
Expand Up @@ -27,7 +27,41 @@
import io.dingodb.mpu.storage.Storage;
import io.dingodb.net.service.FileTransferService;
import lombok.extern.slf4j.Slf4j;
import org.rocksdb.*;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.BackgroundErrorReason;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompressionType;
import org.rocksdb.ConfigOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.FileOperationInfo;
import org.rocksdb.FlushJobInfo;
import org.rocksdb.FlushOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.MemTableInfo;
import org.rocksdb.OptionsUtil;
import org.rocksdb.Range;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Slice;
import org.rocksdb.Snapshot;
import org.rocksdb.Status;
import org.rocksdb.StringAppendOperator;
import org.rocksdb.TableFileCreationBriefInfo;
import org.rocksdb.TableFileCreationInfo;
import org.rocksdb.TableFileDeletionInfo;
import org.rocksdb.TtlDB;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.rocksdb.WriteStallInfo;

import java.io.File;
import java.nio.file.Files;
Expand All @@ -36,11 +70,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.Comparator;

import static io.dingodb.common.codec.PrimitiveCodec.encodeLong;
import static io.dingodb.mpu.Constant.API;
Expand Down Expand Up @@ -76,7 +108,7 @@ public class RocksStorage implements Storage {
public final WriteOptions writeOptions;
public final LinkedRunner runner;

public Checkpoint checkpoint;
public Checkpoint checkPoint;
public RocksDB instruction;
public RocksDB db;

Expand All @@ -89,7 +121,7 @@ public class RocksStorage implements Storage {
private ColumnFamilyDescriptor icfDesc;

private boolean destroy = false;
private boolean disable_checkpoint_purge = false;
private boolean disableCheckpointPurge = false;

private static final int MAX_BLOOM_HASH_NUM = 10;
private static final int MAX_PREFIX_LENGTH = 4;
Expand Down Expand Up @@ -122,7 +154,7 @@ public RocksStorage(CoreMeta coreMeta, String path, final String dbRocksOptionsF
this.db = createDB();
this.writeOptions = new WriteOptions();
log.info("Create {} db, ttl: {}.", coreMeta.label, this.ttl);
checkpoint = Checkpoint.create(db);
checkPoint = Checkpoint.create(db);
log.info("Create rocks storage for {} success.", coreMeta.label);
}

Expand Down Expand Up @@ -153,9 +185,9 @@ private RocksDB createInstruction() throws RocksDBException {
cfs.add(icfDesc);

List<ColumnFamilyHandle> handles = new ArrayList<>();
RocksDB instruction = RocksDB.open(options, this.instructionPath.toString(), cfs, handles);
log.info("RocksStorage createInstruction, RocksDB open, path: {}, options file: {}, handles size: {}.",
this.instructionPath, this.logRocksOptionsFile, handles.size());
RocksDB instruction = RocksDB.open(options, this.instructionPath.toString(), cfs, handles);
this.icfHandler = handles.get(0);
assert (this.icfHandler != null);

Expand Down Expand Up @@ -236,8 +268,8 @@ public void destroy() {
this.icfHandler = null;
this.instruction.close();
this.instruction = null;
this.checkpoint.close();
this.checkpoint = null;
this.checkPoint.close();
this.checkPoint = null;
/**
* to avoid the file handle leak when drop table
*/
Expand Down Expand Up @@ -268,10 +300,10 @@ public CompletableFuture<Void> transferTo(CoreMeta meta) {
String target = storageApi.transferBackup(meta.mpuId, meta.coreId);

//disable checkpoint purge while transferring checkpoint
this.disable_checkpoint_purge = true;
String checkpoint_name = GetLatestCheckpointName(LOCAL_CHECKPOINT_PREFIX);
FileTransferService.transferTo(meta.location, checkpointPath.resolve(checkpoint_name), Paths.get(target));
this.disable_checkpoint_purge = false;
this.disableCheckpointPurge = true;
String checkpointName = getLatestCheckpointName(LOCAL_CHECKPOINT_PREFIX);
FileTransferService.transferTo(meta.location, checkpointPath.resolve(checkpointName), Paths.get(target));
this.disableCheckpointPurge = false;

//call remote node to apply checkpoint into db
storageApi.applyBackup(meta.mpuId, meta.coreId);
Expand Down Expand Up @@ -300,7 +332,7 @@ private void flushInstruction() {
}
}

/**
/*
* Create new RocksDB checkpoint in backup dir
*
* @throws RuntimeException
Expand All @@ -310,56 +342,59 @@ public void createNewCheckpoint() {
return;
}
try {
if (checkpoint != null) {
// use nanoTime as checkpoint_name
String checkpoint_name = String.format("%s%d", LOCAL_CHECKPOINT_PREFIX, System.nanoTime());
String checkpoint_dir_name = this.checkpointPath.resolve(checkpoint_name).toString();
checkpoint.createCheckpoint(checkpoint_dir_name);
if (checkPoint != null) {
// use nanoTime as checkpointName
String checkpointName = String.format("%s%d", LOCAL_CHECKPOINT_PREFIX, System.nanoTime());
String checkpointDirName = this.checkpointPath.resolve(checkpointName).toString();

log.info("RocksStorage::createNewCheckpoint start " + checkpointName);
checkPoint.createCheckpoint(checkpointDirName);
log.info("RocksStorage::createNewCheckpoint finish " + checkpointName);
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
}

/**
/*
* Get Latest Checkpoint Dir Name
*
* @param prefix Prefix of the checkpoint_name
* Use prefix to identify remote or local checkpoint.
*/
public String GetLatestCheckpointName(String prefix){
String latest_checkpoint_name = "";
if (checkpoint != null) {
public String getLatestCheckpointName(String prefix) {
String latestCheckpointName = "";
if (checkPoint != null) {
File[] directories = new File(this.checkpointPath.toString()).listFiles(File::isDirectory);
if (directories.length > 0) {
for (File checkpoint_dir: directories) {
for (File checkpointDir: directories) {
// dir name end with ".tmp" maybe temp dir or litter dir
if((!checkpoint_dir.getName().endsWith(".tmp")) && checkpoint_dir.getName().startsWith(prefix)) {
if (checkpoint_dir.getName().compareTo(latest_checkpoint_name) > 0) {
latest_checkpoint_name = checkpoint_dir.getName();
if ((!checkpointDir.getName().endsWith(".tmp")) && checkpointDir.getName().startsWith(prefix)) {
if (checkpointDir.getName().compareTo(latestCheckpointName) > 0) {
latestCheckpointName = checkpointDir.getName();
}
}
}
}
}
return latest_checkpoint_name;
return latestCheckpointName;
}

/**
/*
* Purge Old checkpoint, only retain latest [count] checkpoints.
*
* @param count Count of checkpoint to retain
*
* @throws RuntimeException
*/
public void purgeOldCheckpoint(int count) {
if (destroy || disable_checkpoint_purge) {
if (destroy || disableCheckpointPurge) {
return;
}
try {
// Sort directory names by alphabetical order
Comparator<File> comparatorFileName = new Comparator<File>(){
public int compare(File p1,File p2){
Comparator<File> comparatorFileName = new Comparator<File>() {
public int compare(File p1,File p2) {
return p2.getName().compareTo(p1.getName());
}
};
Expand All @@ -372,11 +407,12 @@ public int compare(File p1,File p2){
if (directories.length > count) {
for (int i = 0; i < directories.length; i++) {
// dir name end with ".tmp" is delayed to delete
if(!directories[i].getName().endsWith(".tmp")) {
if (!directories[i].getName().endsWith(".tmp")) {
persistCount++;
}

if(persistCount > count){
if (persistCount > count) {
log.info("RocksStorage::purgeOldCheckpoint delete " + directories[i].toString());
FileUtils.deleteIfExists(directories[i].toPath());
}
}
Expand All @@ -386,47 +422,47 @@ public int compare(File p1,File p2){
}
}

/**
/*
* Restore db from latest checkpoint
*
* @throws RuntimeException
*/
@Override
public void applyBackup() {
log.info("RocksStorage::restoreFromLatestCheckpoint");
if (destroy) {
throw new RuntimeException();
}
try {
String remote_checkpoint_dir = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint");
if (remote_checkpoint_dir.length() == 0){
String remoteCheckpointDir = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint");
if (remoteCheckpointDir.length() == 0) {
throw new RuntimeException("GetLatestCheckpointName return null string");
}
log.info("RocksStorage::restoreFromLatestCheckpoint remote_checkpoint_dir=" + remote_checkpoint_dir);
log.info("RocksStorage::applyBackup start [" + remoteCheckpointDir + "]");

//1.generate temp new db dir for new RocksDB
Path temp_new_db_path = this.path.resolve("load_from_"+ remote_checkpoint_dir);
Path temp_old_db_path = this.path.resolve("will_delete_soon_"+ remote_checkpoint_dir);
Path tempNewDbPath = this.path.resolve("load_from_" + remoteCheckpointDir);

//2.rename remote checkpoint to temp_new_db_path
Files.move(this.path.resolve(remote_checkpoint_dir), temp_new_db_path);
//2.rename remote checkpoint to tempNewDbPath
Files.move(this.path.resolve(remoteCheckpointDir), tempNewDbPath);

//3.rename old db to will_delete_soon_[checkpoint_name]
checkpoint.close();
checkpoint = null;
checkPoint.close();
checkPoint = null;
closeDB();
Files.move(this.dbPath, temp_old_db_path);

Path tempOldDbPath = this.path.resolve("will_delete_soon_" + remoteCheckpointDir);
Files.move(this.dbPath, tempOldDbPath);

//4.rename temp new db dir to new db dir
Files.move(temp_new_db_path, this.dbPath);
Files.move(tempNewDbPath, this.dbPath);

//5.createDB()
db = createDB();
checkpoint = Checkpoint.create(db);
checkPoint = Checkpoint.create(db);

//6.delete old db thoroughly
FileUtils.deleteIfExists(temp_old_db_path);
log.info("RocksStorage::restoreFromLatestCheckpoint finished =" + remote_checkpoint_dir);
FileUtils.deleteIfExists(tempOldDbPath);
log.info("RocksStorage::applyBackup finished [" + remoteCheckpointDir + "]");

} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -448,13 +484,13 @@ public void backup() {

@Override
public String receiveBackup() {
String checkpoint_name = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint");
log.info(String.format("receiveBackup path=[%s]\n", this.path.resolve(checkpoint_name).toString()));
String checkpointName = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint");
log.info(String.format("receiveBackup path=[%s]\n", this.path.resolve(checkpointName).toString()));

FileUtils.deleteIfExists(this.path.resolve(checkpoint_name));
FileUtils.createDirectories(this.path.resolve(checkpoint_name));
FileUtils.deleteIfExists(this.path.resolve(checkpointName));
FileUtils.createDirectories(this.path.resolve(checkpointName));

return this.path.resolve(checkpoint_name).toString();
return this.path.resolve(checkpointName).toString();
}

@Override
Expand Down

0 comments on commit 4a93592

Please sign in to comment.