Skip to content

Latest commit

 

History

History
1042 lines (780 loc) · 35.6 KB

scala_api_guide.md

File metadata and controls

1042 lines (780 loc) · 35.6 KB
title
Scala API Programming Guide
Scala Programming Guide =======================

This guide explains how to develop Flink programs with the Scala programming interface.

Here we will look at the general structure of a Scala job. You will learn how to write data sources, data sinks, and operators to create data flows that can be executed using the Flink system.

Writing Scala jobs requires an understanding of Scala, there is excellent documentation available here. Most of the examples can be understood by someone with a good understanding of programming in general, though.

Back to top

Word Count Example ------------------

To start, let's look at a Word Count job implemented in Scala. This program is very simple but it will give you a basic idea of what a Scala job looks like.

import org.apache.flinkclient.LocalExecutor

import org.apache.flinkapi.scala._
import org.apache.flinkapi.scala.operators._

object WordCount {
  def main(args: Array[String]) {
    val input = TextFile(textInput)

    val words = input.flatMap { _.split(" ") map { (_, 1) } }

    val counts = words.groupBy { case (word, _) => word }
      .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }

    val output = counts.write(wordsOutput, CsvOutputFormat())
    val plan = new ScalaPlan(Seq(output))

    LocalExecutor.execute(plan)
  }
}

Same as any Flink job a Scala job consists of one or several data sources, one or several data sinks and operators in between these that transform data. Together these parts are referred to as the data flow graph. It dictates the way data is passed when a job is executed.

When using Scala in Flink an important concept to grasp is that of the DataSet. DataSet is an abstract concept that represents actual data sets at runtime and which has operations that transform data to create a new transformed data set. In this example the TextFile("/some/input") call creates a DataSet[String] that represents the lines of text from the input. The flatMap operation that looks like a regular Scala flatMap is in fact an operation on DataSet that passes (at runtime) the data items through the provided anonymous function to transform them. The result of the flatMap operation is a new DataSet that represents the transformed data. On this other operations be performed. Another such operation are groupBy and reduce, but we will go into details of those later in this guide.

The write operation of DataSet is used to create a data sink. You provide it with a path where the data is to be written to and an output format. This is enough for now but we will discuss data formats (for sources and sinks) later.

To execute a data flow graph one or several sinks have to wrapped in a Plan which can then be executed on a cluster using RemoteExecutor. Here, the LocalExecutor is used to run the flow on the local computer. This is useful for debugging your job before running it on an actual cluster.

Back to top

Project Setup -------------

We will only cover maven here but the concepts should work equivalently with other build systems such as Gradle or sbt. When wanting to develop a Scala job all that is needed as dependency is is flink-scala (and flink-clients, if you want to execute your jobs). So all that needs to be done is to add the following lines to your POM.

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala</artifactId>
    <version>{{site.FLINK_VERSION_STABLE}}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>{{site.FLINK_VERSION_STABLE}}</version>
  </dependency>
</dependencies>

To quickly get started you can use the Flink Scala quickstart available here. This will give you a completeMaven project with some working example code that you can use to explore the system or as basis for your own projects.

These imports are normally enough for any project:

import org.apache.flinkapi.scala._
import org.apache.flinkapi.scala.operators._

import org.apache.flinkclient.LocalExecutor
import org.apache.flinkclient.RemoteExecutor

The first two imports contain things like DataSet, Plan, data sources, data sinks, and the operations. The last two imports are required if you want to run a data flow on your local machine, respectively cluster.

Back to top

The DataSet Abstraction -----------------------

As already alluded to in the introductory example you write Scala jobs by using operations on a DataSet to create new transformed DataSet. This concept is the core of the Flink Scala API so it merits some more explanation. A DataSet can look and behave like a regular Scala collection in your code but it does not contain any actual data but only represents data. For example: when you use TextFile() you get back a DataSource[String] that represents each line of text in the input as a String. No data is actually loaded or available at this point. The set is only used to apply further operations which themselves are not executed until the data flow is executed. An operation on DataSet creates a new DataSet that represents the transformation and has a pointer to the DataSet that represents the data to be transformed. In this way a tree of data sets is created that contains both the specification of the flow of data as well as all the transformations. This graph can be wrapped in a Plan and executed.

