Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Scala 3 on more connectors #3014

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object AzureQueueWithTimeoutsSink {
*/
def create(cloudQueue: Supplier[CloudQueue]): Sink[MessageWithTimeouts, CompletionStage[Done]] =
AzureQueueSink.fromFunction(
{ input: MessageWithTimeouts =>
{ (input: MessageWithTimeouts) =>
AzureQueueSinkFunctions
.addMessage(() => cloudQueue.get)(input.message, input.timeToLive, input.initialVisibility)
}
Expand Down
25 changes: 14 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ lazy val awslambda = alpakkaProject("awslambda", "aws.lambda", Dependencies.AwsL
lazy val azureStorageQueue = alpakkaProject(
"azure-storage-queue",
"azure.storagequeue",
Dependencies.AzureStorageQueue
Dependencies.AzureStorageQueue,
Scala3.settings
)

lazy val cassandra =
Expand All @@ -157,7 +158,7 @@ lazy val elasticsearch = alpakkaProject(
)

// The name 'file' is taken by `sbt.file`, hence 'files'
lazy val files = alpakkaProject("file", "file", Dependencies.File)
lazy val files = alpakkaProject("file", "file", Dependencies.File, Scala3.settings)

lazy val ftp = alpakkaProject(
"ftp",
Expand Down Expand Up @@ -245,9 +246,9 @@ lazy val googleCloudStorage = alpakkaProject(
lazy val googleFcm = alpakkaProject("google-fcm", "google.firebase.fcm", Dependencies.GoogleFcm, Test / fork := true)
.dependsOn(googleCommon)

lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Test / fork := true)
lazy val hbase = alpakkaProject("hbase", "hbase", Dependencies.HBase, Scala3.settings, Test / fork := true)

lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs)
lazy val hdfs = alpakkaProject("hdfs", "hdfs", Dependencies.Hdfs, Scala3.settings)

lazy val huaweiPushKit =
alpakkaProject("huawei-push-kit", "huawei.pushkit", Dependencies.HuaweiPushKit)
Expand All @@ -256,6 +257,7 @@ lazy val influxdb = alpakkaProject(
"influxdb",
"influxdb",
Dependencies.InfluxDB,
Scala3.settings,
Compile / scalacOptions ++= Seq(
// JDK 11: method isAccessible in class AccessibleObject is deprecated
"-Wconf:cat=deprecation:s"
Expand All @@ -275,11 +277,11 @@ lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Depe

lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis).settings(Scala3.settings)

lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu)
lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu, Scala3.settings)

lazy val mongodb = alpakkaProject("mongodb", "mongodb", Dependencies.MongoDb)

lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt)
lazy val mqtt = alpakkaProject("mqtt", "mqtt", Dependencies.Mqtt, Scala3.settings)

lazy val mqttStreaming =
alpakkaProject("mqtt-streaming", "mqttStreaming", Dependencies.MqttStreaming)
Expand All @@ -306,6 +308,7 @@ lazy val pravega = alpakkaProject(
"pravega",
"pravega",
Dependencies.Pravega,
Scala3.settings,
Test / fork := true
)

Expand All @@ -315,7 +318,7 @@ lazy val springWeb = alpakkaProject(
Dependencies.SpringWeb
)

lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs")
lazy val simpleCodecs = alpakkaProject("simple-codecs", "simplecodecs", Scala3.settings)

lazy val slick = alpakkaProject("slick", "slick", Dependencies.Slick)

Expand All @@ -324,20 +327,20 @@ lazy val eventbridge =

lazy val sns = alpakkaProject("sns", "aws.sns", Dependencies.Sns).settings(Scala3.settings)

lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr)
lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr, Scala3.settings)

lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs).settings(Scala3.settings)

lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse)
lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse, Scala3.settings)

lazy val text = alpakkaProject("text", "text")
lazy val text = alpakkaProject("text", "text", Scala3.settings)

lazy val udp = alpakkaProject("udp", "udp")

lazy val unixdomainsocket =
alpakkaProject("unix-domain-socket", "unixdomainsocket", Dependencies.UnixDomainSocket)

lazy val xml = alpakkaProject("xml", "xml", Dependencies.Xml)
lazy val xml = alpakkaProject("xml", "xml", Dependencies.Xml, Scala3.settings)

