Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,11 @@ public boolean updateRegionCache(final TRegionRouteReq req) {
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return partitionCache.getRegionReplicaSet(id);
}

@Override
public void invalidAllCache() {
partitionCache.invalidAllCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.analyze;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
Expand Down Expand Up @@ -92,6 +94,8 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
/** Update region cache in partition cache when receive request from config node */
boolean updateRegionCache(TRegionRouteReq req);

TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id);

/** Invalid all partition cache */
void invalidAllCache();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ private boolean secondPhase(
stateMachine.transitionToFailed(status);
return false;
}

checkAllReplicaSetsConsistency();
} catch (IOException e) {
LOGGER.warn(
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}",
Expand All @@ -433,6 +435,11 @@ private boolean secondPhase(
LOGGER.warn("Interrupt or Execution error.", e);
stateMachine.transitionToFailed(e);
return false;
} catch (Exception e) {
LOGGER.warn(
String.format("Exception occurred during second phase of loading TsFile %s.", tsFile), e);
stateMachine.transitionToFailed(e);
return false;
}
return true;
}
Expand All @@ -447,6 +454,24 @@ private ByteBuffer assignProgressIndex(TsFileResource tsFileResource) throws IOE
}
}

public void checkAllReplicaSetsConsistency() throws RegionReplicaSetChangedException {
for (final TRegionReplicaSet replicaSet : allReplicaSets) {
final TConsensusGroupId regionId = replicaSet.getRegionId();
if (regionId == null) {
LOGGER.info(
"region id is null during region consistency check, will skip this region: {}",
replicaSet);
continue;
}

final TRegionReplicaSet currentReplicaSet =
partitionFetcher.fetcher.getRegionReplicaSet(regionId);
if (!Objects.equals(replicaSet, currentReplicaSet)) {
throw new RegionReplicaSetChangedException(replicaSet, currentReplicaSet);
}
}
}

private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
LOGGER.info("Start load TsFile {} locally.", node.getTsFileResource().getTsFile().getPath());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
return true;
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return null;
}

@Override
public void invalidAllCache() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
return false;
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return null;
}

@Override
public void invalidAllCache() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
return false;
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return null;
}

@Override
public void invalidAllCache() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.relational.analyzer;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
Expand Down Expand Up @@ -431,6 +433,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
return false;
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return null;
}

@Override
public void invalidAllCache() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.relational.analyzer;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
Expand Down Expand Up @@ -505,6 +507,11 @@ public boolean updateRegionCache(TRegionRouteReq req) {
return false;
}

@Override
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
return null;
}

@Override
public void invalidAllCache() {}

Expand Down
Loading