Skip to content

Commit

Permalink
Add keyed version of mapUsingServiceAsyncBatched (#1929)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jozsef Bartok committed Feb 6, 2020
1 parent 6702338 commit e6df12d
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 23 deletions.
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
Expand All @@ -35,10 +36,12 @@
import com.hazelcast.jet.pipeline.ServiceFactory;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;

import static com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.DO_NOT_ADAPT;
import static com.hazelcast.jet.impl.util.Util.toList;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -99,6 +102,28 @@ public <S, R> BatchStage<R> mapUsingServiceAsync(
(s, k, t) -> mapAsyncFn.apply(s, k, t).thenApply(Traversers::singleton));
}

@Nonnull @Override
public <S, R> BatchStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn
) {
return attachTransformUsingServiceAsyncBatched("map", serviceFactory, maxBatchSize,
(s, items) -> mapAsyncFn.apply(s, items).thenApply(list -> toList(list, Traversers::singleton)));
}

@Nonnull @Override
public <S, R> BatchStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>,
? extends CompletableFuture<List<R>>> mapAsyncFn
) {
return attachTransformUsingServiceAsyncBatched("map", serviceFactory, maxBatchSize,
(s, keys, items) -> mapAsyncFn.apply(s, keys, items)
.thenApply(list -> toList(list, Traversers::singleton)));
}

@Nonnull @Override
public <S> BatchStage<T> filterUsingService(
@Nonnull ServiceFactory<?, S> serviceFactory,
Expand Down
Expand Up @@ -61,6 +61,7 @@
import static com.hazelcast.jet.core.WatermarkPolicy.limitingLag;
import static com.hazelcast.jet.impl.JetEvent.jetEvent;
import static com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform.filterUsingServicePartitionedTransform;
import static com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform.flatMapUsingServiceAsyncBatchedPartitionedTransform;
import static com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform.flatMapUsingServiceAsyncPartitionedTransform;
import static com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform.flatMapUsingServicePartitionedTransform;
import static com.hazelcast.jet.impl.pipeline.transform.PartitionedProcessorTransform.mapUsingServicePartitionedTransform;
Expand Down Expand Up @@ -301,10 +302,9 @@ <S, R, RET> RET attachFlatMapUsingServiceAsyncBatched(
return f.thenApply(res -> traverseIterable(res).flatMap(Function.identity()));
};

return (RET) attach(
flatMapUsingServiceAsyncBatchedTransform(
transform, operationName, serviceFactory, MAX_CONCURRENT_ASYNC_BATCHES, maxBatchSize, flattenedFn),
fnAdapter);
ProcessorTransform processorTransform = flatMapUsingServiceAsyncBatchedTransform(
transform, operationName, serviceFactory, MAX_CONCURRENT_ASYNC_BATCHES, maxBatchSize, flattenedFn);
return (RET) attach(processorTransform, fnAdapter);
}

@Nonnull
Expand Down Expand Up @@ -387,6 +387,43 @@ <S, K, R, RET> RET attachTransformUsingPartitionedServiceAsync(
return (RET) attach(processorTransform, fnAdapter);
}

@Nonnull
@SuppressWarnings({"unchecked", "rawtypes"})
<S, K, R, RET> RET attachTransformUsingPartitionedServiceAsyncBatched(
@Nonnull String operationName,
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn,
@Nonnull BiFunctionEx<? super S, ? super List<T>, CompletableFuture<List<Traverser<R>>>> flatMapAsyncFn
) {
checkSerializable(flatMapAsyncFn, operationName + "AsyncFn");
checkSerializable(partitionKeyFn, "partitionKeyFn");
serviceFactory = moveAttachedFilesToPipeline(serviceFactory);
BiFunctionEx adaptedFlatMapFn = fnAdapter.adaptFlatMapUsingServiceAsyncBatchedFn(flatMapAsyncFn);
FunctionEx adaptedPartitionKeyFn = fnAdapter.adaptKeyFn(partitionKeyFn);

// Here we flatten the result from List<Traverser<R>> to Traverser<R>.
// The former is used in pipeline API, the latter in core API.
BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<Traverser<R>>> flattenedFn =
(svc, items) -> {
// R might actually be JetEvent<R> -- we can't represent this with static types
CompletableFuture<List<Traverser<R>>> f =
(CompletableFuture<List<Traverser<R>>>) adaptedFlatMapFn.apply(svc, items);
return f.thenApply(res -> traverseIterable(res).flatMap(Function.identity()));
};

PartitionedProcessorTransform processorTransform = flatMapUsingServiceAsyncBatchedPartitionedTransform(
transform,
operationName,
serviceFactory,
MAX_CONCURRENT_ASYNC_BATCHES,
maxBatchSize,
flattenedFn,
adaptedPartitionKeyFn
);
return (RET) attach(processorTransform, fnAdapter);
}

