Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,28 +273,28 @@ public DataPartition getOrCreateDataPartition(
@Override
public DataPartition getOrCreateDataPartition(
final List<DataPartitionQueryParam> dataPartitionQueryParams, final String userName) {
DataPartition dataPartition;
final Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
splitDataPartitionQueryParam(
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), userName);
DataPartition dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null != dataPartition) {
return dataPartition;
}

try (final ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams =
splitDataPartitionQueryParam(
dataPartitionQueryParams, config.isAutoCreateSchemaEnabled(), userName);
dataPartition = partitionCache.getDataPartition(splitDataPartitionQueryParams);

if (null == dataPartition) {
final TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
final TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(req);
final TDataPartitionReq req = constructDataPartitionReq(splitDataPartitionQueryParams);
final TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(req);

if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new IoTDBRuntimeException(
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode());
}
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
} else {
throw new IoTDBRuntimeException(
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode());
}
} catch (final ClientManagerException | TException e) {
throw new StatementAnalyzeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,31 @@ public boolean needDecodeTsFile(
return true;
}

List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
resource
.getDevices()
.forEach(
o -> {
// iterating the index, must present
slotList.add(
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(o).get())));
slotList.add(
new Pair<>(
o, TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(o).get())));
});
List<Pair<IDeviceID, TTimePartitionSlot>> slotList =
new ArrayList<>(resource.getDevices().size() << 1);
for (final IDeviceID device : resource.getDevices()) {
// iterating the index, must present
final TTimePartitionSlot startSlot =
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
final TTimePartitionSlot endSlot =
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
slotList.add(new Pair<>(device, startSlot));
if (!startSlot.equals(endSlot)) {
slotList.add(new Pair<>(device, endSlot));
}
}

if (slotList.isEmpty()) {
throw new IllegalStateException(
String.format("Devices in TsFile %s is empty, this should not happen here.", tsFile));
} else if (slotList.stream()
.anyMatch(slotPair -> !slotPair.getRight().equals(slotList.get(0).right))) {
needDecodeTsFile = true;
} else {
final TTimePartitionSlot firstSlot = slotList.get(0).right;
for (int i = 1, size = slotList.size(); i < size; i++) {
if (!slotList.get(i).right.equals(firstSlot)) {
needDecodeTsFile = true;
return true;
}
}
needDecodeTsFile = !isDispatchedToLocal(new HashSet<>(partitionFetcher.apply(slotList)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,14 +782,29 @@ private void routeChunkData() throws LoadFileException {
return;
}

final List<Pair<IDeviceID, TTimePartitionSlot>> partitionSlotList = new ArrayList<>();
final int[] chunkPartitionIndexes = new int[nonDirectionalChunkData.size()];
final Map<IDeviceID, Map<TTimePartitionSlot, Integer>> partitionSlotIndexes = new HashMap<>();
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
final ChunkData chunkData = nonDirectionalChunkData.get(i);
final IDeviceID device = chunkData.getDevice();
final TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot();
final Map<TTimePartitionSlot, Integer> slotIndexes =
partitionSlotIndexes.computeIfAbsent(device, key -> new HashMap<>());
Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
if (partitionSlotIndex == null) {
partitionSlotIndex = partitionSlotList.size();
slotIndexes.put(timePartitionSlot, partitionSlotIndex);
partitionSlotList.add(new Pair<>(device, timePartitionSlot));
}
chunkPartitionIndexes[i] = partitionSlotIndex;
}

List<TRegionReplicaSet> replicaSets =
scheduler.partitionFetcher.queryDataPartition(
nonDirectionalChunkData.stream()
.map(data -> new Pair<>(data.getDevice(), data.getTimePartitionSlot()))
.collect(Collectors.toList()),
scheduler.queryContext.getSession().getUserName());
for (int i = 0; i < replicaSets.size(); i++) {
final TRegionReplicaSet replicaSet = replicaSets.get(i);
partitionSlotList, scheduler.queryContext.getSession().getUserName());
for (int i = 0, size = nonDirectionalChunkData.size(); i < size; i++) {
final TRegionReplicaSet replicaSet = replicaSets.get(chunkPartitionIndexes[i]);
final TConsensusGroupId regionId = replicaSet.getRegionId();
if (regionId2ReplicaSetAndNode.containsKey(regionId)
&& !Objects.equals(regionId2ReplicaSetAndNode.get(regionId).getLeft(), replicaSet)) {
Expand Down Expand Up @@ -864,50 +879,49 @@ public void setDatabase(String database) {

public List<TRegionReplicaSet> queryDataPartition(
List<Pair<IDeviceID, TTimePartitionSlot>> slotList, String userName) {
List<TRegionReplicaSet> replicaSets = new ArrayList<>();
List<TRegionReplicaSet> replicaSets = new ArrayList<>(slotList.size());
int size = slotList.size();

for (int i = 0; i < size; i += TRANSMIT_LIMIT) {
List<Pair<IDeviceID, TTimePartitionSlot>> subSlotList =
slotList.subList(i, Math.min(size, i + TRANSMIT_LIMIT));
DataPartition dataPartition =
fetcher.getOrCreateDataPartition(toQueryParam(subSlotList), userName);
replicaSets.addAll(
subSlotList.stream()
.map(
pair ->
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
database != null
? dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right, database)
: dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right))
.collect(Collectors.toList()));
for (final Pair<IDeviceID, TTimePartitionSlot> pair : subSlotList) {
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
replicaSets.add(
database != null
? dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right, database)
: dataPartition.getDataRegionReplicaSetForWriting(pair.left, pair.right));
}
}
return replicaSets;
}

private List<DataPartitionQueryParam> toQueryParam(
List<Pair<IDeviceID, TTimePartitionSlot>> slots) {
return slots.stream()
.collect(
Collectors.groupingBy(
Pair::getLeft, Collectors.mapping(Pair::getRight, Collectors.toSet())))
.entrySet()
.stream()
.map(
entry -> {
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue()));
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
if (database != null) {
queryParam.setDatabaseName(database);
}
return queryParam;
})
.collect(Collectors.toList());
final Map<IDeviceID, Set<TTimePartitionSlot>> device2TimePartitionSlots = new HashMap<>();
for (final Pair<IDeviceID, TTimePartitionSlot> slot : slots) {
device2TimePartitionSlots
.computeIfAbsent(slot.left, key -> new HashSet<>())
.add(slot.right);
}

final List<DataPartitionQueryParam> queryParams =
new ArrayList<>(device2TimePartitionSlots.size());
for (final Map.Entry<IDeviceID, Set<TTimePartitionSlot>> entry :
device2TimePartitionSlots.entrySet()) {
final DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(entry.getKey(), new ArrayList<>(entry.getValue()));
// database is an explicit database hint for table-model loads and
// pipe-generated tree-model loads.
if (database != null) {
queryParam.setDatabaseName(database);
}
queryParams.add(queryParam);
}
return queryParams;
}
}
}
Loading