Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FORWARDPORT] Make comparators able to act on keys and values #15380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1285,11 +1285,18 @@ public Set<Entry<K, V>> entrySet() {
return setBuilder.build();
}

@SuppressWarnings("unchecked")
@Override
public Set<K> keySet(@Nonnull Predicate<K, V> predicate) {
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
if (containsPagingPredicate(predicate)) {
return keySetWithPagingPredicate(predicate);
PagingPredicate pagingPredicate = unwrapPagingPredicate(predicate);
if (pagingPredicate.getComparator() == null) {
return keySetWithPagingPredicate(predicate);
} else {
// custom comparator may act on keys and values at the same time
return entrySetWithPagingPredicate(predicate, IterationType.KEY);
}
}

ClientMessage request = MapKeySetWithPredicateCodec.encodeRequest(name, toData(predicate));
Expand Down Expand Up @@ -1324,11 +1331,12 @@ private Set<K> keySetWithPagingPredicate(Predicate predicate) {
return (Set<K>) getSortedQueryResultSet(resultList, pagingPredicate, IterationType.KEY);
}

@SuppressWarnings("unchecked")
@Override
public Set<Entry<K, V>> entrySet(@Nonnull Predicate predicate) {
checkNotNull(predicate, NULL_PREDICATE_IS_NOT_ALLOWED);
if (containsPagingPredicate(predicate)) {
return entrySetWithPagingPredicate(predicate);
return entrySetWithPagingPredicate(predicate, IterationType.ENTRY);
}
ClientMessage request = MapEntriesWithPredicateCodec.encodeRequest(name, toData(predicate));

Expand All @@ -1345,8 +1353,7 @@ public Set<Entry<K, V>> entrySet(@Nonnull Predicate predicate) {
return setBuilder.build();
}

@SuppressWarnings("unchecked")
private Set<Entry<K, V>> entrySetWithPagingPredicate(Predicate predicate) {
private Set entrySetWithPagingPredicate(Predicate predicate, IterationType iterationType) {
PagingPredicate pagingPredicate = unwrapPagingPredicate(predicate);

PagingPredicateAccessor.setIterationType(pagingPredicate, IterationType.ENTRY);
Expand All @@ -1363,8 +1370,7 @@ private Set<Entry<K, V>> entrySetWithPagingPredicate(Predicate predicate) {
V value = toObject(entry.getValue());
resultList.add(new AbstractMap.SimpleEntry<>(key, value));
}
Set result = getSortedQueryResultSet(resultList, pagingPredicate, IterationType.ENTRY);
return (Set<Entry<K, V>>) result;
return getSortedQueryResultSet(resultList, pagingPredicate, iterationType);
}

@Override
Expand Down
Expand Up @@ -85,6 +85,7 @@ public QueryEngineImpl(MapServiceContext mapServiceContext) {
this.resultProcessorRegistry = mapServiceContext.getResultProcessorRegistry();
}

@SuppressWarnings("unchecked")
@Override
public Result execute(Query query, Target target) {
Query adjustedQuery = adjustQuery(query);
Expand Down Expand Up @@ -234,17 +235,22 @@ private void assertAllPartitionsQueried(PartitionIdSet mutablePartitionIds) {
private IterationType getRetrievalIterationType(Predicate predicate, IterationType iterationType) {
IterationType retrievalIterationType = iterationType;
if (predicate instanceof PagingPredicate) {
// in case of value, we also need to get the keys for sorting.
retrievalIterationType = (iterationType == IterationType.VALUE) ? IterationType.ENTRY : iterationType;
PagingPredicate pagingPredicate = (PagingPredicate) predicate;
if (pagingPredicate.getComparator() != null) {
// custom comparators may act on keys and values at the same time
retrievalIterationType = IterationType.ENTRY;
} else {
// in case of value, we also need to get the keys for sorting
retrievalIterationType = iterationType == IterationType.VALUE ? IterationType.ENTRY : iterationType;
}
}
return retrievalIterationType;
}

private PartitionIdSet getLocalPartitionIds() {
int partitionCount = partitionService.getPartitionCount();
PartitionIdSet partitionIds = new PartitionIdSet(partitionCount,
partitionService.getMemberPartitions(nodeEngine.getThisAddress()));
return partitionIds;
List<Integer> memberPartitions = partitionService.getMemberPartitions(nodeEngine.getThisAddress());
return new PartitionIdSet(partitionCount, memberPartitions);
}

private PartitionIdSet getAllPartitionIds() {
Expand Down