Working with the system is like working with lazy collections, where execution is postponed until the user submits the job.

DataSet has a generic parameter, this is the type of each data item or record that would be processed by further transformations. This is similar to how List[A] in Scala would behave. For example in:

val input: DataSet[(String, Int)] = ...
val mapped = input map { a => (a._1, a._2 + 1)}

The anonymous function would retrieve in a tuples of type (String, Int).

Back to top

Data Types ----------

There are some restrictions regarding the data types that can be used in Scala jobs (basically the generic parameter of DataSet). The usable types are the primitive Scala types, case classes (which includes tuples), and custom data types.

Custom data types must implement the interface {% gh_link /flink-core/src/main/java/org/apache/flink/types/Value.java "Value" %}. For custom data types that should also be used as a grouping key or join key the {% gh_link /flink-core/src/main/java/org/apache/flink/types/Key.java "Key" %} interface must be implemented.

Back to top

Creating Data Sources ---------------------

To get an initial DataSet on which to perform operations to build a data flow graph the following construct is used:

val input = DataSource("<file-path>", <input-format>)

The value input is now a DataSet with the generic type depending on the input format.

The file path can be on of either file:///some/file to acces files on the local machine or hdfs://some/path to read files from HDFS. The input format can be one of our builtin formats or a custom input format. The builtin formats are:

We will now have a look at each of them and show how they are employed and in which situations.

Back to top

#### TextInputFormat

This input format simply reads a text file line wise and creates a String for each line. It is used as:

TextInputFormat()

As you have already seen in the Word Count Example there is a shortcut for this. Instead of using a DataSource with TextInputFormat you can simply write:

val input = TextFile("<file-path>")

The input would then be a DataSet[String].

Back to top

#### CsvInputFormat

This input format is mainly used to read Csv-Files, as the name suggests. Input files must be text files. You can specify the String that should be used as the separator between individual records (this would often be newline) and also the separator between fields of a record (this would often be a comma). The CsvInputFormat will automatically read the records and create Scala tuples or custom case class objects for you. The format can be used in one of the following ways:

CsvInputFormat[Out]()
CsvInputFormat[Out](recordDelim: String)
CsvInputFormat[Out](recordDelim: String, fieldDelim: Char)

CsvInputFormat[Out](fieldIndices: Seq[Int])
CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String)
CsvInputFormat[Out](fieldIndices: Seq[Int], recordDelim: String, fieldDelim: Char)

The default record delimiter is a newline, the default field delimiter is a comma. The type parameter Out must be a case class type, which also includes tuple types since they are internally case classes.

Normally, all the fields of a record are read. If you want to explicitly specify which fields of the record should be read you can use one of the tree variants with a fieldIndices parameter. Here you give a list of the fields that should be read. Field indices start from zero.

An example usage could look as follows:

val input = DataSource("file:///some/file", CsvInputFormat[(Int, Int, String)](Seq(1, 17, 42), "\n", ','))

Here only the specified fields would be read and 3-tuples created for you. The type of input would be DataSet[(Int, Int, String)].

Back to top

#### DelimitedInputFormat

This input format is meant for textual records that are separated by some delimiter. The delimiter could be a newline, for example. It is used like this:

DelimitedInputFormat[Out](parseFunction: String => Out, delim: String = "\n")

The input files will be split on the supplied delimiter (or the default newline) and the supplied parse function must parse the textual representation in the String and return an object. The type of this object will then also be the type of the DataSet created by the DataSource operation.

Just as with BinaryInputFormat the function can be an anonymous function, so you could have:

