Skip to content

Commit

Permalink
Upgrade Akka 2.6.14 and Kafka 2.7.0 (#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Apr 19, 2021
1 parent e3011ee commit 66f3703
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .scala-steward.conf
Expand Up @@ -5,9 +5,9 @@ updates.ignore = [
{ groupId = "org.scalameta", artifactId = "scalafmt-core" }
]
updates.pin = [
{ groupId = "org.apache.kafka", artifactId="kafka-clients", version="2.6." }
{ groupId = "org.apache.kafka", artifactId="kafka-clients", version="2.7." }
# To be updated in tandem with upstream Akka
{ groupId = "com.fasterxml.jackson.core", version="2.10." }
{ groupId = "com.fasterxml.jackson.core", version="2.11." }
{ groupId = "org.scalatest", artifactId = "scalatest", version = "3.1." }
]

Expand Down
22 changes: 9 additions & 13 deletions build.sbt
Expand Up @@ -10,32 +10,28 @@ val Scala212 = "2.12.13"
val Scala213 = "2.13.4"

val AkkaBinaryVersionForDocs = "2.6"
val KafkaVersionForDocs = "26"
val KafkaVersionForDocs = "27"

val akkaVersion = "2.6.10"
val kafkaVersion = "2.6.0"
// TODO Jackson is now a provided dependency of kafka-clients
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/2.6.0
val jacksonVersion = "2.10.5.1"
val akkaVersion = "2.6.14"
val kafkaVersion = "2.7.0"
// Jackson is now a provided dependency of kafka-clients
// This should align with the Jackson minor version used in Akka 2.6.x
// https://github.com/akka/akka/blob/master/project/Dependencies.scala#L23
val jacksonVersion = "2.11.4"
val scalatestVersion = "3.1.4"
val testcontainersVersion = "1.15.2"
val slf4jVersion = "1.7.30"
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val confluentAvroSerializerVersion = "6.0.1"
val confluentAvroSerializerVersion = "6.1.1"
val scalapb = "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.11"
val kafkaBrokerWithoutSlf4jLog4j = "org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12")

val confluentLibsExclusionRules = Seq(
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("com.typesafe.scala-logging"),
ExclusionRule("org.apache.kafka"),
// a transient dependency of `kafka-avro-serializer` brings in a SNAPSHOT version of `javafx.base` that is no longer
// published to maven central. this is a workaround for the upstream confluent `rest-utils` project (that
// `kafka-avro-serializer` depends on, upgrades their version of jersey.
// https://github.com/confluentinc/rest-utils/issues/170
ExclusionRule("org.openjfx", "javafx.base")
ExclusionRule("org.apache.kafka")
)

// Allows to silence scalac compilation warnings selectively by code block or file path
Expand Down
Expand Up @@ -110,7 +110,7 @@ import scala.concurrent.{ExecutionContext, Future}

setHandler(shape.out, new OutHandler {
override def onPull(): Unit = pump()
override def onDownstreamFinish(): Unit =
override def onDownstreamFinish(cause: Throwable): Unit =
performShutdown()
})

Expand Down
Expand Up @@ -75,6 +75,8 @@ private trait MetricsControl extends scaladsl.Consumer.Control {
protected def executionContext: ExecutionContext
protected def consumerFuture: Future[ActorRef]

// FIXME: this can't be accessed until the stream has materialized because the `def executionContext` implementation
// takes the executioncontext from the materializer. should it throw an exception, or block, until materialization?
def metrics: Future[Map[MetricName, Metric]] = {
import akka.pattern.ask

Expand All @@ -84,7 +86,7 @@ private trait MetricsControl extends scaladsl.Consumer.Control {
consumer
.ask(RequestMetrics)(Timeout(1.minute))
.mapTo[ConsumerMetrics]
.map(_.metrics)(ExecutionContexts.sameThreadExecutionContext)
.map(_.metrics)(ExecutionContexts.parasitic)
}(executionContext)
}
}
Expand All @@ -103,7 +105,7 @@ final private[kafka] class ConsumerControlAsJava(underlying: scaladsl.Consumer.C
override def isShutdown: CompletionStage[Done] = underlying.isShutdown.toJava

override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] =
underlying.metrics.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava
underlying.metrics.map(_.asJava)(ExecutionContexts.parasitic).toJava
}

