Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use external Kafka in tests #775

Merged
merged 5 commits into from Apr 30, 2019
Merged
Changes from 4 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -7,6 +7,7 @@ val akkaVersion = "2.5.21"
val kafkaVersion = "2.1.1"
val kafkaVersionForDocs = "21"
val scalatestVersion = "3.0.5"
val testcontainersVersion = "1.11.2"
val slf4jVersion = "1.7.26"
val confluentAvroSerializerVersion = "5.0.1"

@@ -155,6 +156,7 @@ lazy val testkit = project
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion,
"net.manub" %% "scalatest-embedded-kafka" % "2.0.0" exclude ("log4j", "log4j"),
"org.testcontainers" % "kafka" % testcontainersVersion % Provided,
"org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro which pulls in commons-compress 1.8.1
"org.scalatest" %% "scalatest" % scalatestVersion % Provided,
"junit" % "junit" % "4.12" % Provided,
@@ -182,6 +184,7 @@ lazy val tests = project
// See https://github.com/sbt/sbt/issues/3618#issuecomment-448951808
"javax.ws.rs" % "javax.ws.rs-api" % "2.1.1" artifacts Artifact("javax.ws.rs-api", "jar", "jar"),
"net.manub" %% "scalatest-embedded-schema-registry" % "2.0.0" % Test exclude ("log4j", "log4j") exclude ("org.slf4j", "slf4j-log4j12"),
"org.testcontainers" % "kafka" % testcontainersVersion % Test,
"org.apache.commons" % "commons-compress" % "1.18", // embedded Kafka pulls in Avro, which pulls in commons-compress 1.8.1, see testing.md
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"io.spray" %% "spray-json" % "1.3.5" % Test,
@@ -245,13 +248,14 @@ lazy val docs = project
("\\.java\\.scala".r, _ => ".java")
),
Paradox / siteSubdirName := s"docs/alpakka-kafka/${projectInfoVersion.value}",
Paradox / sourceDirectory := sourceDirectory.value / "main" / "paradox",
Paradox / sourceDirectory := sourceDirectory.value / "main",
Paradox / paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
Paradox / paradoxProperties ++= Map(
"akka.version" -> akkaVersion,
"kafka.version" -> kafkaVersion,
"confluent.version" -> confluentAvroSerializerVersion,
"scalatest.version" -> scalatestVersion,
"testcontainers.version" -> testcontainersVersion,
"extref.akka-docs.base_url" -> s"https://doc.akka.io/docs/akka/$akkaVersion/%s",
"extref.kafka-docs.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/documentation/%s",
"extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s",
@@ -10,7 +10,6 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.annotation.InternalApi
import akka.kafka.ProducerMessage._
import akka.stream._
import akka.stream.stage._
import org.apache.kafka.clients.producer.Producer

import scala.concurrent.Future
@@ -1,6 +1,10 @@
# Testing

To simplify testing of streaming integrations with Alpakka Kafka, it provides the **Alpakka Kafka testkit**.
To simplify testing of streaming integrations with Alpakka Kafka, it provides the **Alpakka Kafka testkit**. It provides help for

* @ref:[mocking the Alpakka Kafka Consumers and Producers](#mocking-the-consumer-or-producer)
* @ref:[using an embedded Kafka](#testing-with-an-embedded-kafka-server)
* @ref:[starting and stopping Kafka in Docker](#testing-with-kafka-in-docker)

@@project-info{ projectId="testkit" }

@@ -10,7 +14,7 @@ To simplify testing of streaming integrations with Alpakka Kafka, it provides th
version=$project.version$
}

Note that Akka testkits do not promise binary compatibility. The API might be changed even between minor versions.
Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Alpakka Kafka testkits's direct dependencies and the second tab shows all libraries it depends on transitively. We've overriden the `commons-compress` library to use a version with [fewer known security vulnerabilities](https://commons.apache.org/proper/commons-compress/security-reports.html).

@@ -61,7 +65,7 @@ Java JUnit 4
Java JUnit 5
: @@snip [snip](/tests/src/test/java/docs/javadsl/ProducerExampleTest.java) { #testkit }

The JUnit test base classes run the [`assertAllStagesStopped`](https://doc.akka.io/api/akka/current/akka/stream/testkit/javadsl/StreamTestKit$.html#assertAllStagesStopped(mat:akka.stream.Materializer):Unit) check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.
The JUnit test base classes run the [`assertAllStagesStopped`](https://doc.akka.io/api/akka/current/akka/stream/testkit/javadsl/StreamTestKit$.html#assertAllStagesStopped) check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.


### Testing from Scala code
@@ -76,18 +80,60 @@ The `KafkaSpec` class offers access to

`EmbeddedKafkaLike` extends `KafkaSpec` to add automatic starting and stopping of the embedded Kafka broker.

Most Alpakka Kafka tests implemented in Scala use [Scalatest](http://www.scalatest.org/) with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest $scalatest.version$.)
Some Alpakka Kafka tests implemented in Scala use [Scalatest](http://www.scalatest.org/) with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest $scalatest.version$.)

@@dependency [Maven,sbt,Gradle] {
group=org.scalatest
artifact=scalatest
version=$scalatest.version$
scope=test
}

By mixin in `EmbeddedKafkaLike` an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.

This comment has been minimized.

Copy link
@2m

2m Apr 30, 2019

Member
Suggested change
By mixin in `EmbeddedKafkaLike` an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.
By mixing in `EmbeddedKafkaLike` an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.

Scala
: @@snip [snip](/tests/src/test/scala/akka/kafka/scaladsl/SpecBase.scala) { #testkit }
: @@snip [snip](/tests/src/test/scala/akka/kafka/scaladsl/SpecBase.scala) { #testkit #embeddedkafka }

With this `EmbeddedKafkaSpecBase` class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the `createKafkaConfig` as shown above.

To ensure proper shutdown of all stages in every test, wrap your test code in [`assertAllStagesStopped`](https://doc.akka.io/api/akka/current/akka/stream/testkit/scaladsl/StreamTestKit$.html#assertAllStagesStopped). This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.


## Testing with Kafka in Docker

The [Testcontainers](https://www.testcontainers.org/) project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn't support so that *EmbeddedKafka* can't be used.

@@@note

The Testcontainers support is new to Alpakka Kafka since 1.0.2 and may evolve a bit more.

@@@

With this `SpecBase` class test classes can extend it to automatically start and stop a Kafka broker to test with.
### Testing from Java code

Testcontainers is designed to be used with JUnit and you can follow [their documentation](https://www.testcontainers.org/modules/kafka/) to start and stop Kafka. To start a single instance for many tests see [Singleton containers](https://www.testcontainers.org/test_framework_integration/manual_lifecycle_control/).


### Testing from Scala code

The Testcontainers dependency must be added to your project explicitly.

@@dependency [Maven,sbt,Gradle] {
group=org.testcontainers
artifact=kafka
version=$testcontainers.version$
scope=test
}

By mixin in `TestcontainersKafkaLike` the Kafka Docker container will be started before the first test and shut down after all tests are finished.

This comment has been minimized.

Copy link
@2m

2m Apr 30, 2019

Member
Suggested change
By mixin in `TestcontainersKafkaLike` the Kafka Docker container will be started before the first test and shut down after all tests are finished.
By mixing in `TestcontainersKafkaLike` the Kafka Docker container will be started before the first test and shut down after all tests are finished.

Scala
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/AssignmentSpec.scala) { #testkit }
: @@snip [snip](/tests/src/test/scala/akka/kafka/scaladsl/SpecBase.scala) { #testkit #testcontainers}


With this `TestcontainersSampleSpec` class test classes can extend it to automatically start and stop a Kafka broker to test with.

To ensure proper shutdown of all stages in every test, wrap your test code in [`assertAllStagesStopped`](https://doc.akka.io/api/akka/current/akka/stream/testkit/scaladsl/StreamTestKit$.html#assertAllStagesStopped[T](block:=%3ET)(implicitmaterializer:akka.stream.Materializer):T). This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.
To ensure proper shutdown of all stages in every test, wrap your test code in [`assertAllStagesStopped`](https://doc.akka.io/api/akka/current/akka/stream/testkit/scaladsl/StreamTestKit$.html#assertAllStagesStopped). This may interfere with the `stop-timeout` which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your `application.conf` for tests.


## Alternative testing libraries
@@ -37,7 +37,8 @@ import scala.util.{Failure, Success, Try}
trait EmbeddedKafkaLike extends KafkaSpec {

lazy implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = createKafkaConfig
def createKafkaConfig: EmbeddedKafkaConfig

def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)

This comment has been minimized.

Copy link
@ennru

ennru Apr 30, 2019

Author Member

I don't know why we didn't have a default config here before.


override def bootstrapServers =
s"localhost:${embeddedKafkaConfig.kafkaPort}"
@@ -53,10 +54,12 @@ trait EmbeddedKafkaLike extends KafkaSpec {
}
}

abstract class KafkaSpec(val kafkaPort: Int, val zooKeeperPort: Int, actorSystem: ActorSystem)
abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: ActorSystem)
extends TestKit(actorSystem)
with KafkaTestKit {

def kafkaPort: Int = _kafkaPort

def this(kafkaPort: Int) = this(kafkaPort, kafkaPort + 1, ActorSystem("Spec"))

val log: Logger = LoggerFactory.getLogger(getClass)
@@ -8,7 +8,7 @@ package akka.kafka.testkit.scaladsl
import akka.kafka.testkit.internal.TestFrameworkInterface
import org.scalatest.Suite

abstract class ScalatestKafkaSpec(override val kafkaPort: Int)
abstract class ScalatestKafkaSpec(kafkaPort: Int)
extends KafkaSpec(kafkaPort)
with Suite
with TestFrameworkInterface.Scalatest { this: Suite
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <http://softwaremill.com>
* Copyright (C) 2016 - 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.kafka.testkit.scaladsl

import org.testcontainers.containers.KafkaContainer

/**
* Uses [Testcontainers](https://www.testcontainers.org/) to start a Kafka broker in a Docker container.
* The Testcontainers dependency has to be added explicitly.
*/
trait TestcontainersKafkaLike extends KafkaSpec {
import TestcontainersKafkaLike._

override def kafkaPort: Int = {
requireStarted()
kafkaPortInternal
}

override def bootstrapServers: String = {
requireStarted()
kafkaBootstrapServersInternal
}

override def setUp(): Unit = {
if (kafkaPortInternal == -1) {
val kafkaContainer = new KafkaContainer()
kafkaContainer.start()
kafkaBootstrapServersInternal = kafkaContainer.getBootstrapServers
kafkaPortInternal =
kafkaBootstrapServersInternal.substring(kafkaBootstrapServersInternal.lastIndexOf(":") + 1).toInt
}
super.setUp()
}
}

private object TestcontainersKafkaLike {

private var kafkaBootstrapServersInternal: String = _
private var kafkaPortInternal: Int = -1

private def requireStarted(): Unit =
require(kafkaPortInternal != -1, "Testcontainers Kafka hasn't been started via `setUp`")

}
@@ -16,25 +16,24 @@ object KafkaPorts {
val TransactionsSpec = 9022
val ReconnectSpec = 9032
val ReconnectSpecProxy = 9034
val TimestampSpec = 9042
// val _ = 9042

This comment has been minimized.

Copy link
@ennru

ennru Apr 30, 2019

Author Member

I kept those for re-use.

val MultiConsumerSpec = 9052
val ScalaConsumerExamples = 9062
// val _ = 9062
val ScalaPartitionExamples = 9072
val ScalaAtLeastOnceExamples = 9082
val ScalaFetchMetadataExamples = 9092
// val _ = 9082
// val _ = 9092
val ScalaTransactionsExamples = 9102
val ScalaProducerExamples = 9112
val PartitionedSourcesSpec = 9122
// val _ = 9112
// val _ = 9122
val AssignmentSpec = 9132
val AssignmentTest = 9142
val ScalaAvroSerialization = 9152
val SerializationTest = 9162
val NoBrokerSpec = 9172
// val _ = 9172
val AtLeastOnceToManyTest = 9182
val FetchMetadataTest = 9192
val JavaProducerExamples = 9202
val JavaTransactionsExamples = 9212
val ConsumerExamplesTest = 9222
val CommittingSpec = 9232

}
@@ -10,12 +10,12 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.ProducerMessage.MultiMessage
import akka.kafka._
import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestProbe
import akka.{Done, NotUsed}
import net.manub.embeddedkafka.EmbeddedKafkaConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.scalatest._
@@ -24,17 +24,10 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._

class CommittingSpec extends SpecBase(kafkaPort = KafkaPorts.CommittingSpec) with Inside {
class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside {

implicit val patience: PatienceConfig = PatienceConfig(30.seconds, 500.millis)

def createKafkaConfig: EmbeddedKafkaConfig =
EmbeddedKafkaConfig(kafkaPort,
zooKeeperPort,
Map(
"num.partitions" -> "2",
"offsets.topic.replication.factor" -> "1"
))
final val Numbers = (1 to 200).map(_.toString)
final val partition1 = 1

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.