Skip to content

Commit

Permalink
Merge 0e12e94 into 0b32149
Browse files Browse the repository at this point in the history
  • Loading branch information
ahjohannessen committed Jul 11, 2020
2 parents 0b32149 + 0e12e94 commit 728aff7
Show file tree
Hide file tree
Showing 39 changed files with 356 additions and 134 deletions.
23 changes: 17 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ jdk:
language: scala

scala:
- 2.13.1
- 2.12.10
- 2.13.3
- 2.12.11

sudo: required

Expand All @@ -21,16 +21,27 @@ after_success:

jobs:
include:
- stage: integration
scala: 2.13.1
- stage: integration-5.x
scala: 2.13.3
env: AKKA_TEST_TIMEFACTOR=1.5 AKKA_TEST_LOGLEVEL=OFF
before_install:
- docker pull eventstore/eventstore:release-5.0.5
- docker run -d --rm --name eventstore-node -p 2113:2113 -p 1113:1113 -e EVENTSTORE_MEM_DB=True -e EVENTSTORE_STATS_PERIOD_SEC=2400 eventstore/eventstore:release-5.0.5
- docker pull eventstore/eventstore:release-5.0.8
- docker run -d --rm --name eventstore-node-5 -p 2113:2113 -p 1113:1113 -e EVENTSTORE_MEM_DB=True -e EVENTSTORE_STATS_PERIOD_SEC=2400 eventstore/eventstore:release-5.0.8
script:
- sbt test:compile
- travis_retry sbt it:test

- stage: integration-20.x
scala: 2.13.3
env: AKKA_TEST_TIMEFACTOR=1.5 AKKA_TEST_LOGLEVEL=OFF ES_TEST_ADDRESS_PORT=1114 ES_TEST_HTTP_PORT=2114 ES_TEST_IS_20_SERIES=true
before_install:
- docker pull quay.io/ahjohannessen/eventstore-20.6.0-bionic
- docker run -d --rm --name eventstore-node-20 -p 2114:2113 -p 1114:1113 -e EVENTSTORE_DEV=True -e EVENTSTORE_MEM_DB=True -e EVENTSTORE_ENABLE_EXTERNAL_TCP=True -e EVENTSTORE_RUN_PROJECTIONS=All quay.io/ahjohannessen/eventstore-20.6.0-bionic
script:
- sbt test:compile
- travis_retry sbt it:test


cache:
# These directories are cached to S3 at the end of the build
directories:
Expand Down
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@
This change log is ordered chronologically, so each release contains all changes described below
it. Changes start from v6.0.0 and only relevant or interesting changes are mentioned.

## v7.2.0 (2020-07-10)

### Enhancements

* Ability to connect to ES v20.6.0+ node(s) using non-TLS.

### Dependencies

* akka-*: 2.6.1 -> 2.6.7
* akka-http-*: 10.1.11 -> 10.1.12
* specs2-core: 4.8.3 -> 4.10.0

### Misc / Build

* scala: 2.13.1 -> 2.13.3, 2.12.10 -> 2.12.11
* sbt: 1.3.6 -> 1.3.13

* sbt-plugins:
* sbt-tpolecat: 0.1.10 -> 0.1.13

## v7.1.0 (2020-01-08)

### Breaking changes
Expand Down
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
<table border="0">
<tr>
<td><a href="http://www.scala-lang.org">Scala</a> </td>
<td>2.13.1 / 2.12.10</td>
<td>2.13.3 / 2.12.11</td>
</tr>
<tr>
<td><a href="http://akka.io">Akka</a> </td>
<td>2.6.1</td>
<td>2.6.7</td>
</tr>
<tr>
<td><a href="https://eventstore.org">Event Store</a></td>
<td>v5.0.0 and higher is supported</td>
</tr>
<td>v5.x is supported - v20.x using non-TLS is supported</td>
</tr>
</table>


Expand Down Expand Up @@ -51,15 +51,15 @@ connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

#### Sbt
```scala
libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.1.0"
libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.2.0"
```

