Skip to content

Commit

Permalink
Merge ff2f61e into da2b9dc
Browse files Browse the repository at this point in the history
  • Loading branch information
ahjohannessen committed Jan 9, 2020
2 parents da2b9dc + ff2f61e commit 642f9c3
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 75 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Expand Up @@ -29,6 +29,12 @@ TAGS
# Scala
*.sc

# Metals
.metals/*
.bloop/*
project/.bloop/*
project/metals.sbt

# Mac
.DS_Store

Expand Down
12 changes: 6 additions & 6 deletions .travis.yml
@@ -1,12 +1,12 @@
jdk:
- oraclejdk8
- openjdk8
- openjdk11

language: scala

scala:
- 2.13.0
- 2.12.9
- 2.13.1
- 2.12.10

sudo: required

Expand All @@ -22,11 +22,11 @@ after_success:
jobs:
include:
- stage: integration
scala: 2.13.0
scala: 2.13.1
env: AKKA_TEST_TIMEFACTOR=1.5 AKKA_TEST_LOGLEVEL=OFF
before_install:
- docker pull eventstore/eventstore:release-5.0.2
- docker run -d --rm --name eventstore-node -p 2113:2113 -p 1113:1113 -e EVENTSTORE_MEM_DB=True -e EVENTSTORE_STATS_PERIOD_SEC=1200 eventstore/eventstore:release-5.0.2
- docker pull eventstore/eventstore:release-5.0.5
- docker run -d --rm --name eventstore-node -p 2113:2113 -p 1113:1113 -e EVENTSTORE_MEM_DB=True -e EVENTSTORE_STATS_PERIOD_SEC=2400 eventstore/eventstore:release-5.0.5
script:
- sbt test:compile
- travis_retry sbt it:test
Expand Down
24 changes: 24 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,30 @@
This change log is ordered chronologically, so each release contains all changes described below
it. Changes start from v6.0.0 and only relevant or interesting changes are mentioned.

## v7.1.0 (2020-01-08)

### Breaking changes

* Moving to Akka 2.6.1
* Change constructor of `eventstore.akka.EsConnection` to require implicit `ActorSystem`.

### Dependencies

* akka-*: 2.5.25 -> 2.6.1
* akka-http-*: 10.1.9 -> 10.1.11
* config: 1.3.4 -> 1.4.0
* reactive-streams: 1.0.2 -> 1.0.3
* specs2-core: 4.5.1 -> 4.8.3

### Misc / Build

* scala: 2.13.0 -> 2.13.1, 2.12.9 -> 2.12.10
* sbt: 1.2.8 -> 1.3.6

* sbt-plugins:
* sbt-scoverage: 1.6.0 -> 1.6.1
* sbt-tpolecat: 0.1.6 -> 0.1.10

## v7.0.2 (2019-08-22)

### Dependencies
Expand Down
14 changes: 6 additions & 8 deletions README.md
Expand Up @@ -3,11 +3,11 @@
<table border="0">
<tr>
<td><a href="http://www.scala-lang.org">Scala</a> </td>
<td>2.13.0 / 2.12.9</td>
<td>2.13.1 / 2.12.10</td>
</tr>
<tr>
<td><a href="http://akka.io">Akka</a> </td>
<td>2.5.25</td>
<td>2.6.1</td>
</tr>
<tr>
<td><a href="https://eventstore.org">Event Store</a></td>
Expand Down Expand Up @@ -51,15 +51,15 @@ connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

#### Sbt
```scala
libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.0.2"
libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.1.0"
```

#### Maven
```xml
<dependency>
<groupId>com.geteventstore</groupId>
<artifactId>eventstore-client_${scala.version}</artifactId>
<version>7.0.2</version>
<version>7.1.0</version>
</dependency>
```

Expand Down Expand Up @@ -432,12 +432,11 @@ Here is a short example on how to use it:

```scala
import _root_.akka.actor.ActorSystem
import _root_.akka.stream.ActorMaterializer

object ListAllStreamsExample extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()

val connection = EventStoreExtension(system).connection
val source = connection.streamSource(EventStream.System.`$streams`, infinite = false, resolveLinkTos = true)

Expand All @@ -456,14 +455,13 @@ Here is a short example on how to accomplish that:

```scala
import _root_.akka.actor.ActorSystem
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._

object MessagesPerSecondReactiveStreams extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val connection = EventStoreExtension(system).connection

val publisher: Publisher[String] = connection.allStreamsSource()
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -6,7 +6,7 @@ lazy val commonSettings = Seq(

organization := "com.geteventstore",
scalaVersion := crossScalaVersions.value.head,
crossScalaVersions := Seq("2.13.0", "2.12.9"),
crossScalaVersions := Seq("2.13.1", "2.12.10"),
releaseCrossBuild := true,
licenses := Seq("BSD 3-Clause" -> url("http://raw.github.com/EventStore/EventStore.JVM/master/LICENSE")),
homepage := Some(new URL("http://github.com/EventStore/EventStore.JVM")),
Expand Down
10 changes: 4 additions & 6 deletions client/src/main/scala/eventstore/akka/EsConnection.scala
Expand Up @@ -6,7 +6,6 @@ import scala.concurrent.{ExecutionContext, Future}
import _root_.akka.NotUsed
import _root_.akka.actor._
import _root_.akka.pattern.ask
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl._
import _root_.akka.util.Timeout
import org.reactivestreams.Publisher
Expand All @@ -24,8 +23,7 @@ class EsConnection(
connection: ActorRef,
factory: ActorRefFactory,
settings: Settings = Settings.Default
) {
private val materializer = ActorMaterializer()(factory)
)(implicit system: ActorSystem) {

implicit val timeout = Timeout(settings.operationTimeout)

Expand Down Expand Up @@ -273,7 +271,7 @@ class EsConnection(
credentials,
infinite,
readBatchSize
).runWith(Sink.asPublisher(fanout = true))(materializer)
).runWith(Sink.asPublisher(fanout = true))
}

/**
Expand Down Expand Up @@ -348,7 +346,7 @@ class EsConnection(
credentials,
infinite,
readBatchSize
).runWith(Sink.asPublisher(fanout = true))(materializer)
).runWith(Sink.asPublisher(fanout = true))
}

/**
Expand Down Expand Up @@ -395,6 +393,6 @@ object EsConnection {
connection = system actorOf props,
factory = system,
settings = settings
)
)(system)
}
}
Expand Up @@ -9,7 +9,7 @@ class EventStoreExtension(system: ActorSystem) extends Extension {
def settings: Settings = Settings(system.settings.config)

lazy val actor: ActorRef = system.actorOf(ConnectionActor.props(settings), "eventstore-connection")
lazy val connection: EsConnection = new EsConnection(actor, system, settings)
lazy val connection: EsConnection = new EsConnection(actor, system, settings)(system)
lazy val projectionsClient: ProjectionsClient = new ProjectionsClient(settings, system)

/**
Expand Down
5 changes: 2 additions & 3 deletions client/src/main/scala/eventstore/akka/ProjectionsClient.scala
Expand Up @@ -9,7 +9,6 @@ import _root_.akka.http.scaladsl.Http
import _root_.akka.http.scaladsl.model._
import _root_.akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials}
import _root_.akka.http.scaladsl.unmarshalling.Unmarshal
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl.{Flow, Sink, Source}
import ProjectionsClient.ProjectionCreationResult._
import ProjectionsClient.ProjectionDeleteResult._
Expand Down Expand Up @@ -103,10 +102,10 @@ object ProjectionException {
*/
class ProjectionsClient(settings: Settings = Settings.Default, system: ActorSystem) extends ProjectionsUrls {

implicit val materializer: ActorMaterializer = ActorMaterializer.create(system)
private implicit val as = system

import system.dispatcher
import ProjectionsClient._
import materializer.executionContext

private val connection: Flow[HttpRequest, Try[HttpResponse], NotUsed] = {
val httpExt = Http(system)
Expand Down
Expand Up @@ -8,7 +8,6 @@ import scala.concurrent._
import scala.util.Try
import _root_.akka.actor.ActorSystem
import _root_.akka.http.scaladsl.Http.HostConnectionPool
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl._
import _root_.akka.http.scaladsl.Http
import _root_.akka.http.scaladsl.model._
Expand All @@ -28,7 +27,6 @@ private[eventstore] object ClusterInfoOf {

val http = Http(system)
val acceptHeader = headers.Accept(MediaRange(MediaTypes.`application/json`))
implicit val materializer = ActorMaterializer()

val pools = TrieMap.empty[Uri, Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), HostConnectionPool]]

Expand Down
28 changes: 22 additions & 6 deletions client/src/main/scala/eventstore/akka/tcp/ConnectionActor.scala
Expand Up @@ -7,7 +7,7 @@ import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import _root_.akka.actor._
import _root_.akka.stream.scaladsl._
import _root_.akka.stream.{ActorMaterializer, BufferOverflowException, StreamTcpException}
import _root_.akka.stream.{CompletionStrategy, BufferOverflowException, StreamTcpException}
import eventstore.core.{HeartbeatRequest, HeartbeatResponse}
import eventstore.core.NotHandled.NotMaster
import eventstore.core.settings.ClusterSettings
Expand Down Expand Up @@ -362,15 +362,31 @@ private[eventstore] class ConnectionActor(settings: Settings) extends Actor with
}

