Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
Merge a242e4f into 9f9051a
Browse files Browse the repository at this point in the history
  • Loading branch information
ahjohannessen committed Mar 13, 2018
2 parents 9f9051a + a242e4f commit 4f46e74
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 27 deletions.
34 changes: 24 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
jdk: oraclejdk8

language: scala

scala:
- 2.12.4
- 2.11.12
sudo: required

services:
- docker

env:
global:
- AKKA_TEST_TIMEFACTOR=2.0
matrix:
include:

sudo: false
- scala: 2.11.12
script:
- travis_retry sbt clean coverage test

script: travis_retry sbt clean coverage test
- scala: 2.12.4
script:
- travis_retry sbt clean coverage test

after_success: sbt coverageReport coveralls
- scala: 2.12.4
env: IT=true AKKA_TEST_TIMEFACTOR=2.0
before_install:
- docker pull eventstore/eventstore
before_script:
- docker run -d --rm --name eventstore-node -p 2113:2113 -p 1113:1113 -e EVENTSTORE_STATS_PERIOD_SEC=1200 eventstore/eventstore
- sleep 10
script:
- travis_retry sbt it:test

jdk: oraclejdk8
after_success: sbt coverageReport coveralls
2 changes: 1 addition & 1 deletion src/test/scala/eventstore/ReadAllEventsForwardITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ReadAllEventsForwardITest extends TestConnection {
preparePosition = position.preparePosition - 1
)
readAllEventsFailed(wrongPosition, 10) must throwA[ServerErrorException]
}
}.pendingUntilFixed("It seems this behavior has changed in ES?")

