Simple Example

Lucas Torri edited this page Aug 19, 2013 · 8 revisions

Simple Example - Word Count

The Dempsy Examples repository has several versions of the WordCount example. You can find the final version of this tutorial's code in the userguide-wordcount sub-project.

What does a Dempsy application look like

In order to understand how to use Dempsy you need to understand how a Dempsy application is structured and the best way to see this is through a simple example. The "Hello World" of the BigData applications seems to be the "word counter." If you're familiar with Hadoop then you probably started with the WordCount example. In this simple example let's suppose we have a source of words from some text. In traditional batch based BigData systems the source for these words would be a file or perhaps already partitioned across some distributed storage. In a Dempsy application we're receiving these words in real-time through a data stream. Maybe we're getting all of the live data from a large social media application and we want to calculate the histogram of word occurrences.

The Message Processor Prototype

What would the Dempsy application that accomplishes the above described feat look like? Imagine that each word from this hypothetical live stream of text is broken into its own message. Each of these messages is routed to an instance of a class (let's call that instance a message processor) that has the responsibility to keep track of the count for a single word. That is, there is an instance of Word Counting message processor, per distinct word. For example, every time the word "Google" flows through the stream it's routed to the same message processor (the one dedicated to the word "Google"). Every time the word "is" is encountered, it's routed to another message processor instance. And likewise with the word "Skynet."

How easy would it be to write the code for that message processor class? This way of looking at the problem, that is in a functional programming manner, makes the code fairly simple. It could be as simple as this:

class WordCount
{
  private long count = 0;

  public void countWord(String word)
  {
     count++;
  }
}

Notice, we write the message processor in such a way that we assume each instance is responsible for a single word and that in the larger application there will be a many instances, each operating on their piece of the stream. These instances can be (usually are) spread out over a large number of machines.

Of course, what's missing? How does each word get to its respective message processor? How are the WordCount instances instantiated, deleted, provided their word message? Where are they instantiated? What about synchronization? What about sending out messages? All of these are the primary responsibility of Dempsy, as will be explained.

Message Processor Addressing

At this point we have a nice little piece of POJO functionality completely unmoored from infrastructural concerns. Let's look at how Dempsy handles some of these concerns. As mentioned, one of Dempsy's primary responsibilities is, given a message, find the message processor responsible for handling that message. Now that we have a POJO that accomplishes some business functionality we need to tell Dempsy how they are to be addressed, that is, how Dempsy is to find which message processor is responsible for which messages.

The way Dempsy does this is through the use of a message key. Each message that flows through Dempsy needs to have a message key. Dempsy is annotation driven, so classes that represent messages need to identify a means of obtaining the message's MessageKey through the use of an annotation. An important concept to grasp here is that a MessageKey is essentially the address of a MessageProcessor instance.

In our example, each word is a message.

import com.nokia.dempsy.annotations.MessageKey;

public class Word
{
   private String wordText;

   public Word() {} // see Note on serialization below.

   public Word(String wordText)
   {
      this.wordText = wordText;
   }

   @MessageKey
   public String getWordText()
   {
      return wordText;
   }
}

So when Dempsy receives a message of type Word, it retrieves the key using the annotated method getWordText(). That key will become the address of a Message Processor somewhere on the system. Dempsy will find the Message Processor instance (in this case an instance of the class WordCount) within a cluster of nodes responsible for running the WordCount message processing. In the case that the instance doesn't already exist, Dempsy will clone() a WordCount instance prototype.

If you're paying attention you might notice there's two gaps that need to be filled in the WordCount implementation. First, how is it that Dempsy understands that the WordCount handles the Word message, and second, how is a WordCount prototype cloned (notice the existing WordCount class cannot (yet) simply be cloned()).

This requires us to revisit the WordCount implementation. We need to do several things to satisfy Dempsy:

  1. We need to identify the WordCount class as a MessageProcessor which is done with a class annotation
  2. We need to identify the WordCount.countWord() call as the point where the MessageProcessor handles the message.
  3. WordCount.countWord() needs to be annotated with the @MessageHandler annotation.
  4. WordCount.countWord() needs to take the actual message type. In this case, Word
  5. We need to make sure WordCount is Cloneable.

This would be accomplished by the following:

import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageProcessor;

@MessageProcessor
class WordCount implements Cloneable
{
  private long count = 0;

  @MessageHandler
  public void countWord(Word word)
  {
     count++;
  }

  public Object clone() throws CloneNotSupportedException
  {
     return (WordCount)super.clone();
  }
}

The Dempsy framework now has enough information that it can understand:

  1. How to create instances of the WordCount message processor given an already existing instance it will use as a prototype.
  2. That instances of WordCount handle messages of type Word using the WordCount.countWord() method.
  3. That the key for a given Word message, which represents the "address" of a unique WordCount instance to which the Word message should be routed, is provided by a call on the message instance, Word.getWordText().
Note: Currently the default implementation for the Serializer is Kryo. This at least requires messages to have a default constructor defined (though Kryo supports private default construtor invocations under the right circumstances. See the Kryo documentation for more information). Also, Dempsy can be configured with the standard Java Serializer (though I'm not sure why anyone would ever want to do this).

a word about the message key

It is critical that the Object that Dempsy obtains as the message key (in the example that would be the result of the call to Word.getWordText()) has the appropriate identity semantics. In all cases that means there needs to be a non-default equals() and hashCode() method. The reason for this is partially very obvious: a "unique" message key corresponds to an instance of a message processor so it's important to get the understanding of "unique" correct. The default Object behavior is not adequate. Think of Dempsy as using the key as if it were a key in a HashMap that contained all of the current message processor instances. The default implementation of Object.equals and Object.hashCode wouldn't work given multiple instantiations of the same Word.

But this is not all. Given that instances of a message processor are distributed across many nodes, the default routing behavior of Dempsy uses the hashCode() as a means of determining which node a particular message processor is running on. Therefore, while strictly speaking most Java applications would work (though very poorly) if, for example, the hashCode() method were implemented to simply return 1, this would cause ALL message processors to be instantiate on the same node of a cluster.

In the example, the MessageKey is a java.lang.String which has appropriate identity semantics.  Note: The mesage key is not restricted to only String type but to any type that is hashable.

Adaptors

So where do Word messages come from and how do they get to Dempsy in order to be routed to the appropriate WordCount message processor? Dempsy provides an interface that needs to be implemented by the application developer in order to adapt sources of stream data to the Dempsy message bus. An Adaptor implementation:

  1. will be given a handle to the Dempsy message bus through an interface called a Dispatcher.
  2. will need to obtain data from an external source and use the Dispatcher to send that data onto Dempsy

The API for an Adaptor is very simple so we will extend the Word Count example with the following class:

import com.nokia.dempsy.Adaptor;
import com.nokia.dempsy.Dispatcher;

public class WordAdaptor implements Adaptor
{
   private Dispatcher dempsy;
   ...

   @Override
   public void setDispatcher(Dispatcher dispatcher)
   {
      this.dempsy = dispatcher;
   }

   @Override
   public void start()
   {
      running = true;
      while (running)
      {
         // obtain data from an external source
         String wordString = ... set this from an external source ...

         dempsy.dispatch(new Word(wordString));
      }
   }

   @Override
   public void stop()
   {
      running = false;
   }
}

The WordAdaptor will be instantiated by Dempsy. It will be provided a handle to a Dispatcher. Then Adaptor.start() will be called. The application developer is responsible for creating Dempsy compliant messages (as described above, a message should be Serializable (at least for now), and have a means of obtaining a MessageKey identified) using data from an external source.

Notice the lifecycle. The start() is called from the framework but it never exits. If it ever does exit, it will not be called again without restarting the node that the Adaptor was instantiated in. Note: It's very important that you manage this. You are allowed to exit the start() method whenever you want, either because the Adaptor is finished (if such a case exists) or because you decided to do the work in another thread (or many other threads) but Dempsy will not re-invoke the start() method.

Dempsy will invoke the stop() method to shut down the Adaptor when the node shuts down. Well behaved Adaptors must return from start() at this time, if they had not done so previously. Not doing so will hang the Vm on exit since, by default, the Adaptor is run in a non-daemon thread (though this is a configurable option for ill-behaved Adaptors).

Application Definition

Dempsy uses, and heavily emphasizes, a Dependency Injection based approach to application development. The configuration of an application can use any DI container the user wants to use. Out of the box Dempsy operates with Spring and Guice. The examples that follow will show how to do the configuration by hand which means the translation to any DI container should be obvious to the users of those containers. We will also include Spring examples.

At this point we should begin to have an understanding of what a Dempsy application is. It's a series of instances of message processors across a number of compute nodes, being routed messages based on their keys, and being supplied message data by Adaptors. The configuration of an application is simply a formalization of these specifics. To define (configure) the Word Count application we've been walking through, we need to simply lay out the specifics. Doing this programatically we would have:

import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;

 ...
      ApplicationDefinition myWordCountApplication =
        new ApplicationDefinition("word-count").add(
            new ClusterDefinition("adaptor", new WordAdaptor()),
            new ClusterDefinition("mp", new WordCount()));
 ...

Notice what we are doing here. We are defining the topology of a Dempsy application (and accepting all of the defaults). The application, called "word-count," consists of two clusters, "adaptor" and "mp," the first of which contains our Adaptor which, as we have seen, sources Word messages. This is followed by a message processor whose prototype is an instance of WordCount.

WordCount pipline so far
Fig. 1 Message Processor Lifecycle

Although messages coming from the WordAdaptor flow to the WordCount message processor, the order in the definition doesn't actually matter. Dempsy determines where messages are sent based on the type of the message and the type of object that the MessageHandler on the MessageProcessor takes. In the case of our example, when the WordAdaptor adaptor produces a message of type Word, Dempsy knows that message can be handled by the WordCount message processor because the method WordCount.countWord() (which is annotated with the @MessageHandler annotation) takes the type Word. If there are other message processors that also have handlers that take a Word the messages will be routed to the appropriate message processor within those clusters also.

What do we do with the ApplicationDefinition. That depends on the Dependency Injection framework you're using. If using either Spring or Guice you don't need to do much else to run your application. If you're using a different dependency injection container then you'll need to obtain a reference to the Dempsy object and give it the ApplicationDefinition, but this is a more advanced topic for a later section. Moving forward we will show you how the Spring implementation works.

The above application definition could be defined using Spring as follows:

<beans>
  <bean class="com.nokia.dempsy.config.ApplicationDefinition">
    <constructor-arg value="word-count" />
    <property name="clusterDefinitions">
      <list>
        <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="adaptor"/>
          <property name="adaptor">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
          </property>
        </bean>
        <bean class="com.nokia.dempsy.config.ClusterDefinition">
          <constructor-arg value="mp"/>
          <property name="messageProcessorPrototype">
            <bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
          </property>
        </bean>
      </list>
    </property>
  </bean>
</beans>

Running the example

By default there are two modes that you can run the Dempsy application in. It can be run all within a local Java VM. Or it can be run distributed on a set of machines. For the purposes of this tutorial we will demonstrate how to run it in a local Java VM. This would be easy to set up in an IDE like Eclipse.

There are several "main" implementations provided depending on what mode you're running a Dempsy application in, as well as which DI framework you're using. As a matter of fact the only place any particular DI container is assumed is in these supplied "main" applications so adding other currently unsupported DI containers is straightforward.

To run your application using the Spring container on the command line you would use:

java -Dapplication=WordCount.xml -cp [classpath] com.nokia.dempsy.spring.RunAppInVm

Your classpath will need to contain all of the main Dempsy artifacts plus the Dempsy Spring library. See the section on the codebase structure for more information.

Next section: Terminology