// Java Platform version for JavaDoc creation
val JavaDocLinkVersion = "11"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ presentation by Colin Breck, ScalaDays New York, June 2018
[Stream a file to AWS S3 using Akka Streams (via Alpakka) in Play Framework](https://blog.knoldus.com/stream-a-file-to-aws-s3-using-akka-streams-via-alpakka-in-play-framework/)
blog by Sidharth Khattri, May 2018

[Alpakka (Akka Streams) vs Apache Camel: who wins?](http://www.thedevpiece.com/alpakka-akka-streams-vs-apache-camel-who-wins/) Blog by Gabriel Francisco, May 2018

[Alpakka – a new world of connectors for Reactive Enterprise Integration](https://www.youtube.com/watch?v=EcNZ2mJZmCk)
presentation by Jan Pustelnik, Actyx, ReactSphere Kraków, April 2018

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ trait ElasticsearchConnectorBehaviour {

val indexName = "sink7"
val createBooks = Source(books)
.map { book: (String, Book) =>
.map { (book: (String, Book)) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed syntax in Elasticsearch, but more intricate things made me not enable Scala 3, yet.

WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand Down Expand Up @@ -411,7 +411,7 @@ trait ElasticsearchConnectorBehaviour {

// Update sink7/_doc with the second dataset
val upserts = Source(updatedBooks)
.map { book: (String, JsObject) =>
.map { (book: (String, JsObject)) =>
WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand Down Expand Up @@ -483,7 +483,7 @@ trait ElasticsearchConnectorBehaviour {
"read and write document-version if configured to do so" in {

case class VersionTestDoc(id: String, name: String, value: Int)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc.apply)

val indexName = "version-test-scala"
val typeName = "_doc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait ElasticsearchSpecUtils { this: AnyWordSpec with ScalaFutures =>

case class Book(title: String, shouldSkip: Option[Boolean] = None, price: Int = 10)

implicit val format: JsonFormat[Book] = jsonFormat3(Book)
implicit val format: JsonFormat[Book] = jsonFormat3(Book.apply)
//#define-class

def register(connectionSettings: ElasticsearchConnectionSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[spray.json.JsObject] =>
.map { (message: ReadResult[spray.json.JsObject]) =>
val book: Book = jsonReader[Book].read(message.source)
WriteMessage.createIndexMessage(message.id, book)
}
Expand Down Expand Up @@ -93,7 +93,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.runWith(
Expand Down Expand Up @@ -129,7 +129,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.via(
Expand Down Expand Up @@ -209,7 +209,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -261,7 +261,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6-bulk"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -316,7 +316,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6-nop"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -373,7 +373,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below

val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -464,7 +464,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage
.createIndexMessage(message.id, message.source)
.withIndexName(customIndexName) // Setting the index-name to use for this document
Expand Down Expand Up @@ -500,7 +500,7 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
case class TestDoc(id: String, a: String, b: Option[String], c: String)
//#custom-search-params

implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc)
implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply)

val indexName = "custom-search-params-test-scala"
val typeName = "_doc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[spray.json.JsObject] =>
.map { (message: ReadResult[spray.json.JsObject]) =>
val book: Book = jsonReader[Book].read(message.source)
WriteMessage.createIndexMessage(message.id, book)
}
Expand Down Expand Up @@ -84,7 +84,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.runWith(
Expand Down Expand Up @@ -119,7 +119,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage.createIndexMessage(message.id, message.source)
}
.via(
Expand Down Expand Up @@ -196,7 +196,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -248,7 +248,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6-bulk"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -304,7 +304,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val indexName = "sink6-nop"
val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -362,7 +362,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
register(connectionSettings, indexName, "dummy", 10) // need to create index else exception in reading below

val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka
.map { kafkaMessage: KafkaMessage =>
.map { (kafkaMessage: KafkaMessage) =>
val book = kafkaMessage.book
val id = book.title

Expand Down Expand Up @@ -451,7 +451,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
query = """{"match_all": {}}""",
settings = baseSourceSettings
)
.map { message: ReadResult[Book] =>
.map { (message: ReadResult[Book]) =>
WriteMessage
.createIndexMessage(message.id, message.source)
.withIndexName(customIndexName) // Setting the index-name to use for this document
Expand Down Expand Up @@ -484,7 +484,7 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

case class TestDoc(id: String, a: String, b: Option[String], c: String)

implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc)
implicit val formatVersionTestDoc: JsonFormat[TestDoc] = jsonFormat4(TestDoc.apply)

val indexName = "custom-search-params-test-scala"
val typeName = "_doc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ trait OpensearchConnectorBehaviour {

val indexName = "sink7"
val createBooks = Source(books)
.map { book: (String, Book) =>
.map { (book: (String, Book)) =>
WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand Down Expand Up @@ -411,7 +411,7 @@ trait OpensearchConnectorBehaviour {

// Update sink7/_doc with the second dataset
val upserts = Source(updatedBooks)
.map { book: (String, JsObject) =>
.map { (book: (String, JsObject)) =>
WriteMessage.createUpsertMessage(id = book._1, source = book._2)
}
.via(
Expand Down Expand Up @@ -483,7 +483,7 @@ trait OpensearchConnectorBehaviour {
"read and write document-version if configured to do so" in {

case class VersionTestDoc(id: String, name: String, value: Int)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc)
implicit val formatVersionTestDoc: JsonFormat[VersionTestDoc] = jsonFormat3(VersionTestDoc.apply)

val indexName = "version-test-scala"
val typeName = "_doc"
Expand Down
Loading
Loading