"not read linked events if resolveLinkTos = false" in new TestConnectionScope {
val (linked, link) = linkedAndLink()
Expand Down
4 changes: 4 additions & 0 deletions src/test/scala/eventstore/SoftDeleteStreamITest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package eventstore

import scala.concurrent.duration._

class SoftDeleteStreamITest extends TestConnection {
implicit def direction: ReadDirection = ReadDirection.Forward

Expand Down Expand Up @@ -51,6 +53,7 @@ class SoftDeleteStreamITest extends TestConnection {
// Long.MaxValue = 9223372036854775807
writeMetadata("""{"$tb":9223372036854775807,"test":"test"}""")
appendEventToRecreate()
expectNoMessage(100.millis) // Give ES a bit time
readMetadata() mustEqual """{"$tb":1,"test":"test"}"""
}

Expand Down Expand Up @@ -112,6 +115,7 @@ class SoftDeleteStreamITest extends TestConnection {
def appendEventToRecreate(expVer: ExpectedVersion = ExpectedVersion.Any): EventData = {
val event = newEventData
write(expVer, event)
Thread.sleep(100) // Give ES time - fix this
lastEvent.data mustEqual event
event
}
Expand Down
12 changes: 7 additions & 5 deletions src/test/scala/eventstore/SubscribeToAllCatchingUpITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,19 @@ class SubscribeToAllCatchingUpITest extends TestConnection {
}

"catch link events if resolveLinkTos = false" in new SubscribeToAllCatchingUpScope {
newSubscription(Some(lastPosition), resolveLinkTos = false)
fishForLiveProcessingStarted()
val last = lastPosition
newSubscription(Some(last))
fishForLiveProcessingStarted(last)
val (linked, link) = linkedAndLink()
expectIndexedEvent.event mustEqual linked
expectIndexedEvent
expectIndexedEvent.event mustEqual link
}

"catch link events if resolveLinkTos = true" in new SubscribeToAllCatchingUpScope {
newSubscription(Some(lastPosition), resolveLinkTos = true)
fishForLiveProcessingStarted()
val last = lastPosition
newSubscription(Some(last), resolveLinkTos = true)
fishForLiveProcessingStarted(last)
val (linked, link) = linkedAndLink()
expectIndexedEvent.event mustEqual linked
expectIndexedEvent
Expand Down Expand Up @@ -196,7 +198,7 @@ class SubscribeToAllCatchingUpITest extends TestConnection {
testKit: TestKitBase = this
): Position = testKit.expectMsgType[AnyRef] match {
case LiveProcessingStarted => position
case IndexedEvent(_, x) =>
case IndexedEvent(e, x) if !e.streamId.isSystem || !e.streamId.isMetadata =>
x must beGreaterThanOrEqualTo(position)
fishForLiveProcessingStarted(x, testKit)
}
Expand Down
18 changes: 9 additions & 9 deletions src/test/scala/eventstore/SubscribeToAllITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class SubscribeToAllITest extends TestConnection {

clients.foreach {
case (client, commitPosition) =>
val indexedEvent = expectStreamEventAppeared(client)
val indexedEvent = expectNonSystemStreamEventAppeared(client)
indexedEvent.position.commitPosition must >(commitPosition)
val event = indexedEvent.event
event.number mustEqual EventNumber(1)
Expand All @@ -27,10 +27,10 @@ class SubscribeToAllITest extends TestConnection {
"catch created and deleted events as well" in new SubscribeToAll {
val lastCommitPosition = subscribeToAll()
appendEventToCreateStream()
expectStreamEventAppeared().event.number mustEqual EventNumber.First
expectNonSystemStreamEventAppeared().event.number mustEqual EventNumber.First
deleteStream()

val indexedEvent = expectStreamEventAppeared()
val indexedEvent = expectNonSystemStreamEventAppeared()
indexedEvent.position.commitPosition must >(lastCommitPosition)
indexedEvent.event must beLike {
case Event.StreamDeleted() => ok
Expand All @@ -40,17 +40,17 @@ class SubscribeToAllITest extends TestConnection {
"not catch linked events if resolveLinkTos = false" in new SubscribeToAll {
subscribeToAll(resolveLinkTos = false)
val (linked, link) = linkedAndLink()
expectStreamEventAppeared().event mustEqual linked
expectStreamEventAppeared()
expectStreamEventAppeared().event mustEqual link
expectNonSystemStreamEventAppeared().event mustEqual linked
expectNonSystemStreamEventAppeared()
expectNonSystemStreamEventAppeared().event mustEqual link
}

"catch linked events if resolveLinkTos = true" in new SubscribeToAll {
subscribeToAll(resolveLinkTos = true)
val (linked, link) = linkedAndLink()
expectStreamEventAppeared().event mustEqual linked
expectStreamEventAppeared()
expectStreamEventAppeared().event mustEqual ResolvedEvent(linked, link)
expectNonSystemStreamEventAppeared().event mustEqual linked
expectNonSystemStreamEventAppeared()
expectNonSystemStreamEventAppeared().event mustEqual ResolvedEvent(linked, link)
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/test/scala/eventstore/TestConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ abstract class TestConnection extends util.ActorSpec {
event
}

def expectNonSystemStreamEventAppeared(testKit: TestKitBase = this, max: Duration = Duration.Undefined) = {

val sea = testKit.fishForSpecificMessage[StreamEventAppeared](max) {
case sea @ StreamEventAppeared(IndexedEvent(e, _)) if !(e.streamId.isSystem || e.streamId.isMetadata) sea.fixDate
}

sea.event.event.streamId mustEqual streamId
sea.event
}

def mustBeSorted[T](xs: List[T])(implicit direction: ReadDirection, ordering: Ordering[T]): Unit = {
xs.map {
case ResolvedEvent(_, link) => link.asInstanceOf[T]
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/eventstore/j/EsConnectionITest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class EsConnectionITest extends eventstore.util.ActorSpec {

"publish stream events" in new TestScope {
val streamId = s"java-publish-$randomUuid"
val publisher = connection.streamPublisher(streamId, null, false, null, false)
def publisher = connection.streamPublisher(streamId, null, false, null, false)
await_(connection.writeEvents(streamId, null, events, null))
Source.fromPublisher(publisher)
.map(_.data)
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/eventstore/tcp/ConnectionActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ConnectionActorSpec extends util.ActorSpec with Mockito {
"use reconnectionDelay from settings" in new TestScope {
connectedAndIdentified()
client ! tcpException
tcp.expectNoMsg(200.millis)
tcp.expectNoMsg(100.millis)
verifyReconnections(settings.maxReconnections)

override def settings = Settings(maxReconnections = 3, reconnectionDelayMin = 500.millis)
Expand Down

0 comments on commit 4f46e74

Please sign in to comment.