Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new ClusterPartitionFetcher interface #12848

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
Expand All @@ -52,8 +54,6 @@

import org.apache.thrift.TException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -64,9 +64,10 @@
import java.util.Map;
import java.util.Set;

import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;

public class ClusterPartitionFetcher implements IPartitionFetcher {

private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

private final SeriesPartitionExecutor partitionExecutor;
Expand Down Expand Up @@ -96,44 +97,23 @@ private ClusterPartitionFetcher() {

@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
patternTree.constructTree();
List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
Map<String, List<IDeviceID>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(deviceIDs, true, false, null);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
}
return getOrCreateSchemaPartition(patternTree, false, null);
}

@Override
public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree, String userName) {
return getOrCreateSchemaPartition(patternTree, true, userName);
}

private SchemaPartition getOrCreateSchemaPartition(
PathPatternTree patternTree, boolean isAutoCreate, String userName) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
patternTree.constructTree();
List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
Map<String, List<IDeviceID>> storageGroupToDeviceMap =
partitionCache.getStorageGroupToDevice(deviceIDs, true, true, userName);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(storageGroupToDeviceMap);
Map<String, List<IDeviceID>> databaseToDevice =
partitionCache.getDatabaseToDevice(deviceIDs, true, isAutoCreate, userName);
SchemaPartition schemaPartition = partitionCache.getSchemaPartition(databaseToDevice);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
Expand Down Expand Up @@ -295,21 +275,54 @@ public void invalidAllCache() {

@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
String database, List<IDeviceID> deviceIDs, String userName) {
return getOrCreateSchemaPartition(database, deviceIDs, true, userName);
}

@Override
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
public SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDs) {
return getOrCreateSchemaPartition(database, deviceIDs, false, null);
}

private SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDs, boolean isAutoCreate, String userName) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate, userName);
SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(
new HashMap<String, List<IDeviceID>>() {
{
put(database, deviceIDs);
}
});
if (null == schemaPartition) {
PathPatternTree tree = new PathPatternTree();
tree.appendPathPattern(new PartialPath(database + "." + MULTI_LEVEL_PATH_WILDCARD));
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
if (schemaPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
schemaPartition = parseSchemaPartitionTableResp(schemaPartitionTableResp);
partitionCache.updateSchemaPartitionCache(
schemaPartitionTableResp.getSchemaPartitionTable());
} else {
throw new RuntimeException(
new IoTDBException(
schemaPartitionTableResp.getStatus().getMessage(),
schemaPartitionTableResp.getStatus().getCode()));
}
}
return schemaPartition;
} catch (ClientManagerException | TException | IllegalPathException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
}
}

@Override
public SchemaPartition getSchemaPartition(String database) {
// todo implement related logic @Potato
throw new UnsupportedOperationException("Unsupported schema partition operation");
return partitionCache.getSchemaPartition(database);
}

/** split data partition query param by database */
Expand All @@ -321,14 +334,14 @@ private Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParam(
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
deviceIDs.add(dataPartitionQueryParam.getDeviceID());
}
Map<IDeviceID, String> deviceToStorageGroupMap =
partitionCache.getDeviceToStorageGroup(deviceIDs, true, isAutoCreate, userName);
Map<IDeviceID, String> deviceToDatabase =
partitionCache.getDeviceToDatabase(deviceIDs, true, isAutoCreate, userName);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
IDeviceID deviceID = dataPartitionQueryParam.getDeviceID();
if (deviceToStorageGroupMap.containsKey(deviceID)) {
String storageGroup = deviceToStorageGroupMap.get(deviceID);
result.computeIfAbsent(storageGroup, key -> new ArrayList<>()).add(dataPartitionQueryParam);
if (deviceToDatabase.containsKey(deviceID)) {
String database = deviceToDatabase.get(deviceID);
result.computeIfAbsent(database, key -> new ArrayList<>()).add(dataPartitionQueryParam);
}
}
return result;
Expand Down Expand Up @@ -360,6 +373,7 @@ private TSchemaNodeManagementReq constructSchemaNodeManagementPartitionReq(
}

private static class ComplexTimeSlotList {

Set<TTimePartitionSlot> timeSlotList;
boolean needLeftAll;
boolean needRightAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
void invalidAllCache();

// ======================== Table Model Schema Partition Interface ========================

/**
* Get or create schema partition, used in data insertion with enable_auto_create_schema is true.
* if schemaPartition does not exist, then automatically create.
Expand All @@ -103,7 +104,7 @@ SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName);
String database, List<IDeviceID> deviceIDs, String userName);

/**
* For data query with completed id.
Expand All @@ -112,7 +113,7 @@ SchemaPartition getOrCreateSchemaPartition(
*
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDList);
SchemaPartition getSchemaPartition(String database, List<IDeviceID> deviceIDs);

/**
* For data query with partial device id conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;

public abstract class StorageGroupCacheResult<K, V> {
public abstract class DatabaseCacheResult<K, V> {
/** the result */
private boolean success = true;

Expand Down Expand Up @@ -56,7 +56,7 @@ public Map<K, V> getMap() {
return map;
}

public abstract void put(IDeviceID device, String storageGroupName);
public abstract void put(IDeviceID device, String databaseName);

/** set failed and clear the map */
public void setFailed() {
Expand Down
Loading
Loading