Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding test data sources designed for benchmarking purposes #2382

Merged
merged 28 commits into from Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e29ddb3
add methods for distributed batch and stream stages w/ long values fo…
Jun 30, 2020
3830ffc
advanced source creation for performance testing
Jul 18, 2020
8fcf108
incorporated PR feedback (formatting, function call reordering), rena…
Jul 19, 2020
5672b33
added javadoc, minor improvements
Jul 20, 2020
d476f97
javadoc updated, renaming
Jul 23, 2020
4f6fa02
Improve Javadoc
Jul 23, 2020
c514b80
Rename StreamSourceLong -> LongStreamSourceP
Jul 23, 2020
7b97dd6
test renaming
Jul 23, 2020
8d33a22
Extended Javadoc, updated annotations
Jul 23, 2020
a82bac8
Added paragraph in Javadoc
Jul 23, 2020
2752257
Simplifications and formatting changes
Jul 24, 2020
2981be3
Reflow para
Jul 24, 2020
05f23c8
Fix source name
Jul 28, 2020
703d740
Use the same startTime for all members
Jul 28, 2020
905d7c5
Rename counter -> emittedCount
Jul 28, 2020
4b6a3a9
Improve event scheduling
Jul 28, 2020
da22d82
Expose private method in EventTimeMapper
Jul 28, 2020
f50b4ca
Use EventTimeMapper
Jul 28, 2020
81e3d4f
Remove extra line
Jul 29, 2020
f7edb81
Remove unused imports in TestSources.java
Jul 29, 2020
df1ad0b
Change unit of hiccup threshold from ms to ns in order to reduce requ…
Jul 29, 2020
82b4287
Retrieve logger from Context in init method
Jul 29, 2020
680f042
Added note to Javadoc mentioning the consequences of a clock skew
Jul 29, 2020
71d01db
Removing unused import from LongStreamSourceP.java
Jul 30, 2020
fdf3cea
Use native timestamps instead of ingestion ts in test
Jul 30, 2020
16b48a8
Merge branch 'master' into guenter-hesse/master
viliam-durina Aug 4, 2020
46d7606
Fix test failure
viliam-durina Aug 4, 2020
2cfed4b
Simplify the test
viliam-durina Aug 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.pipeline.test;

import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;

import javax.annotation.Nonnull;

