Skip to content

Commit

Permalink
Implement scan processor partition pruning [HZ-2497] (#24686)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fly-Style committed Jun 16, 2023
1 parent 112e744 commit aab1256
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private SourceProcessors() {
*/
@Nonnull
public static ProcessorMetaSupplier readMapP(@Nonnull String mapName) {
return HazelcastReaders.readLocalMapSupplier(mapName);
return HazelcastReaders.readLocalMapSupplier(mapName, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.hazelcast.spi.impl.InternalCompletableFuture;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.security.Permission;
import java.util.Map.Entry;
Expand All @@ -74,7 +75,8 @@ private HazelcastReaders() {
public static ProcessorMetaSupplier readLocalCacheSupplier(@Nonnull String cacheName) {
return new LocalProcessorMetaSupplier<
InternalCompletableFuture<CacheEntriesWithCursor>, CacheEntriesWithCursor, Entry<Data, Data>>(
new LocalCacheReaderFunction(cacheName)
new LocalCacheReaderFunction(cacheName),
null
) {
@Override
public Permission getRequiredPermission() {
Expand Down Expand Up @@ -172,10 +174,10 @@ public int getClassId() {
}

@Nonnull
public static ProcessorMetaSupplier readLocalMapSupplier(@Nonnull String mapName) {
return new LocalProcessorMetaSupplier<
InternalCompletableFuture<MapEntriesWithCursor>, MapEntriesWithCursor, Entry<Data, Data>>(
new LocalMapReaderFunction(mapName)
public static ProcessorMetaSupplier readLocalMapSupplier(@Nonnull String mapName, @Nullable int[] partitions) {
return new LocalProcessorMetaSupplier<>(
new LocalMapReaderFunction(mapName),
partitions
) {
@Override
public Permission getRequiredPermission() {
Expand Down Expand Up @@ -233,7 +235,8 @@ public static <K, V, T> ProcessorMetaSupplier readLocalMapSupplier(
checkSerializable(Objects.requireNonNull(projection), "projection");

return new LocalProcessorMetaSupplier<InternalCompletableFuture<ResultSegment>, ResultSegment, QueryResultRow>(
new LocalMapQueryReaderFunction<>(mapName, predicate, projection)
new LocalMapQueryReaderFunction<>(mapName, predicate, projection),
null
) {
@Override
public Permission getRequiredPermission() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.security.Permission;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -272,16 +274,43 @@ abstract static class LocalProcessorMetaSupplier<F extends CompletableFuture, B,

private static final long serialVersionUID = 1L;
private final BiFunctionEx<HazelcastInstance, InternalSerializationService, Reader<F, B, R>> readerSupplier;
private Map<Address, int[]> partitionAssignment;
private int[] partitionsToScan;

LocalProcessorMetaSupplier(
@Nonnull BiFunctionEx<HazelcastInstance, InternalSerializationService, Reader<F, B, R>> readerSupplier
@Nonnull BiFunctionEx<HazelcastInstance, InternalSerializationService, Reader<F, B, R>> readerSupplier,
@Nullable int[] partitionsToScan
) {
this.readerSupplier = readerSupplier;
this.partitionsToScan = partitionsToScan;
}

@Override @Nonnull
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
return address -> new LocalProcessorSupplier<>(readerSupplier);
if (partitionsToScan == null) {
return address -> new LocalProcessorSupplier<>(readerSupplier);
} else {
return address -> {
int[] partitions = partitionAssignment.get(address);
List<Integer> partitionsToScanList = new ArrayList<>();
// partitionAssignment is sorted, so we can use binary search
for (int pId : partitionsToScan) {
if (Arrays.binarySearch(partitions, pId) > 0) {
partitionsToScanList.add(pId);
}
}

int[] memberPartitionsToScan = partitionsToScanList.stream().mapToInt(i -> i).toArray();
return new LocalProcessorSupplier<>(readerSupplier, memberPartitionsToScan);
};
}
}

@Override
public void init(@Nonnull Context context) throws Exception {
if (partitionsToScan != null) {
this.partitionAssignment = context.partitionAssignment();
}
}

@Override
Expand Down Expand Up @@ -315,9 +344,9 @@ public static final class LocalProcessorSupplier<F extends CompletableFuture, B,

private BiFunction<HazelcastInstance, InternalSerializationService, Reader<F, B, R>> readerSupplier;

private transient int[] memberPartitions;
private transient HazelcastInstance hzInstance;
private transient InternalSerializationService serializationService;
private transient int[] partitionsToScan;

public LocalProcessorSupplier() {
}
Expand All @@ -328,16 +357,26 @@ private LocalProcessorSupplier(
this.readerSupplier = readerSupplier;
}

private LocalProcessorSupplier(
@Nonnull BiFunction<HazelcastInstance, InternalSerializationService, Reader<F, B, R>> readerSupplier,
int[] partitionsToScan
) {
this.readerSupplier = readerSupplier;
this.partitionsToScan = partitionsToScan;
}

@Override
public void init(@Nonnull Context context) {
hzInstance = context.hazelcastInstance();
serializationService = ((ProcSupplierCtx) context).serializationService();
memberPartitions = context.partitionAssignment().get(hzInstance.getCluster().getLocalMember().getAddress());
if (partitionsToScan == null) {
partitionsToScan = context.partitionAssignment().get(hzInstance.getCluster().getLocalMember().getAddress());
}
}

@Override @Nonnull
public List<Processor> get(int count) {
return Arrays.stream(distributeObjects(count, memberPartitions))
return Arrays.stream(distributeObjects(count, partitionsToScan))
.map(partitions ->
new ReadMapOrCacheP<>(readerSupplier.apply(hzInstance, serializationService), partitions))
.collect(toList());
Expand Down

0 comments on commit aab1256

Please sign in to comment.