val input = DataSource("file:///some/file", BinaryInputFormat( { line =>
  line match {
    case EdgeInputPattern(from, to) => Path(from.toInt, to.toInt, 1)
  }
}))

In this example EdgeInputPattern is some regular expression used for parsing a line of text and Path is a custom case class that is used to represent the data. The type of input would in this case be DataSet[Path].

Back to top

#### BinaryInputFormat

This input format is best used when you have a custom binary format that you store the data in. It is created using one of the following:

BinaryInputFormat[Out](readFunction: DataInput => Out)
BinaryInputFormat[Out](readFunction: DataInput => Out, blocksize: Long)

So you have to provide a function that gets a java.io.DataInput and returns the object that contains the data. The type of this object will then also be the type of the DataSet created by the DataSource operation.

The provided function can also be an anonymous function, so you could have something like this:

val input = DataSource("file:///some/file", BinaryInputFormat( { input =>
  val one = input.readInt
  val two = input.readDouble
  (one, two)  
}))

Here input would be of type DataSet[(Int, Double)].

Back to top

#### BinarySerializedInputFormat

This input format is only meant to be used in conjunction with BinarySerializedOutputFormat. You can use these to write elements to files using a Flink-internal format that can efficiently be read again. You should only use this when output is only meant to be consumed by other Flink jobs. The format can be used on one of two ways:

BinarySerializedInputFormat[Out]()
BinarySerializedInputFormat[Out](blocksize: Long)

So if input files contain elements of type (String, Int) (a tuple type) you could use:

val input = DataSource("file:///some/file", BinarySerializedInputFormat[(String, Int)]())

Back to top

#### FixedLengthInputFormat

This input format is for cases where you want to read binary blocks of a fixed size. The size of a block must be specified and you must provide code that reads elements from a byte array.

The format is used like this:

FixedLengthInputFormat[Out](readFunction: (Array[Byte], Int) => Out, recordLength: Int)

The specified function gets an array and a position at which it must start reading the array and returns the element read from the binary data.

Back to top

Operations on DataSet ---------------------

As explained in Programming Model, a Flink job is a graph of operators that process data coming from sources that is finally written to sinks. When you use the Scala front end these operators as well as the graph is created behind the scenes. For example, when you write code like this:

val input = TextFile("file:///some/file")

val words = input.map { x => (x, 1) }

val output = counts.write(words, CsvOutputFormat()))

val plan = new ScalaPlan(Seq(output))

What you get is a graph that has a data source, a map operator (that contains the code written inside the anonymous function block), and a data sink. You do not have to know about this to be able to use the Scala front end but it helps to remember, that when you are using Scala you are building a data flow graph that processes data only when executed.

There are operations on DataSet that correspond to all the types of operators that the Flink system supports. We will shortly go trough all of them with some examples.

Back to top

#### Basic Operator Templates

Most of the operations have three similar versions and we will explain them here for all of the operators together. The three versions are map, flatMap, and filter. All of them accept an anonymous function that defines what the operation does but the semantics are different.

The map version is a simple one to one mapping. Take a look at the following code:

val input: DataSet[(String, Int)]

val mapped = input.map { x => (x._1, x._2 + 3) }

This defines a map operator that operates on tuples of String and Int and just adds three to the Int (the second fields of the tuple). So, if the input set had the tuples (a, 1), (b, 2), and (c, 3) the result after the operator would be (a, 4), (b, 5), and (c, 6).

The flatMap version works a bit differently, here you return something iterable from the anonymous function. The iterable could be a list or an array. The elements in this iterable are unnested. So for every element in the input data you get a list of elements. The concatenation of those is the result of the operator. If you had the following code:

val input: DataSet[(String, Int)]

val mapped = input.flatMap { x => List( (x._1, x._2), (x._1, x._2 + 1) ) }

and as input the tuples (a, 1) and (b, 1) you would get (a, 1), (a, 2), (b, 1), and (b, 2) as result. It is one flat list, and not the individual lists returned from the anonymous function.

