Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding test data sources designed for benchmarking purposes #2382

Merged
merged 28 commits into from Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e29ddb3
add methods for distributed batch and stream stages w/ long values fo…
Jun 30, 2020
3830ffc
advanced source creation for performance testing
Jul 18, 2020
8fcf108
incorporated PR feedback (formatting, function call reordering), rena…
Jul 19, 2020
5672b33
added javadoc, minor improvements
Jul 20, 2020
d476f97
javadoc updated, renaming
Jul 23, 2020
4f6fa02
Improve Javadoc
Jul 23, 2020
c514b80
Rename StreamSourceLong -> LongStreamSourceP
Jul 23, 2020
7b97dd6
test renaming
Jul 23, 2020
8d33a22
Extended Javadoc, updated annotations
Jul 23, 2020
a82bac8
Added paragraph in Javadoc
Jul 23, 2020
2752257
Simplifications and formatting changes
Jul 24, 2020
2981be3
Reflow para
Jul 24, 2020
05f23c8
Fix source name
Jul 28, 2020
703d740
Use the same startTime for all members
Jul 28, 2020
905d7c5
Rename counter -> emittedCount
Jul 28, 2020
4b6a3a9
Improve event scheduling
Jul 28, 2020
da22d82
Expose private method in EventTimeMapper
Jul 28, 2020
f50b4ca
Use EventTimeMapper
Jul 28, 2020
81e3d4f
Remove extra line
Jul 29, 2020
f7edb81
Remove unused imports in TestSources.java
Jul 29, 2020
df1ad0b
Change unit of hiccup threshold from ms to ns in order to reduce requ…
Jul 29, 2020
82b4287
Retrieve logger from Context in init method
Jul 29, 2020
680f042
Added note to Javadoc mentioning the consequences of a clock skew
Jul 29, 2020
71d01db
Removing unused import from LongStreamSourceP.java
Jul 30, 2020
fdf3cea
Use native timestamps instead of ingestion ts in test
Jul 30, 2020
16b48a8
Merge branch 'master' into guenter-hesse/master
viliam-durina Aug 4, 2020
46d7606
Fix test failure
viliam-durina Aug 4, 2020
2cfed4b
Simplify the test
viliam-durina Aug 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2008-2020, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.jet.pipeline.test;

import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Watermark;

import javax.annotation.Nonnull;

import static com.hazelcast.jet.impl.JetEvent.jetEvent;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Represents a StreamSource containing Long values which can be used
* for performance pipeline testing and development.
*
* @since 4.3
*/
public class StreamSourceLong extends AbstractProcessor {

private static final long SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS = 10;
private static final long SIMPLE_TIME_SPAN_MILLIS = HOURS.toMillis(3);

private static final long REPORT_PERIOD_NANOS = SECONDS.toNanos(SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS);
private static final long HICCUP_REPORT_THRESHOLD_MILLIS = 10;
private final long nanoTimeMillisToCurrentTimeMillis = determineTimeOffset();
private final long startTime;
private final long itemsPerSecond;
private final boolean isReportingThroughput;
private final long wmGranularity;
private final long wmOffset;
private long globalProcessorIndex;
private long totalParallelism;
private long emitPeriod;

private final AppendableTraverser<Object> traverser = new AppendableTraverser<>(2);
private long emitSchedule;
private long lastReport;
private long counterAtLastReport;
private long lastCallNanos;
private long counter;
private long lastEmittedWm;
private long nowNanos;

/**
* Creates a stream source for Long values that is supposed to be used for performance testing purposes.
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
*
* @since 4.3
*/
@Nonnull
public StreamSourceLong(
long startTime,
long itemsPerSecond,
EventTimePolicy<? super Long> eventTimePolicy,
boolean shouldReportThroughput
) {
this.wmGranularity = eventTimePolicy.watermarkThrottlingFrameSize();
this.wmOffset = eventTimePolicy.watermarkThrottlingFrameOffset();
this.startTime = MILLISECONDS.toNanos(startTime + nanoTimeMillisToCurrentTimeMillis);
this.itemsPerSecond = itemsPerSecond;
this.isReportingThroughput = shouldReportThroughput;
}

@Override
protected void init(Context context) {
totalParallelism = context.totalParallelism();
globalProcessorIndex = context.globalProcessorIndex();
emitPeriod = SECONDS.toNanos(1) * totalParallelism / itemsPerSecond;
lastCallNanos = lastReport = emitSchedule =
startTime + SECONDS.toNanos(1) * globalProcessorIndex / itemsPerSecond;
}

@Override
public boolean complete() {
nowNanos = System.nanoTime();
detectAndReportHiccup();
emitEvents();
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
if (isReportingThroughput) {
reportThroughput();
}
return false;
}

private void emitEvents() {
while (emitFromTraverser(traverser) && emitSchedule <= nowNanos) {
long timestamp = NANOSECONDS.toMillis(emitSchedule) - nanoTimeMillisToCurrentTimeMillis;
traverser.append(jetEvent(timestamp, counter * totalParallelism + globalProcessorIndex));
counter++;
emitSchedule += emitPeriod;
if (timestamp >= lastEmittedWm + wmGranularity) {
long wmToEmit = timestamp - (timestamp % wmGranularity) + wmOffset;
traverser.append(new Watermark(wmToEmit));
lastEmittedWm = wmToEmit;
}
}
}

private void detectAndReportHiccup() {
long millisSinceLastCall = NANOSECONDS.toMillis(nowNanos - lastCallNanos);
if (millisSinceLastCall > HICCUP_REPORT_THRESHOLD_MILLIS) {
System.out.printf("*** Source #%d hiccup: %,d ms%n", globalProcessorIndex, millisSinceLastCall);
}
lastCallNanos = nowNanos;
}

private void reportThroughput() {
long nanosSinceLastReport = nowNanos - lastReport;
if (nanosSinceLastReport < REPORT_PERIOD_NANOS) {
return;
}
lastReport = nowNanos;
long itemCountSinceLastReport = counter - counterAtLastReport;
counterAtLastReport = counter;
System.out.printf("%,d p%d: %,.0f items/second%n",
simpleTime(NANOSECONDS.toMillis(nowNanos)),
globalProcessorIndex,
itemCountSinceLastReport / ((double) nanosSinceLastReport / SECONDS.toNanos(1))
);
}

@Override
public boolean tryProcessWatermark(Watermark watermark) {
throw new UnsupportedOperationException("Source processor shouldn't be asked to process a watermark");
}

private static long determineTimeOffset() {
long milliTime = System.currentTimeMillis();
long nanoTime = System.nanoTime();
return NANOSECONDS.toMillis(nanoTime) - milliTime;
}

private static long simpleTime(long timeMillis) {
return timeMillis % SIMPLE_TIME_SPAN_MILLIS;
}

}
Expand Up @@ -16,12 +16,17 @@