def connect(address: InetSocketAddress): Unit = {
implicit val materializer = ActorMaterializer()(context)

import settings.{connectionTimeout, heartbeatTimeout, bufferSize, bufferOverflowStrategy}

val connection = tcp.outgoingConnection(
remoteAddress = address,
connectTimeout = settings.connectionTimeout,
idleTimeout = settings.heartbeatTimeout
connectTimeout = connectionTimeout,
idleTimeout = heartbeatTimeout
)
val source = Source.actorRef(settings.bufferSize, settings.bufferOverflowStrategy.toAkka)
val sink = Sink.actorRef(self, Disconnected(address))

val completionMatcher: PartialFunction[Any, CompletionStrategy] = {
case Status.Success(s: CompletionStrategy) => s
case Status.Success(_) => CompletionStrategy.draining
case Status.Success => CompletionStrategy.draining
}

val failureMatcher: PartialFunction[Any, Throwable] = {
case Status.Failure(cause) => cause
}

// completionMatcher & failureMatcher are the same as the deprecated `Source.actorRef` Akka 2.6.0 uses.
val source = Source.actorRef(completionMatcher, failureMatcher, bufferSize, bufferOverflowStrategy.toAkka)

// onFailureMessage is the same as the deprecated `Sink.actorRef` in Akka 2.6.0 uses.
val sink = Sink.actorRef(self, Disconnected(address), th => Status.Failure(th))

val (ref, connected) = source.viaMat(connection.join(flow))(Keep.both).toMat(sink)(Keep.left).run()
for { _ <- connected } self.tell(Connected(address), ref)
}
Expand Down
@@ -1,17 +1,8 @@
package eventstore
package akka

