Skip to content

Commit

Permalink
akka 2.6 only
Browse files Browse the repository at this point in the history
  • Loading branch information
seglo committed Oct 5, 2020
1 parent 4ffeb8d commit ccf1dda
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 48 deletions.
4 changes: 0 additions & 4 deletions .travis.yml
Expand Up @@ -25,8 +25,6 @@ jobs:
name: "Code style check. Run locally with: sbt verifyCodeStyle"
- env: CMD=";++2.13.1 Test/compile ;++2.13.1 It/compile"
name: "Compile all code with fatal warnings for Scala 2.13. Run locally with: env CI=true sbt ';++2.13.1 Test/compile ;++2.13.1 It/compile'"
- env: CMD="+clusterSharding/Test/compile
name: "Compile cluster-sharding module for all supported Scala versions. Run locally with: env CI=true sbt '+clusterSharding/Test/compile'"
- env: CMD="verifyDocs"
name: "Create all API docs for artifacts/website and all reference docs. Run locally with: sbt verifyDocs"
- env: CMD="mimaReportBinaryIssues"
Expand Down Expand Up @@ -61,8 +59,6 @@ jobs:
- stage: publish
env: CMD="+publish"
name: "Publish artifacts for all Scala versions"
- env: CMD="+clusterSharding/publish"
name: "Publish akka-stream-kafka-cluster-sharding for all its supported Scala versions"
- script: openssl aes-256-cbc -K $encrypted_d80875c2ae41_key -iv $encrypted_d80875c2ae41_iv -in .travis/travis_alpakka_kafka_rsa.enc -out .travis/id_rsa -d && eval "$(ssh-agent -s)" && chmod 600 .travis/id_rsa && ssh-add .travis/id_rsa && sbt -jvm-opts .jvmopts-travis '++2.13.1 docs/publishRsync'
name: "Publish API and reference documentation"

Expand Down
48 changes: 18 additions & 30 deletions build.sbt
Expand Up @@ -8,24 +8,25 @@ val Nightly = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron")

val Scala212 = "2.12.10"
val Scala213 = "2.13.1"
val akkaVersion26 = "2.6.6"
val akkaVersion = if (Nightly) akkaVersion26 else "2.5.31"
val AkkaBinaryVersion25 = "2.5"
val AkkaBinaryVersion26 = "2.6"
val AkkaBinaryVersion = if (Nightly) AkkaBinaryVersion26 else AkkaBinaryVersion25

val AkkaBinaryVersionForDocs = "2.6"
val KafkaVersionForDocs = "24"

val akkaVersion = "2.6.9"

val kafkaVersion = "2.6.0"
val embeddedKafkaVersion = "2.6.0"
val embeddedKafka = "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
val kafkaVersionForDocs = "24"
val scalatestVersion = "3.1.4"
val testcontainersVersion = "1.14.3"
val slf4jVersion = "1.7.30"
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
val confluentAvroSerializerVersion = "6.0.0"
val scalapb = "com.thesamet.scalapb" %% "scalapb-runtime" % "0.10.8"

val scalatestVersion = "3.1.4"
val testcontainersVersion = "1.14.3"

val slf4jVersion = "1.7.30"

val kafkaBrokerWithoutSlf4jLog4j = "org.apache.kafka" %% "kafka" % kafkaVersion % Provided exclude ("org.slf4j", "slf4j-log4j12")

