Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchStage.sort() #2469

Merged
merged 39 commits into from Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
cfcd5fe
add in-memory sort
MohamedMandouh Aug 26, 2020
25cfa0f
add in-memory sort
MohamedMandouh Aug 26, 2020
7478050
Merge remote-tracking branch 'origin/in-memory-sorting' into in-memor…
MohamedMandouh Aug 27, 2020
26e76f7
remove SerializableComparator
MohamedMandouh Aug 27, 2020
139910d
replace TreeMap with TreeSet
MohamedMandouh Aug 27, 2020
b7e2393
replace ComparatorEx by Comparator in SortPrepareP
MohamedMandouh Aug 27, 2020
8d4013f
add @NonNull/@Nullable
MohamedMandouh Aug 27, 2020
a1e488d
add BatchStage.sort()
MohamedMandouh Aug 29, 2020
1176d43
fix returning NO_PROGRESS whenever a queue returns null
MohamedMandouh Aug 31, 2020
4c8897d
pass checkstyle
MohamedMandouh Aug 31, 2020
9f4a0b1
apply requested changes
MohamedMandouh Sep 2, 2020
9b9cba5
Add @Nonnull
Sep 3, 2020
532e4ab
Remove the requirement for Comparator<Object>
Sep 3, 2020
1c8a39d
Improve Javadoc
Sep 3, 2020
a7b9ea5
Assert monotonic order in CIES
Sep 3, 2020
18a7d4f
Simplify draining logic in CIES
Sep 3, 2020
ca64396
Remove numActiveQueues
Sep 3, 2020
8b884ea
Merge nested ifs
Sep 3, 2020
a6ad9b2
Upgraded deps in NOTICE
Sep 4, 2020
9499597
Improve Javadoc of Processors.sortPrepareP
Sep 4, 2020
193663d
Improve generic type treatment
Sep 4, 2020
c02162c
Simplify SortPrepareP
Sep 4, 2020
89d5109
Rename SortPrepareP -> SortP
Sep 4, 2020
bf4907a
Fix generics
Sep 4, 2020
4f30b5c
Merge branch 'master' into in-memory-sorting
Sep 4, 2020
2a3be96
Work around JDK 8 compiler bug
Sep 4, 2020
8cb3897
Fix two bugs
Sep 5, 2020
f1bbf30
Add BatchStageTest.sort()
Sep 5, 2020
d629a5c
add ProcessorsTest.sort
MohamedMandouh Sep 5, 2020
a23a817
add missing Override
MohamedMandouh Sep 8, 2020
eb6751d
add sample usage, param and return annotaitions
MohamedMandouh Sep 8, 2020
5f59a0d
add "only available for batch" in javadoc
MohamedMandouh Sep 12, 2020
0e38170
introduce BatchStage.partialSort
MohamedMandouh Sep 12, 2020
c0d102b
cleanup
MohamedMandouh Sep 14, 2020
029432b
Revert "cleanup"
MohamedMandouh Sep 14, 2020
a045a23
Revert "introduce BatchStage.partialSort"
MohamedMandouh Sep 14, 2020
b3d124d
Improve Javadoc
Sep 14, 2020
85a31da
Improved Edge doc
Sep 14, 2020
befdc1d
Add jet-start.sh docs
Sep 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,11 +17,13 @@
package com.hazelcast.jet.core;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.impl.MasterJobContext;
import com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.util.ConstantFunctionEx;
import com.hazelcast.nio.ObjectDataInput;
Expand Down Expand Up @@ -94,7 +96,7 @@ public class Edge implements IdentifiedDataSerializable {
private Address distributedTo;
private Partitioner<?> partitioner;
private RoutingPolicy routingPolicy = RoutingPolicy.UNICAST;

private ComparatorEx<Object> comparator;
private EdgeConfig config;

protected Edge() {
Expand Down Expand Up @@ -371,6 +373,27 @@ public Partitioner<?> getPartitioner() {
return partitioner;
}

/**
* Sets a comparator on this edge. The comparator is used by {@link ConcurrentInboundEdgeStream}
* to determine which cluster member to receive the next item from over this edge.
*
* @since 4.3
*/
public Edge monotonicOrder(@Nonnull ComparatorEx<Object> comparator) {
this.comparator = comparator;
return this;
}

/**
* Returns the comparator defined on this edge.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add a reference back to monoticOrder here

*
* @since 4.3
**/
@Nullable
public ComparatorEx<Object> getComparator() {
return comparator;
}

/**
* Returns the {@link RoutingPolicy} in effect on the edge.
*/
Expand Down Expand Up @@ -568,6 +591,7 @@ public void writeData(@Nonnull ObjectDataOutput out) throws IOException {
out.writeUTF(getDestName());
out.writeInt(getDestOrdinal());
out.writeInt(getPriority());
out.writeObject(getComparator());
out.writeObject(getDistributedTo());
out.writeObject(getRoutingPolicy());
CustomClassLoadedObject.write(out, getPartitioner());
Expand All @@ -581,6 +605,7 @@ public void readData(@Nonnull ObjectDataInput in) throws IOException {
destName = in.readUTF();
destOrdinal = in.readInt();
priority = in.readInt();
comparator = in.readObject();
distributedTo = in.readObject();
routingPolicy = in.readObject();
try {
Expand Down
Expand Up @@ -46,15 +46,18 @@
import com.hazelcast.jet.impl.processor.InsertWatermarksP;
import com.hazelcast.jet.impl.processor.SessionWindowP;
import com.hazelcast.jet.impl.processor.SlidingWindowP;
import com.hazelcast.jet.impl.processor.SortPrepareP;
import com.hazelcast.jet.impl.processor.TransformP;
import com.hazelcast.jet.impl.processor.TransformStatefulP;
import com.hazelcast.jet.impl.processor.TransformUsingServiceP;
import com.hazelcast.jet.pipeline.ServiceFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand Down Expand Up @@ -972,6 +975,20 @@ public static <C, S, T, R> ProcessorSupplier flatMapUsingServiceP(
(singletonTraverser, service, item) -> flatMapFn.apply(service, item));
}

/**
* Returns a supplier of processors for a vertex that performs the prepare phase of sorting.
* The processors sorts the input dataset locally at each cluster member using in-memory {@link TreeMap}
* to prepare it for the global sorting phase.
* There can be only one {@link SortPrepareP} processor per cluster member for
* each sort stage.
*
* @since 4.3
ufukyilmaz marked this conversation as resolved.
Show resolved Hide resolved
*/
@Nonnull
public static <V> SupplierEx<Processor> sortPrepareP(Comparator<V> comparator) {
return () -> new SortPrepareP<>(comparator);
}

/**
* Returns a supplier of a processor that swallows all its normal input (if
* any), does nothing with it, forwards the watermarks, produces no output
Expand Down
Expand Up @@ -16,10 +16,12 @@

package com.hazelcast.jet.impl.execution;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
Expand All @@ -33,6 +35,7 @@
import static com.hazelcast.jet.impl.execution.WatermarkCoalescer.NO_NEW_WM;
import static com.hazelcast.jet.impl.util.ProgressState.DONE;
import static com.hazelcast.jet.impl.util.ProgressState.MADE_PROGRESS;
import static com.hazelcast.jet.impl.util.ProgressState.NO_PROGRESS;
import static com.hazelcast.jet.impl.util.Util.toLocalTime;

/**
Expand All @@ -47,7 +50,7 @@ public class ConcurrentInboundEdgeStream implements InboundEdgeStream {
private final ConcurrentConveyor<Object> conveyor;
private final ProgressTracker tracker = new ProgressTracker();
private final ItemDetector itemDetector = new ItemDetector();

private ComparatorEx<Object> comparator;
private final WatermarkCoalescer watermarkCoalescer;
private final BitSet receivedBarriers; // indicates if current snapshot is received on the queue
private final ILogger logger;
Expand Down Expand Up @@ -81,6 +84,12 @@ public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordi
logger.finest("Coalescing " + conveyor.queueCount() + " input queues");
}

public ConcurrentInboundEdgeStream(ConcurrentConveyor<Object> conveyor, int ordinal, int priority,
boolean waitForAllBarriers, String debugName, ComparatorEx<Object> comparator) {
this(conveyor, ordinal, priority, waitForAllBarriers, debugName);
this.comparator = comparator;
}

@Override
public int ordinal() {
return ordinal;
Expand All @@ -93,6 +102,9 @@ public int priority() {

@Override
public ProgressState drainTo(Predicate<Object> dest) {
if (comparator != null) {
return drainToWithComparator(dest);
}
tracker.reset();
for (int queueIndex = 0; queueIndex < conveyor.queueCount(); queueIndex++) {
final QueuedPipe<Object> q = conveyor.queue(queueIndex);
Expand Down Expand Up @@ -165,6 +177,54 @@ public ProgressState drainTo(Predicate<Object> dest) {
return tracker.toProgressState();
}

private ProgressState drainToWithComparator(Predicate<Object> dest) {
int batchSize = -1;
while (true) {
int minIndex = 0;
Object minItem = null;
for (int queueIndex = 0; queueIndex < conveyor.queueCount(); queueIndex++) {
final QueuedPipe<Object> q = conveyor.queue(queueIndex);
if (q == null) {
continue;
}
Object headObject = q.peek();
Object headItem;
if (headObject instanceof ObjectWithPartitionId) {
headItem = ((ObjectWithPartitionId) headObject).getItem();
} else {
headItem = headObject;
}
if (headItem == null) {
if (batchSize == -1) {
return NO_PROGRESS;
}
return MADE_PROGRESS;
}
if (headItem == DONE_ITEM) {
conveyor.removeQueue(queueIndex);
continue;
}
if (minItem == null || comparator.compare(minItem, headItem) > 0) {
minIndex = queueIndex;
minItem = headItem;
}
}
if (conveyor.liveQueueCount() == 0) {
return DONE;
}
if (batchSize == -1) {
batchSize = conveyor.queue(minIndex).size();
}
Object polledItem = conveyor.queue(minIndex).poll();
assert polledItem == minItem : "polledItem != minItem";
boolean testResult = dest.test(minItem);
assert testResult : "testResult is false";
if (--batchSize == 0) {
return MADE_PROGRESS;
}
}
}

private boolean maybeEmitWm(long timestamp, Predicate<Object> dest) {
if (timestamp != NO_NEW_WM) {
boolean res = dest.test(new Watermark(timestamp));
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.jet.impl.execution.init;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Edge.RoutingPolicy;
Expand All @@ -39,6 +40,7 @@ public class EdgeDef implements IdentifiedDataSerializable {
private RoutingPolicy routingPolicy;
private Partitioner<?> partitioner;
private EdgeConfig config;
private ComparatorEx<Object> comparator;

// transient fields populated and used after deserialization
private transient String id;
Expand All @@ -57,6 +59,7 @@ public class EdgeDef implements IdentifiedDataSerializable {
this.routingPolicy = edge.getRoutingPolicy();
this.partitioner = edge.getPartitioner();
this.config = config;
this.comparator = edge.getComparator();
}

void initTransientFields(Map<Integer, VertexDef> vMap, VertexDef nearVertex, boolean isOutbound) {
Expand Down Expand Up @@ -115,6 +118,10 @@ EdgeConfig getConfig() {
return config;
}

ComparatorEx<Object> getComparator() {
return comparator;
}


// IdentifiedDataSerializable implementation

Expand All @@ -134,6 +141,7 @@ public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(destOrdinal);
out.writeInt(sourceOrdinal);
out.writeInt(priority);
out.writeObject(comparator);
out.writeObject(distributedTo);
out.writeObject(routingPolicy);
CustomClassLoadedObject.write(out, partitioner);
Expand All @@ -146,6 +154,7 @@ public void readData(ObjectDataInput in) throws IOException {
destOrdinal = in.readInt();
sourceOrdinal = in.readInt();
priority = in.readInt();
comparator = in.readObject();
distributedTo = in.readObject();
routingPolicy = in.readObject();
partitioner = CustomClassLoadedObject.read(in);
Expand Down
Expand Up @@ -598,7 +598,7 @@ private ConcurrentInboundEdgeStream newEdgeStream(EdgeDef inEdge, ConcurrentConv
String debugName) {
return new ConcurrentInboundEdgeStream(conveyor, inEdge.destOrdinal(), inEdge.priority(),
jobConfig.getProcessingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE,
debugName);
debugName, inEdge.getComparator());
}

public List<Processor> getProcessors() {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
Expand Down Expand Up @@ -84,6 +85,16 @@ public BatchStage<T> rebalance() {
return new BatchStageImpl<>(this, true);
}

@Nonnull @Override
public BatchStage<T> sort() {
return attachSort(null);
}

@Nonnull @Override
public BatchStage<T> sort(@Nonnull ComparatorEx<T> comparator) {
return attachSort(comparator);
}

@Nonnull @Override
public <R> BatchStage<R> map(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
return attachMap(mapFn);
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.BiPredicateEx;
import com.hazelcast.function.ComparatorEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.function.SupplierEx;
Expand All @@ -39,6 +40,7 @@
import com.hazelcast.jet.impl.pipeline.transform.PeekTransform;
import com.hazelcast.jet.impl.pipeline.transform.ProcessorTransform;
import com.hazelcast.jet.impl.pipeline.transform.SinkTransform;
import com.hazelcast.jet.impl.pipeline.transform.SortTransform;
import com.hazelcast.jet.impl.pipeline.transform.TimestampTransform;
import com.hazelcast.jet.impl.pipeline.transform.Transform;
import com.hazelcast.jet.pipeline.BatchStage;
Expand Down Expand Up @@ -133,6 +135,12 @@ public StreamStage<T> addTimestamps(@Nonnull ToLongFunctionEx<? super T> timesta
return new StreamStageImpl<>(tsTransform, ADAPT_TO_JET_EVENT, pipelineImpl);
}

@Nonnull
@SuppressWarnings({"unchecked"})
<RET> RET attachSort(@Nullable ComparatorEx<T> comparator) {
return (RET) attach(new SortTransform<>(this.transform, comparator), fnAdapter);
}

@Nonnull
@SuppressWarnings({"unchecked", "rawtypes"})
<R, RET> RET attachMap(@Nonnull FunctionEx<? super T, ? extends R> mapFn) {
Expand Down
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.impl.pipeline.transform;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.pipeline.Planner;
import com.hazelcast.jet.impl.pipeline.Planner.PlannerVertex;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static com.hazelcast.function.FunctionEx.identity;
import static com.hazelcast.jet.core.Edge.between;
import static com.hazelcast.jet.core.processor.Processors.sortPrepareP;


public class SortTransform<V> extends AbstractTransform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use T here instead of V


private static final String FIRST_STAGE_VERTEX_NAME_SUFFIX = "-prepare";
private final ComparatorEx<V> comparator;

public SortTransform(@Nonnull Transform upstream, @Nullable ComparatorEx<V> comparator) {
super("sort", upstream);
if (comparator == null) {
this.comparator = (ComparatorEx<V>) (o1, o2) -> ((Comparable<V>) o1).compareTo(o2);
} else {
this.comparator = comparator;
}
}

@Override
public void addToDag(Planner p) {
Vertex v1 = p.dag.newVertex(name() + FIRST_STAGE_VERTEX_NAME_SUFFIX, sortPrepareP(comparator));
PlannerVertex pv2 = p.addVertex(this, name(), 1, ProcessorMetaSupplier
.forceTotalParallelismOne(ProcessorSupplier.of(Processors.mapP(identity())), name()));
p.addEdges(this, v1);
p.dag.edge(between(v1, pv2.v).distributed().allToOne(name().hashCode())
.monotonicOrder((ComparatorEx<Object>) comparator));
}
}