import _root_.akka.stream.ActorMaterializer

abstract class AbstractStreamsITest extends ActorSpec {

implicit val materializer = ActorMaterializer()

override def afterAll() = {
materializer.shutdown()
super.afterAll()
}

trait TestScope extends ActorScope {
def settings = Settings.Default
}
Expand Down
Expand Up @@ -3,11 +3,9 @@ package akka
package streams

import scala.concurrent.duration._
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.testkit.scaladsl.TestSink

class AllStreamsSourceITest extends TestConnection {
implicit val materializer = ActorMaterializer()

"AllStreamsSource" should {

Expand Down
Expand Up @@ -4,7 +4,6 @@ package streams

import scala.concurrent.duration._
import _root_.akka.NotUsed
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.scaladsl._
import _root_.akka.stream.testkit.scaladsl._
import _root_.akka.testkit._
Expand All @@ -13,8 +12,6 @@ abstract class SourceSpec extends ActorSpec {

abstract class AbstractSourceScope[T] extends ActorScope {

implicit val materializer: ActorMaterializer = ActorMaterializer()

val duration = 1.second
val readBatchSize = 2
val resolveLinkTos = false
Expand Down
Expand Up @@ -3,13 +3,10 @@ package akka
package streams

import scala.concurrent.duration._
import _root_.akka.stream.ActorMaterializer
import _root_.akka.stream.testkit.scaladsl.TestSink

class StreamSourceITest extends TestConnection {

implicit val materializer = ActorMaterializer()

"StreamSource" should {
"subscribe to stream" in new Scope {
val events = appendMany(3)
Expand Down
Expand Up @@ -5,16 +5,14 @@ package tcp
import scala.util.{Success, Try}
import scala.collection.mutable.{Queue => MQueue}
import _root_.akka.stream.scaladsl._
import _root_.akka.stream.{ActorMaterializer, OverflowStrategy}
import _root_.akka.stream.OverflowStrategy
import _root_.akka.testkit.TestProbe
import eventstore.core.{HeartbeatRequest, HeartbeatResponse}
import eventstore.core.tcp.{PackIn, PackOut}
import testutil.TestLogger

class BidiLoggingSpec extends ActorSpec {

implicit val materializer = ActorMaterializer()

"BidiLogging" should {

"log incoming & outgoing if enabled" in new TestScope {
Expand Down Expand Up @@ -78,8 +76,8 @@ class BidiLoggingSpec extends ActorSpec {
case PackIn(message, _) => sys error s"unexpected $message"
}
val (source, _) = (logging join flow).runWith(
Source.actorRef(100, OverflowStrategy.fail),
Sink.actorRef[PackOut](sink.ref, "completed")
Source.actorRef(PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.fail),
Sink.actorRef[PackOut](sink.ref, "completed", { _: Throwable => "failed" })
)
}
}

0 comments on commit 642f9c3

Please sign in to comment.