val confluentLibsExclusionRules = Seq(
Expand Down Expand Up @@ -201,7 +202,7 @@ lazy val `alpakka-kafka` =
| run a single benchmark backed by Docker containers
""".stripMargin
)
.aggregate(core, testkit, tests, benchmarks, docs)
.aggregate(core, testkit, clusterSharding, tests, benchmarks, docs)

lazy val core = project
.enablePlugins(AutomateHeaderPlugin)
Expand Down Expand Up @@ -250,12 +251,6 @@ lazy val testkit = project
)
)

/**
* TODO: Once Akka 2.5 is dropped:
* - add to `alpakka-kafka` aggregate project
* - move `ClusterShardingExample` to `tests` project
* - remove all akka26 paradox properties
*/
lazy val clusterSharding = project
.in(file("./cluster-sharding"))
.dependsOn(core)
Expand All @@ -266,7 +261,7 @@ lazy val clusterSharding = project
name := "akka-stream-kafka-cluster-sharding",
AutomaticModuleName.settings("akka.stream.alpakka.kafka.cluster.sharding"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion26
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
) ++ silencer,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value
Expand All @@ -275,7 +270,7 @@ lazy val clusterSharding = project
)

lazy val tests = project
.dependsOn(core, testkit)
.dependsOn(core, testkit, clusterSharding)
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(MimaPlugin, SitePlugin)
.configs(IntegrationTest.extend(Test))
Expand Down Expand Up @@ -365,22 +360,15 @@ lazy val docs = project
"javadoc.akka.kafka.base_url" -> "",
// Akka
"akka.version" -> akkaVersion,
"extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion/%s",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion/",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/$AkkaBinaryVersion/",
"extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersionForDocs/%s",
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersionForDocs/",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/$AkkaBinaryVersionForDocs/",
"javadoc.akka.link_style" -> "direct",
"extref.akka-management.base_url" -> s"https://doc.akka.io/docs/akka-management/current/%s",
// Akka 2.6. These can be removed when we drop Akka 2.5 support.
"akka.version26" -> akkaVersion26,
"extref.akka26.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
"scaladoc.akka.actor.typed.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion26/",
"extref.akka.actor.typed.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
"scaladoc.akka.cluster.sharding.typed.base_url" -> s"https://doc.akka.io/api/akka/$AkkaBinaryVersion26/",
"extref.akka.cluster.sharding.typed.base_url" -> s"https://doc.akka.io/docs/akka/$AkkaBinaryVersion26/%s",
// Kafka
"kafka.version" -> kafkaVersion,
"extref.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/%s",
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$kafkaVersionForDocs/javadoc/",
"extref.kafka.base_url" -> s"https://kafka.apache.org/$KafkaVersionForDocs/%s",
"javadoc.org.apache.kafka.base_url" -> s"https://kafka.apache.org/$KafkaVersionForDocs/javadoc/",
"javadoc.org.apache.kafka.link_style" -> "frames",
// Java
"extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s",
Expand Down
20 changes: 10 additions & 10 deletions docs/src/main/paradox/cluster-sharding.md
Expand Up @@ -3,11 +3,11 @@ project.description: Alpakka Kafka provides a module to use Kafka with Akka Clus
---
# Akka Cluster Sharding

Akka Cluster allows the user to use an @extref[external shard allocation](akka26:/typed/cluster-sharding.html#external-shard-allocation) strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to.
Akka Cluster allows the user to use an @extref[external shard allocation](akka:/typed/cluster-sharding.html#external-shard-allocation) strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to.
If you consume Kafka messages into your Akka Cluster application then it's possible to run an Alpakka Kafka Consumer on each cluster node and co-locate Kafka partitions with Akka Cluster shards.
When partitions and shards are co-located together then there is less chance that a message must be transmitted over the network by the Akka Cluster Shard Coordinator to a destination user sharded entity.

This module directly depends on `akka-cluster-sharding-typed` version 2.6.6 or later.
This module directly depends on `akka-cluster-sharding-typed` and requires Akka version 2.6.6 or later.

@@project-info{ projectId="clusterSharding" }

Expand Down Expand Up @@ -48,26 +48,26 @@ In the following example we asynchronously request an extractor that does not us
Given a user entity.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #user-entity }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #user-entity }

Create a `MessageExtractor`.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #message-extractor }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #message-extractor }

Setup Akka Typed Cluster Sharding.

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #setup-cluster-sharding }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #setup-cluster-sharding }

## Rebalance Listener

Expand All @@ -89,7 +89,7 @@ The same message type is used by separate Alpakka Kafka consumers, but the messa
Create the rebalance listener using the extension and pass it into an Alpakka Kafka @scaladoc[Subscription](akka.kafka.Subscription).

Scala
: @@snip [snip](/cluster-sharding/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }
: @@snip [snip](/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala) { #rebalance-listener }

Java
: @@snip [snip](/cluster-sharding/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }
: @@snip [snip](/tests/src/test/java/docs/javadsl/ClusterShardingExample.java) { #rebalance-listener }
Expand Up @@ -6,9 +6,9 @@
package docs.scaladsl

import akka.NotUsed
import akka.actor.typed.{ActorSystem, Behavior}
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ActorSystem, Behavior}
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
Expand All @@ -23,8 +23,7 @@ import scala.concurrent.duration._
import scala.util.{Failure, Success}

/**
* This is compile-only code meant for documentation purposes. This file can't be in the tests project while we still
* build with Akka 2.5.
* This is compile-only code meant for documentation purposes.
* A full sample application exists in the akka-samples repository:
*
* https://github.com/akka/akka-samples/tree/2.6/akka-sample-kafka-to-sharding-scala
Expand Down

0 comments on commit ccf1dda

Please sign in to comment.