Skip to content

Commit

Permalink
Add Pekko support (#2899)
Browse files Browse the repository at this point in the history
* Add module

* Add module

* peko streams

* Bump Scala 3 to 3.3.0 and make pekkostreams compile to 3

* Remove scalamock

* Remove scalamock

* Fix for Scala 3

* Scala 3.3.0 in build
  • Loading branch information
dragisak committed Aug 17, 2023
1 parent e2a9a5b commit 76fabf6
Show file tree
Hide file tree
Showing 20 changed files with 1,332 additions and 80 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
run: docker run -d -it -p 39227:9200 -p 39337:9300 -e "discovery.type=single-node" -v /home/runner/work/elastic4s/elastic4s/elastic4s-tests/src/test/resources/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:8.5.3

- name: run tests
run: sbt ++3.2.0 elastic4s-scala3/test
run: sbt ++3.3.0 elastic4s-scala3/test

- name: Import GPG key
id: import_gpg
Expand All @@ -125,7 +125,7 @@ jobs:
echo "email: ${{ steps.import_gpg.outputs.email }}"
- name: publish snapshot
run: sbt ++3.2.0 elastic4s-scala3/publish
run: sbt ++3.3.0 elastic4s-scala3/publish
env:
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ jobs:
run: docker run -d -it -p 39227:9200 -p 39337:9300 -e "discovery.type=single-node" -v /home/runner/work/elastic4s/elastic4s/elastic4s-tests/src/test/resources/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml docker.elastic.co/elasticsearch/elasticsearch:8.5.3

- name: run tests
run: sbt ++3.2.0 elastic4s-scala3/test
run: sbt ++3.3.0 elastic4s-scala3/test
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}

- name: publish 3.0 release
run: sbt ++3.2.0 elastic4s-scala3/publishSigned
run: sbt ++3.3.0 elastic4s-scala3/publishSigned
env:
RELEASE_VERSION: ${{ github.event.inputs.version }}
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
Expand Down
24 changes: 19 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def ossrhUsername = sys.env.getOrElse("OSSRH_USERNAME", "")
def ossrhPassword = sys.env.getOrElse("OSSRH_PASSWORD", "")

val scala2Versions = Seq("2.12.17", "2.13.11")
val scalaAllVersions = scala2Versions :+ "3.2.2"
val scalaAllVersions = scala2Versions :+ "3.3.0"
lazy val commonScalaVersionSettings = Seq(
scalaVersion := "2.12.17",
crossScalaVersions := Nil
Expand Down Expand Up @@ -139,7 +139,8 @@ lazy val scala3Projects: Seq[ProjectReference] = Seq(
ziojson,
clientsttp,
httpstreams,
akkastreams
akkastreams,
pekkostreams
)
lazy val scala3_root = Project("elastic4s-scala3", file("scala3"))
.settings(name := "elastic4s")
Expand All @@ -157,7 +158,7 @@ lazy val root = Project("elastic4s", file("."))
noPublishSettings
)
.aggregate(
Seq[ProjectReference](scalaz, sprayjson, ziojson_1, clientakka) ++ scala3Projects: _*
Seq[ProjectReference](scalaz, sprayjson, ziojson_1, clientakka, clientpekko) ++ scala3Projects: _*
)

lazy val domain = (project in file("elastic4s-domain"))
Expand Down Expand Up @@ -269,6 +270,12 @@ lazy val akkastreams = (project in file("elastic4s-streams-akka"))
.settings(scala3Settings)
.settings(libraryDependencies += Dependencies.akkaStream)

lazy val pekkostreams = (project in file("elastic4s-streams-pekko"))
.dependsOn(core, testkit % "test", jackson % "test")
.settings(name := "elastic4s-streams-pkko")
.settings(scala3Settings)
.settings(libraryDependencies += Dependencies.pekkoStream)

lazy val jackson = (project in file("elastic4s-json-jackson"))
.dependsOn(core)
.settings(name := "elastic4s-json-jackson")
Expand Down Expand Up @@ -324,8 +331,15 @@ lazy val clientsttp = (project in file("elastic4s-client-sttp"))
lazy val clientakka = (project in file("elastic4s-client-akka"))
.dependsOn(core, testkit % "test")
.settings(name := "elastic4s-client-akka")
.settings(scala2Settings) // tests need re-writing to not use scalaMock. We also need akka-http to be cross-published, which depends on an akka bump with restrictive licensing changes
.settings(libraryDependencies ++= Seq(akkaHTTP, akkaStream, scalaMock))
.settings(scala2Settings) // We need akka-http to be cross-published, which depends on an akka bump with restrictive licensing changes
.settings(libraryDependencies ++= Seq(akkaHTTP, akkaStream))

lazy val clientpekko = (project in file("elastic4s-client-pekko"))
.dependsOn(core, testkit % "test")
.settings(name := "elastic4s-client-pekko")
.settings(scala3Settings)
.settings(libraryDependencies ++= Seq(pekkoHTTP, pekkoStream))