The third template is filter. Here you give an anonymous function that returns a Boolean. The elements for which this Boolean is true are part of the result of the operation, the others are culled. An example for a filter is this code:

val input: DataSet[(String, Int)]

val mapped = input.filter { x => x._2 >= 3 }

Back to top

#### Field/Key Selectors

For some operations (group, join, and cogroup) it is necessary to specify which parts of a data type are to be considered the key. This key is used for grouping elements together for reduce and for joining in case of a join or cogroup. In Scala the key is specified using a special anonymous function called a key selector. The key selector has as input an element of the type of the DataSet and must return a single value or a tuple of values that should be considered the key. This will become clear with some examples: (Note that we use the reduce operation here as an example, we will have a look at that further down):

val input: DataSet[(String, Int)]
val reduced = input groupBy { x => (x._1) } reduce { ... }
val reduced2 = input groupBy { case (w, c) => w } reduce { ... }

case class Test(a: String, b: Int, c: Int)
val input2: DataSet[Test]
val reduced3 = input2 groupBy { x => (x.a, x.b) } reduce { ... }
val reduced4 = input2 groupBy { case Test(x,y,z) => (x,y) } reduce { ... }

The anonymous function block passed to groupBy is the key selector. The first two examples both specify the String field of the tuple as key. In the second set of examples we see a custom case class and here we select the first two fields as a compound key.

It is worth noting that the key selector function is not actually executed at runtime but it is parsed at job creation time where the key information is extracted and stored for efficient computation at runtime.

Map Operation

Map is an operation that gets one element at a time and can output one or several elements. The operations that result in a MapOperator in the graph are exactly those mentioned in the previous section. For completeness' sake we will mention their signatures here (in this and the following such lists In is the type of the input data set, DataSet[In]):

def map[Out](fun: In => Out): DataSet[Out]
def flatMap[Out](fun: In => Iterator[Out]): DataSet[Out]
def filter(fun: In => Boolean): DataSet[Out]

Reduce Operation

Reduce is an operation that looks at groups of elements at a time and can, for one group, output one or several elements. To specify how elements should be grouped you need to give a key selection function, as explained above.

The basic template of the reduce operation is:

input groupBy { <key selector> } reduce { <reduce function> }

The signature of the reduce function depends on the variety of reduce operation selected. There are right now three different versions:

def reduce(fun: (In, In) => In): DataSet[In]

def reduceGroup[Out](fun: Iterator[In] => Out): DataSet[Out]
def combinableReduceGroup(fun: Iterator[In] => In): DataSet[In]

The reduce variant is like a reduceLeft on a Scala collection with the limitation that the output data type must be the same as the input data type. You specify how to elements of the selection should be combined, this is then used to reduce the elements in one group (of the same key) down to one element. This can be used to implement aggregation operators, for example:

val words: DataSet[(String, Int)]
val counts = words.groupBy { case (word, count) => word}
  .reduce { (w1, w1) => (w1._1, w1._2 + w2._2) }

This would add up the Int fields of those tuples that have the same String in the first fields. As is for example required in Word Count.

The reduceGroup variant can be used when more control is required. Here your reduce function gets an Iterator that can be used to iterate over all the elements in a group. With this type or reduce operation the output data type can be different from the input data type. An example of this kind of operation is this:

val words: DataSet[(String, Int)]
val minCounts = words.groupBy { case (word, count) => word}
  .reduceGroup { words => words.minBy { _._2 } }

Here we use the minBy function of Scala collections to determine the element with the minimum count in a group.

The combinableGroupReduce works like the groupReduce with the difference that the reduce operation is combinable. This is an optimization one can use, please have a look at Programming Model for the details.

Join Operation

The join operation is similar to a database equi-join. It is a two input iteration where you have to specify a key selector for each of the inputs and then the anonymous function is called for every pair of matching elements from the two input sides.

