Skip to content
Permalink
Browse files
[FLINK-13863] Update playgrounds to Flink 1.9.0
* Update Operations Playground (example job, dockerfile, docker-compose.yaml)
* Update README.md
  • Loading branch information
fhueske committed Aug 27, 2019
1 parent 626ca07 commit ffd349616ac91528090ed8dff0faf369666ce163
Showing 8 changed files with 52 additions and 28 deletions.
@@ -11,7 +11,7 @@ Currently, the following playgrounds are available:

* The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example
Flink job. The playground is presented in detail in the
["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.
["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation.

* The interactive SQL playground is still under development and will be added shortly.

@@ -32,7 +32,7 @@ RUN mvn clean install
# Build Operations Playground Image
###############################################################################

FROM flink:1.8.1-scala_2.11
FROM flink:1.9.0-scala_2.11

WORKDIR /opt/flink/bin

@@ -22,7 +22,7 @@ under the License.

<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
<version>1-FLINK-1.8_2.11</version>
<version>1-FLINK-1.9_2.11</version>

<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
@@ -44,7 +44,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.8.1</flink.version>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
@@ -86,10 +86,11 @@ public long extractTimestamp(final ClickEvent element) {
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter")
.addSink(new FlinkKafkaProducer<ClickEventStatistics>(
.addSink(new FlinkKafkaProducer<>(
outputTopic,
new ClickEventStatisticsSerializationSchema(),
kafkaProps))
new ClickEventStatisticsSerializationSchema(outputTopic),
kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
.name("ClickEventStatistics Sink");

env.execute("Click Event Count");
@@ -18,11 +18,10 @@

package org.apache.flink.playgrounds.ops.clickcount;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,12 +66,13 @@ public static void main(String[] args) throws Exception {
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);

ClickIterator clickIterator = new ClickIterator();
SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema();

while (true) {

byte[] message = clickSerializer.serialize(clickIterator.next());
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message);
ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize(
clickIterator.next(),
null);

producer.send(record);

Thread.sleep(DELAY);
@@ -17,28 +17,37 @@

package org.apache.flink.playgrounds.ops.clickcount.records;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
* A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON.
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON.
*
*/
public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> {
public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> {

private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;

public ClickEventSerializationSchema(){
}

public ClickEventSerializationSchema() {
super();
public ClickEventSerializationSchema(String topic) {
this.topic = topic;
}

@Override
public byte[] serialize(ClickEvent message) {
public ProducerRecord<byte[], byte[]> serialize(
final ClickEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return objectMapper.writeValueAsBytes(message);
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
@@ -18,23 +18,37 @@
package org.apache.flink.playgrounds.ops.clickcount.records;


import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
* A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {

private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;

public ClickEventStatisticsSerializationSchema(){
}

public ClickEventStatisticsSerializationSchema(String topic) {
this.topic = topic;
}

@Override
public byte[] serialize(ClickEventStatistics message) {
public ProducerRecord<byte[], byte[]> serialize(
final ClickEventStatistics message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return objectMapper.writeValueAsBytes(message);
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
@@ -20,7 +20,7 @@ version: "2.1"
services:
client:
build: ../docker/ops-playground-image
image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
@@ -30,12 +30,12 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
image: flink:1.8-scala_2.11
image: flink:1.9-scala_2.11
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
@@ -46,7 +46,7 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.8-scala_2.11
image: flink:1.9-scala_2.11
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"

0 comments on commit ffd3496

Please sign in to comment.