From fc3f475459e5db685219c2db106454e26b9a9135 Mon Sep 17 00:00:00 2001 From: Seth Wiesman Date: Tue, 22 Jun 2021 12:04:27 -0500 Subject: [PATCH] [hotfix] Update ClickEventCount to use modern API --- .../ops/clickcount/ClickEventCount.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java index 534ef9f..e56a39e 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -17,6 +17,7 @@ package org.apache.flink.playgrounds.ops.clickcount; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.playgrounds.ops.clickcount.functions.BackpressureMap; import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector; @@ -25,17 +26,20 @@ import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -48,7 +52,7 @@ * *

The Job can be configured via the command line:

* * "--checkpointing": enables checkpointing - * * "--event-time": set the StreamTimeCharacteristic to EventTime + * * "--event-time": use an event time window assigner * * "--backpressure": insert an operator that causes periodic backpressure * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to @@ -80,15 +84,17 @@ public static void main(String[] args) throws Exception { kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); + WatermarkStrategy strategy = WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofMillis(200)) + .withTimestampAssigner((clickEvent, l) -> clickEvent.getTimestamp().getTime()); + DataStream clicks = - env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) - .name("ClickEvent Source") - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.of(200, TimeUnit.MILLISECONDS)) { - @Override - public long extractTimestamp(final ClickEvent element) { - return element.getTimestamp().getTime(); - } - }); + env.addSource(new FlinkKafkaConsumer<>( + inputTopic, + new ClickEventDeserializationSchema(), + kafkaProps) + .assignTimestampsAndWatermarks(strategy)) + .name("ClickEvent Source"); if (inflictBackpressure) { // Force a network shuffle so that the backpressure will affect the buffer pools @@ -98,9 +104,13 @@ public long extractTimestamp(final ClickEvent element) { .name("Backpressure"); } + WindowAssigner assigner = params.has(EVENT_TIME_OPTION) ? + TumblingEventTimeWindows.of(WINDOW_SIZE) : + TumblingProcessingTimeWindows.of(WINDOW_SIZE); + DataStream statistics = clicks .keyBy(ClickEvent::getPage) - .timeWindow(WINDOW_SIZE) + .window(assigner) .aggregate(new CountingAggregator(), new ClickEventStatisticsCollector()) .name("ClickEvent Counter"); @@ -121,17 +131,12 @@ private static void configureEnvironment( final StreamExecutionEnvironment env) { boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION); - boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION); boolean enableChaining = params.has(OPERATOR_CHAINING_OPTION); if (checkpointingEnabled) { env.enableCheckpointing(1000); } - if (eventTimeSemantics) { - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - } - if(!enableChaining){ //disabling Operator chaining to make it easier to follow the Job in the WebUI env.disableOperatorChaining();