The basic template is:

input1 join input2 where { <key selector 1> } isEqualTo { <key selector 2>} map { <join function> }

or, because lines will get to long fast:

input1.join(input2)
  .where { <key selector 1> }
  .isEqualTo { <key selector 2>}
  .map { <join function> }

(Scala can sometimes be quite finicky about where you can omit dots and parentheses, so it's best to use dots in multi-line code like this.)

As mentioned in here there are three versions of this operator, so you can use one of these in the last position:

def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]

One example where this can be used is database-style joining with projection:

input1.join(input2)
  .where { case (a, b, c) => (a, b) }
  .isEqualTo { case (a, b, c, d) => (c, d) }
  .map { (left, right) => (left._3, right._1) }

Here the join key for the left input is a compound of the first two tuple fields while the key for the second input is a compound of the last two fields. We then pick one field each from both sides as the result of the operation.

CoGroup Operation

The cogroup operation is a cross between join and reduce. It has two inputs and you have to specify a key selector for each of them. This is where the similarities with join stop. Instead of having one invocation of your user code per pair of matching elements all elements from the left and from the right are grouped together for one single invocation. In your function you get an Iterator for the elements from the left input and another Iterator for the elements from the right input.

The basic template is:

input1 cogroup input2 where { <key selector 1> } isEqualTo { <key selector 2>} map { <cogroup function> }

or, because lines will get to long fast:

input1.cogroup(input2)
  .where { <key selector 1> }
  .isEqualTo { <key selector 2>}
  .map { <cogroup function> }

There are to variants you can use, with the semantics explained here.

def map[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Out): DataSet[Out]
def flatMap[Out](fun: (Iterator[LeftIn], Iterator[RightIn]) => Iterator[Out]): DataSet[Out]

Cross Operation

The cross operation is used to form the Cartesian product of the elements from two inputs. The basic template is:

input1 cross input2 map { <cogroup function> }

Again there are three variants, with the semantics explained here.

def map[Out](fun: (LeftIn, RightIn) => Out): DataSet[Out]
def flatMap[Out](fun: (LeftIn, RightIn) => Iterator[Out]): DataSet[Out]
def filter(fun: (LeftIn, RightIn) => Boolean): DataSet[(LeftIn, RightIn)]

Union

When you want to have the combination of several data sets as the input of an operation you can use a union to combine them. It is used like this

val input1: DataSet[String]
val input2: DataSet[String]
val unioned = input1.union(input2)

The signature of union is:

def union(secondInput: DataSet[A])

Where A is the generic type of the DataSet on which you execute the union.

Back to top

Iterations ----------

Iterations allow you to implement loops in Flink programs. This page gives a general introduction to iterations. This section here provides quick examples of how to use the concepts using the Scala API. The iteration operators encapsulate a part of the program and execute it repeatedly, feeding back the result of one iteration (the partial solution) into the next iteration. Flink has two different types of iterations, Bulk Iteration and Delta Iteration.

For both types of iterations you provide the iteration body as a function that has data sets as input and returns a new data set. The difference is that bulk iterations map from one data set two one new data set while delta iterations map two data sets to two new data sets.

Bulk Iteration

The signature of the bulk iterate method is this:

def iterate(n: Int, stepFunction: DataSet[A] => DataSet[A])

where A is the type of the DataSet on which iterate is called. The number of steps is given in n. This is how you use it in practice:

val dataPoints = DataSource(dataPointInput, DelimitedInputFormat(parseInput))
val clusterPoints = DataSource(clusterInput, DelimitedInputFormat(parseInput))

def kMeansStep(centers: DataSet[(Int, Point)]) = {

  val distances = dataPoints cross centers map computeDistance
  val nearestCenters = distances.groupBy { case (pid, _) => pid }
    .reduceGroup { ds => ds.minBy(_._2.distance) } map asPointSum.tupled
  val newCenters = nearestCenters.groupBy { case (cid, _) => cid }
    .reduceGroup sumPointSums map { case (cid, pSum) => cid -> pSum.toPoint() }

  newCenters
}

