Skip to content

Commit

Permalink
Test with Akka Streams 2.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
waxzce authored and mkiedys committed Jan 4, 2016
1 parent 385b6fd commit e3c27c1
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -12,7 +12,7 @@ Available at Maven Central for Scala 2.10 and 2.11:
Example
----

#### Akka Streams - 1.0
#### Akka Streams - 2.0.1

```Scala
import akka.actor.ActorSystem
Expand All @@ -30,5 +30,5 @@ val exchange = connection.publish(exchange = "accounting_department",
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

Source(queue).map(_.message).to(Sink(exchange)).run()
Source.fromPublisher(queue).map(_.message).runWith(Sink.fromSubscriber(exchange))
```
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -31,7 +31,7 @@ libraryDependencies ++= Seq(
"com.google.code.findbugs" % "jsr305" % "3.0.0",
"org.scalatest" %% "scalatest" % "2.2.4" % "test", // for TCK
"org.reactivestreams" % "reactive-streams-tck" % "1.0.0" % "test",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0" % "test"
"com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1" % "test"
)

publishMavenStyle := true
Expand Down
Expand Up @@ -29,13 +29,13 @@ class ExchangeSubscriberBlackboxSpec(defaultTimeout: FiniteDuration) extends Sub
/** if `elements` is 0 the `Publisher` should signal `onComplete` immediately. */
case 0 Source.empty
/** if `elements` is [[Long.MaxValue]] the produced stream must be infinite. */
case Long.MaxValue Source(() Iterator.continually(message))
case Long.MaxValue Source.fromIterator(() => Iterator.continually(message))
/** It must create a `Publisher` for a stream with exactly the given number of elements. */
case n if n <= Int.MaxValue Source(List.fill(n.toInt)(message))
/** I assume that the number of elements is always less or equal to [[Int.MaxValue]] */
case n sys.error("n > Int.MaxValue")
}

override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.publisher)
override def createHelperPublisher(elements: Long) = createHelperSource(elements).runWith(Sink.asPublisher(true))
override def createElement(element: Int) = message
}
Expand Up @@ -64,7 +64,7 @@ class RabbitConnectionSpec extends FlatSpec with Matchers with ScalaFutures with
it should "return number of consumers" in {
val queue = Queue(name = UUID.randomUUID().toString, exclusive = true)
connection.queueDeclare(queue).flatMap { _
Source(connection.consume(queue.name)).to(Sink.ignore).run()
Source.fromPublisher(connection.consume(queue.name)).runWith(Sink.ignore)
connection.queueDeclare(queue)
}.futureValue should have (
'consumerCount (1)
Expand Down

0 comments on commit e3c27c1

Please sign in to comment.