Skip to content
This repository has been archived by the owner on Dec 31, 2020. It is now read-only.
Eron Wright edited this page Apr 18, 2017 · 12 revisions

flink-tensorflow

Welcome! flink-tensorflow (FTF) is an open-source library for machine intelligence in Apache Flink, using TensorFlow for numerical computation within a Flink program. A TensorFlow model becomes a function that you can use to transform a stream. This can be combined with Flink connectors and other Flink libraries, to produce scalable, stateful, intelligent stream processing applications.

The flink-tensorflow library is intended for use with the Flink Scala API.

Note: flink-tensorflow doesn't provide a way to run ordinary TensorFlow programs unaltered. It provides a way for Flink programs to leverage the TensorFlow library while retaining the full power of Flink.

More information:

Getting Started

  • See the Building section to build the flink-tensorflow library
  • See the Developing an Application section to package your Flink application with the TensorFlow dependencies.
  • See the various example programs, e.g. Johnny which demonstrates the use of a well-known image-labeling model called 'Inception' to identify a time-based sequence of images.

The below sections cover the key concepts of the library.

Introducing TensorFlow

TensorFlow has a lot in common with Apache Flink. It is based on the dataflow programming model, where you define a graph of numerical computations over a stream of data records. TensorFlow executes the computations using the CPU and GPU (when available). An interesting aspect of its design is, the 'pump' that streams records thru the graph is not internal to TensorFlow; it is the responsibility of application code. With flink-tensorflow, the Flink dataflow engine acts as the pump!

All inputs and outputs to a TensorFlow graph are stored as multi-dimensional arrays called tensors. This library provides various converters to read and write data records as tensors.

TensorFlow Models

A model is a pre-built TensorFlow graph with associated data and with well-defined interfaces. A given model might support image classification, text analysis, regression, or other forms of inference. An advanced model might even be stateful, learning from input data or identifying sequences using internal state.

TensorFlow defines a standard format for such models called the saved model format. The flink-tensorflow library fully supports the saved model format. It is also possible to use arbitrary TensorFlow graphs.

Using TensorFlow in Flink

The core functionality of the library is to enable the use of TensorFlow models within Flink data transformations (e.g. map, window, iterate). To interoperate with a diverse range of transformations, the library is designed to work with any transformation function.

Importing a TensorFlow Model

A good TensorFlow model exposes its functionality as functions over defined inputs. The signature of a given function (which we call a method) may be standardized across many models, allowing for mix-and-match of models and tools. For example, some models implement a classification method. A given model may implement numerous methods.

To use a given model in Flink, you implement the Model class and define the functions that the model supports. Reuse the standard method signatures (e.g. RegressionMethod, ClassificationMethod) where possible.

A model consists of a graph definition, stored variable data (such as pre-trained weights), and metadata defining the methods supported by the model. You have some flexibility as to how Flink obtains this information about the model.

  1. Use a model that is encoded in the standard saved model format and stored on the filesystem.
  2. Use an exported TensorFlow graph, supplemented with metadata that you provide.
  3. Construct an in-memory TensorFlow graph.

See the Models section for more information.

Using a Model

With a model defined as described above, you can now use the model in a Flink transformation function. There's two aspects:

  1. Integrating the model into the function lifecycle. Flink functions participate in lifecycle events using the RichFunction interface. The model must likewise participate. For this purpose, the library provides abstract function classes, and alternatively a mix-in trait called ModelAwareFunction.

  2. Processing input records using the functions exposed by the model. Convert your input records to tensor(s), apply the model function, and then convert the output tensors to output records.

Managing Tensors

Most interactions with TensorFlow involve the use of tensors, which are multi-dimensional arrays stored in off-heap memory allocated by TensorFlow's native engine. Tensors aren't garbage-collected by Java and must be closed after use.

ARM

To simplify the safe use of tensors, flink-tensorflow relies on the Scala Automatic Resource Management (ARM) library. The library provides type classes to treat tensors as ARM managed resources. The invocation of a model function produces a resource container containing output tensors. You can use an imperative or monadic style to convert tensor data to output records.

TensorValue

To use a tensor as the data type of a Flink data stream, wrap the tensor in a TensorValue object. TensorValue instances are on-heap, serializable objects that may be converted to/from tensors.