Skip to content

Commit

Permalink
Implement new ClusterPartitionFetcher interface
Browse files Browse the repository at this point in the history
  • Loading branch information
OneSizeFitsQuorum committed Jul 3, 2024
1 parent a472f9d commit 00100fa
Show file tree
Hide file tree
Showing 14 changed files with 414 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
Expand All @@ -52,8 +54,6 @@

import org.apache.thrift.TException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -64,9 +64,10 @@
import java.util.Map;
import java.util.Set;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;

public class ClusterPartitionFetcher implements IPartitionFetcher {

private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

private final SeriesPartitionExecutor partitionExecutor;
Expand Down Expand Up @@ -96,44 +97,23 @@ private ClusterPartitionFetcher() {

@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
patternTree.constructTree();
List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
Map<String, List<IDeviceID>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(deviceIDs, true, false, null);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
}
return getOrCreateSchemaPartition(patternTree, false, null);
}

@Override
public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree, String userName) {
return getOrCreateSchemaPartition(patternTree, true, userName);
}

private SchemaPartition getOrCreateSchemaPartition(
PathPatternTree patternTree, boolean isAutoCreate, String userName) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
patternTree.constructTree();
List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
Map<String, List<IDeviceID>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(deviceIDs, true, true, userName);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
Map<String, List<IDeviceID>> databaseToDevice =
partitionCache.getDatabaseToDevice(deviceIDs, true, isAutoCreate, userName);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(databaseToDevice);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
Expand Down Expand Up @@ -295,21 +275,54 @@ public void invalidAllCache() {

@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
String database, List<IDeviceID> deviceIDs, String userName) {
return getOrCreateSchemaPartition(database, deviceIDs, true, userName);
}

@Override
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDs) {
return getOrCreateSchemaPartition(database, deviceIDs, false, null);
}

private SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDs, boolean isAutoCreate, String userName) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate, userName);
SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(
new HashMap<String, List<IDeviceID>>() {
{
put(database, deviceIDs);
}
});
if (null == schemaPartition) {
PathPatternTree tree = new PathPatternTree();
tree.appendPathPattern(new PartialPath(database + "." + MULTI_LEVEL_PATH_WILDCARD));
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (ClientManagerException | TException | IllegalPathException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
}
}

@Override
public SchemaPartition getSchemaPartition(String database) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
return partitionCache.getSchemaPartition(database);
}

/** split data partition query param by database */
Expand All @@ -321,14 +334,14 @@ private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
deviceIDs.add(dataPartitionQueryParam.getDeviceID());
}
Map<IDeviceID, String> deviceToStorageGroupMap =
partitionCache.getDeviceToStorageGroup(deviceIDs, true, isAutoCreate, userName);
Map<IDeviceID, String> deviceToDatabase =
partitionCache.getDeviceToDatabase(deviceIDs, true, isAutoCreate, userName);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
IDeviceID deviceID = dataPartitionQueryParam.getDeviceID();
if (deviceToStorageGroupMap.containsKey(deviceID)) {
String storageGroup = deviceToStorageGroupMap.get(deviceID);
result.computeIfAbsent(storageGroup, key -> new ArrayList<>()).add(dataPartitionQueryParam);
if (deviceToDatabase.containsKey(deviceID)) {
String database = deviceToDatabase.get(deviceID);
result.computeIfAbsent(database, key -> new ArrayList<>()).add(dataPartitionQueryParam);
}
}
return result;
Expand Down Expand Up @@ -360,6 +373,7 @@ private TSchemaNodeManagementReq constructSchemaNodeManagementPartitionReq(
}

private static class ComplexTimeSlotList {

Set<TTimePartitionSlot> timeSlotList;
boolean needLeftAll;
boolean needRightAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
void invalidAllCache();

// ======================== Table Model Schema Partition Interface ========================

/**
* Get or create schema partition, used in data insertion with enable_auto_create_schema is true.
* if schemaPartition does not exist, then automatically create.
Expand All @@ -103,7 +104,7 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName);
String database, List<IDeviceID> deviceIDs, String userName);

/**
* For data query with completed id.
Expand All @@ -112,7 +113,7 @@ SchemaPartition getOrCreateSchemaPartition(
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList);
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDs);

/**
* For data query with partial device id conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;

public abstract class StorageGroupCacheResult<K, V> {
public abstract class DatabaseCacheResult<K, V> {
/** the result */
private boolean success = true;

Expand Down Expand Up @@ -56,7 +56,7 @@ public Map<K, V> getMap() {
return map;
}

public abstract void put(IDeviceID device, String storageGroupName);
public abstract void put(IDeviceID device, String databaseName);

/** set failed and clear the map */
public void setFailed() {
Expand Down
Loading

0 comments on commit 00100fa

Please sign in to comment.