import static com.hazelcast.jet.impl.JetEvent.jetEvent;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Implements the {@link TestSources#longStream} source.
*
* @since 4.3
*/
public class LongStreamSourceP extends AbstractProcessor {

private static final long SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS = 10;
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved

private static final long REPORT_PERIOD_NANOS = SECONDS.toNanos(SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS);
private static final long HICCUP_REPORT_THRESHOLD_MILLIS = 10;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
private final long nanoTimeMillisToCurrentTimeMillis = determineTimeOffset();
private final long startTime;
private final long itemsPerSecond;
private final ILogger logger = Logger.getLogger(LongStreamSourceP.class);
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
private final long wmGranularity;
private final long wmOffset;
private long globalProcessorIndex;
private long totalParallelism;
private long emitPeriod;

private final AppendableTraverser<Object> traverser = new AppendableTraverser<>(2);
private long emitSchedule;
private long lastReport;
private long counterAtLastReport;
private long lastCallNanos;
private long counter;
private long lastEmittedWm;
private long nowNanos;

LongStreamSourceP(
long startTime,
long itemsPerSecond,
EventTimePolicy<? super Long> eventTimePolicy
) {
this.wmGranularity = eventTimePolicy.watermarkThrottlingFrameSize();
this.wmOffset = eventTimePolicy.watermarkThrottlingFrameOffset();
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
this.startTime = MILLISECONDS.toNanos(startTime + nanoTimeMillisToCurrentTimeMillis);
this.itemsPerSecond = itemsPerSecond;
}

@Override
protected void init(@Nonnull Context context) {
totalParallelism = context.totalParallelism();
globalProcessorIndex = context.globalProcessorIndex();
emitPeriod = SECONDS.toNanos(1) * totalParallelism / itemsPerSecond;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
lastCallNanos = lastReport = emitSchedule =
startTime + SECONDS.toNanos(1) * globalProcessorIndex / itemsPerSecond;
}

@Override
public boolean complete() {
nowNanos = System.nanoTime();
emitEvents();
detectAndReportHiccup();
if (logger.isFineEnabled()) {
reportThroughput();
}
return false;
}

private void emitEvents() {
while (emitFromTraverser(traverser) && emitSchedule <= nowNanos) {
long timestamp = NANOSECONDS.toMillis(emitSchedule) - nanoTimeMillisToCurrentTimeMillis;
traverser.append(jetEvent(timestamp, counter * totalParallelism + globalProcessorIndex));
counter++;
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
emitSchedule += emitPeriod;
if (timestamp >= lastEmittedWm + wmGranularity) {
long wmToEmit = timestamp - (timestamp % wmGranularity) + wmOffset;
traverser.append(new Watermark(wmToEmit));
lastEmittedWm = wmToEmit;
}
}
}

private void detectAndReportHiccup() {
long millisSinceLastCall = NANOSECONDS.toMillis(nowNanos - lastCallNanos);
if (millisSinceLastCall > HICCUP_REPORT_THRESHOLD_MILLIS) {
logger.info(String.format("*** Source #%d hiccup: %,d ms%n", globalProcessorIndex, millisSinceLastCall));
}
lastCallNanos = nowNanos;
}

private void reportThroughput() {
long nanosSinceLastReport = nowNanos - lastReport;
if (nanosSinceLastReport < REPORT_PERIOD_NANOS) {
return;
}
lastReport = nowNanos;
long itemCountSinceLastReport = counter - counterAtLastReport;
counterAtLastReport = counter;
logger.fine(String.format("p%d: %,.0f items/second%n",
globalProcessorIndex,
itemCountSinceLastReport / ((double) nanosSinceLastReport / SECONDS.toNanos(1))));
}

private static long determineTimeOffset() {
return NANOSECONDS.toMillis(System.nanoTime()) - System.currentTimeMillis();
}

}
Expand Up @@ -16,12 +16,16 @@

package com.hazelcast.jet.pipeline.test;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.annotation.EvolvingApi;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.TimestampedSourceBuffer;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamSourceStage;
import com.hazelcast.jet.pipeline.SourceBuilder.TimestampedSourceBuffer;

import javax.annotation.Nonnull;
import java.util.Arrays;
Expand All @@ -31,8 +35,8 @@
import static com.hazelcast.jet.impl.util.Util.checkSerializable;

/**
* Contains factory methods for various mock sources which can be used
* for pipeline testing and development.
* Contains factory methods for various mock sources which can be used for
* pipeline testing and development.
*
* @since 3.2
*/
Expand All @@ -43,8 +47,8 @@ private TestSources() {
}

/**
* Returns a batch source which iterates through the supplied iterable and then
* terminates.
* Returns a batch source which iterates through the supplied iterable and
* then terminates.
*
* @since 3.2
*/
Expand All @@ -59,8 +63,8 @@ public static <T> BatchSource<T> items(@Nonnull Iterable<? extends T> items) {
}

/**
* Returns a batch source which iterates through the supplied items and then
* terminates.
* Returns a batch source which iterates through the supplied items and
* then terminates.
*
* @since 3.2
*/
Expand All @@ -71,20 +75,20 @@ public static <T> BatchSource<T> items(@Nonnull T... items) {
}

/**
* Returns a streaming source which generates events of type {@link SimpleEvent} at
* the specified rate infinitely.
* Returns a streaming source that generates events of type {@link
* SimpleEvent} at the specified rate.
* <p>
* The source supports {@linkplain
* StreamSourceStage#withNativeTimestamps(long) native timestamps}. The
* timestamp is the current system time at the moment they are
* generated. The source is not distributed and all the items are
* generated on the same node. This source is not fault-tolerant.
* The sequence will be reset once a job is restarted.
* timestamp is the current system time at the moment they are generated.
* The source is not distributed and all the items are generated on the
* same node. This source is not fault-tolerant. The sequence will be
* reset once a job is restarted.
* <p>
* <b>Note:</b>
* There is no absolute guarantee that the actual rate of emitted
* items will match the supplied value. It is done on a best-effort
* basis.
* <strong>Note:</strong>
* There is no absolute guarantee that the actual rate of emitted items
* will match the supplied value. It is ensured that no emitted event's
* timestamp will be in the future.
*
* @param itemsPerSecond how many items should be emitted each second
*
Expand All @@ -97,20 +101,20 @@ public static StreamSource<SimpleEvent> itemStream(int itemsPerSecond) {
}

/**
* Returns a streaming source which generates events created by the {@code
* generatorFn} at the specified rate infinitely.
* Returns a streaming source that generates events created by the {@code
* generatorFn} at the specified rate.
* <p>
* The source supports {@linkplain
* StreamSourceStage#withNativeTimestamps(long) native timestamps}. The
* timestamp is the current system time at the moment they are
* generated. The source is not distributed and all the items are
* generated on the same node. This source is not fault-tolerant.
* The sequence will be reset once a job is restarted.
* timestamp is the current system time at the moment they are generated.
* The source is not distributed and all the items are generated on the
* same node. This source is not fault-tolerant. The sequence will be
* reset once a job is restarted.
* <p>
* <b>Note:</b>
* There is no absolute guarantee that the actual rate of emitted
* items will match the supplied value. It is done on a best-effort
* basis.
* <strong>Note:</strong>
* There is no absolute guarantee that the actual rate of emitted items
* will match the supplied value. It is ensured that no emitted event's
* timestamp will be in the future.
*
* @param itemsPerSecond how many items should be emitted each second
* @param generatorFn a function which takes the timestamp and the sequence of the generated
Expand All @@ -132,6 +136,50 @@ public static <T> StreamSource<T> itemStream(
.build();
}

/**
* Returns a {@link com.hazelcast.jet.pipeline.StreamSource} that emits an
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
* ever-increasing sequence of {@code Long} numbers with native timestamps
* that are exactly the same amount of time apart, as specified by the
* supplied {@code eventsPerSecond} parameter. The source is distributed and
* suitable for high-throughput performance testing. It emits the events at
* the maximum possible speed, constrained by the invariant that it will
* never emit an event whose timestamp is in the future.
* <p>
* The emission of events is distributed across the parallel processors in
* a round-robin fashion: processor 0 emits the first event, processor 1
* the second one, and so on. There is no coordination that would prevent
* processor 1 from emitting its event before processor 0, though, so this
* only applies to the event timestamps.
* <p>
* Use the {@code initialDelayMillis} parameter to give enough time to the
* Jet cluster to initialize the job on the whole cluster before the time
* of the first event arrives, so that there is no initial flood of events
* from the past. The point of reference is the moment at which the
* coordinator node creates the job's execution plan, before sending it out
* to the rest of the cluster.
* <p>
* This source is not fault-tolerant. The sequence will be reset once a job
* is restarted.
viliam-durina marked this conversation as resolved.
Show resolved Hide resolved
*
* @param eventsPerSecond the desired event rate
* @param initialDelayMillis initial delay in milliseconds before emitting values
*
* @since 4.3
*
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
*/
@Nonnull
public static StreamSource<Long> longStream(long eventsPerSecond, long initialDelayMillis) {
return Sources.streamFromProcessorWithWatermarks("longValues",
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
true,
eventTimePolicy -> ProcessorMetaSupplier.of(
(Address ignored) -> {
long startTime = System.currentTimeMillis() + initialDelayMillis;
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
return ProcessorSupplier.of(() ->
new LongStreamSourceP(startTime, eventsPerSecond, eventTimePolicy));
})
);
}

private static final class ItemStreamSource<T> {
private static final int MAX_BATCH_SIZE = 1024;

Expand Down Expand Up @@ -161,4 +209,5 @@ void fillBuffer(TimestampedSourceBuffer<T> buf) throws Exception {
}
}
}

}
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static com.hazelcast.jet.pipeline.test.Assertions.assertCollectedEventually;
import static org.junit.Assert.assertEquals;
Expand All @@ -48,6 +49,30 @@ public void test_items() {
jet().newJob(p).join();
}

@Test
public void test_longStream() throws Throwable {
int itemsPerSecond = 10;
int timeout = 10;
int numberOfExpectedValues = timeout * itemsPerSecond;
Long[] input = LongStream.range(0, numberOfExpectedValues).boxed().toArray(Long[]::new);
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
List<Long> expected = Arrays.asList(input);

p.readFrom(TestSources.longStream(itemsPerSecond, 0)).
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
withIngestionTimestamps().
apply(assertCollectedEventually(timeout, items -> {
assertTrue("list should contain at least " + numberOfExpectedValues + " items",
items.size() >= numberOfExpectedValues);
assertTrue("list should contain less than " + 2 * numberOfExpectedValues + " items",
items.size() < 2 * numberOfExpectedValues);
for (Long value : items) {
assertTrue(expected.contains(value));
}
}));

expectedException.expectMessage(AssertionCompletedException.class.getName());
executeAndPeel();
}

@Test
public void test_itemStream() throws Throwable {
int expectedItemCount = 20;
Expand Down