Skip to content

Commit

Permalink
More docs and added me to the list of contributors.
Browse files Browse the repository at this point in the history
  • Loading branch information
cmcmteixeira committed Jun 14, 2019
1 parent f7009df commit f38e1af
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 679 deletions.
22 changes: 7 additions & 15 deletions build.sbt
Expand Up @@ -119,6 +119,13 @@ lazy val kernelSettings = Seq(
<organization>ITV</organization>
<organizationUrl>http://www.itv.com</organizationUrl>
</developer>
<developer>
<id>cmcmteixeira</id>
<name>Carlos Teixeira</name>
<url>http://cmcmteixeira.github.io</url>
<organization>ITV</organization>
<organizationUrl>http://www.itv.com</organizationUrl>
</developer>
</developers>
)
)
Expand Down Expand Up @@ -260,18 +267,3 @@ lazy val root = (project in file("."))
.aggregate(xml, circe, kamon, argonaut, example, test, core)
.settings(publishArtifact := false)

lazy val readme = scalatex
.ScalatexReadme(
projectId = "readme",
wd = file(""),
url = "https://github.com/ITV/bucky/tree/master",
source = "readme"
)
.settings(
publishArtifact in (Compile, packageDoc) := false,
publishSigned := (),
publishLocalSigned := (),
publish := (),
publishLocal := (),
publishArtifact := false
)
216 changes: 49 additions & 167 deletions docs/consumer.md
@@ -1,36 +1,31 @@
## Consuming messages

Consuming messages with Bucky is done by creating a consumer that executes
a handler function every time a message arrives.

```scala
import cats.effect.IO
import com.itv.bucky._
import com.itv.bucky.circe._
import com.itv.bucky.consume._
import com.itv.bucky.decl.Queue
import com.typesafe.scalalogging.StrictLogging
import io.circe.generic.auto._

object ConsumeMessages extends StrictLogging {
case class Person(name: String, age: Int)

val queue = Queue(QueueName("people-messages"))
### Simple Handler
Consuming messages with Bucky is done in two stages. First you create a handler that's going to be
executed whenever you receive a message:

def handler: Handler[IO, Person] = { message =>
IO.delay {
logger.info(s"${message.name} is ${message.age} years old")
Ack
}
}
```scala
case class Message(foo: String)
class MyHandler extends Handler[IO, Message] {
override def apply(m: Message): IO[ConsumeAction] =
IO(Ack)
}
```

def consume(client: AmqpClient[IO]) = {
for {
_ <- client.declare(List(queue))
_ <- client.registerConsumerOf[Person](queue.name, handler)
} yield ()
and then you register it like so:
```scala
object MyApp extends IOApp {
val config = AmqpClientConfig(host = "127.0.0.1", port = 5672, username = "guest", password = "guest")
override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContext = ExecutionContext.global
(for {
client <- AmqpClient[IO](config)
handler = new MyHandler
_ <- client.declareR(declarations)
_ <- client.registerConsumerOf(QueueName("queue-name"), handler)
} yield ()).use(_ => IO.never)
}

}
```

Expand All @@ -41,156 +36,43 @@ the message that you were processing. Bucky gives you tools to create the
re-queueing behaviour out of the box. You can configure both how many times you
want a message to be re-queued as well as the delay between messages.

```scala

import cats.effect.IO
import com.itv.bucky._
import com.itv.bucky.consume._
import com.itv.bucky.circe._
import com.itv.bucky.decl.Queue
import com.itv.bucky.pattern.requeue.RequeuePolicy
import com.typesafe.scalalogging.StrictLogging
import concurrent.duration._
import io.circe.generic.auto._