package com.hazelcast.jet.pipeline.test;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.annotation.EvolvingApi;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.SourceBuilder.TimestampedSourceBuffer;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamSourceStage;
import com.hazelcast.jet.pipeline.SourceBuilder.TimestampedSourceBuffer;

import javax.annotation.Nonnull;
import java.util.Arrays;
Expand Down Expand Up @@ -161,4 +166,71 @@ void fillBuffer(TimestampedSourceBuffer<T> buf) throws Exception {
}
}
}

/**
* Returns a stream source that contains long values and does late materialization
* of values after distributing them across the cluster, which is useful for high-throughput
* testing.
*
* @since 4.3
*
*/
@Nonnull
public static StreamSource<Long> longStreamSource(long itemsPerSecond, long initialDelay) {
return longStreamSource(itemsPerSecond, initialDelay, Vertex.LOCAL_PARALLELISM_USE_DEFAULT, false);
}

/**
* Returns a stream source that contains long values and does late materialization
* of values after distributing them across the cluster, which is useful for high-throughput
* testing.
*
* @since 4.3
*
*/
@Nonnull
public static StreamSource<Long> longStreamSource(long itemsPerSecond, long initialDelay,
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
int preferredLocalParallelism) {
return longStreamSource(itemsPerSecond, initialDelay, preferredLocalParallelism, false);
}

/**
* Returns a stream source that contains long values and does late materialization
* of values after distributing them across the cluster, which is useful for high-throughput
* testing.
*
* @since 4.3
*
*/
@Nonnull
public static StreamSource<Long> longStreamSource(long itemsPerSecond, long initialDelay,
boolean shouldReportThroughput) {
return longStreamSource(itemsPerSecond, initialDelay,
Vertex.LOCAL_PARALLELISM_USE_DEFAULT, shouldReportThroughput);
}

/**
* Returns a stream source that contains long values and does late materialization
* of values after distributing them across the cluster, which is useful for high-throughput
* testing.
*
* @since 4.3
*
*/
@Nonnull
public static StreamSource<Long> longStreamSource(long itemsPerSecond, long initialDelay,
int preferredLocalParallelism, boolean shouldReportThroughput) {
return Sources.streamFromProcessorWithWatermarks("longs",
true,
eventTimePolicy -> ProcessorMetaSupplier.of(
preferredLocalParallelism,
(Address ignored) -> {
long startTime = System.currentTimeMillis() + initialDelay;
return ProcessorSupplier.of(() ->
new StreamSourceLong(startTime, itemsPerSecond,
eventTimePolicy, shouldReportThroughput));
})
);

}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static com.hazelcast.jet.pipeline.test.Assertions.assertCollectedEventually;
import static org.junit.Assert.assertEquals;
Expand All @@ -48,6 +49,30 @@ public void test_items() {
jet().newJob(p).join();
}

@Test
public void test_stream_source_long() throws Throwable {
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
int itemsPerSecond = 10;
int timeout = 10;
int numberOfExpectedValues = timeout * itemsPerSecond;
Long[] input = LongStream.range(0, numberOfExpectedValues).boxed().toArray(Long[]::new);
guenter-hesse marked this conversation as resolved.
Show resolved Hide resolved
List<Long> expected = Arrays.asList(input);

p.readFrom(TestSources.longStreamSource(itemsPerSecond, 0)).
withIngestionTimestamps().
apply(assertCollectedEventually(timeout, items -> {
assertTrue("list should contain at least " + numberOfExpectedValues + " items",
items.size() >= numberOfExpectedValues);
assertTrue("list should contain less than " + 2 * numberOfExpectedValues + " items",
items.size() < 2 * numberOfExpectedValues);
for (Long value : items) {
assertTrue(expected.contains(value));
}
}));

expectedException.expectMessage(AssertionCompletedException.class.getName());
executeAndPeel();
}

@Test
public void test_itemStream() throws Throwable {
int expectedItemCount = 20;
Expand Down