Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump: Scala 2.13.8 (drop 2.12), Akka 2.6.18 #2777

Merged
merged 19 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ jobs:
env: CMD="verifyCodeStyle; mimaReportBinaryIssues"
name: "Code style check and MiMa. Run locally with: sbt verifyCodeStyle; mimaReportBinaryIssues"
if: type != cron
- env: CMD="++2.12.11 Test/compile"
name: "Compile all code with Scala 2.12 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.12.11 Test/compile"
- env: CMD="++2.13.3 Test/compile"
name: "Compile all code with Scala 2.13"
- env: CMD="++2.13.8 Test/compile"
name: "Compile all code with Scala 2.13 and fatal warnings enabled. Run locally with: env CI=true sbt ++2.13.8 Test/compile"
- env: CMD="unidoc; docs/paradox"
name: "Create all API docs and create site with Paradox"

Expand Down Expand Up @@ -150,7 +148,7 @@ jobs:
- stage: licenses
script: echo "License checking is temporarily disabled"

- name: "Publish artifacts for Scala 2.12 and 2.13"
- name: "Publish artifacts for Scala 2.13"
env: CMD="ci-release"
script: openssl aes-256-cbc -K $encrypted_74014e1c3c6a_key -iv $encrypted_74014e1c3c6a_iv -in .travis/travis_gpg_secret.enc -out .travis/travis_gpg_secret.gpg -d && export PGP_SECRET=$(cat .travis/travis_gpg_secret.gpg) && ./scripts/travis.sh
- script: openssl aes-256-cbc -K $encrypted_bbf1dc4f2a07_key -iv $encrypted_bbf1dc4f2a07_iv -in .travis/travis_alpakka_rsa.enc -out .travis/id_rsa -d && eval "$(ssh-agent -s)" && chmod 600 .travis/id_rsa && ssh-add .travis/id_rsa && sbt -jvm-opts .jvmopts-travis docs/publishRsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import javax.net.ssl.{SSLContext, TrustManager}

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

