Skip to content

Commit

Permalink
Akka 2.5.31 and prefer ClassicActorSystemProvider (#2254)
Browse files Browse the repository at this point in the history
* Use Akka 2.5.31
* Classic/typed support in Akka extensions
* Accept ClassicActorSystem where possible
  • Loading branch information
ennru committed Apr 2, 2020
1 parent f541d7b commit 3cfc035
Show file tree
Hide file tree
Showing 25 changed files with 186 additions and 122 deletions.
2 changes: 1 addition & 1 deletion cassandra/src/main/resources/reference.conf
@@ -1,7 +1,7 @@
alpakka.cassandra {
# The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ActorSystem` and `Config` parameters.
# It may optionally have a constructor with an `ClassicActorSystemProvider` and `Config` parameters.
session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"

# Configure Akka Discovery by setting a service name
Expand Down
Expand Up @@ -39,16 +39,6 @@ object CassandraMetricsRegistry extends ExtensionId[CassandraMetricsRegistry] wi
override def createExtension(system: ExtendedActorSystem) =
new CassandraMetricsRegistry

/**
* Get the CassandraMetricsRegistry extension with the classic actors API.
*/
override def apply(system: akka.actor.ActorSystem): CassandraMetricsRegistry = super.apply(system)

/**
* Get the CassandraMetricsRegistry extension with the new actors API.
*/
def apply(system: ClassicActorSystemProvider): CassandraMetricsRegistry = super.apply(system.classicSystem)

/**
* Java API.
* Get the CassandraMetricsRegistry extension with the classic actors API.
Expand All @@ -59,5 +49,5 @@ object CassandraMetricsRegistry extends ExtensionId[CassandraMetricsRegistry] wi
* Java API.
* Get the CassandraMetricsRegistry extension with the classic actors API.
*/
def get(system: ClassicActorSystemProvider): CassandraMetricsRegistry = super.apply(system.classicSystem)
override def get(system: ClassicActorSystemProvider): CassandraMetricsRegistry = super.apply(system)
}
Expand Up @@ -96,9 +96,9 @@ object CqlSessionProvider {
*/
def driverConfig(system: ActorSystem, config: Config): Config = {
val driverConfigPath = config.getString("datastax-java-driver-config")
system.settings.config.getConfig(driverConfigPath).withFallback {
system.classicSystem.settings.config.getConfig(driverConfigPath).withFallback {
if (driverConfigPath == "datastax-java-driver") ConfigFactory.empty()
else system.settings.config.getConfig("datastax-java-driver")
else system.classicSystem.settings.config.getConfig("datastax-java-driver")
}
}
}
Expand Up @@ -24,7 +24,7 @@ object CassandraSessionRegistry {
* Get the session registry with new actors API.
*/
def get(system: ClassicActorSystemProvider): CassandraSessionRegistry =
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system.classicSystem))
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system))

/**
* Get the session registry with the classic actors API.
Expand Down
Expand Up @@ -10,14 +10,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.Done
import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import akka.annotation.InternalStableApi
import akka.event.Logging
import akka.stream.alpakka.cassandra.{CassandraSessionSettings, CqlSessionProvider}
Expand All @@ -34,12 +27,6 @@ object CassandraSessionRegistry extends ExtensionId[CassandraSessionRegistry] wi
def createExtension(system: ExtendedActorSystem): CassandraSessionRegistry =
new CassandraSessionRegistry(system)

override def apply(system: ActorSystem): CassandraSessionRegistry = super.apply(system)

// This is not source compatible with Akka 2.6 as it lacks `overrride`
def apply(system: ClassicActorSystemProvider): CassandraSessionRegistry =
apply(system.classicSystem)

override def lookup(): ExtensionId[CassandraSessionRegistry] = this

/** Hash key for `sessions`. */
Expand Down
Expand Up @@ -5,6 +5,7 @@
package akka.stream.alpakka.couchbase.javadsl;

import akka.actor.ActorSystem;
import akka.actor.ClassicActorSystemProvider;
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import com.typesafe.config.Config;

Expand Down Expand Up @@ -40,5 +41,15 @@ public final class DiscoverySupport {
system.settings().config().getConfig(CouchbaseSessionSettings.configPath()), system);
}

/**
* Expects a `service` section in the given Config and reads the given service name's address to
* be used as Couchbase `nodes`.
*/
public static final java.util.function.Function<
CouchbaseSessionSettings, CompletionStage<CouchbaseSessionSettings>>
getNodes(ClassicActorSystemProvider system) {
return getNodes(system.classicSystem());
}

private DiscoverySupport() {}
}
@@ -0,0 +1,2 @@
# Internal API now accepts ClassicActorSystemProvider
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.couchbase.scaladsl.DiscoverySupport.getNodes")
Expand Up @@ -26,25 +26,11 @@ object CouchbaseSessionRegistry extends ExtensionId[CouchbaseSessionRegistry] wi
def createExtension(system: ExtendedActorSystem): CouchbaseSessionRegistry =
new CouchbaseSessionRegistry(system)