@Nonnull
private <S> ServiceFactory<?, S> moveAttachedFilesToPipeline(@Nonnull ServiceFactory<?, S> serviceFactory) {
pipelineImpl.attachFiles(serviceFactory.attachedFiles());
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Traverser;
Expand All @@ -27,7 +28,9 @@

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.hazelcast.jet.impl.util.Util.checkSerializable;

Expand Down Expand Up @@ -124,6 +127,37 @@ <S, R, RET> RET attachTransformUsingServiceAsync(
});
}

@Nonnull
<S, R, RET> RET attachTransformUsingServiceAsyncBatched(
@Nonnull String operationName,
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>, CompletableFuture<List<Traverser<R>>>> flatMapAsyncFn
) {
FunctionEx<? super T, ? extends K> keyFn = keyFn();
return computeStage.attachTransformUsingPartitionedServiceAsyncBatched(
operationName, serviceFactory, maxBatchSize, keyFn, flatMapAsyncFn);
}

@Nonnull
<S, R, RET> RET attachTransformUsingServiceAsyncBatched(
@Nonnull String operationName,
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>, CompletableFuture<List<Traverser<R>>>>
flatMapAsyncFn
) {
FunctionEx<? super T, ? extends K> keyFn = keyFn();
return computeStage.attachTransformUsingPartitionedServiceAsyncBatched(
operationName, serviceFactory, maxBatchSize, keyFn,
(s, items) -> {
List<K> keys = items.stream()
.map(t -> (K) keyFn.apply(t))
.collect(Collectors.toList());
return flatMapAsyncFn.apply(s, keys, items);
});
}

static Transform transformOf(GeneralStageWithKey stage) {
return ((StageWithGroupingBase) stage).computeStage.transform;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.impl.pipeline;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
Expand All @@ -30,8 +31,11 @@
import com.hazelcast.jet.pipeline.WindowDefinition;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.hazelcast.jet.impl.util.Util.toList;

public class StreamStageWithKeyImpl<T, K> extends StageWithGroupingBase<T, K> implements StreamStageWithKey<T, K> {

StreamStageWithKeyImpl(
Expand Down Expand Up @@ -110,6 +114,28 @@ public <S, R> StreamStage<R> mapUsingServiceAsync(
(s, k, t) -> mapAsyncFn.apply(s, k, t).thenApply(Traversers::singleton));
}

@Nonnull @Override
public <S, R> StreamStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn
) {
return attachTransformUsingServiceAsyncBatched("map", serviceFactory, maxBatchSize,
(s, items) -> mapAsyncFn.apply(s, items).thenApply(list -> toList(list, Traversers::singleton)));
}

@Nonnull @Override
public <S, R> StreamStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>,
? extends CompletableFuture<List<R>>> mapAsyncFn
) {
return attachTransformUsingServiceAsyncBatched("map", serviceFactory, maxBatchSize,
(s, keys, items) -> mapAsyncFn.apply(s, keys, items)
.thenApply(list -> toList(list, Traversers::singleton)));
}