/**
* Only for internal implementations
Expand Down Expand Up @@ -121,7 +121,7 @@ final class AmqpDetailsConnectionProvider private (
copy(connectionName = Option(name))

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val factory = new ConnectionFactory
credentials.foreach { credentials =>
factory.setUsername(credentials.username)
Expand Down Expand Up @@ -331,7 +331,7 @@ final class AmqpConnectionFactoryConnectionProvider private (val factory: Connec
copy(hostAndPorts = hostAndPorts.asScala.map(_.toScala).toIndexedSeq)

override def get: Connection = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
factory.newConnection(hostAndPortList.map(hp => new Address(hp._1, hp._2)).asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.amqp
import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.collection.immutable
import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private trait AmqpConnectorLogic { this: GraphStageLogic =>
connection.addShutdownListener(shutdownListener)
channel.addShutdownListener(shutdownListener)

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

settings.declarations.foreach {
case d: QueueDeclaration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private var unackedMessages = 0

override def whenConnected(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
channel.basicQos(bufferSize)
val consumerCallback = getAsyncCallback(handleDelivery)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class AmqpConnectorsSpec extends AmqpSpec {

val input = Vector("one", "two", "three", "four", "five")
val (rpcQueueF, probe) =
Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run
Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run()
rpcQueueF.futureValue

val amqpSink = AmqpSink.replyTo(
Expand Down Expand Up @@ -215,8 +215,8 @@ class AmqpConnectorsSpec extends AmqpSpec {

val publisher = TestPublisher.probe[ByteString]()
val subscriber = TestSubscriber.probe[ReadResult]()
amqpSink.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Source.fromPublisher(publisher))
amqpSource.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Sink.fromSubscriber(subscriber))
Source.fromPublisher(publisher).to(amqpSink).addAttributes(Attributes.inputBuffer(1, 1)).run()
amqpSource.to(Sink.fromSubscriber(subscriber)).addAttributes(Attributes.inputBuffer(1, 1)).run()

// note that this essentially is testing rabbitmq just as much as it tests our sink and source
publisher.ensureSubscription()
Expand Down Expand Up @@ -335,7 +335,7 @@ class AmqpConnectorsSpec extends AmqpSpec {
.viaMat(amqpRpcFlow)(Keep.right)
.mapAsync(1)(cm => cm.ack().map(_ => cm.message))
.toMat(TestSink.probe)(Keep.both)
.run
.run()
rpcQueueF.futureValue

val amqpSink = AmqpSink.replyTo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.viaMat(mockedFlowWithContextAndConfirm)(Keep.right)
.asSource
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -239,7 +239,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.viaMat(mockedUnorderedFlowWithPassThrough)(Keep.right)
.asSource
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -279,7 +279,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -297,7 +297,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -314,7 +314,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => (WriteMessage(ByteString(s)), s))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand Down Expand Up @@ -352,7 +352,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -380,7 +380,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

val messages = probe.request(input.size).expectNextN(input.size)

Expand All @@ -399,7 +399,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

probe.request(input.size)

Expand Down Expand Up @@ -438,7 +438,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.right)
.toMat(TestSink.probe)(Keep.right)
.run
.run()

probe.request(sourceElements)

Expand All @@ -458,7 +458,7 @@ class AmqpFlowSpec extends AmqpSpec with AmqpMocking with BeforeAndAfterEach {
.map(s => WriteMessage(ByteString(s)))
.viaMat(flow)(Keep.left)
.toMat(TestSink.probe)(Keep.both)
.run
.run()

sinkProbe.request(input.size)
input.foreach(sourceProbe.sendNext)
Expand Down
4 changes: 2 additions & 2 deletions amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class AmqpDocsSpec extends AmqpSpec {
.map(s => ByteString(s))
.viaMat(amqpRpcFlow)(Keep.right)
.toMat(TestSink.probe)(Keep.both)
.run
.run()
//#create-rpc-flow
rpcQueueF.futureValue

Expand Down Expand Up @@ -153,7 +153,7 @@ class AmqpDocsSpec extends AmqpSpec {
}
//#create-exchange-source

val completion = Promise[Done]
val completion = Promise[Done]()
val mergingFlow = mergedSources
.viaMat(KillSwitches.single)(Keep.right)
.to(Sink.fold(Set.empty[Int]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
import org.scalacheck.Gen
import org.scalatest.{BeforeAndAfterAll, Suite}

import scala.language.higherKinds
import scala.reflect.io.Directory
import scala.util.Random

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class AvroParquetSinkSpec
}

"create new parquet file from any subtype of `GenericRecord` " in assertAllStagesStopped {
import scala.language.higherKinds

//given
val n: Int = 3
val file: String = genFinalFile.sample.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.collection.mutable.Queue
retrieveMessages()

def retrieveMessages(): Unit = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
val res = cloudQueueBuilt
.retrieveMessages(settings.batchSize, settings.initialVisibilityTimeout, null, null)
.asScala
Expand All @@ -51,7 +51,7 @@ import scala.collection.mutable.Queue
}
} else {
buffer ++= res
push(out, buffer.dequeue)
push(out, buffer.dequeue())
}
}

Expand All @@ -60,7 +60,7 @@ import scala.collection.mutable.Queue
new OutHandler {
override def onPull: Unit =
if (!buffer.isEmpty) {
push(out, buffer.dequeue)
push(out, buffer.dequeue())
} else {
retrieveMessages()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
import org.scalatest._

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.Properties
Expand Down Expand Up @@ -45,13 +45,13 @@ class AzureQueueSpec extends TestKit(ActorSystem()) with AsyncFlatSpecLike with
test()
}

override def beforeAll: Unit =
override def beforeAll(): Unit =
queueOpt.map(_.createIfNotExists)

override def afterAll: Unit = {
override def afterAll(): Unit = {
queueOpt.map(_.deleteIfExists)
TestKit.shutdownActorSystem(system)
super.afterAll
super.afterAll()
}

private var testMsgCount = 0
Expand Down
48 changes: 24 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ lazy val alpakka = project
mqttStreamingBench,
// googleCloudPubSubGrpc and googleCloudBigQueryStorage contain the same gRPC generated classes
// don't include ScalaDocs for googleCloudBigQueryStorage to make it work
googleCloudBigQueryStorage
googleCloudBigQueryStorage,
// springWeb triggers an esoteric ScalaDoc bug (from Java code)
springWeb
),
crossScalaVersions := List() // workaround for https://github.com/sbt/sbt/issues/3465
)
Expand Down Expand Up @@ -164,28 +166,23 @@ lazy val geode =
"geode",
Dependencies.Geode,
Test / fork := true,
Compile / unmanagedSourceDirectories ++= {
val sourceDir = (Compile / sourceDirectory).value
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, n)) if n >= 12 => Seq(sourceDir / "scala-2.12+")
case _ => Seq.empty
}
}
// https://github.com/scala/bug/issues/12072
Test / scalacOptions += "-Xlint:-byname-implicit"
)

lazy val googleCommon = alpakkaProject(
"google-common",
"google.common",
Dependencies.GoogleCommon,
Test / fork := true,
fatalWarnings := true
Test / fork := true
)

lazy val googleCloudBigQuery = alpakkaProject(
"google-cloud-bigquery",
"google.cloud.bigquery",
Dependencies.GoogleBigQuery,
Test / fork := true
Test / fork := true,
Compile / scalacOptions += "-Wconf:src=src_managed/.+:s"
).dependsOn(googleCommon).enablePlugins(spray.boilerplate.BoilerplatePlugin)

lazy val googleCloudBigQueryStorage = alpakkaProject(
Expand All @@ -198,8 +195,8 @@ lazy val googleCloudBigQueryStorage = alpakkaProject(
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Server),
akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Scala, AkkaGrpc.Java),
Compile / scalacOptions ++= Seq(
"-P:silencer:pathFilters=akka-grpc/main",
"-P:silencer:pathFilters=akka-grpc/test"
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
).dependsOn(googleCommon).enablePlugins(AkkaGrpcPlugin)
Expand All @@ -224,8 +221,8 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
// for the ExampleApp in the tests
run / connectInput := true,
Compile / scalacOptions ++= Seq(
"-P:silencer:pathFilters=akka-grpc/main",
"-P:silencer:pathFilters=akka-grpc/test"
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
).enablePlugins(AkkaGrpcPlugin).dependsOn(googleCommon)
Expand All @@ -247,7 +244,15 @@ lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs)
lazy val huaweiPushKit =
alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit)

lazy val influxdb = alpakkaProject("influxdb", "influxdb", Dependencies.InfluxDB)
lazy val influxdb = alpakkaProject(
"influxdb",
"influxdb",
Dependencies.InfluxDB,
Compile / scalacOptions ++= Seq(
// JDK 11: method isAccessible in class AccessibleObject is deprecated
"-Wconf:cat=deprecation:s"
)
)

lazy val ironmq = alpakkaProject(
"ironmq",
Expand Down Expand Up @@ -420,10 +425,7 @@ lazy val `doc-examples` = project
.settings(
name := s"akka-stream-alpakka-doc-examples",
publish / skip := true,
// More projects are not available for Scala 2.13
crossScalaVersions -= Dependencies.Scala213,
Dependencies.`Doc-examples`,
fatalWarnings := true
Dependencies.`Doc-examples`
)

def alpakkaProject(projectId: String, moduleName: String, additionalSettings: sbt.Def.SettingsDefinition*): Project = {
Expand All @@ -439,8 +441,7 @@ def alpakkaProject(projectId: String, moduleName: String, additionalSettings: sb
.getOrElse(throw new Error("Unable to determine previous version"))
),
mimaBinaryIssueFilters += ProblemFilters.exclude[Problem]("*.impl.*"),
Test / parallelExecution := false,
fatalWarnings := true
Test / parallelExecution := false
)
.settings(additionalSettings: _*)
.dependsOn(testkit % Test)
Expand All @@ -452,8 +453,7 @@ def internalProject(projectId: String, additionalSettings: sbt.Def.SettingsDefin
.disablePlugins(SitePlugin, MimaPlugin)
.settings(
name := s"akka-stream-alpakka-$projectId",
publish / skip := true,
fatalWarnings := true
publish / skip := true
)
.settings(additionalSettings: _*)

Expand Down
Loading