Skip to content
Browse files
Change model for state management to use Ref
  • Loading branch information
MartinSnyder committed Apr 7, 2019
1 parent 312517b commit a52d13614eb29fd7b115c7cb7929a2afce5bd100
Showing with 13 additions and 13 deletions.
  1. +5 −9
  2. +8 −4 src/main/scala/com/martinsnyder/chatserver/ChatServer.scala
@@ -15,7 +15,7 @@ This project is a practical example of:
3. InputMessage routed to Queue
### In ChatServer (processingStream)
4. Queued InputMessages pumped through the current ChatState, producing a "next" state and some number of OutputMessage objects.
5. "Next" state is preserved for the next iteration
5. "Next" state is managed by a functional mutable reference (Ref)
6. OutputMessage objects are routed to a topic (publish/subscribe pattern)
### In ChatRoutes (toClient)
7. OutputMessage objects are received from the topic
@@ -24,7 +24,7 @@ This project is a practical example of:

## Notes
### Concurrency in fs2
This implementation relies heavily on the concurrency objects Queue and Topic. They are
This implementation relies heavily on the concurrency objects Ref, Queue and Topic. They are
used both to move messages between streams and also to control the flow of messages in general.

A more complicated implementation would queue on a per-room basis. That would require per-room
@@ -33,13 +33,9 @@ queues though, and would distract from what this example is trying to demonstrat
### Limitations
The biggest weakness of what I've done here is that *all* traffic is routed through a single queue.
This neutralizes many of the benefits of http4s. Queueing is necessary in this case because we
need to sequence our messages in order to conduct the state transformations on ChatState.

Another weakness is that the current state is embedded entirely within "processingStream." This
prevents other services from accessing that state. For instance, it would be useful to
implement a monitoring RESTful endpoint that reported on usage metrics, but that is not
possible without refactoring the state to be more accessible. See the "Ref" link in
[Further reading](#further-reading) for the technique to address that.
need to sequence our messages in order to conduct the state transformations on ChatState. A more
complicated model would use multiple queues (and potentially multiple state objects), perhaps
along room boundaries.

### Functional enhancements
Functional improvements to this application can be implemented by modifying InputMessage,
@@ -2,6 +2,7 @@ package com.martinsnyder.chatserver

import scala.util.Try
import cats.effect._
import cats.effect.concurrent.Ref
import cats.implicits._
import fs2.Stream
import fs2.concurrent.{Queue, Topic}
@@ -24,6 +25,7 @@ object HelloWorldServer extends IOApp {

for (
// Synchronization objects must be created at the top-level so they can be shared across the application
ref <- Ref.of[IO, ChatState](ChatState());
queue <- Queue.unbounded[IO, InputMessage];
topic <- Topic[IO, OutputMessage](SendToUsers(Set.empty, ""));
exitCode <- {
@@ -36,13 +38,15 @@ object HelloWorldServer extends IOApp {
val keepAlive = Stream.awakeEvery[IO](30.seconds).map(_ => KeepAlive).through(topic.publish)

// Stream to process items from the queue and publish the results to the topic
// Note mapAccumulate below which performs our state manipulation
// 1. Dequeue
// 2. apply message to state reference
// 3. Convert resulting output messages to a stream
// 4. Publish output messages to the publish/subscribe topic
val processingStream =
.mapAccumulate(ChatState())((prevState, inputMsg) => prevState.process(inputMsg))
.map(_._2) // Strip our state object from the stream and propagate only our messages
.flatMap(Stream.emits) // Lift our messages into a stream of their own
.flatMap(msg => Stream.eval(ref.modify(_.process(msg))))

// fs2 Streams must be "pulled" to process messages. Drain will perpetually pull our top-level streams

0 comments on commit a52d136

Please sign in to comment.