/** Internal API */
Expand Down
Expand Up @@ -71,7 +71,7 @@ private[kafka] trait DeferredProducer[K, V] {
closeAndFailStageCb.invoke(e)
e
}
)(ExecutionContexts.sameThreadExecutionContext)
)(ExecutionContexts.parasitic)
changeProducerAssignmentLifecycle(AsyncCreateRequestSent)
}
}
Expand Down
Expand Up @@ -9,7 +9,7 @@ import akka.actor.{ActorRef, ExtendedActorSystem, Terminated}
import akka.annotation.InternalApi
import akka.kafka.scaladsl.PartitionAssignmentHandler
import akka.kafka.{ConsumerSettings, RestrictedConsumer, Subscription}
import akka.stream.{ActorMaterializerHelper, SourceShape}
import akka.stream.SourceShape
import org.apache.kafka.common.TopicPartition

import scala.concurrent.{Future, Promise}
Expand All @@ -32,7 +32,7 @@ import scala.concurrent.{Future, Promise}
final def consumerFuture: Future[ActorRef] = consumerPromise.future

final def createConsumerActor(): ActorRef = {
val extendedActorSystem = ActorMaterializerHelper.downcast(materializer).system.asInstanceOf[ExtendedActorSystem]
val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem]
val actor =
extendedActorSystem.systemActorOf(akka.kafka.KafkaConsumerActor.props(sourceActor.ref, settings),
s"kafka-consumer-$actorNumber")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/akka/kafka/internal/SubSourceLogic.scala
Expand Up @@ -216,7 +216,7 @@ private class SubSourceLogic[K, V, Msg](
new OutHandler {
override def onPull(): Unit =
emitSubSourcesForPendingPartitions()
override def onDownstreamFinish(): Unit = performShutdown()
override def onDownstreamFinish(cause: Throwable): Unit = performShutdown()
}
)

