Skip to content

Commit

Permalink
New blog post on Scala implementation of the state pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
francois committed Mar 1, 2012
1 parent e3826fc commit 7670a5c
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
---
title: "Scala Case Classes and State Machines Using Akka Actors"
created_at: 2012-02-29 21:04:54
id: 20120229210454
tags:
- scala
blog_post: true
filter:
- erb
- textile
---
At Seevibes we use the Twitter Streaming API to harvest what people say about television shows. The Twitter Streaming API docs state that:

bq. Upon a change, reconnect immediately if no changes have occurred for some time. For example, reconnect no more than twice every four minutes, or three times per six minutes, or some similar metric.

From "Updating Filter Predicates":https://dev.twitter.com/docs/streaming-api/concepts#updating-filter-predicates

The way my system is architected, whenever we change the list of keywords, a <code>ShowDataErased</code> message is sent, followed by a bunch of <code>ShowKeywordsReplaced</code> events, one per show. The system uses <a href="http://en.wikipedia.org/wiki/CQRS"><acronym title="Command-Query Responsibility Segregation">CQRS</acronym></a> where each change that occurs is reflected in the system through one or more events, hence the two events above.

Here's my state machine in all it's gory details:

<img src="<%= same_folder_url("streaming-harvester-state-machine.png") %>" width="480" alt="Seevibes Streaming Harvester State Machine">

What the machine does is:

* When streaming, and time passes, nothing happens;
* When streaming and we change keywords (either reset or add keyword), we enter a state where we're pending some changes;
* When we're pending, and we change keywords (reset or add again), we stay in the pending state;
* When we're pending, and insufficient time has passed, then we must stay in the same state, in case we receive more keyword change events;
* Finally, if we're pending and sufficient time has passed, we just go back to the streaming state.

The obvious first test is to start with the fact that when we're streaming, we must still be streaming. Here's an implementation of this test.

<% code :lang => :scala do %>import org.junit.Test
import org.scalatest.Assertions
import org.joda.time.Instant

class StreamingStateMachineTest extends Assertions {
val KEY2 = Set("b")

@Test
def whenStreaming_andReceiveTick_thenShouldStillBeStreaming() {
expect(Streaming(KEY2)) {
Streaming(KEY2).tick(new Instant())
}
}
}
<% end %>

For the Rubyists out there, case classes are a kind of class that has a few nice properties:

* They act like methods, thus we can call the <code>Streaming</code> method (which the compiler turns into a call of <code>Streaming.apply()</code>);
* They have an <code>unapply()</code> method, which enables pattern-matching (which I won't use in this case);
* They are serializable, externalizable, clonable, have a free <code>equals()</code> and <code>hashCode()</code> implementation that is correct and consistent with the Java <code>equals()</code> and <code>hashCode()</code> guidelines.

Case classes are an ideal vehicle to transport data with one or more methods. One thing that you must realize when using case classes is that you must use only immutable objects as parameters to your case classes. If you used a mutable object, your case class would suddenly be mutable, and the <code>equals()</code> and <code>hashCode()</code> methods would be useless for you. The Scala compiler has no way to enfore this.

Getting back to our business, the obvious implementation is a no-op:

<% code :lang => :scala do %>import org.joda.time.ReadableInstant
case class Streaming(keywords: Set[String]) {
def tick(now: ReadableInstant) = this
}
<% end %>

One event down, 6 to go! Let's change state : go from the streaming state to the pending keywords change state. I'll start with the simpler case: resetting the keywords list. Here's an implementation:

<% code :lang => :scala do %>@Test
def whenStreaming_andReceiveReset_thenShouldBePending() {
expect(PendingKeywordsChange(Set.empty[String], now)) {
Streaming(KEY1).resetKeywords(now)
}
}
<% end %>

Notice that I'm passing explicit times to my methods. This is to make testing much easier. It doesn't really bother me to send the current time along. It's certainly easier than sending a factory which returns the current time, which would add a level of indirection, and be more complicated.

The implementation is also pretty simple:

<% code :lang => :scala do %>def resetKeywords(now: ReadableInstant) =
PendingKeywordsChange(Set.empty[String], now)
<% end %>

And so on for the other implementations of Streaming. The guts of the state machine are really in the PendingKeywordsChange class. First, a test:

<% code :lang => :scala do %>@Test
def whenStreaming_andReceiveAddKeyword_thenShouldBePending() {
expect(PendingKeywordsChange(KEY1 + "b", now)) {
Streaming(KEY1).addKeyword("b", now)
}
}

case class PendingKeywordsChange(keywords: Set[String],
lastChangedAt: ReadableInstant) {

def addKeyword(keyword: String, now: ReadableInstant) =
PendingKeywordsChange(newKeywords + keyword, now)

}
<% end %>

When I wrote the code, I was really excited by how concise and clear my intents shined through the code. The final method I'd like to show is the <code>tick()</code> method on <code>PendingKeywordsChange</code>:

<% code :lang => :scala do %>@Test
def whenPending_andReceiveTick_andInsufficientTimeHasPassed_thenShouldStayPending() {
expect(PendingKeywordsChange(KEY1, oneMinuteAgo)) {
PendingKeywordsChange(KEY1, oneMinuteAgo).tick(now)
}

expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
PendingKeywordsChange(KEY1, twoMinutesAgo).tick(now)
}

expect(PendingKeywordsChange(KEY1, twoMinutesAgo)) {
PendingKeywordsChange(KEY1, twoMinutesAgo).tick(twoMinutesAgo)
}
}

