Skip to content

Commit

Permalink
Revert "Fix bugs of add duplicated metadata (#918)"
Browse files Browse the repository at this point in the history
This reverts commit a92b80c.
  • Loading branch information
fanhualta committed Mar 18, 2020
1 parent a92b80c commit e51b03b
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1537,9 +1537,8 @@ public void loadNewTsFileForSync(TsFileResource newTsFileResource)
writeLock();
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource)){
updateLatestTimeMap(newTsFileResource);
}
loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource);
updateLatestTimeMap(newTsFileResource);
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
Expand Down Expand Up @@ -1781,9 +1780,8 @@ private void updateLatestTimeMap(TsFileResource newTsFileResource) {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @UsedBy sync module, load external tsfile module.
* @return load the file successfully
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
private void loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource)
throws TsFileProcessorException, DiskSpaceInsufficientException {
File targetFile;
Expand All @@ -1795,10 +1793,6 @@ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
storageGroupName + File.separatorChar + timeRangeId + File.separator + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if(unSequenceFileList.contains(tsFileResource)){
logger.error("The file {} has already been loaded in unsequence list", tsFileResource);
return false;
}
unSequenceFileList.add(tsFileResource);
logger.info("Load tsfile in unsequence list, move file from {} to {}",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
Expand All @@ -1809,10 +1803,6 @@ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
storageGroupName + File.separatorChar + timeRangeId + File.separator
+ tsFileResource.getFile().getName());
tsFileResource.setFile(targetFile);
if(sequenceFileTreeSet.contains(tsFileResource)){
logger.error("The file {} has already been loaded in sequence list", tsFileResource);
return false;
}
sequenceFileTreeSet.add(tsFileResource);
logger.info("Load tsfile in sequence list, move file from {} to {}",
syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath());
Expand Down Expand Up @@ -1850,7 +1840,6 @@ private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(),
e.getMessage()));
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,13 @@ private String deleteOneTimeseriesAndUpdateStatisticsAndLog(String path)
public void setStorageGroup(String storageGroup) throws MetadataException {
lock.writeLock().lock();
try {
mtree.setStorageGroup(storageGroup);
if (writeToLog) {
BufferedWriter writer = getLogWriter();
writer.write(MetadataOperationType.SET_STORAGE_GROUP + "," + storageGroup);
writer.newLine();
writer.flush();
}
mtree.setStorageGroup(storageGroup);
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
seriesNumberInStorageGroups.put(storageGroup, 0);
Expand Down
3 changes: 0 additions & 3 deletions server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ void createTimeseries(String path, TSDataType dataType, TSEncoding encoding,
}
MNode leaf = new LeafMNode(cur, nodeNames[nodeNames.length - 1], dataType, encoding,
compressor, props);
if (cur.hasChild(leaf.getName())) {
throw new MetadataException(String.format("The timeseries %s has already existed.", path));
}
cur.addChild(leaf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.security.NoSuchAlgorithmException;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand All @@ -46,7 +45,6 @@
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -77,20 +75,8 @@ public class SyncServiceImpl implements SyncService.Iface {
* Verify IP address of sender
*/
@Override
public SyncStatus check(ConfirmInfo info) {
String ipAddress = info.address, uuid = info.uuid;
public SyncStatus check(String ipAddress, String uuid) {
Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
if (!info.version.equals(IoTDBConstant.VERSION)) {
return getErrorResult(String.format("Version mismatch: the sender <%s>, the receiver <%s>",
info.version, IoTDBConstant.VERSION));
}
if (info.partitionInterval != IoTDBDescriptor.getInstance().getConfig()
.getPartitionInterval()) {
return getErrorResult(String
.format("Partition interval mismatch: the sender <%d>, the receiver <%d>",
info.partitionInterval,
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval()));
}
if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
senderName.set(ipAddress + SyncConstant.SYNC_DIR_NAME_SEPARATOR + uuid);
if (checkRecovery()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public void getCurrentLocalFiles(String dataDir) {
.equals(TsFileConstant.PATH_UPGRADE)) {
continue;
}
try {
if (!MManager.getInstance().getStorageGroupName(sgFolder.getName())
.equals(sgFolder.getName())) {
// the folder is not a sg folder
continue;
}
} catch (MetadataException e) {
// the folder is not a sg folder
continue;
}
allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
for (File timeRangeFolder : sgFolder.listFiles()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.SyncConnectionException;
Expand All @@ -68,7 +67,6 @@
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
import org.apache.iotdb.tsfile.utils.BytesUtils;
Expand Down Expand Up @@ -288,12 +286,9 @@ public void establishConnection(String serverIp, int serverPort) throws SyncConn

@Override
public void confirmIdentity() throws SyncConnectionException {
try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) {
ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(),
getOrCreateUUID(getUuidFile()),
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION);
try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())){
SyncStatus status = serviceClient
.check(info);
.check(socket.getLocalAddress().getHostAddress(), getOrCreateUUID(getUuidFile()));
if (status.code != SUCCESS_CODE) {
throw new SyncConnectionException(
"The receiver rejected the synchronization task because " + status.msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public static void checkTsFileResource(TsFileResource tsFileResource) throws IOE
} else {
tsFileResource.deserialize();
}
tsFileResource.setClosed(true);
}

public static void updateTsFileResource(TsFileMetaData metaData, TsFileSequenceReader reader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void testGetAllFileNamesByPath() {
root.setStorageGroup("root.laptop.d2");
root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.GZIP, null);
root.createTimeseries("root.laptop.d1.s2", TSDataType.INT32, TSEncoding.PLAIN,
root.createTimeseries("root.laptop.d1.s1", TSDataType.INT32, TSEncoding.PLAIN,
CompressionType.GZIP, null);

List<String> list = new ArrayList<>();
Expand Down
23 changes: 5 additions & 18 deletions service-rpc/src/main/thrift/sync.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,15 @@
*/
namespace java org.apache.iotdb.service.sync.thrift

struct SyncStatus{
1:required i32 code
2:required string msg
}

// The sender and receiver need to check some info to confirm validity
struct ConfirmInfo{
// check whether the ip of sender is in thw white list of receiver.
1:string address
typedef i32 int

// Sender needs to tell receiver its identity.
2:string uuid

// The partition interval of sender and receiver need to be the same.
3:i64 partitionInterval

// The version of sender and receiver need to be the same.
4:string version
struct SyncStatus{
required i32 code
required string msg
}

service SyncService{
SyncStatus check(ConfirmInfo info)
SyncStatus check(1:string address, 2:string uuid)
SyncStatus startSync();
SyncStatus init(1:string storageGroupName)
SyncStatus syncDeletedFileName(1:string fileName)
Expand Down

0 comments on commit e51b03b

Please sign in to comment.