Expand Down Expand Up @@ -446,9 +446,9 @@ private abstract class SubSourceStageLogic[K, V, Msg](
shape.out,
new OutHandler {
override def onPull(): Unit = pump()
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
subSourceCancelledCb.invoke(tp -> onDownstreamFinishSubSourceCancellationStrategy())
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
}
}
)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/akka/kafka/javadsl/Consumer.scala
Expand Up @@ -280,7 +280,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
_ => ()
)
.map {
Expand Down Expand Up @@ -311,7 +311,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
(tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava)
)
.map {
Expand Down Expand Up @@ -350,7 +350,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
_ => ()
)
.map {
Expand All @@ -375,7 +375,7 @@ object Consumer {
settings,
subscription,
(tps: Set[TopicPartition]) =>
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.sameThreadExecutionContext),
getOffsetsOnAssign(tps.asJava).toScala.map(_.asScala.toMap)(ExecutionContexts.parasitic),
(tps: Set[TopicPartition]) => onRevoke.accept(tps.asJava)
)
.map {
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/akka/kafka/javadsl/MetadataClient.scala
Expand Up @@ -28,13 +28,13 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
.getBeginningOffsets(partitions.asScala.toSet)
.map { beginningOffsets =>
beginningOffsets.view.mapValues(Long.box).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
.toJava

def getBeginningOffsetForPartition[K, V](partition: TopicPartition): CompletionStage[java.lang.Long] =
metadataClient
.getBeginningOffsetForPartition(partition)
.map(Long.box)(ExecutionContexts.sameThreadExecutionContext)
.map(Long.box)(ExecutionContexts.parasitic)
.toJava

def getEndOffsets(
Expand All @@ -44,29 +44,29 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
.getEndOffsets(partitions.asScala.toSet)
.map { endOffsets =>
endOffsets.view.mapValues(Long.box).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
.toJava

def getEndOffsetForPartition(partition: TopicPartition): CompletionStage[java.lang.Long] =
metadataClient
.getEndOffsetForPartition(partition)
.map(Long.box)(ExecutionContexts.sameThreadExecutionContext)
.map(Long.box)(ExecutionContexts.parasitic)
.toJava

def listTopics(): CompletionStage[java.util.Map[java.lang.String, java.util.List[PartitionInfo]]] =
metadataClient
.listTopics()
.map { topics =>
topics.view.mapValues(partitionsInfo => partitionsInfo.asJava).toMap.asJava
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
.toJava

def getPartitionsFor(topic: java.lang.String): CompletionStage[java.util.List[PartitionInfo]] =
metadataClient
.getPartitionsFor(topic)
.map { partitionsInfo =>
partitionsInfo.asJava
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
.toJava

@deprecated("use `getCommittedOffsets`", "2.0.3")
Expand All @@ -82,7 +82,7 @@ class MetadataClient private (metadataClient: akka.kafka.scaladsl.MetadataClient
.getCommittedOffsets(partitions.asScala.toSet)
.map { committedOffsets =>
committedOffsets.asJava
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)
.toJava

def close(): Unit =
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/akka/kafka/scaladsl/Committer.scala
Expand Up @@ -37,7 +37,7 @@ object Committer {
case WaitForAck =>
offsetBatches
.mapAsyncUnordered(settings.parallelism) { batch =>
batch.commitInternal().map(_ => batch)(ExecutionContexts.sameThreadExecutionContext)
batch.commitInternal().map(_ => batch)(ExecutionContexts.parasitic)
}
case SendAndForget =>
offsetBatches.map(_.tellCommit())
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/akka/kafka/scaladsl/Consumer.scala
Expand Up @@ -104,8 +104,8 @@ object Consumer {
override def shutdown(): Future[Done] =
control
.shutdown()
.flatMap(_ => streamCompletion)(ExecutionContexts.sameThreadExecutionContext)
.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
.flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContexts.parasitic)

override def drainAndShutdown[S](streamCompletion: Future[S])(implicit ec: ExecutionContext): Future[S] =
control.drainAndShutdown(streamCompletion)
Expand All @@ -119,8 +119,8 @@ object Consumer {

override def isShutdown: Future[Done] =
control.isShutdown
.flatMap(_ => streamCompletion)(ExecutionContexts.sameThreadExecutionContext)
.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
.flatMap(_ => streamCompletion)(ExecutionContexts.parasitic)
.map(_ => Done)(ExecutionContexts.parasitic)

override def metrics: Future[Map[MetricName, Metric]] = control.metrics
}
Expand Down Expand Up @@ -257,7 +257,7 @@ object Consumer {
def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
committableSource[K, V](settings, subscription).mapAsync(1) { m =>
m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.parasitic)
}

/**
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/akka/kafka/scaladsl/MetadataClient.scala
Expand Up @@ -30,7 +30,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

def getBeginningOffsetForPartition(partition: TopicPartition): Future[Long] =
getBeginningOffsets(Set(partition))
Expand All @@ -43,7 +43,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

def getEndOffsetForPartition(partition: TopicPartition): Future[Long] =
getEndOffsets(Set(partition))
Expand All @@ -56,7 +56,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

def getPartitionsFor(topic: String): Future[List[PartitionInfo]] =
(consumerActor ? GetPartitionsFor(topic))(timeout)
Expand All @@ -65,7 +65,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

@deprecated("use `getCommittedOffsets`", "2.0.3")
def getCommittedOffset(partition: TopicPartition): Future[OffsetAndMetadata] =
Expand All @@ -75,7 +75,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

def getCommittedOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition, OffsetAndMetadata]] =
(consumerActor ? GetCommittedOffsets(partitions))(timeout)
Expand All @@ -84,7 +84,7 @@ class MetadataClient private (consumerActor: ActorRef, timeout: Timeout, managed
.flatMap {
case Success(res) => Future.successful(res)
case Failure(e) => Future.failed(e)
}(ExecutionContexts.sameThreadExecutionContext)
}(ExecutionContexts.parasitic)

def close(): Unit =
if (managedActor) {
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/paradox/home.md
Expand Up @@ -12,7 +12,7 @@ This **Alpakka Kafka connector** lets you connect [Apache Kafka](https://kafka.a

|Kafka client | Scala Versions | Akka version | Alpakka Kafka Connector
|-------------|----------------|--------------|-------------------------
|[2.6.0](https://dist.apache.org/repos/dist/release/kafka/2.6.0/RELEASE_NOTES.html) | 2.13, 2.12 | 2.6.10+ | @ref:[release 2.1.0](release-notes/2.1.x.md)
|[2.7.0](https://dist.apache.org/repos/dist/release/kafka/2.7.0/RELEASE_NOTES.html) | 2.13, 2.12 | 2.6.14+ | @ref:[release 2.1.0](release-notes/2.1.x.md)
|[2.4.1](https://dist.apache.org/repos/dist/release/kafka/2.4.1/RELEASE_NOTES.html) | 2.13, 2.12, 2.11 | 2.5.31+, 2.6.6+ | @ref:[release 2.0.5](release-notes/2.0.x.md)
|[2.4.1](https://dist.apache.org/repos/dist/release/kafka/2.4.1/RELEASE_NOTES.html) | 2.13, 2.12, 2.11 | 2.5.30+, 2.6.6+ | @ref:[release 2.0.4](release-notes/2.0.x.md)
|[2.4.1](https://dist.apache.org/repos/dist/release/kafka/2.4.1/RELEASE_NOTES.html) | 2.13, 2.12, 2.11 | 2.5.30+, 2.6.3+ | @ref:[release 2.0.3](release-notes/2.0.x.md)
Expand Down
15 changes: 13 additions & 2 deletions docs/src/main/paradox/release-notes/2.1.x.md
Expand Up @@ -9,13 +9,24 @@ In case you are browsing a specific version's documentation: check out the [late

The Alpakka Kafka 2.1 series features

* Kafka 2.6.0 client
* Kafka 2.7.0 client
* Akka 2.6.x (dropped Akka 2.5)
* Scala 2.12 and 2.13 (dropped Scala 2.11)
* Testkit: use ScalaTest 3.1.x
* Testkit: default to use Confluent Platform 6.0.0
* Testkit: default to use Confluent Platform 6.1.1
* Testkit: no longer support Embedded Kafka (Kafka 2.6.0 can't be safely embedded in Scala applications)

# 2.1.0-RC1

@@@ note
The Apache Kafka clients have a provided dependency on Jackson `2.10.5`, but Akka depends on `2.11.4`.
Alpakka Kafka references `2.11.4`.
@@@

Released: fixme



# 2.1.0-M1

Released: 2020-10-22
Expand Down
Expand Up @@ -32,7 +32,7 @@ public class AlpakkaKafkaContainer extends GenericContainer<AlpakkaKafkaContaine
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

// Align these confluent platform constants with testkit/src/main/resources/reference.conf
public static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "6.0.1";
public static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "6.1.1";

public static final DockerImageName DEFAULT_ZOOKEEPER_IMAGE_NAME =
DockerImageName.parse("confluentinc/cp-zookeeper")
Expand Down
2 changes: 1 addition & 1 deletion testkit/src/main/resources/reference.conf
Expand Up @@ -25,7 +25,7 @@ akka.kafka.testkit.testcontainers {
kafka-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
schema-registry-image = "confluentinc/cp-schema-registry"
schema-registry-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
confluent-platform-version = "6.0.1"
confluent-platform-version = "6.1.1"

# the number of Kafka brokers to include in a test cluster
num-brokers = 1
Expand Down
2 changes: 1 addition & 1 deletion tests/src/test/java/docs/javadsl/AtLeastOnceTest.java
Expand Up @@ -6,7 +6,7 @@
package docs.javadsl;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.MatcherAssert.assertThat;

import akka.NotUsed;
import akka.actor.ActorSystem;
Expand Down
3 changes: 2 additions & 1 deletion tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
Expand Up @@ -48,7 +48,8 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.MatcherAssert.*;
import static org.junit.Assert.assertEquals;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(LogCapturingExtension.class)
Expand Down

0 comments on commit 66f3703

Please sign in to comment.