Skip to content

Commit

Permalink
[FLINK-27507] update operations-walkthrough playground for Flink 1.14
Browse files Browse the repository at this point in the history
* [FLINK-27507] update operations-walkthrough playground for Flink 1.14

* [FLINK-27507] removed docker socket mount
  • Loading branch information
shba24 committed May 23, 2022
1 parent a3cc5af commit de4d2d7
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -11,7 +11,7 @@ Currently, the following playgrounds are available:

* The **Flink Operations Playground** (in the `operations-playground` folder) lets you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example
Flink job. The playground is presented in detail in
["Flink Operations Playground"](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/flink-operations-playground), which is part of the _Try Flink_ section of the Flink documentation.
["Flink Operations Playground"](https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/try-flink/flink-operations-playground), which is part of the _Try Flink_ section of the Flink documentation.

* The **Table Walkthrough** (in the `table-walkthrough` folder) shows to use the Table API to build an analytics pipeline that reads streaming data from Kafka and writes results to MySQL, along with a real-time dashboard in Grafana. The walkthrough is presented in detail in ["Real Time Reporting with the Table API"](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/table_api), which is part of the _Try Flink_ section of the Flink documentation.

Expand Down
4 changes: 2 additions & 2 deletions docker/ops-playground-image/Dockerfile
Expand Up @@ -20,7 +20,7 @@
# Build Click Count Job
###############################################################################

FROM maven:3.6-jdk-8-slim AS builder
FROM maven:3.8-jdk-8-slim AS builder

# Get Click Count job and compile it
COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob
Expand All @@ -32,7 +32,7 @@ RUN mvn clean install
# Build Operations Playground Image
###############################################################################

FROM apache/flink:1.13.1-scala_2.12-java8
FROM apache/flink:1.14.4-scala_2.12-java8

WORKDIR /opt/flink/bin

Expand Down
Expand Up @@ -22,7 +22,7 @@ under the License.

<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
<version>1-FLINK-1.13_2.12</version>
<version>1-FLINK-1.14_2.12</version>

<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
Expand All @@ -44,7 +44,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.1</flink.version>
<flink.version>1.14.4</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down
Expand Up @@ -19,6 +19,9 @@

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
Expand All @@ -34,7 +37,6 @@
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -115,13 +117,18 @@ public static void main(String[] args) throws Exception {
new ClickEventStatisticsCollector())
.name("ClickEvent Counter");

statistics
.addSink(new FlinkKafkaProducer<>(
outputTopic,
new ClickEventStatisticsSerializationSchema(outputTopic),
kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
.name("ClickEventStatistics Sink");
statistics.sinkTo(
KafkaSink.<ClickEventStatistics>builder()
.setBootstrapServers(kafkaProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
.setKafkaProducerConfig(kafkaProps)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic(outputTopic)
.setValueSerializationSchema(new ClickEventStatisticsSerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build())
.name("ClickEventStatistics Sink");

env.execute("Click Event Count");
}
Expand Down
Expand Up @@ -17,40 +17,27 @@

package org.apache.flink.playgrounds.ops.clickcount.records;


import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.io.IOException;

/**
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
* A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {

public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;

public ClickEventStatisticsSerializationSchema(){
}

public ClickEventStatisticsSerializationSchema(String topic) {
this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(
final ClickEventStatistics message, @Nullable final Long timestamp) {
public byte[] serialize(ClickEventStatistics event) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
return objectMapper.writeValueAsBytes(event);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
throw new IllegalArgumentException("Could not serialize record: " + event, e);
}
}
}
2 changes: 1 addition & 1 deletion operations-playground/README.md
Expand Up @@ -61,4 +61,4 @@ docker-compose down
## Further instructions

The playground setup and more detailed instructions are presented in the
["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.13/try-flink/flink-operations-playground.html) of Flink's documentation.
["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.14/try-flink/flink-operations-playground.html) of Flink's documentation.
10 changes: 5 additions & 5 deletions operations-playground/docker-compose.yaml
Expand Up @@ -20,7 +20,7 @@ version: "2.1"
services:
client:
build: ../docker/ops-playground-image
image: apache/flink-ops-playground:1-FLINK-1.13-scala_2.12
image: apache/flink-ops-playground:1-FLINK-1.14-scala_2.12
command: "flink run -d /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
Expand All @@ -30,12 +30,12 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
image: apache/flink-ops-playground:1-FLINK-1.13-scala_2.12
image: apache/flink-ops-playground:1-FLINK-1.14-scala_2.12
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka
jobmanager:
image: apache/flink:1.13.1-scala_2.12-java8
image: apache/flink:1.14.4-scala_2.12-java8
command: "jobmanager.sh start-foreground"
ports:
- 8081:8081
Expand All @@ -46,7 +46,7 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: apache/flink:1.13.1-scala_2.12-java8
image: apache/flink:1.14.4-scala_2.12-java8
depends_on:
- jobmanager
command: "taskmanager.sh start-foreground"
Expand All @@ -59,7 +59,7 @@ services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
kafka:
image: wurstmeister/kafka:2.12-2.2.1
image: wurstmeister/kafka:2.13-2.8.1
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
Expand Down
2 changes: 0 additions & 2 deletions table-walkthrough/docker-compose.yml
Expand Up @@ -61,8 +61,6 @@ services:
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "kafka:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
data-generator:
image: apache/data-generator:1
build: ../docker/data-generator
Expand Down

0 comments on commit de4d2d7

Please sign in to comment.