/**
* Get the session registry with new actors API.
*/
// This is not source compatible with Akka 2.6 as it lacks `overrride`
def apply(system: ClassicActorSystemProvider): CouchbaseSessionRegistry =
super.apply(system.classicSystem)

/**
* Get the session registry with the classic actors API.
*/
override def apply(system: akka.actor.ActorSystem): CouchbaseSessionRegistry =
super.apply(system)

/**
* Java API: Get the session registry with new actors API.
*/
// This is not source compatible with Akka 2.6 as it lacks `overrride`
def get(system: ClassicActorSystemProvider): CouchbaseSessionRegistry =
super.apply(system.classicSystem)
override def get(system: ClassicActorSystemProvider): CouchbaseSessionRegistry =
super.apply(system)

/**
* Java API: Get the session registry with the classic actors API.
Expand Down
Expand Up @@ -6,7 +6,7 @@ package akka.stream.alpakka.couchbase.scaladsl

import java.util.concurrent.CompletionStage

import akka.actor.ActorSystem
import akka.actor.{ActorSystem, ClassicActorSystemProvider}
import akka.annotation.InternalApi
import akka.discovery.Discovery
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings
Expand All @@ -30,8 +30,8 @@ sealed class DiscoverySupport private {
private def readNodes(
serviceName: String,
lookupTimeout: FiniteDuration
)(implicit system: ActorSystem): Future[immutable.Seq[String]] = {
import system.dispatcher
)(implicit system: ClassicActorSystemProvider): Future[immutable.Seq[String]] = {
implicit val ec = system.classicSystem.dispatcher
val discovery = Discovery(system).discovery
discovery.lookup(serviceName, lookupTimeout).map { resolved =>
resolved.addresses.map(_.host)
Expand All @@ -41,7 +41,7 @@ sealed class DiscoverySupport private {
/**
* Expect a `service` section in Config and use Akka Discovery to read the addresses for `name` within `lookup-timeout`.
*/
private def readNodes(config: Config)(implicit system: ActorSystem): Future[immutable.Seq[String]] =
private def readNodes(config: Config)(implicit system: ClassicActorSystemProvider): Future[immutable.Seq[String]] =
if (config.hasPath("service")) {
val serviceName = config.getString("service.name")
val lookupTimeout = config.getDuration("service.lookup-timeout").asScala
Expand All @@ -54,30 +54,43 @@ sealed class DiscoverySupport private {
*/
def nodes(
config: Config
)(implicit system: ActorSystem): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] = {
import system.dispatcher
)(implicit system: ClassicActorSystemProvider): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] = {
implicit val ec = system.classicSystem.dispatcher
settings =>
readNodes(config)
.map { nodes =>
settings.withNodes(nodes)
}
}

private[couchbase] def nodes(config: Config,
system: ActorSystem): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] =
nodes(config)(system)

/**
* Internal API: Java wrapper.
*/
@InternalApi
private[couchbase] def getNodes(
config: Config,
system: ActorSystem
system: ClassicActorSystemProvider
): java.util.function.Function[CouchbaseSessionSettings, CompletionStage[CouchbaseSessionSettings]] =
nodes(config)(system).andThen(_.toJava).asJava

/**
* Expects a `service` section in `alpakka.couchbase.session` and reads the given service name's address
* to be used as Couchbase `nodes`.
*/
def nodes()(implicit system: ActorSystem): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] =
def nodes()(
implicit system: ClassicActorSystemProvider
): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] =
nodes(system.classicSystem)

/**
* Expects a `service` section in `alpakka.couchbase.session` and reads the given service name's address
* to be used as Couchbase `nodes`.
*/
def nodes(system: ActorSystem): CouchbaseSessionSettings => Future[CouchbaseSessionSettings] =
nodes(system.settings.config.getConfig(CouchbaseSessionSettings.configPath))(system)

}
Expand Down
Expand Up @@ -131,11 +131,11 @@ object GooglePubSub {
attr
.get[PubSubAttributes.Publisher]
.map(_.publisher)
.getOrElse(GrpcPublisherExt()(mat.system).publisher)
.getOrElse(GrpcPublisherExt.get(mat.system).publisher)

private def subscriber(mat: ActorMaterializer, attr: Attributes) =
attr
.get[PubSubAttributes.Subscriber]
.map(_.subscriber)
.getOrElse(GrpcSubscriberExt()(mat.system).subscriber)
.getOrElse(GrpcSubscriberExt.get(mat.system).subscriber)
}
Expand Up @@ -67,6 +67,7 @@ object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdPr
/**
* Access to extension.
*/
@deprecated("use get() instead", since = "2.0.0")
def apply()(implicit system: ActorSystem): GrpcPublisherExt = super.apply(system)