@Test
def whenPending_andReceiveTick_andSufficientTimeHasPassed_thenShouldBeStreaming() {
expect(Streaming(KEY1, now)) {
PendingKeywordsChange(KEY1, twoMinutesAgo.minus(TimeUnit.SECONDS.toMillis(1))).tick(now)
}
}

// in class PendingKeywordsChange
def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))

def tick(now: ReadableInstant = new Instant()) =
if (now.isAfter(nextChangeAt))
Streaming(newKeywords)
else
this
<% end %>

The final part is where and how I regularly call the tick event. Because the rest of my infrastructure is tied to Akka actors, I used the Akka scheduler API to send a message to an actor, which hid my state machine behind a nice and consistent facade:

<% code :lang => :scala do %>import akka.actor.Actor

case object Tick
case object ResetKeywords
case class AddKeyword(keyword: String)

trait State {
def resetKeywords(now: ReadableInstant): State
def addKeyword(keyword: String, now: ReadableInstant): State
def tick(now: ReadableInstant): State
def keywords: Set[String]
}

case class Streaming(keywords: Set[String],
lastChangedAt: ReadableInstant) extends State {
def resetKeywords(now: ReadableInstant) =
PendingKeywordsChange(Set.empty[String], now)

def addKeyword(keyword: String, now: ReadableInstant) =
PendingKeywordsChange(Set(keyword), now)

def tick(now: ReadableInstant) = this
}

case class PendingKeywordsChange(keywords: Set[String],
lastChangedAt: ReadableInstant) extends State {
def resetKeywords(now: ReadableInstant) =
PendingKeywordsChange(Set.empty[String], now)

def addKeyword(keyword: String, now: ReadableInstant) =
PendingKeywordsChange(keywords + keyword, now)

def nextChangeAt = lastChangedAt.toInstant.plus(TimeUnit.MINUTES.toMillis(2))

def tick(now: ReadableInstant = new Instant()) =
if (now.isAfter(nextChangeAt))
Streaming(keywords)
else
this
}

class KeywordsStateMachine extends Actor {
private var state: State = Streaming(Set.empty[String], new Instant())
private var currentKeywords = Set.empty[String]
private val stream = ... // Twitter4J Streaming API

def receive = {
case Tick =>
state = state.tick(new Instant)
if (state.keywords != currentKeywords)
// reset stream with new keywords

case AddKeyword(keyword) =>
state = state.addKeyword(keyword, new Instant())

case ResetKeywords =>
state = state.resetKeywords(new Instant())
}
}

object Main {
def main(args : Array[String]) {
val keywordsStateMachine =
Actor.actorOf[KeywordsStateMachine].start()

// Send the Tick message to the keywordsStateMachine now,
// and every minute thereafter
Scheduler.schedule(keywordsStateMachine, Tick, 0, 1, TimeUnit.MINUTES)
}
}
<% end %>

A few things contributed to the clarity of the implementation:

* The Joda Time library is really easy to use and understand. You don't need to use java.util.Date or java.util.Calendar : ditch them as soon as you can;
* Scala's case classes reduced boilerplate: no <code>new</code> operator in sight;
* Public by default enhances clarity by removing keywords where they don't matter;
* Libraries, yes, but not when something simple is required. Clarity of implementation before reuse.

Note that this state machine does not have to deal with dropped connections, or rate limiting or anything of the sort, because it's all hidden behind Twitter4J's interface. The resulting state machine is much easier to understand and reason about.

In the interest of fun, profit and learning, I've made available a GitHub repository with similar code to what's here at "streaming-state-machine":https://github.com/francois/streaming-state-machine. The code is different because it doesn't actually connect to Twitter, and thus only needs to demonstrate how changing the states works. The time scale was changed from minutes to seconds.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 7670a5c

Please sign in to comment.