@Nonnull @Override
public <S> StreamStage<T> filterUsingService(
@Nonnull ServiceFactory<?, S> serviceFactory,
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.hazelcast.jet.pipeline.ServiceFactory;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.hazelcast.jet.core.processor.Processors.filterUsingServiceP;
Expand Down Expand Up @@ -105,6 +106,22 @@ public static <S, T, K, R> PartitionedProcessorTransform<T, K> flatMapUsingServi
return new PartitionedProcessorTransform<>(name, upstream, metaSupplier, partitionKeyFn);
}

public static <S, T, K, R> PartitionedProcessorTransform<T, K> flatMapUsingServiceAsyncBatchedPartitionedTransform(
@Nonnull Transform upstream,
@Nonnull String operationName,
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxConcurrentOps,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn,
@Nonnull FunctionEx<? super T, ? extends K> partitionKeyFn
) {
String name = operationName + "UsingPartitionedServiceAsync";
ProcessorSupplier supplier = flatMapUsingServiceAsyncBatchedP(
serviceFactory, maxConcurrentOps, maxBatchSize, flatMapAsyncFn);
ProcessorMetaSupplier metaSupplier = ProcessorMetaSupplier.of(getPreferredLP(serviceFactory), supplier);
return new PartitionedProcessorTransform<>(name, upstream, metaSupplier, partitionKeyFn);
}

@Override
public void addToDag(Planner p) {
PlannerVertex pv = p.addVertex(this, name(), localParallelism(), processorSupplier);
Expand Down
Expand Up @@ -110,12 +110,13 @@ public static <S, T, R> ProcessorTransform flatMapUsingServiceAsyncBatchedTransf
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxConcurrentOps,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>,
? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn
@Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<Traverser<R>>> flatMapAsyncFn
) {
return new ProcessorTransform(operationName + "UsingServiceAsyncBatched", upstream,
ProcessorMetaSupplier.of(getPreferredLP(serviceFactory),
flatMapUsingServiceAsyncBatchedP(serviceFactory, maxConcurrentOps, maxBatchSize, flatMapAsyncFn)));
String name = operationName + "UsingServiceAsyncBatched";
ProcessorSupplier supplier = flatMapUsingServiceAsyncBatchedP(
serviceFactory, maxConcurrentOps, maxBatchSize, flatMapAsyncFn);
ProcessorMetaSupplier metaSupplier = ProcessorMetaSupplier.of(getPreferredLP(serviceFactory), supplier);
return new ProcessorTransform(name, upstream, metaSupplier);
}

static int getPreferredLP(@Nonnull ServiceFactory<?, ?> serviceFactory) {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.hazelcast.map.IMap;

import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -127,6 +128,21 @@ <S, R> BatchStage<R> mapUsingServiceAsync(
@Nonnull TriFunction<? super S, ? super K, ? super T, CompletableFuture<R>> mapAsyncFn
);

@Nonnull @Override
<S, R> BatchStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull BiFunctionEx<? super S, ? super List<T>, ? extends CompletableFuture<List<R>>> mapAsyncFn
);

@Nonnull @Override
<S, R> BatchStage<R> mapUsingServiceAsyncBatched(
@Nonnull ServiceFactory<?, S> serviceFactory,
int maxBatchSize,
@Nonnull TriFunction<? super S, ? super List<K>, ? super List<T>,
? extends CompletableFuture<List<R>>> mapAsyncFn
);

@Nonnull @Override
<S> BatchStage<T> filterUsingService(
@Nonnull ServiceFactory<?, S> serviceFactory,
Expand Down
Expand Up @@ -385,7 +385,7 @@ <S, R> GeneralStage<R> mapUsingServiceAsync(
);

/**
* Batched version of {@link #mapUsingService}: {@code mapAsyncFn} takes
* Batched version of {@link #mapUsingServiceAsync}: {@code mapAsyncFn} takes
* a list of input items and returns a {@code CompletableFuture<List<R>>}.
* The size of the input list is limited by the given {@code maxBatchSize}.
* <p>
Expand Down

0 comments on commit e6df12d

Please sign in to comment.