/**
Expand All @@ -81,5 +82,5 @@ object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdPr
*
* Access to the extension from the new actors API.
*/
def get(system: ClassicActorSystemProvider): GrpcPublisherExt = super.get(system.classicSystem)
override def get(system: ClassicActorSystemProvider): GrpcPublisherExt = super.get(system)
}
Expand Up @@ -66,6 +66,7 @@ object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionId
/**
* Access to extension.
*/
@deprecated("use get() instead", since = "2.0.0")
def apply()(implicit system: ActorSystem): GrpcSubscriberExt = super.apply(system)

/**
Expand All @@ -80,5 +81,5 @@ object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionId
*
* Access to the extension from the new actors API.
*/
def get(system: ClassicActorSystemProvider): GrpcSubscriberExt = super.get(system.classicSystem)
override def get(system: ClassicActorSystemProvider): GrpcSubscriberExt = super.get(system.classicSystem)
}
Expand Up @@ -32,38 +32,51 @@ final class GrpcPublisher private (settings: PubSubSettings, sys: ActorSystem) {
}

object GrpcPublisher {
def apply(settings: PubSubSettings)(implicit sys: ActorSystem): GrpcPublisher =
def apply(settings: PubSubSettings)(implicit sys: ClassicActorSystemProvider): GrpcPublisher =
new GrpcPublisher(settings, sys.classicSystem)

def apply(settings: PubSubSettings, sys: ActorSystem): GrpcPublisher =
new GrpcPublisher(settings, sys)

def apply()(implicit sys: ActorSystem): GrpcPublisher =
def apply()(implicit sys: ClassicActorSystemProvider): GrpcPublisher =
apply(PubSubSettings(sys))

def apply(sys: ActorSystem): GrpcPublisher =
apply(PubSubSettings(sys), sys)
}

/**
* An extension that manages a single gRPC scala publisher client per actor system.
*/
final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension {
implicit val publisher = GrpcPublisher()(sys)
implicit val publisher = GrpcPublisher(sys: ActorSystem)
}

object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider {
override def lookup = GrpcPublisherExt
override def createExtension(system: ExtendedActorSystem) = new GrpcPublisherExt(system)

/**
* Access to extension.
* Access to extension from the new and classic actors API.
*/
def apply()(implicit system: ActorSystem): GrpcPublisherExt = super.apply(system)
def apply()(implicit system: ClassicActorSystemProvider): GrpcPublisherExt = super.apply(system)

/**
* Access to the extension from the new actors API.
* Access to the extension from the classic actors API.
*/
def apply(system: ClassicActorSystemProvider): GrpcPublisherExt = super.apply(system.classicSystem)
override def apply(system: akka.actor.ActorSystem): GrpcPublisherExt = super.apply(system)

/**
* Java API
*
* Access to extension.
*/
override def get(system: ActorSystem): GrpcPublisherExt = super.get(system)

/**
* Java API
*
* Access to extension.
*/
override def get(system: ClassicActorSystemProvider): GrpcPublisherExt = super.get(system)
}
Expand Up @@ -32,38 +32,51 @@ final class GrpcSubscriber private (settings: PubSubSettings, sys: ActorSystem)
}

object GrpcSubscriber {
def apply(settings: PubSubSettings)(implicit sys: ActorSystem): GrpcSubscriber =
def apply(settings: PubSubSettings)(implicit sys: ClassicActorSystemProvider): GrpcSubscriber =
new GrpcSubscriber(settings, sys.classicSystem)

def apply(settings: PubSubSettings, sys: ActorSystem): GrpcSubscriber =
new GrpcSubscriber(settings, sys)

def apply()(implicit sys: ActorSystem): GrpcSubscriber =
def apply()(implicit sys: ClassicActorSystemProvider): GrpcSubscriber =
apply(PubSubSettings(sys))

def apply(sys: ActorSystem): GrpcSubscriber =
apply(PubSubSettings(sys), sys)
}

/**
* An extension that manages a single gRPC scala subscriber client per actor system.
*/
final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension {
implicit val subscriber = GrpcSubscriber()(sys)
implicit val subscriber = GrpcSubscriber(sys: ActorSystem)
}

object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider {
override def lookup = GrpcSubscriberExt
override def createExtension(system: ExtendedActorSystem) = new GrpcSubscriberExt(system)

/**
* Access to extension.
* Access to extension from the new and classic actors API.
*/
def apply()(implicit system: ActorSystem): GrpcSubscriberExt = super.apply(system)
def apply()(implicit system: ClassicActorSystemProvider): GrpcSubscriberExt = super.apply(system)

/**
* Access to the extension from the new actors API.
* Access to the extension from the classic actors API.
*/
def apply(system: ClassicActorSystemProvider): GrpcSubscriberExt = super.apply(system.classicSystem)
override def apply(system: ActorSystem): GrpcSubscriberExt = super.apply(system)

/**
* Java API
*
* Access to extension.
*/
override def get(system: ActorSystem): GrpcSubscriberExt = super.get(system)

/**
* Java API
*
* Access to extension.
*/
override def get(system: ClassicActorSystemProvider): GrpcSubscriberExt = super.get(system)
}

0 comments on commit 3cfc035

Please sign in to comment.