object RequeueConsumeMessages extends StrictLogging {
case class Person(name: String, age: Int)
In order to do so, you have to create a requeue handler first:

val queue = Queue(QueueName("people-messages"))

def handler: RequeueHandler[IO, Person] = { message =>
for {
_ <- IO.raiseError(new RuntimeException("Throwing intentional error"))
} yield Ack
}

def consume(client: AmqpClient[IO]) = {
for {
_ <- client.declare(List(queue))
_ <- client.registerRequeueConsumerOf[Person](
queueName = queue.name,
handler = handler,
requeuePolicy = RequeuePolicy(maximumProcessAttempts = 10,requeueAfter = 5.minutes)
)
} yield ()
}
```scala
case class Message(foo: String)
class MyHandler extends RequeueHandler[IO, Message] {
override def apply(m: Message): IO[RequeuConsumeAction] = IO(Ack)
}

```

In the above example the `handler` function will error out every time it runs.
When this happens, the consumer is configured to requeue the message that lead
to the error up to 10 times in a 5 minute interval.

#### Testing handlers

##### Unit testing

When testing handlers, they can be treated similar to pure functions.
They receive a message and produce an output in form of a consume action.

And then register it:
```scala
import cats.effect.IO
import com.itv.bucky.consume.Ack
import com.itv.bucky.{Handler, consume}
import org.scalatest.{Matchers, WordSpec}
object MyApp extends IOApp {
case class Message(foo: String)

class ConsumerSpec extends WordSpec with Matchers {

case class User(id: String)
case class EmailAddress(value: String)
case class Message(email: EmailAddress)

class SendPasswordReset(
fetchEmailAddress: User => IO[EmailAddress],
sendMail: Message => IO[Unit]
) extends Handler[IO, User] {

def apply(recipient: User): IO[consume.ConsumeAction] =
for {
emailAddress <- fetchEmailAddress(recipient)
_ <- sendMail(Message(emailAddress))
} yield Ack
}

"SendPasswordReset" should {
"send emails to the user in question" in new Setup {
result shouldBe Right(Ack)
}
"error out if fetching the email fails" in new Setup {
override val emailResult = IO.raiseError(new RuntimeException("DB failed"))
result.left.map(_.getMessage) shouldBe Left("DB failed")
}
"error out if sending the email fails" in new Setup {
override val sendResult = IO.raiseError(new RuntimeException("Email failed"))
result.left.map(_.getMessage) shouldBe Left("Email failed")
}
val config = AmqpClientConfig(host = "127.0.0.1", port = 5672, username = "guest", password = "guest")
val declarations = List(
Queue(QueueName("queue-name")),
Exchange(ExchangeName("exchange-name")).binding(RoutingKey("rk") -> QueueName("queue-name"))
)

class MyHandler extends RequeueHandler[IO, Message] {
override def apply(m: Message): IO[RequeueConsumeAction] = IO(Ack)
}

trait Setup {
val emailResult = IO.pure(EmailAddress("test@example.com"))
val sendResult = IO.unit

lazy val handler = new SendPasswordReset(
fetchEmailAddress = _ => emailResult,
sendMail = _ => sendResult
)

lazy val result = handler(User("1")).attempt.unsafeRunSync()
}
}

```

##### Integration testing

We can also test handlers by using an actual AMQP client and wiring up the handler
function with a consumer.

```scala

import cats.effect.IO
import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller
import com.itv.bucky.consume.Delivery
import com.itv.bucky.decl.{Exchange, Queue}
import com.itv.bucky.publish.PublishCommandBuilder
import com.itv.bucky.test.{IOAmqpClientTest, StubHandlers}
import com.itv.bucky.{ExchangeName, QueueName, RoutingKey}
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures

class ConsumerTest
extends FunSuite
with IOAmqpClientTest
with ScalaFutures {

test("Consuming messages should work") {
runAmqpTest { client =>
val exchange = ExchangeName("email")
val queue = QueueName("email")
val rk = RoutingKey("email")
val message = "Hello"

val commandBuilder = PublishCommandBuilder
.publishCommandBuilder[String](StringPayloadMarshaller)
.using(exchange)
.using(rk)

val handler = StubHandlers.ackHandler[IO, Delivery]
val declarations = List(Queue(queue), Exchange(exchange).binding((rk, queue)))

for {
_ <- client.declare(declarations)
_ <- client.registerConsumer(queue, handler)
_ <- client.publisher()(commandBuilder.toPublishCommand("Test"))
} yield {
handler.receivedMessages should have size 1
}
}
override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContext = ExecutionContext.global
(for {
client <- AmqpClient[IO](config)
handler = new MyHandler
_ <- client.declareR(declarations)
_ <- client.registerRequeueConsumerOf(QueueName("queue-name"), handler, RequeuePolicy(10, 3.seconds))
} yield client).use{_ => IO.never}
}
}
```

In the above example the `handler` function will error out every time it runs.
When this happens, the consumer is configured to requeue the message that lead
to the error up to 10 times in a 3 seconds interval.

```
86 changes: 49 additions & 37 deletions docs/getting-started.md
Expand Up @@ -4,7 +4,7 @@ In order to get started with bucky, add the following to you `build.sbt`:


```scala
val buckyVersion = "2.0.0-M5"
val buckyVersion = "2.0.0-M10"
libraryDependencies ++= Seq(
"com.itv" %% "bucky-core" % buckyVersion,
"com.itv" %% "bucky-circe" % buckyVersion, //for circe based marshallers/unmarshallers
Expand All @@ -16,8 +16,8 @@ libraryDependencies ++= Seq(

or for ammonite:
```scala
import $ivy.`com.itv::bucky-core:2.0.0-M5`
import $ivy.`com.itv::bucky-circe:2.0.0-M5`
import $ivy.`com.itv::bucky-core:2.0.0-M10`
import $ivy.`com.itv::bucky-circe:2.0.0-M10`
```

Imports, implicits and config:
Expand All @@ -26,54 +26,66 @@ import cats._
import cats.implicits._
import cats.effect._
import io.circe.generic.auto._
import scala.concurrent.{ExecutionContext}


import com.itv.bucky.decl.Exchange
import com.itv.bucky.decl.Queue
import com.itv.bucky._
import com.itv.bucky.circe._
import com.itv.bucky.consume._


implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(10))
implicit val cs : ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)

val config = AmqpClientConfig(host = "127.0.0.1", port= 5672, username="guest", password="guest")
case class Message(foo: String)

import com.itv.bucky.publish._
```
Registering a consumer:
Registering a simple consumer:
```scala
val clientResource = AmqpClient[IO](config)
def handler(s: Message) : IO[ConsumeAction] = {
for {
_ <- IO.delay(println(s"Received: $s"))
} yield Ack
}
clientResource.use { client =>
val declarations = List(
object MyApp extends IOApp {
case class Message(foo: String)

val config = AmqpClientConfig(host = "127.0.0.1", port = 5672, username = "guest", password = "guest")
val declarations = List(
Queue(QueueName("queue-name")),
Exchange(ExchangeName("exchange-name")).binding(RoutingKey("rk") -> QueueName("queue-name"))
)

for {
_ <- client.declare(declarations)
_ <- client.registerConsumerOf[Message](QueueName("queue-name"), handler)
_ <- IO.never //keep running the consumer (for demo purposes only)
} yield ()
}.unsafeRunAsync(println)
)

class MyHandler extends Handler[IO, Message] {
override def apply(m: Message): IO[ConsumeAction] =
IO(Ack)
}

override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContext = ExecutionContext.global
(for {
client <- AmqpClient[IO](config)
handler = new MyHandler
_ <- client.declareR(declarations)
_ <- client.registerConsumerOf(QueueName("queue-name"), handler)
} yield ()).use(_ => IO.never)
}
}
```

Publishing a message:
```scala
val clientResource = AmqpClient[IO](config)
clientResource.use { client =>
val publisher = client.publisherOf[Message](ExchangeName("exchange-name"), RoutingKey("rk"))
for {
_ <- client.declare(List(Exchange(ExchangeName("exchange-name"))))
_ <- publisher(Message("Hello"))
} yield "Message Published"
}.unsafeRunAsync(println)
object MyApp extends IOApp {
case class Message(foo: String)

val config = AmqpClientConfig(host = "127.0.0.1", port = 5672, username = "guest", password = "guest")
val declarations = List(
Queue(QueueName("queue-name")),
Exchange(ExchangeName("exchange-name")).binding(RoutingKey("rk") -> QueueName("queue-name"))
)

override def run(args: List[String]): IO[ExitCode] = {
implicit val ec: ExecutionContext = ExecutionContext.global
(for {
client <- AmqpClient[IO](config)
_ <- client.declareR(declarations)
} yield client).use { client =>
val publisher = client.publisherOf[Message](ExchangeName("exchange-name"), RoutingKey("rk"))
publisher(Message("Hello"))
} *> IO(ExitCode.Success)
}
}
```

For easiness of use, bucky supports the creation of [Wirings](./wiring). A [Wiring](./wiring) centralizes the definition
Expand Down

0 comments on commit f38e1af

Please sign in to comment.