val finalCenters = clusterPoints.iterate(numIterations, kMeansStep)

val output = finalCenters.write(clusterOutput, DelimitedOutputFormat(formatOutput.tupled))

Not that we use some functions here which we don't show. If you want, you can check out the complete code in our KMeans example.

Delta Iteration

The signature of the delta iterate method is this:

def iterateWithDelta(workset: DataSet[W], solutionSetKey: A => K, stepFunction: (DataSet[A], DataSet[W]) => (DataSet[A], DataSet[W]), maxIterations: Int)

where A is the type of the DataSet on which iterateWithDelta is called, W is the type of the DataSet that represents the workset and K is the key type. The maximum number of iterations must always be given.

For information on how delta iterations in general work on our system, please refer to iterations. A working example job is available here: Scala Connected Components Example

Back to top

Creating Data Sinks -------------------

The creation of data sinks is analog to the creation of data sources. DataSet has a write method that is used to create a sink that writes the output of the operation to a file in the local file system or HDFS. The general pattern is this:

val sink = out.write("<file-path>", <output-format>)

Where out is some DataSet. Just as for data sources, the file path can be on of either file:///some/file to acces files on the local machine or hdfs://some/path to read files from HDFS. The output format can be one of our builtin formats or a custom output format. The builtin formats are:

We will now have a look at each of them and show how they are employed and in which situations.

Back to top

#### DelimitedOutputFormat

This output format is meant for writing textual records that are separated by some delimiter. The delimiter could be a newline, for example. It is used like this:

DelimitedOutputFormat[In](formatFunction: In => String, delim: String = "\n")

For every element in the DataSet the formatting function is called and the result of that is appended to the output file. In between the elements the delim string is inserted.

An example would be:

val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", DelimitedOutputFormat( { elem =>
  "%s|%d".format(elem._1, elem._2)
}))

Here we use Scala String formatting to write the two fields of the tuple separated by a pipe character. The default newline delimiter will be inserted between the elements in the output files.

Back to top

#### CsvOutputFormat

This output format can be used to automatically write fields of tuple elements or case classes to CSV files. You can specify what separator should be used between fields of an element and also the separator between elements.

CsvOutputFormat[In]()
CsvOutputFormat[In](recordDelim: String)
CsvOutputFormat[In](recordDelim: String, fieldDelim: Char)

The default record delimiter is a newline, the default field delimiter is a comma.

An example usage could look as follows:

val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", CsvOutputFormat())

Notice how we don't need to specify the generic type here, it is inferred.

Back to top

#### RawOutputFormat

This input format can be used when you want to have complete control over what gets written. You get an OutputStream and can write the elements of the DataSet exactly as you see fit.

A RawOutputFormat is created like this:

RawOutputFormat[In](writeFunction: (In, OutputStream) => Unit)

The function you pass in gets one element from the DataSet and must write it to the given OutputStream. An example would be the following:

val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", RawOutputFormat( { (elem, output) =>
  /* write elem._1 and elem._2 to output */ 
}))
#### BinaryOutputFormat

This format is very similar to the RawOutputFormat. The difference is that instead of an OutputStream you get a DataOutput to which you can write binary data. You can also specify the block size for the binary output file. When you don't specify a block size some default is used.

A BinaryOutputFormat is created like this:

BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit)
BinaryOutputFormat[In](writeFunction: (In, DataOutput) => Unit, blockSize: Long)

Back to top

#### BinarySerializedOutputFormat

This output format is only meant to be used in conjunction with BinarySerializedInputFormat. You can use these to write elements to files using a Flink-internal format that can efficiently be read again. You should only use this when output is only meant to be consumed by other Flink jobs. The output format can be used on one of two ways:

BinarySerializedOutputFormat[In]()
BinarySerializedOutputFormat[In](blocksize: Long)

So to write elements of some DataSet[A] to a binary file you could use:

val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", BinarySerializedInputFormat())

As you can see the type of the elements need not be specified, it is inferred by Scala.

Back to top

Executing Jobs --------------

To execute a data flow graph the sinks need to be wrapped in a {% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala/ScalaPlan.scala "ScalaPlan" %} object like this:

val out: DataSet[(String, Int)]
val sink = out.write("file:///some/file", CsvOutputFormat())

val plan = new ScalaPlan(Seq(sink))

You can put several sinks into the Seq that is passed to the constructor.

There are two ways one can execute a data flow plan: local execution and remote/cluster execution. When using local execution the plan is executed on the local computer. This is handy while developing jobs because you can easily debug your code and iterate quickly. When a job is ready to be used on bigger data sets it can be executed on a cluster. We will now give an example for each of the two execution modes.

First up is local execution:

import org.apache.flinkclient.LocalExecutor

...

val plan: ScalaPlan = ...
LocalExecutor.execute(plan)

This is all there is to it.

Remote (or cluster) execution is a bit more complicated because you have to package your code in a jar file so that it can be distributed on the cluster. Have a look at the scala quickstart to see how you can set up a maven project that does the packaging. Remote execution is done using the {% gh_link /flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java "RemoteExecutor" %}, like this:

import org.apache.flinkclient.RemoteExecutor

...

val plan: ScalaPlan = ...
val ex = new RemoteExecutor("<job manager ip address>", <job manager port>, "your.jar");
ex.executePlan(plan);

The IP address and the port of the Flink job manager depend on your setup. Have a look at cluster quickstart for a quick guide about how to set up a cluster. The default cluster port is 6123, so if you run a job manger on your local computer you can give this and "localhost" as the first to parameters to the RemoteExecutor constructor.

Back to top

Rich Functions --------------

Sometimes having a single function that is passed to an operation is not enough. Using Rich Functions it is possible to have state inside your operations and have code executed before the first element is processed and after the last element is processed. For example, instead of a simple function as in this example:

val mapped = input map { x => x + 1 }

you can have a rich function like this:

val mapped = input map( new MapFunction[(String, Int), (String, Int)] {
  val someState: SomeType = ...
  override def open(config: Configuration) = {
    // one-time initialization code
  }

  override def close() = {
    // one-time clean-up code
  }

  override def apply(in: (String, Int)) = {
    // do complex stuff
    val result = ...
    result
  }
})

You could also create a custom class that derives from MapFunction instead of the anonymous class we used here.

There are rich functions for all the various operator types. The basic template is the some, though. The common interface that they implement is {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/functions/Function.java "Function" %}. The open and close methods can be overridden to run set-up and tear-down code. The other methods can be used in a rich function to work with the runtime context which gives information about the context of the operator. Your operation code must now reside in an apply method that has the same signature as the anonymous function you would normally supply.

The rich functions reside in the package org.apache.flinkapi.scala.functions. This is a list of all the rich functions can can be used instead of simple functions in the respective operations:

abstract class MapFunction[In, Out] 
abstract class FlatMapFunction[In, Out] 
abstract class FilterFunction[In, Out] 

abstract class ReduceFunction[In]
abstract class GroupReduceFunction[In, Out]
abstract class CombinableGroupReduceFunction[In, Out]

abstract class JoinFunction[LeftIn, RightIn, Out]
abstract class FlatJoinFunction[LeftIn, RightIn, Out]

abstract class CoGroupFunction[LeftIn, RightIn, Out]
abstract class FlatCoGroupFunction[LeftIn, RightIn, Out]

abstract class CrossFunction[LeftIn, RightIn, Out]
abstract class FlatCrossFunction[LeftIn, RightIn, Out]

Note that for all the rich stubs, you need to specify the generic type of the input (or inputs) and the output type.

Back to top