Skip to content
Permalink
Browse files
[FLINK-14160] Add --backpressure option to the ClickEventCount job in…
… the operations playground

This closes #4.
  • Loading branch information
alpinegizmo authored and fhueske committed Sep 23, 2019
1 parent 5d636ae commit 1c7c254fc7827e74db7c3c387348e7ca2219788a
Showing 4 changed files with 71 additions and 6 deletions.
@@ -22,7 +22,7 @@ under the License.

<groupId>org.apache.flink</groupId>
<artifactId>flink-playground-clickcountjob</artifactId>
<version>1-FLINK-1.9_2.11</version>
<version>2-FLINK-1.9_2.11</version>

<name>flink-playground-clickcountjob</name>
<packaging>jar</packaging>
@@ -18,13 +18,15 @@
package org.apache.flink.playgrounds.ops.clickcount;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap;
import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector;
import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -47,6 +49,7 @@
* <p>The Job can be configured via the command line:</p>
* * "--checkpointing": enables checkpointing
* * "--event-time": set the StreamTimeCharacteristic to EventTime
* * "--backpressure": insert an operator that causes periodic backpressure
* * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
* * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
* * "--bootstrap.servers": comma-separated list of Kafka brokers
@@ -56,6 +59,7 @@ public class ClickEventCount {

public static final String CHECKPOINTING_OPTION = "checkpointing";
public static final String EVENT_TIME_OPTION = "event-time";
public static final String BACKPRESSURE_OPTION = "backpressure";

public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);

@@ -66,26 +70,41 @@ public static void main(String[] args) throws Exception {

configureEnvironment(params, env);

boolean inflictBackpressure = params.has(BACKPRESSURE_OPTION);

String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", "localhost:9092");
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");

env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
DataStream<ClickEvent> clicks =
env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
.name("ClickEvent Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(final ClickEvent element) {
return element.getTimestamp().getTime();
}
})
});

if (inflictBackpressure) {
// Force a network shuffle so that the backpressure will affect the buffer pools
clicks = clicks
.keyBy(ClickEvent::getPage)
.map(new BackpressureMap())
.name("Backpressure");
}

DataStream<ClickEventStatistics> statistics = clicks
.keyBy(ClickEvent::getPage)
.timeWindow(WINDOW_SIZE)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter")
.name("ClickEvent Counter");

statistics
.addSink(new FlinkKafkaProducer<>(
outputTopic,
new ClickEventStatisticsSerializationSchema(outputTopic),
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.playgrounds.ops.clickcount.functions;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;

import java.time.LocalTime;

/**
* This MapFunction causes severe backpressure during even-numbered minutes.
* E.g., from 10:12:00 to 10:12:59 it will only process 10 events/sec,
* but from 10:13:00 to 10:13:59 events will pass through unimpeded.
*/
public class BackpressureMap implements MapFunction<ClickEvent, ClickEvent> {

private boolean causeBackpressure() {
return ((LocalTime.now().getMinute() % 2) == 0);
}

@Override
public ClickEvent map(ClickEvent event) throws Exception {
if (causeBackpressure()) {
Thread.sleep(100);
}

return event;
}

}
@@ -20,7 +20,7 @@ version: "2.1"
services:
client:
build: ../docker/ops-playground-image
image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time"
depends_on:
- jobmanager
@@ -30,7 +30,7 @@ services:
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
clickevent-generator:
image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
image: apache/flink-ops-playground:2-FLINK-1.9-scala_2.11
command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input"
depends_on:
- kafka

0 comments on commit 1c7c254

Please sign in to comment.