lazy val tests = (project in file("elastic4s-tests"))
.settings(name := "elastic4s-tests")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,33 @@ package com.sksamuel.elastic4s.akka
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes, Uri}
import com.sksamuel.elastic4s.{ElasticRequest, HttpEntity => ElasticEntity, HttpResponse => ElasticResponse}
import org.scalamock.function.MockFunction1
import org.scalamock.scalatest.MockFactory
import org.mockito.ArgumentMatchers._
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatestplus.mockito.MockitoSugar
import org.mockito.Mockito._

import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

class AkkaHttpClientMockTest
extends AnyWordSpec
with Matchers
with MockFactory
with MockitoSugar
with ScalaFutures
with IntegrationPatience
with BeforeAndAfterAll {

private implicit lazy val system: ActorSystem = ActorSystem()

override def afterAll: Unit = {
override def afterAll(): Unit = {
system.terminate()
}

def mockHttpPool(): (MockFunction1[HttpRequest, Try[HttpResponse]], TestHttpPoolFactory) = {
val sendRequest = mockFunction[HttpRequest, Try[HttpResponse]]
def mockHttpPool(): (Function[HttpRequest, Try[HttpResponse]], TestHttpPoolFactory) = {
val sendRequest = mock[Function[HttpRequest, Try[HttpResponse]]]
val poolFactory = new TestHttpPoolFactory(sendRequest)
(sendRequest, poolFactory)
}
Expand All @@ -49,22 +50,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("ok")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host2/test")
}))
.thenReturn(Success(HttpResponse().withEntity("ok")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -91,14 +92,14 @@ class AkkaHttpClientMockTest
blacklist,
httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.add _).expects("host1").returns(true)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -122,22 +123,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Success(HttpResponse(StatusCodes.BadGateway)))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Success(HttpResponse(StatusCodes.BadGateway)))

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -158,22 +159,22 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(false)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.add _).expects("host1").returns(true)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(false)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.add("host1")).thenReturn(true)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host1/test")
})
.returns(Failure(new Exception("Some exception")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host1/test")
}))
.thenReturn(Failure(new Exception("Some exception")))

sendRequest
.expects(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r != null && r.uri == Uri("http://host2/test")
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand All @@ -194,16 +195,16 @@ class AkkaHttpClientMockTest
val client =
new AkkaHttpClient(AkkaHttpClientSettings(hosts), blacklist, httpPool)

(blacklist.contains _).expects("host1").returns(true)
(blacklist.size _).expects().returns(1)
(blacklist.contains _).expects("host2").returns(false)
(blacklist.remove _).expects("host2").returns(false)
when(blacklist.contains("host1")).thenReturn(true)
when(blacklist.size).thenReturn(1)
when(blacklist.contains("host2")).thenReturn(false)
when(blacklist.remove("host2")).thenReturn(false)

sendRequest
.expects(argThat { (r: HttpRequest) =>
when(sendRequest
.apply(argThat { (r: HttpRequest) =>
r.uri == Uri("http://host2/test")
})
.returns(Success(HttpResponse().withEntity("host2")))
}))
.thenReturn(Success(HttpResponse().withEntity("host2")))

client
.sendAsync(ElasticRequest("GET", "/test"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class AkkaHttpClientTest extends AnyFlatSpec with Matchers with DockerTests with

private implicit lazy val system: ActorSystem = ActorSystem()

override def beforeAll: Unit = {
override def beforeAll(): Unit = {
Try {
client.execute {
deleteIndex("testindex")
}.await
}
}

override def afterAll: Unit = {
override def afterAll(): Unit = {
Try {
client.execute {
deleteIndex("testindex")
Expand Down
18 changes: 18 additions & 0 deletions elastic4s-client-pekko/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
com.sksamuel.elastic4s.pekko {
hosts = []
https = false
verify-ssl-certificate = true
// optionally provide credentials
// username = ...
// password = ...
queue-size = 1000
blacklist {
min-duration = 1m
max-duration = 30m
}
max-retry-timeout = 30s
pekko.http {
// pekko-http settings specific for elastic4s
// can be overwritten in this section
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.sksamuel.elastic4s.pekko

/**
* List of 'bad' hosts.
* Implementation must have expiration logic backed-in.
*/
private[pekko] trait Blacklist {

/**
* Adds a host to the blacklist.
*
* @param host host
* @return true if record is blacklisted for the first time
*/
def add(host: String): Boolean

/**
* Removes a host from the blacklist.
*
* @param host host
* @return true if host was blacklisted
*/
def remove(host: String): Boolean

/**
* Checks if a host can be used.
*
* @param host host
* @return true if host is not in a blacklist or temporary removed from it
*/
def contains(host: String): Boolean

/**
* Number of hosts in blacklist
*/
def size: Int

/**
* List all hosts in the blacklist
*
* @return
*/
def list: List[String]
}

Loading

0 comments on commit 76fabf6

Please sign in to comment.