#### Maven
```xml
<dependency>
<groupId>com.geteventstore</groupId>
<artifactId>eventstore-client_${scala.version}</artifactId>
<version>7.1.0</version>
<version>7.2.0</version>
</dependency>
```

Expand Down Expand Up @@ -262,7 +262,7 @@ object ReadEventExample extends App {
)

val connection = system.actorOf(ConnectionActor.props(settings))
implicit val readResult = system.actorOf(Props[ReadResult])
implicit val readResult = system.actorOf(Props[ReadResult]())

connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

Expand Down Expand Up @@ -353,7 +353,7 @@ import eventstore.akka.tcp.ConnectionActor
object CountAll extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props(), "connection")
val countAll = system.actorOf(Props[CountAll], "count-all")
val countAll = system.actorOf(Props[CountAll](), "count-all")
system.actorOf(SubscriptionActor.props(connection, countAll, None, None, Settings.Default), "subscription")
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ lazy val commonSettings = Seq(

organization := "com.geteventstore",
scalaVersion := crossScalaVersions.value.head,
crossScalaVersions := Seq("2.13.1", "2.12.10"),
crossScalaVersions := Seq("2.13.3", "2.12.11"),
releaseCrossBuild := true,
licenses := Seq("BSD 3-Clause" -> url("http://raw.github.com/EventStore/EventStore.JVM/master/LICENSE")),
homepage := Some(new URL("http://github.com/EventStore/EventStore.JVM")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object OverflowStrategy {
private lazy val map = Values.map(x => (x.toString.toLowerCase, x)).toMap

def apply(x: String): OverflowStrategy =
map getOrElse (x.toLowerCase, sys error s"No OverflowStrategy found by $x")
map.getOrElse(x.toLowerCase, sys error s"No OverflowStrategy found by $x")

case object DropHead extends OverflowStrategy
case object DropTail extends OverflowStrategy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,40 +90,40 @@ private[eventstore] class PersistentSubscriptionActor private (
}
// Ignore events sent while unsubscribed
case Event(PS.EventAppeared(_), _) =>
stay
stay()
}

when(PSA.LiveProcessing) {
case Event(PS.EventAppeared(event), details: PSA.SubscriptionDetails) =>
if (autoAck) toConnection(Ack(details.subscriptionId, getEventId(event) :: Nil))
client ! event
stay
stay()
case Event(PSA.ManualAck(eventId), details: PSA.SubscriptionDetails) =>
toConnection(Ack(details.subscriptionId, eventId :: Nil))
stay
stay()
case Event(PSA.ManualNak(eventId), details: PSA.SubscriptionDetails) =>
toConnection(Nak(details.subscriptionId, List(eventId), Retry, None))
stay
stay()
}

when(PSA.CatchingUp) {
case Event(PS.EventAppeared(event), details: PSA.SubscriptionDetails) =>
if (autoAck) toConnection(Ack(details.subscriptionId, getEventId(event) :: Nil))
client ! event
if (details.lastEventNum.exists(_ <= event.number)) goto(PSA.LiveProcessing) using details
else stay
else stay()
case Event(PSA.ManualAck(eventId), details: PSA.SubscriptionDetails) =>
toConnection(Ack(details.subscriptionId, eventId :: Nil))
stay
stay()
case Event(PSA.ManualNak(eventId), details: PSA.SubscriptionDetails) =>
toConnection(Nak(details.subscriptionId, List(eventId), Retry, None))
stay
stay()
}

whenUnhandled {
// If a reconnect is launched in LiveProcessing or CatchingUp, then renew subId
case Event(PS.Connected(subId, _, eventNum), _) =>
stay using subscriptionDetails(subId, eventNum)
stay() using subscriptionDetails(subId, eventNum)
// Error conditions
// This handles when the client or connection is terminated (unrecoverable)
case Event(Terminated(_), _) =>
Expand All @@ -138,7 +138,7 @@ private[eventstore] class PersistentSubscriptionActor private (
stop()
case Event(e, s) =>
log.warning(s"Received unhandled $e in state $stateName with state $s")
stay
stay()
}

initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ private[eventstore] object ClusterJsonProtocol extends DefaultJsonProtocol {
}

implicit object MemberInfoFormat extends RootJsonFormat[MemberInfo] {
private val MappingFormat = jsonFormat21(Mapping)
private val MappingFormatPreSeries20 = jsonFormat21(MappingPreSeries20)
private val MappingFormatPostSeries20 = jsonFormat19(MappingPostSeries20)

def read(json: JsValue): MemberInfo = {
val m = MappingFormat.read(json)

MemberInfo(
def pre20(m: MappingPreSeries20) = MemberInfo(
instanceId = m.instanceId,
timestamp = m.timeStamp,
state = m.state,
Expand All @@ -61,10 +61,36 @@ private[eventstore] object ClusterJsonProtocol extends DefaultJsonProtocol {
epochId = m.epochId,
nodePriority = m.nodePriority
)

def post20(m: MappingPostSeries20) = MemberInfo(
instanceId = m.instanceId,
timestamp = m.timeStamp,
state = m.state,
isAlive = m.isAlive,
internalTcp = m.internalTcpIp :: m.internalTcpPort,
externalTcp = m.externalTcpIp :: m.externalTcpPort,
internalSecureTcp = m.internalTcpIp :: m.internalSecureTcpPort,
externalSecureTcp = m.externalTcpIp :: m.externalSecureTcpPort,
internalHttp = m.httpEndPointIp :: m.httpEndPointPort,
externalHttp = m.httpEndPointIp :: m.httpEndPointPort,
lastCommitPosition = m.lastCommitPosition,
writerCheckpoint = m.writerCheckpoint,
chaserCheckpoint = m.chaserCheckpoint,
epochPosition = m.epochPosition,
epochNumber = m.epochNumber,
epochId = m.epochId,
nodePriority = m.nodePriority
)

json.asJsObject.fields.get("httpEndPointIp").fold(
pre20(MappingFormatPreSeries20.read(json)))(
_ => post20(MappingFormatPostSeries20.read(json))
)

}

def write(x: MemberInfo): JsValue = {
val m = Mapping(
val m = MappingPreSeries20(
instanceId = x.instanceId,
timeStamp = x.timestamp,
state = x.state,
Expand All @@ -88,10 +114,10 @@ private[eventstore] object ClusterJsonProtocol extends DefaultJsonProtocol {
nodePriority = x.nodePriority
)

MappingFormat.write(m)
MappingFormatPreSeries20.write(m)
}

final case class Mapping(
final case class MappingPreSeries20(
instanceId: Uuid,
timeStamp: ZonedDateTime,
state: NodeState,
Expand All @@ -114,6 +140,31 @@ private[eventstore] object ClusterJsonProtocol extends DefaultJsonProtocol {
epochId: Uuid,
nodePriority: Int
)

///

final case class MappingPostSeries20(
instanceId: Uuid,
timeStamp: ZonedDateTime,
state: NodeState,
isAlive: Boolean,
internalTcpIp: String,
internalTcpPort: Int,
internalSecureTcpPort: Int,
externalTcpIp: String,
externalTcpPort: Int,
externalSecureTcpPort: Int,
httpEndPointIp: String,
httpEndPointPort: Int,
lastCommitPosition: Long,
writerCheckpoint: Long,
chaserCheckpoint: Long,
epochPosition: Long,
epochNumber: Int,
epochId: Uuid,
nodePriority: Int
)

}

implicit object ClusterInfoFormat extends RootJsonFormat[ClusterInfo] {
Expand Down
19 changes: 19 additions & 0 deletions client/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,23 @@ akka {

test.timefactor = 1.0
test.timefactor = ${?AKKA_TEST_TIMEFACTOR}
}

eventstore {
# IP & port of Event Store
address {
host = "127.0.0.1"
host = ${?ES_TEST_ADDRESS_HOST}
port = 1113
port = ${?ES_TEST_ADDRESS_PORT}
}

http {
protocol = "http"
protocol = ${?ES_TEST_HTTP_PROTOCOL}
port = 2113
port = ${?ES_TEST_HTTP_PORT}
prefix = ""
prefix = ${?ES_TEST_HTTP_PREFIX}
}
}

0 comments on commit 728aff7

Please sign in to comment.