Skip to content

Commit

Permalink
mat and 2.6.10 updates
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 13, 2020
1 parent 8fb079d commit e17c7da
Show file tree
Hide file tree
Showing 39 changed files with 221 additions and 257 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Expand Up @@ -23,8 +23,8 @@ jobs:
- stage: check
env: CMD="verifyCodeStyle"
name: "Code style check. Run locally with: sbt verifyCodeStyle"
- env: CMD=";++2.13.1 Test/compile ;++2.13.1 It/compile"
name: "Compile all code with fatal warnings for Scala 2.13. Run locally with: env CI=true sbt ';++2.13.1 Test/compile ;++2.13.1 It/compile'"
- env: CMD=";++2.13.2 Test/compile ;++2.13.2 It/compile"
name: "Compile all code with fatal warnings for Scala 2.13. Run locally with: env CI=true sbt ';++2.13.2 Test/compile ;++2.13.2 It/compile'"
- env: CMD="verifyDocs"
name: "Create all API docs for artifacts/website and all reference docs. Run locally with: sbt verifyDocs"

Expand Down
Expand Up @@ -43,11 +43,8 @@ public abstract class BaseKafkaTest extends KafkaTestKitClass {

public final Logger log = LoggerFactory.getLogger(getClass());

protected final Materializer materializer;

protected BaseKafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) {
protected BaseKafkaTest(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
this.materializer = materializer;
}

@Override
Expand All @@ -67,7 +64,7 @@ protected CompletionStage<Done> produceString(String topic, int messageCount, in
return Source.fromIterator(() -> IntStream.range(0, messageCount).iterator())
.map(Object::toString)
.map(n -> new ProducerRecord<String, String>(topic, partition, DefaultKey(), n))
.runWith(Producer.plainSink(producerDefaults()), materializer);
.runWith(Producer.plainSink(producerDefaults()), system());
}

protected CompletionStage<Done> produceString(String topic, String message) {
Expand All @@ -83,8 +80,7 @@ protected final <K, V> CompletionStage<Done> produce(
Pair<K, V>... messages) {
return Source.from(Arrays.asList(messages))
.map(pair -> new ProducerRecord<>(topic, pair.first(), pair.second()))
.runWith(
Producer.plainSink(producerDefaults(keySerializer, valueSerializer)), materializer);
.runWith(Producer.plainSink(producerDefaults(keySerializer, valueSerializer)), system());
}

protected Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumeString(
Expand All @@ -101,7 +97,7 @@ protected <K, V> Consumer.DrainingControl<List<ConsumerRecord<K, V>>> consume(
Subscriptions.topics(topic))
.take(take)
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(materializer);
.run(system());
}

/**
Expand Down
Expand Up @@ -36,15 +36,14 @@ private static EmbeddedKafkaConfig embeddedKafkaConfig(
protected final int kafkaPort;
protected final int replicationFactor;

public EmbeddedKafkaJunit4Test(
ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) {
super(system, materializer, "localhost:" + kafkaPort);
public EmbeddedKafkaJunit4Test(ActorSystem system, int kafkaPort, int replicationFactor) {
super(system, "localhost:" + kafkaPort);
this.kafkaPort = kafkaPort;
this.replicationFactor = replicationFactor;
}

protected EmbeddedKafkaJunit4Test(ActorSystem system, Materializer materializer, int kafkaPort) {
this(system, materializer, kafkaPort, 1);
protected EmbeddedKafkaJunit4Test(ActorSystem system, int kafkaPort) {
this(system, kafkaPort, 1);
}

protected static void startEmbeddedKafka(int kafkaPort, int replicationFactor) {
Expand Down
Expand Up @@ -6,7 +6,6 @@
package akka.kafka.testkit.javadsl;

import akka.actor.ActorSystem;
import akka.stream.Materializer;
import net.manub.embeddedkafka.EmbeddedKafka$;
import net.manub.embeddedkafka.EmbeddedKafkaConfig;
import net.manub.embeddedkafka.EmbeddedKafkaConfig$;
Expand Down Expand Up @@ -38,15 +37,14 @@ private static EmbeddedKafkaConfig embeddedKafkaConfig(
protected final int kafkaPort;
protected final int replicationFactor;

protected EmbeddedKafkaTest(
ActorSystem system, Materializer materializer, int kafkaPort, int replicationFactor) {
super(system, materializer, "localhost:" + kafkaPort);
protected EmbeddedKafkaTest(ActorSystem system, int kafkaPort, int replicationFactor) {
super(system, "localhost:" + kafkaPort);
this.kafkaPort = kafkaPort;
this.replicationFactor = replicationFactor;
}

protected EmbeddedKafkaTest(ActorSystem system, Materializer materializer, int kafkaPort) {
this(system, materializer, kafkaPort, 1);
protected EmbeddedKafkaTest(ActorSystem system, int kafkaPort) {
this(system, kafkaPort, 1);
}

protected void startEmbeddedKafka(int kafkaPort, int replicationFactor) {
Expand Down
Expand Up @@ -14,9 +14,8 @@
/** JUnit 4 base-class with some convenience for accessing a Kafka broker. */
public abstract class KafkaJunit4Test extends BaseKafkaTest {

protected KafkaJunit4Test(
ActorSystem system, Materializer materializer, String bootstrapServers) {
super(system, materializer, bootstrapServers);
protected KafkaJunit4Test(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
}

@Before
Expand All @@ -33,6 +32,6 @@ public void cleanUpAdmin() {
public void checkForStageLeaks() {
// you might need to configure `stop-timeout` in your `application.conf`
// as the default of 30s will fail this
StreamTestKit.assertAllStagesStopped(materializer);
StreamTestKit.assertAllStagesStopped(Materializer.matFromSystem(system()));
}
}
Expand Up @@ -20,8 +20,8 @@
*/
public abstract class KafkaTest extends BaseKafkaTest {

protected KafkaTest(ActorSystem system, Materializer materializer, String bootstrapServers) {
super(system, materializer, bootstrapServers);
protected KafkaTest(ActorSystem system, String bootstrapServers) {
super(system, bootstrapServers);
}

@BeforeAll
Expand All @@ -38,6 +38,6 @@ public void cleanUpAdmin() {
public void checkForStageLeaks() {
// you might need to configure `stop-timeout` in your `application.conf`
// as the default of 30s will fail this
StreamTestKit.assertAllStagesStopped(materializer);
StreamTestKit.assertAllStagesStopped(Materializer.matFromSystem(system()));
}
}
Expand Up @@ -8,7 +8,6 @@
import akka.actor.ActorSystem;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.internal.TestcontainersKafka;
import akka.stream.Materializer;
import org.junit.After;
import org.junit.Before;

Expand All @@ -24,20 +23,19 @@ public abstract class TestcontainersKafkaJunit4Test extends KafkaJunit4Test {
private static final KafkaTestkitTestcontainersSettings settings =
TestcontainersKafka.Singleton().testcontainersSettings();

protected TestcontainersKafkaJunit4Test(ActorSystem system, Materializer materializer) {
super(system, materializer, startKafka(settings));
protected TestcontainersKafkaJunit4Test(ActorSystem system) {
super(system, startKafka(settings));
}

/** @deprecated Use constructor with `testcontainersSettings` instead. since 2.0.0 */
@Deprecated
protected TestcontainersKafkaJunit4Test(
ActorSystem system, Materializer materializer, String confluentPlatformVersion) {
super(system, materializer, startKafka(confluentPlatformVersion));
protected TestcontainersKafkaJunit4Test(ActorSystem system, String confluentPlatformVersion) {
super(system, startKafka(confluentPlatformVersion));
}

protected TestcontainersKafkaJunit4Test(
ActorSystem system, Materializer materializer, KafkaTestkitTestcontainersSettings settings) {
super(system, materializer, startKafka(settings));
ActorSystem system, KafkaTestkitTestcontainersSettings settings) {
super(system, startKafka(settings));
}

/** @deprecated Use method with `testcontainersSettings` instead. since 2.0.0 */
Expand Down
Expand Up @@ -26,20 +26,20 @@ public abstract class TestcontainersKafkaTest extends KafkaTest {
public static final KafkaTestkitTestcontainersSettings settings =
TestcontainersKafka.Singleton().testcontainersSettings();

protected TestcontainersKafkaTest(ActorSystem system, Materializer materializer) {
super(system, materializer, startKafka(settings));
protected TestcontainersKafkaTest(ActorSystem system) {
super(system, startKafka(settings));
}

protected TestcontainersKafkaTest(
ActorSystem system, Materializer materializer, KafkaTestkitTestcontainersSettings settings) {
super(system, materializer, startKafka(settings));
ActorSystem system, KafkaTestkitTestcontainersSettings settings) {
super(system, startKafka(settings));
}

/** @deprecated Use constructor with `testcontainersSettings` instead. since 2.0.0 */
@Deprecated
protected TestcontainersKafkaTest(
ActorSystem system, Materializer materializer, String confluentPlatformVersion) {
super(system, materializer, startKafka(confluentPlatformVersion));
super(system, startKafka(confluentPlatformVersion));
}

/** @deprecated Use method with `testcontainersSettings` instead. since 2.0.0 */
Expand Down
Expand Up @@ -72,7 +72,7 @@ class TransactionsPartitionedSourceSpec

def runStream(id: String): UniqueKillSwitch =
RestartSource
.onFailuresWithBackoff(10.millis, 100.millis, 0.2)(
.onFailuresWithBackoff(RestartSettings(10.millis, 100.millis, 0.2))(
() => {
transactionalPartitionedCopyStream(
consumerSettings,
Expand All @@ -84,12 +84,12 @@ class TransactionsPartitionedSourceSpec
maxPartitions = sourcePartitions,
restartAfter = Some(restartAfter)
).recover {
case e: TimeoutException =>
if (completedWithTimeout.incrementAndGet() > 10)
"no more messages to copy"
else
throw new Error("Continue restarting copy stream")
}
case e: TimeoutException =>
if (completedWithTimeout.incrementAndGet() > 10)
"no more messages to copy"
else
throw new Error("Continue restarting copy stream")
}
}
)
.viaMat(KillSwitches.single)(Keep.right)
Expand Down
2 changes: 1 addition & 1 deletion tests/src/it/scala/akka/kafka/TransactionsSourceSpec.scala
Expand Up @@ -70,7 +70,7 @@ class TransactionsSourceSpec

def runStream(id: String): UniqueKillSwitch =
RestartSource
.onFailuresWithBackoff(10.millis, 100.millis, 0.2)(
.onFailuresWithBackoff(RestartSettings(10.millis, 100.millis, 0.2))(
() => {
val transactionId = s"$group-$id"
transactionalCopyStream(consumerSettings,
Expand Down
25 changes: 11 additions & 14 deletions tests/src/test/java/docs/javadsl/AssignmentTest.java
Expand Up @@ -18,8 +18,6 @@
// #testkit
import akka.kafka.javadsl.Producer;
import akka.kafka.tests.javadsl.LogCapturingJunit4;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #testkit
Expand Down Expand Up @@ -49,10 +47,9 @@ public class AssignmentTest extends EmbeddedKafkaJunit4Test {
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();

private static final ActorSystem sys = ActorSystem.create("AssignmentTest");
private static final Materializer mat = ActorMaterializer.create(sys);

public AssignmentTest() {
super(sys, mat, KafkaPorts.AssignmentTest());
super(sys, KafkaPorts.AssignmentTest());
}

// #testkit
Expand All @@ -65,7 +62,7 @@ public void mustConsumeFromTheSpecifiedSingleTopic() throws Exception {
final CompletionStage<Done> producerCompletion =
Source.range(1, totalMessages)
.map(msg -> new ProducerRecord<>(topic, 0, DefaultKey(), msg.toString()))
.runWith(Producer.plainSink(producerDefaults()), mat);
.runWith(Producer.plainSink(producerDefaults()), sys);

resultOf(producerCompletion);

Expand All @@ -79,7 +76,7 @@ public void mustConsumeFromTheSpecifiedSingleTopic() throws Exception {
resultOf(
consumer
.takeWhile(m -> Integer.valueOf(m.value()) < totalMessages, true)
.runWith(Sink.seq(), mat))
.runWith(Sink.seq(), sys))
.size();

assertEquals(totalMessages, receivedMessages);
Expand All @@ -99,7 +96,7 @@ public void mustConsumeFromTheSpecifiedTopicPattern() throws Exception {
.map(t -> new ProducerRecord<>(t, 0, DefaultKey(), msg.toString()))
.collect(Collectors.toList())))
.via(Producer.flexiFlow(producerDefaults()))
.runWith(Sink.ignore(), mat);
.runWith(Sink.ignore(), sys);

resultOf(producerCompletion);

Expand All @@ -112,7 +109,7 @@ public void mustConsumeFromTheSpecifiedTopicPattern() throws Exception {

int expectedTotal = totalMessages * topics.size();
final Integer receivedMessages =
resultOf(consumer.take(expectedTotal).runWith(Sink.seq(), mat)).size();
resultOf(consumer.take(expectedTotal).runWith(Sink.seq(), sys)).size();

assertEquals(expectedTotal, (int) receivedMessages);
}
Expand All @@ -128,7 +125,7 @@ public void mustConsumeFromTheSpecifiedPartition() throws Exception {
final Integer partition = msg % 2;
return new ProducerRecord<>(topic, partition, DefaultKey(), msg.toString());
})
.runWith(Producer.plainSink(producerDefaults()), mat);
.runWith(Producer.plainSink(producerDefaults()), sys);

resultOf(producerCompletion);

Expand All @@ -144,7 +141,7 @@ public void mustConsumeFromTheSpecifiedPartition() throws Exception {
consumer
.take(totalMessages / 2)
.map(msg -> Integer.valueOf(msg.value()))
.runWith(Sink.seq(), mat);
.runWith(Sink.seq(), sys);
final List<Integer> messages = resultOf(consumerCompletion);
messages.forEach(m -> assertEquals(0, m % 2));
}
Expand All @@ -156,7 +153,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndOffset() throws Exception {
final CompletionStage<Done> producerCompletion =
Source.range(1, totalMessages)
.map(msg -> new ProducerRecord<>(topic, 0, DefaultKey(), msg.toString()))
.runWith(Producer.plainSink(producerDefaults()), mat);
.runWith(Producer.plainSink(producerDefaults()), sys);

resultOf(producerCompletion);

Expand All @@ -170,7 +167,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndOffset() throws Exception {
// #assingment-single-partition-offset

final CompletionStage<List<Long>> consumerCompletion =
consumer.take(totalMessages / 2).map(ConsumerRecord::offset).runWith(Sink.seq(), mat);
consumer.take(totalMessages / 2).map(ConsumerRecord::offset).runWith(Sink.seq(), sys);
final List<Long> messages = resultOf(consumerCompletion);
IntStream.range(0, (int) offset).forEach(idx -> assertEquals(idx, messages.get(idx) - offset));
}
Expand All @@ -185,7 +182,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() throws Exception
msg ->
new ProducerRecord<>(
topic, 0, System.currentTimeMillis(), DefaultKey(), msg.toString()))
.runWith(Producer.plainSink(producerDefaults()), mat);
.runWith(Producer.plainSink(producerDefaults()), sys);

resultOf(producerCompletion);

Expand All @@ -204,7 +201,7 @@ public void mustConsumeFromTheSpecifiedPartitionAndTimestamp() throws Exception
consumer
.takeWhile(m -> Integer.valueOf(m.value()) < totalMessages, true)
.map(ConsumerRecord::timestamp)
.runWith(Sink.seq(), mat);
.runWith(Sink.seq(), sys);
final long oldMessages =
resultOf(consumerCompletion).stream().map(t -> t - now).filter(t -> t > 5000).count();
assertEquals(0, oldMessages);
Expand Down

0 comments on commit e17c7da

Please sign in to comment.