Skip to content
markheger edited this page Jul 6, 2020 · 13 revisions

Scala Support

Overview

Scala: http://www.scala-lang.org/

Since Scala’s interoperability with Java is completely unparalleled(*) Scala code can call the Java Application API to build a streaming application. This means a Scala developer can build applications for IBM Streams using Scala code, including Scala anonymous functions and Scala objects for tuples in streams.

Since Scala will call the Java Application API it is recommended to read the getting started guide and take a look at the samples.

(*) http://www.codecommit.com/blog/java/interop-between-java-and-scala

Developing an Application

Just like Java, the Scala application creates a Topology object:

object FizzBuzzScala {
  def main(args: Array[String]) {
  val topology = new Topology("FizzBuzzScala")

This is from the FizzBuzzScala sample application, its full source is here - https://github.com/IBMStreams/streamsx.topology/blob/develop/samples/scala/src/games/FizzBuzzScala.scala

Then source streams are created, here a single stream of type TStream[Long] containing integral values is created using a utility method:

var counting = BeaconStreams.longBeacon(topology)

Then functional transformations are applied to the streams to implement the required application analytics, here the stream of numbers is converted to a stream of strings following the rules of FizzBuzz.

var shouts = counting.transform(
   (c:java.lang.Long) => {
       if (c == 0)
            null
       else {
            var shout = ""
            if (c % 3 == 0)
               shout = "Fizz"
            if (c % 5 == 0)
               shout += "Buzz"

            if (shout.isEmpty())
               c.toString()
            else
               shout + "!"
       }
   }  , classOf[String]);

An anonymous function is used to provide the functional transformation, the anonymous function is implicitly converted to the required Java function class (Function<Long,String> in this case). This occurs because the Scala code imports FunctionConversions provided by com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar.

import com.ibm.streamsx.topology.functions.FunctionConversions._

Note also that the required tuple type (Java's Class<T>) is defined by the Scala expression classOf[String], equivalent to Java's String.class.

Submitting an application

Once the topology has been built, the application is submitted,

val future = StreamsContextFactory.getStreamsContext("EMBEDDED").submit(topology)

Here the application is submitted to run embedded, within the current JVM. This works even when the Scala application is executed using scala. Streaming applications written in Scala may be submitted to any context supported by the Java Application API, including DISTRIBUTED to support executing using a distributed IBM Streams instance.

Scala objects as tuples

Scala objects can be used as tuples on a stream, TStream[T]. Like Java, the object must be serializable, so a simple example is:

@SerialVersionUID(1L)
class Person(name: String, val age: Int)  extends Serializable {
    override def toString() : String = name + " is " + age 
}

and then a simple example of its use is:

val emma = List(new Person("Emma", 20), new Person("George", 37), new Person("Harriet", 17), new Person("Jane", 20))
var peopleStream = topology.constants(emma, classOf[Person])
peopleStream = peopleStream.filter((p : Person) => p.age >= 20)
var strings = StringStreams.toString(peopleStream)

The initial stream contains four Scala Person objects, which are then filtered based upon the person's age before being converted into a stream of strings.

Note that the Scala List was implicitly converted to a java.util.List through the set of implicit conversions provided by:

import scala.collection.JavaConversions._

Specific functional transformations

Examples of specific calls into the Java Application API using Scala functions.

Source streams / Supplier

Create a source stream that uses a Scala Iterable, in this case a List.

val emma = List(new Person("Emma", 20), new Person("George", 37), new Person("Harriet", 17), new Person("Jane", 20))
val getCharacters : () => java.util.List[Person] = () =>  { emma };  
var characters : TStream[Person] = topology.source(getCharacters, classOf[Person])

Note:

  • getCharacters function needs to explicitly declare its Java collection type java.util.List[Person].
  • The Scala collection is implicitly converted to the Java collection using scala.collection.JavaConversions.
  • The type of characters stream needs to be explicitly declared TStream[Person] .
  • The Scala function getCharacters is implicitly converted to com.ibm.streams.topology.function.Supplier<String> using com.ibm.streamsx.topology.functions.FunctionConversions.

Transform

Transform a stream of Person into a stream of String using an anonymous function.

var strings : TStream[String] = peopleStream.transform((p : Person) => p.toString(), classOf[String])

Note:

  • The type of strings stream needs to be explicitly declared TStream[String] .
  • The Scala anonymous function is implicitly converted to com.ibm.streams.topology.function.Function<Person,String> using com.ibm.streamsx.topology.functions.FunctionConversions.

MultiTransform

Split sentences into individual words.

val source = topology.strings("mary had a little lamb",
        "its fleece was white as snow");
val words : TStream[String] = source.multiTransform(
   (s : String) =>
        {
           val sseq : Seq[String] = s.split(" ")
           sseq
        } : java.lang.Iterable[String] , classOf[String]);
  • The type of words stream needs to be explicitly declared TStream[String] .
  • The Scala anonymous function needs to be explicitly declare its return type java.lang.Iterable[String].
  • The Scala anonymous function is implicitly converted to com.ibm.streams.topology.function.Function<String,Iterable<String>> using com.ibm.streamsx.topology.functions.FunctionConversions.
  • The Scala Seq[String] is implicitly converted to java.lang.Iterable<String> using scala.collection.JavaConversions.

Aggregate

From a stream of Person tuples (TStream[Person]) continually aggregate to find the oldest person in the last three tuples.

var characterWindow = characters.last(3);

var oldestPerson = characterWindow .aggregate((people: java.util.List[Person]) => {
  var oldest: Person = new Person("", -1)
  people.foreach { person =>
    if (person.age > oldest.age)
      oldest = person
  }
  oldest
},
  classOf[Person])

Note

  • The Scala anonymous function for aggregation needs to be explicitly declare its parameter type as the Java type java.lang.List[Person].

Details

The version of Scala used is defined by the value of the environment variable SCALA_HOME. When a IBM Streams application bundle is created, then $SCALA_HOME/lib/scala-library.jar is copied into the bundle for use during application execution.

These libraries must be added to the Scala classpath for compilation and execution:

  • com.ibm.streamsx.topology/lib/com.ibm.streamsx.topology.jar - Scala & Java Application APIs for IBM Streams
  • $STREAMS_INSTALL/lib/com.ibm.streams.operator.samples.jar - IBM Streams Java Operator API and its samples

When compiling with scalac the flag '-usemanifestcp` is required.