Skip to content
danielblazevski edited this page Oct 11, 2016 · 2 revisions

Introduction

Flink is a distributed data processing tool that combines streaming and batch processing in a unified streaming framework. Some key features of Flink are

  • It is a framework for stateful stream processing. In contrast to other stream processing tools, Flink caches state of the results locally and updates on the fly without needing to store the results in an external database.

  • Flink has event-time semantics and a flexible and performant window API. Flink does not buffer data in its Window, rather does incremental calculations as the data is coming in, which is a more efficient approach than buffering the data.

  • Event-time processing. With caching state at the level of the stream processor, Flink is able to carry out queries on streams of data based on the timestamp in the data, as opposed to, e.g. processing that last second of data to enter your system.

  • Flink has a Complex Event Processing (CEP) library. Again, by caching the state of the stream, pattern matching becomes possible in Flink. Flink has a nice example here about how one can take temperature and power measurements of severs and if two consecutive temperature measurements above a threshold are observed, Flink creates an alert, and all without using an external database.

  • Flink has a graph (Gelly) and a machine learning (Flink-ML) libraries for batch processing, and has a DataSet API that allows for distributed data processing.

Example of streaming job in Java

The Following example is a simplified version of the example of sensor data found here.

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class streamExample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        // create a stream of sensor readings, assign timestamps
        DataStream<Tuple3<Double, Long, String>> readings = env
                .addSource(new SimpleDataGenerator());

        readings
                .keyBy(2)
                .window(TumblingTimeWindows.of(Time.seconds(1)))
                .sum(0)
                .writeAsCsv("out");

        env.execute("Ingestion time example");

    }

The method keyBy is a hash partitioner of incoming data to group incoming data with the same key together.

The examle to generate data is below. Note that this class is only used in this "hello world" example, in most applications of Flink, the data source will be a queueing system like Kafka or Kinesis and the code and dependencies would have to be modified accordingly.

    /* class used to generate a stream of data.
     in many practical problems this is not needed, as your source will likely be from
     a queueing systems like Kafka or Kinesis
    */
    static class SimpleDataGenerator extends RichParallelSourceFunction<Tuple3<Double, Long, String>> {

        private static final int numSensors = 1000;
        private volatile boolean running = true;

        @Override
        public void run(final SourceContext<Tuple3<Double, Long, String>> ctx) throws Exception {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
            final Random rnd = new Random();
            try {
                while (running) {

                    // create a variably delayed event from all sensors
                    for (int i = 0; i < numSensors; i++) {
                        long cur = System.currentTimeMillis();
                        Double reading = rnd.nextDouble();
                        String id = Integer.toString(i);
                        final Tuple3<Double, Long, String> event = new Tuple3(reading, cur, id);

                        exec.schedule(() -> {
                            ctx.collect(event);
                        }, 600, TimeUnit.MILLISECONDS);
                    }
                    Thread.sleep(500);
                }
            } finally {
                exec.shutdownNow();
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

Using Java 8

The above example makes use of Java version 1.8, which allows for lambdas which, in this and many other examples, significantly reduces the lines of code in the example.

Creating a Flink project in IntelliJ

  • Download the community edition of IntelliJ here
  • Click File -> New -> Project...
  • Click Maven and make sure your Project SDK is Java version 1.8. Click on Next
  • Type in flink.example for the GroupID and flinkStreamExample for the ArtifactID
  • Type in flinkStreamExample for project name and click Finish
  • Create a new Java Class via File -> New -> Java Class. Call the Java Class streamExample

Now copy the above two code snippets into that file.

We now need to add in the proper dependencies into the pom.xml file in the root directory. In IntelliJ, open the pom.xml file and add in the following lines

    <properties>
        <flink.version>1.1.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

	<dependencies>
   		<dependency>
       	<groupId>org.apache.flink</groupId>
       	<artifactId>flink-streaming-java_2.10</artifactId>
       	<version>${flink.version}</version>
    	</dependency>

    	<dependency>
        	<groupId>org.apache.flink</groupId>
        	<artifactId>flink-streaming-scala_2.10</artifactId>
        	<version>${flink.version}</version>
    	</dependency>
	</dependencies>
	
  <build>
     <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.1</version>
          <configuration>
            <!-- or whatever version you use -->
            <source>1.8</source>
            <target>1.8</target>
          </configuration>
        </plugin>
      </plugins>	
 	</build>

You should then be propmted that Maven needs to import changes, click on "Import Changes". At this point there should be no text in your code that is underlined in red.

Running locally using IntelliJ

Running this program on your local machine is quite easy, namely

  • Click on Run -> Run -> streamExample

After a few seconds have passed by, you should have a out/ directory in the base directory of the project. This should contain two files, labeled 1 and 2 that contain the sum of each window of sensor readings.

Running locally using the command line

To test your code locally, you can start Flink on your local machine via

/usr/local/flink/bin/start-local.sh

To build a jar that can be run using Flink's runner, use the maven command

mvn install -Pbuild-jar

To run in Flink, use the command

/usr/local/flink/bin/flink run -c flinkStreamExample  target/flink-stream-1.0-SNAPSHOT.jar

Running on a cluster

To build the project use the maven command

mvn install -Pbuild-jar

To run use the command

/usr/local/flink/bin/flink run -c flinkStreamExample  target/flink-stream-1.0-SNAPSHOT.jar

The "-c" is a flag that is used to specify the Java Class path where the Main method lies in.