From 7f1c76de5d2fb54ff4e9836a38bc537098649fb4 Mon Sep 17 00:00:00 2001 From: "tibor.moger" Date: Mon, 28 Nov 2016 16:51:47 +0100 Subject: [PATCH 01/11] [FLINK-5109] fix invalid content-encoding header of webmonitor --- .../webmonitor/HttpRequestHandler.java | 37 +++++++++---------- .../webmonitor/PipelineErrorHandler.java | 1 - .../webmonitor/RuntimeMonitorHandler.java | 4 +- .../handlers/ConstantTextHandler.java | 9 ++--- .../handlers/HandlerRedirectUtils.java | 2 - 5 files changed, 23 insertions(+), 30 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java index bbd29fa6af5af..703b621bfd062 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java @@ -66,7 +66,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler { private static final Charset ENCODING = Charset.forName("UTF-8"); - + /** A decoder factory that always stores POST chunks on disk */ private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); @@ -80,7 +80,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler public HttpRequestHandler(File tmpDir) { this.tmpDir = tmpDir; } - + @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if (currentDecoder != null) { @@ -94,12 +94,12 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { if (msg instanceof HttpRequest) { currentRequest = (HttpRequest) msg; currentRequestPath = null; - + if (currentDecoder != null) { currentDecoder.destroy(); currentDecoder = null; } - + if (currentRequest.getMethod() == HttpMethod.GET || currentRequest.getMethod() == HttpMethod.DELETE) { // directly delegate to the router ctx.fireChannelRead(currentRequest); @@ -118,43 +118,43 @@ else if (currentDecoder != null && msg instanceof HttpContent) { // received new chunk, give it to the current decoder HttpContent chunk = (HttpContent) msg; currentDecoder.offer(chunk); - + try { while (currentDecoder.hasNext()) { InterfaceHttpData data = currentDecoder.next(); - + // IF SOMETHING EVER NEEDS POST PARAMETERS, THIS WILL BE THE PLACE TO HANDLE IT // all fields values will be passed with type Attribute. - + if (data.getHttpDataType() == HttpDataType.FileUpload) { DiskFileUpload file = (DiskFileUpload) data; if (file.isCompleted()) { String name = file.getFilename(); - + File target = new File(tmpDir, UUID.randomUUID() + "_" + name); file.renameTo(target); - + QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath); encoder.addParam("filepath", target.getAbsolutePath()); encoder.addParam("filename", name); - + currentRequest.setUri(encoder.toString()); } } - + data.release(); } } catch (EndOfDataDecoderException ignored) {} - + if (chunk instanceof LastHttpContent) { HttpRequest request = currentRequest; currentRequest = null; currentRequestPath = null; - + currentDecoder.destroy(); currentDecoder = null; - + // fire next channel handler ctx.fireChannelRead(request); } @@ -163,20 +163,19 @@ else if (currentDecoder != null && msg instanceof HttpContent) { catch (Throwable t) { currentRequest = null; currentRequestPath = null; - + if (currentDecoder != null) { currentDecoder.destroy(); currentDecoder = null; } - + if (ctx.channel().isActive()) { byte[] bytes = ExceptionUtils.stringifyException(t).getBytes(ENCODING); - + DefaultFullHttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java index 23a0ba69357f7..b4788ddbcaee7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/PipelineErrorHandler.java @@ -64,7 +64,6 @@ private void sendError(ChannelHandlerContext ctx, String error) { HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(error.getBytes())); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); ctx.writeAndFlush(response); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index aba4e1711590a..68e17351b21b6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -61,7 +61,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { private static final Charset ENCODING = Charset.forName("UTF-8"); public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address"; - + private final RequestHandler handler; public RuntimeMonitorHandler( @@ -102,7 +102,6 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGa : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING)); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); LOG.debug("Error while handling request", e); } @@ -111,7 +110,6 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGa response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes)); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); LOG.debug("Error while handling request", e); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java index aedf0c2c8e804..127efdbc26e24 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java @@ -37,9 +37,9 @@ */ @ChannelHandler.Sharable public class ConstantTextHandler extends SimpleChannelInboundHandler { - + private final byte[] encodedText; - + public ConstantTextHandler(String text) { try { this.encodedText = text.getBytes("UTF-8"); @@ -48,16 +48,15 @@ public ConstantTextHandler(String text) { throw new RuntimeException(e.getMessage(), e); } } - + @Override protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { HttpResponse response = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText)); response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length); - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); - + KeepAliveWrite.flush(ctx, routed.request(), response); } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java index 21a0f8c6a9f8f..ca61ec159b0db 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/HandlerRedirectUtils.java @@ -89,7 +89,6 @@ public static HttpResponse getRedirectResponse(String redirectAddress, String pa HttpResponse redirectResponse = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.TEMPORARY_REDIRECT); redirectResponse.headers().set(HttpHeaders.Names.LOCATION, newLocation); - redirectResponse.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); redirectResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0); return redirectResponse; @@ -102,7 +101,6 @@ public static HttpResponse getUnavailableResponse() throws UnsupportedEncodingEx HttpResponse unavailableResponse = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, Unpooled.wrappedBuffer(bytes)); - unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytes.length); unavailableResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, MimeTypes.getMimeTypeForExtension("txt")); From 2a305c54dee1644fd787ec2d707b57c74a6b45f6 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 2 Nov 2016 11:06:01 +0100 Subject: [PATCH 02/11] [FLINK-4993] Don't Allow Trigger.onMerge() to return TriggerResult Allowing Trigger.onMerge() to return a TriggerResult is not necessary since an onMerge() call will always be followed by an onElement() call when adding the element that caused the merging to the merged window. Having this complicates the internal logic of the WindowOperator and makes writing Triggers more confusing than it has to be. --- .../windowing/assigners/GlobalWindows.java | 4 +-- .../triggers/ContinuousEventTimeTrigger.java | 3 +-- .../ContinuousProcessingTimeTrigger.java | 3 +-- .../api/windowing/triggers/CountTrigger.java | 7 +---- .../windowing/triggers/EventTimeTrigger.java | 5 ++-- .../triggers/ProcessingTimeTrigger.java | 3 +-- .../windowing/triggers/PurgingTrigger.java | 5 ++-- .../api/windowing/triggers/Trigger.java | 2 +- .../api/windowing/triggers/TriggerResult.java | 26 ------------------- .../windowing/EvictingWindowOperator.java | 15 +++-------- .../operators/windowing/WindowOperator.java | 18 ++++--------- .../windowing/WindowOperatorTest.java | 6 ++--- 12 files changed, 21 insertions(+), 76 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 1c6284a1adc2f..7ea3158532079 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -92,9 +92,7 @@ public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerCon public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} @Override - public TriggerResult onMerge(GlobalWindow window, - OnMergeContext ctx) { - return TriggerResult.CONTINUE; + public void onMerge(GlobalWindow window, OnMergeContext ctx) { } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 02c2a4260708b..f3b3e4f5eed23 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -109,13 +109,12 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { + public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); if (nextFireTimestamp != null) { ctx.registerEventTimeTimer(nextFireTimestamp); } - return TriggerResult.CONTINUE; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 287d3df6adced..18c9edb2159ff 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -97,10 +97,9 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, + public void onMerge(W window, OnMergeContext ctx) { ctx.mergePartitionedState(stateDesc); - return TriggerResult.CONTINUE; } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index 86c5c4cf36310..ffe74b0cca0f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -76,13 +76,8 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { + public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); - ReducingState count = ctx.getPartitionedState(stateDesc); - if (count.get() >= maxCount) { - return TriggerResult.FIRE; - } - return TriggerResult.CONTINUE; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index da14ffd646104..ae25e87685973 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -67,10 +67,9 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(TimeWindow window, + public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); - return TriggerResult.CONTINUE; } @Override @@ -88,4 +87,6 @@ public String toString() { public static EventTimeTrigger create() { return new EventTimeTrigger(); } + + } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index a0102862d205d..cd7869e3d78fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -58,10 +58,9 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(TimeWindow window, + public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); - return TriggerResult.CONTINUE; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index f02d1db36d105..ed1d2fc5566eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -70,9 +70,8 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { - TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx); - return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; + public void onMerge(W window, OnMergeContext ctx) throws Exception { + nestedTrigger.onMerge(window, ctx); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 3f68e7882f058..a0209aadc0d2d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -104,7 +104,7 @@ public boolean canMerge() { * @param window The new window that results from the merge. * @param ctx A context object that can be used to register timer callbacks and access state. */ - public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { + public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new RuntimeException("This trigger does not support merging."); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java index 2841542693e7a..cb4c01b2953d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java @@ -67,30 +67,4 @@ public boolean isFire() { public boolean isPurge() { return purge; } - - // ------------------------------------------------------------------------ - - /** - * Merges two {@code TriggerResults}. This specifies what should happen if we have - * two results from a Trigger, for example as a result from - * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and - * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}. - * - *

- * For example, if one result says {@code CONTINUE} while the other says {@code FIRE} - * then {@code FIRE} is the combined result; - */ - public static TriggerResult merge(TriggerResult a, TriggerResult b) { - if (a.purge || b.purge) { - if (a.fire || b.fire) { - return FIRE_AND_PURGE; - } else { - return PURGE; - } - } else if (a.fire || b.fire) { - return FIRE; - } else { - return CONTINUE; - } - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 150f46ec4172b..8c738780cf825 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; @@ -98,10 +97,6 @@ public void processElement(StreamRecord element) throws Exception { MergingWindowSet mergingWindows = getMergingWindowSet(); for (W window : elementWindows) { - // If there is a merge, it can only result in a window that contains our new - // element because we always eagerly merge - final Tuple1 mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE); - // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then @@ -115,8 +110,7 @@ public void merge(W mergeResult, context.key = key; context.window = mergeResult; - // store for later use - mergeTriggerResult.f0 = context.onMerge(mergedWindows); + context.onMerge(mergedWindows); for (W m : mergedWindows) { context.window = m; @@ -152,12 +146,9 @@ public void merge(W mergeResult, evictorContext.key = key; evictorContext.window = actualWindow; - // we might have already fired because of a merge but still call onElement - // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); - TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - if (combinedTriggerResult.isFire()) { + if (triggerResult.isFire()) { Iterable> contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do @@ -166,7 +157,7 @@ public void merge(W mergeResult, fire(actualWindow, contents, windowState); } - if (combinedTriggerResult.isPurge()) { + if (triggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); } else { registerCleanupTimer(actualWindow); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 0ead14a58e7a6..edcd8339c306e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -31,7 +31,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; @@ -227,9 +226,6 @@ public void processElement(StreamRecord element) throws Exception { MergingWindowSet mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { - // If there is a merge, it can only result in a window that contains our new - // element because we always eagerly merge - final Tuple1 mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE); // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then @@ -242,8 +238,7 @@ public void merge(W mergeResult, context.key = key; context.window = mergeResult; - // store for later use - mergeTriggerResult.f0 = context.onMerge(mergedWindows); + context.onMerge(mergedWindows); for (W m: mergedWindows) { context.window = m; @@ -278,12 +273,9 @@ public void merge(W mergeResult, context.key = key; context.window = actualWindow; - // we might have already fired because of a merge but still call onElement - // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); - TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - if (combinedTriggerResult.isFire()) { + if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; @@ -291,7 +283,7 @@ public void merge(W mergeResult, fire(actualWindow, contents); } - if (combinedTriggerResult.isPurge()) { + if (triggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); } else { registerCleanupTimer(actualWindow); @@ -642,9 +634,9 @@ public TriggerResult onEventTime(long time) throws Exception { return trigger.onEventTime(time, window, this); } - public TriggerResult onMerge(Collection mergedWindows) throws Exception { + public void onMerge(Collection mergedWindows) throws Exception { this.mergedWindows = mergedWindows; - return trigger.onMerge(window, this); + trigger.onMerge(window, this); } public void clear() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 2a132942d369e..0e2d1e8e29783 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,7 +32,6 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -2451,10 +2450,9 @@ public boolean canMerge() { } @Override - public TriggerResult onMerge(TimeWindow window, + public void onMerge(TimeWindow window, OnMergeContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); - return TriggerResult.CONTINUE; } @Override From 3ce3e104641728930f7d53a3308b74dac5cd15e5 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 28 Nov 2016 13:51:51 +0100 Subject: [PATCH 03/11] [FLINK-4993] Remove Unused Import in TriggerResult --- .../flink/streaming/api/windowing/triggers/TriggerResult.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java index cb4c01b2953d9..814993038a04f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.api.windowing.triggers; -import org.apache.flink.streaming.api.windowing.windows.Window; - /** * Result type for trigger methods. This determines what happens with the window, * for example whether the window function should be called, or the window From 5d4ececb310d0d3d0d01934cb6015766bb17ca65 Mon Sep 17 00:00:00 2001 From: sergey_sokur Date: Thu, 17 Nov 2016 19:28:20 +0300 Subject: [PATCH 04/11] [FLINK-5050] [build] Remove transitive JSON.org dependency This transitive dependency has an incompatible license. This closes #2824 --- flink-batch-connectors/flink-hcatalog/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-batch-connectors/flink-hcatalog/pom.xml b/flink-batch-connectors/flink-hcatalog/pom.xml index 444bd9acb0818..6889e5a3fa90d 100644 --- a/flink-batch-connectors/flink-hcatalog/pom.xml +++ b/flink-batch-connectors/flink-hcatalog/pom.xml @@ -52,6 +52,12 @@ under the License. org.apache.hive.hcatalog hcatalog-core 0.12.0 + + + org.json + json + + From 5cd7eb0042c71839c610f25765ac1834561bffb3 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 23 Nov 2016 15:54:15 +0100 Subject: [PATCH 05/11] [hotfix] [tests] Harden timeout logic for TaskManager registration in AbstractTaskManagerProcessFailureRecoveryTest --- ...TaskManagerProcessFailureRecoveryTest.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 0ff2e78f9f466..3acf5bbaa1e5b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -273,25 +273,25 @@ public void run() { public abstract void testTaskManagerFailure(int jobManagerPort, File coordinateDir) throws Exception; - protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) + protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelayMillis) throws Exception { - final long deadline = System.currentTimeMillis() + maxDelay; - while (true) { - long remaining = deadline - System.currentTimeMillis(); - if (remaining <= 0) { - fail("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"); - } + final long interval = maxDelayMillis * 1_000_000; + final long deadline = System.nanoTime() + interval; + long remaining = interval; - FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS); + while (remaining > 0) { + FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.NANOSECONDS); try { Future result = Patterns.ask(jobManager, JobManagerMessages.getRequestNumberRegisteredTaskManager(), new Timeout(timeout)); - Integer numTMs = (Integer) Await.result(result, timeout); + + int numTMs = (Integer) Await.result(result, timeout); + if (numTMs == numExpected) { - break; + return; } } catch (TimeoutException e) { @@ -300,7 +300,11 @@ protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int nu catch (ClassCastException e) { fail("Wrong response: " + e.getMessage()); } + + remaining = deadline - System.nanoTime(); } + + fail("The TaskManagers did not register within the expected time (" + maxDelayMillis + "msecs)"); } protected static void printProcessLog(String processName, String log) { From ba61d43c87b5228c689a10156950880ffbe23b2f Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Wed, 23 Nov 2016 15:37:05 +0100 Subject: [PATCH 06/11] [hotfix] Flush in CsvOutputFormat before closing, to increase CI stability --- .../flink/api/java/io/CsvOutputFormat.java | 1 + .../api/java/io/CsvOutputFormatTest.java | 47 ++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java index 703128fa75de2..c2fe13cc66fbb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java @@ -165,6 +165,7 @@ public void open(int taskNumber, int numTasks) throws IOException { @Override public void close() throws IOException { if (wrt != null) { + this.wrt.flush(); this.wrt.close(); } super.close(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java index a9288c6124849..a8ce495a2a6da 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,25 +35,30 @@ import java.nio.file.Paths; import java.util.List; +import static org.junit.Assert.fail; + public class CsvOutputFormatTest { private String path = null; - private CsvOutputFormat> csvOutputFormat; @Before public void createFile() throws Exception { path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath(); - csvOutputFormat = new CsvOutputFormat<>(new Path(path)); } @Test public void testNullAllow() throws Exception { - csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); - csvOutputFormat.setAllowNullValues(true); - csvOutputFormat.open(0, 1); - csvOutputFormat.writeRecord(new Tuple3("One", null, 8)); - csvOutputFormat.close(); + final CsvOutputFormat> csvOutputFormat = new CsvOutputFormat<>(new Path(path)); + try { + csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); + csvOutputFormat.setAllowNullValues(true); + csvOutputFormat.open(0, 1); + csvOutputFormat.writeRecord(new Tuple3("One", null, 8)); + } + finally { + csvOutputFormat.close(); + } java.nio.file.Path p = Paths.get(path); Assert.assertTrue(Files.exists(p)); @@ -61,19 +67,28 @@ public void testNullAllow() throws Exception { Assert.assertEquals("One,,8", lines.get(0)); } - @Test(expected = RuntimeException.class) + @Test public void testNullDisallowOnDefault() throws Exception { - csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); - csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); - csvOutputFormat.open(0, 1); - csvOutputFormat.writeRecord(new Tuple3("One", null, 8)); - csvOutputFormat.close(); + final CsvOutputFormat> csvOutputFormat = new CsvOutputFormat<>(new Path(path)); + try { + csvOutputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE); + csvOutputFormat.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.PARONLY); + csvOutputFormat.open(0, 1); + try { + csvOutputFormat.writeRecord(new Tuple3("One", null, 8)); + fail("should fail with an exception"); + } catch (RuntimeException e) { + // expected + } + + } + finally { + csvOutputFormat.close(); + } } @After public void cleanUp() throws IOException { - csvOutputFormat.close(); Files.deleteIfExists(Paths.get(path)); } - } From dc51242a4cede4684614c86afa0d0724898343b7 Mon Sep 17 00:00:00 2001 From: shijinkui Date: Sat, 26 Nov 2016 16:14:13 +0800 Subject: [PATCH 07/11] [FLINK-5168] Scaladoc annotation link use [[]] instead of {@link} This closes #2875 --- .../src/main/scala/org/apache/flink/graph/scala/Graph.scala | 2 +- .../graph/scala/test/GellyScalaAPICompletenessTest.scala | 2 +- .../clusterframework/RegisteredMesosWorkerNode.scala | 2 +- .../org/apache/flink/streaming/api/scala/DataStream.scala | 2 +- .../org/apache/flink/streaming/api/scala/SplitStream.scala | 6 +++--- .../streaming/api/scala/StreamExecutionEnvironment.scala | 5 ++--- .../streaming/api/scala/StreamingOperatorsITCase.scala | 4 ++-- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala index 4dd9d12611f72..27bc548d742f6 100644 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala +++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala @@ -294,7 +294,7 @@ object Graph { } /** - * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}. + * Represents a graph consisting of [[Edge]] edges and [[Vertex]] vertices. * * @param jgraph the underlying java api Graph. * @tparam K the key type for vertex and edge identifiers diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala index d7ab1dd44093a..034bf7756e8c8 100644 --- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala +++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/GellyScalaAPICompletenessTest.scala @@ -26,7 +26,7 @@ import org.junit.Test /** * This checks whether the Gelly Scala API is up to feature parity with the Java API. - * Implements the {@link ScalaAPICompletenessTest} for Gelly. + * Implements the [[ScalaAPICompletenessTestBase]] for Gelly. */ class GellyScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala index 59764ef9e8245..7ca388f03ca86 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala @@ -22,7 +22,7 @@ import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} /** - * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}. + * A representation of a registered Mesos task managed by the [[MesosFlinkResourceManager]]. */ case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 4fe73e96c12fe..dbc91bd90ba0d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -697,7 +697,7 @@ class DataStream[T](stream: JavaStream[T]) { * For the second case and when the watermarks are required to lag behind the maximum * timestamp seen so far in the elements of the stream by a fixed amount of time, and this * amount is known in advance, use the - * {@link org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness}. + * [[org.apache.flink.streaming.api.functions.TimestampExtractorWithFixedAllowedLateness]]. * * For cases where watermarks should be created in an irregular fashion, for example * based on certain markers that some element carry, use the diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala index 0b9ac69c75b73..ca4bcc0f9c6bc 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala @@ -23,10 +23,10 @@ import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStrea /** * The SplitStream represents an operator that has been split using an - * {@link OutputSelector}. Named outputs can be selected using the - * {@link #select} function. To apply a transformation on the whole output simply call + * [[org.apache.flink.streaming.api.collector.selector.OutputSelector]]. + * Named outputs can be selected using the [[SplitStream#select()]] function. + * To apply a transformation on the whole output simply call * the appropriate method on this stream. - * */ @Public class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaStream){ diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 2e432babb7397..432e8ac6cd330 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -502,9 +502,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * The files to be excluded from the processing * @return The data stream that represents the data read from the given file * - * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and - * {@link StreamExecutionEnvironment#readFile(FileInputFormat, - * String, FileProcessingMode, long)} + * @deprecated Use [[FileInputFormat#setFilesFilter(FilePathFilter)]] to set a filter and + * [[StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)]] */ @PublicEvolving @Deprecated diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index c57c29ce62c2f..e08e0b5c294da 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -58,8 +58,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { * The stream is grouped by the first field. For each group, the resulting stream is folded by * summing up the second tuple field. * - * This test relies on the hash function used by the {@link DataStream#keyBy}, which is - * assumed to be {@link MathUtils#murmurHash}. + * This test relies on the hash function used by the [[DataStream#keyBy]], which is + * assumed to be [[MathUtils#murmurHash]]. */ @Test def testGroupedFoldOperator(): Unit = { From bb2e2d0fc2eae2a65839e937cebe0034ab81a75d Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 28 Nov 2016 14:55:31 +0100 Subject: [PATCH 08/11] [hotfix] [docs] Add a rouch description about internal types of states and state backends --- docs/internals/state_backends.md | 84 ++++++++++++++++++++++++++ docs/internals/stream_checkpointing.md | 13 +--- 2 files changed, 85 insertions(+), 12 deletions(-) create mode 100644 docs/internals/state_backends.md diff --git a/docs/internals/state_backends.md b/docs/internals/state_backends.md new file mode 100644 index 0000000000000..e9f9fd8a0af2d --- /dev/null +++ b/docs/internals/state_backends.md @@ -0,0 +1,84 @@ +--- +title: "State and State Backends" +nav-title: State Backends +nav-parent_id: internals +nav-pos: 4 +--- + + +* This will be replaced by the TOC +{:toc} + +**NOTE** This document is only a sketch of some bullet points, to be fleshed out. + +**NOTE** The structure of State Backends changed heavily between version 1.1 and 1.2. This documentation is only applicable +to Apache Flink version 1.2 and later. + + +## Keyed State and Operator state + +There are two basic state backends: `Keyed State` and `Operator State`. + +#### Keyed State + +*Keyed State* is always relative to keys and can only be used in functions and operator on a `KeyedStream`. +Examples of keyed state are the `ValueState` or `ListState` that one can create in a function on a `KeyedStream`, as +well at the state of a keyed window operator. + +Keyed State is organized in so called *Key Groups*. Key Groups are the unit in which keyed state can be redistributed and +there are as many key groups as the defined maximum parallelism. +During execution, each parallel instance of an operator gets one or more key groups. + +#### Operator State + +*Operator State* is state per parallel subtask. It subsume the `Checkpointed` interface in Flink 1.0 and Flink 1.1. +The new `CheckpointedFunction` interface is basically a shortcut (syntactic sugar) for the Operator State. + +Operator State needs special re-distribution schemes when parallelism is changed. There can be different variations of such +schemes, out of which the following are currently defined: + + - **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of + all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. + Each operator gets a sublist, which can be empty, or contain one or more elements. + + +## Raw and Managed State + +*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. + +*Managed State* is represented in data structured controlled by the Flink runtime, such as internal hash tables, or RocksDB. +Examples are the "ValueState", "ListState", etc. Flink's runtime encodes the states and writes them into the checkpoints. + +*Raw State* is state that users and operators keep in their own data structures. Upon checkpoints, they only write a sequence or bytes into +the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. + + +## Checkpointing Procedure + +When operator snapshots are takes, there are two parts: The **synchronous** and the **asynchronous** part. + +Operators and state backends provide their snapshots as a Java `FutureTask`. That task contains the state where tte *synchronous* part +is completed and the *asynchronous* part is pending. The asynchronous part is then executed by a background thread for that checkpoint. + +Operators that checkpoint purely synchronous return an already completed `FutureTask`. +If an asynchronous operation needs to be performed, it is executed in the `run()` method of that `FutureTask`. + +The tasks are canceleable, in order to release streams and other resource consuming handles. + diff --git a/docs/internals/stream_checkpointing.md b/docs/internals/stream_checkpointing.md index aaf73860ea433..75493cabe25a9 100644 --- a/docs/internals/stream_checkpointing.md +++ b/docs/internals/stream_checkpointing.md @@ -133,7 +133,6 @@ of the data after checkpoint *n*. Because of that, dataflows with only embarrassingly parallel streaming operations (`map()`, `flatMap()`, `filter()`, ...) actually give *exactly once* guarantees even in *at least once* mode. - +See [State Backends]({{ site.baseurl }}/internals/state_backends.html) for details on the state snapshots. ## Recovery From 764739de663d1bc8de981bd7508f3aa03a4a6a0b Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 23 Nov 2016 12:13:05 +0100 Subject: [PATCH 09/11] [FLINK-5181] Add Tests in StateBackendTestBase that verify Default-Value Behaviour --- .../runtime/state/StateBackendTestBase.java | 143 +++++++++++++++++- 1 file changed, 137 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 9e835ce96e9ff..0a3a0923077c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -56,12 +56,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RunnableFuture; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -658,6 +654,141 @@ public void testFoldingState() { } } + /** + * Verify that {@link ValueStateDescriptor} allows {@code null} as default. + */ + @Test + public void testValueStateNullAsDefaultValue() throws Exception { + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertEquals(null, state.value()); + + state.update("Ciao"); + assertEquals("Ciao", state.value()); + + state.clear(); + assertEquals(null, state.value()); + + backend.dispose(); + } + + + /** + * Verify that an empty {@code ValueState} will yield the default value. + */ + @Test + public void testValueStateDefaultValue() throws Exception { + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor kvId = new ValueStateDescriptor<>("id", String.class, "Hello"); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertEquals("Hello", state.value()); + + state.update("Ciao"); + assertEquals("Ciao", state.value()); + + state.clear(); + assertEquals("Hello", state.value()); + + backend.dispose(); + } + + /** + * Verify that an empty {@code ReduceState} yields {@code null}. + */ + @Test + public void testReducingStateDefaultValue() throws Exception { + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ReducingStateDescriptor kvId = new ReducingStateDescriptor<>("id", new AppendingReduce(), String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ReducingState state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add("Ciao"); + assertEquals("Ciao", state.get()); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + /** + * Verify that an empty {@code FoldingState} yields {@code null}. + */ + @Test + public void testFoldingStateDefaultValue() throws Exception { + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + FoldingStateDescriptor kvId = + new FoldingStateDescriptor<>("id", "Fold-Initial:", new AppendingFold(), String.class); + + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + FoldingState state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add(1); + state.add(2); + assertEquals("Fold-Initial:,1,2", state.get()); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + + /** + * Verify that an empty {@code ListState} yields {@code null}. + */ + @Test + public void testListStateDefaultValue() throws Exception { + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ListState state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + assertNull(state.get()); + + state.add("Ciao"); + state.add("Bello"); + assertThat(state.get(), containsInAnyOrder("Ciao", "Bello")); + + state.clear(); + assertNull(state.get()); + + backend.dispose(); + } + + + + /** * This test verifies that state is correctly assigned to key groups and that restore * restores the relevant key groups in the backend. From ff3470991d0c620ae3ccedeb1326c331c9cfa443 Mon Sep 17 00:00:00 2001 From: twalthr Date: Thu, 17 Nov 2016 14:17:23 +0100 Subject: [PATCH 10/11] [FLINK-4872] [types] Type erasure problem exclusively on cluster execution This closes #2823. --- .../api/java/typeutils/TypeExtractor.java | 64 ++----------------- 1 file changed, 4 insertions(+), 60 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index b41bbc1562b41..08f8c5317142c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -751,24 +751,9 @@ else if (t instanceof GenericArrayType) { // due to a Java 6 bug, it is possible that the JVM classifies e.g. String[] or int[] as GenericArrayType instead of Class if (componentType instanceof Class) { - Class componentClass = (Class) componentType; - String className; - // for int[], double[] etc. - if(componentClass.isPrimitive()) { - className = encodePrimitiveClass(componentClass); - } - // for String[], Integer[] etc. - else { - className = "L" + componentClass.getName() + ";"; - } - - Class classArray; - try { - classArray = (Class) Class.forName("[" + className); - } catch (ClassNotFoundException e) { - throw new InvalidTypesException("Could not convert GenericArrayType to Class."); - } + + Class classArray = (Class) (java.lang.reflect.Array.newInstance(componentClass, 0).getClass()); return getForClass(classArray); } else { @@ -778,21 +763,8 @@ else if (t instanceof GenericArrayType) { in1Type, in2Type); - Class classArray; - - try { - String componentClassName = componentInfo.getTypeClass().getName(); - String resultingClassName; - - if (componentClassName.startsWith("[")) { - resultingClassName = "[" + componentClassName; - } else { - resultingClassName = "[L" + componentClassName + ";"; - } - classArray = (Class) Class.forName(resultingClassName); - } catch (ClassNotFoundException e) { - throw new InvalidTypesException("Could not convert GenericArrayType to Class."); - } + Class componentClass = componentInfo.getTypeClass(); + Class classArray = (Class) (java.lang.reflect.Array.newInstance(componentClass, 0).getClass()); return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo); } @@ -1559,34 +1531,6 @@ private static void validateLambdaGenericParameter(Type t) { + "See the documentation for more information about how to compile jobs containing lambda expressions."); } } - - private static String encodePrimitiveClass(Class primitiveClass) { - if (primitiveClass == boolean.class) { - return "Z"; - } - else if (primitiveClass == byte.class) { - return "B"; - } - else if (primitiveClass == char.class) { - return "C"; - } - else if (primitiveClass == double.class) { - return "D"; - } - else if (primitiveClass == float.class) { - return "F"; - } - else if (primitiveClass == int.class) { - return "I"; - } - else if (primitiveClass == long.class) { - return "J"; - } - else if (primitiveClass == short.class) { - return "S"; - } - throw new InvalidTypesException(); - } /** * Tries to find a concrete value (Class, ParameterizedType etc. ) for a TypeVariable by traversing the type hierarchy downwards. From ad9f8dbd6aa0a460c2f763ec1041130f44beeaad Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 11 Nov 2016 10:57:25 +0100 Subject: [PATCH 11/11] [FLINK-5026] Rename TimelyFlatMap to Process --- .../api/datastream/ConnectedStreams.java | 62 +++++++------- .../streaming/api/datastream/KeyedStream.java | 55 +++++++------ ...tMapFunction.java => ProcessFunction.java} | 37 +++++---- ...Function.java => RichProcessFunction.java} | 6 +- ...apFunction.java => CoProcessFunction.java} | 50 ++++++------ ...nction.java => RichCoProcessFunction.java} | 6 +- ...imelyFlatMap.java => ProcessOperator.java} | 14 ++-- ...elyFlatMap.java => CoProcessOperator.java} | 16 ++-- .../flink/streaming/api/DataStreamTest.java | 23 +++--- ...tMapTest.java => ProcessOperatorTest.java} | 54 ++++++------- ...apTest.java => CoProcessOperatorTest.java} | 80 +++++++++---------- .../api/scala/ConnectedStreams.scala | 39 ++++----- .../streaming/api/scala/KeyedStream.scala | 24 +++--- .../streaming/api/scala/DataStreamTest.scala | 20 ++--- 14 files changed, 255 insertions(+), 231 deletions(-) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/{TimelyFlatMapFunction.java => ProcessFunction.java} (73%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/{RichTimelyFlatMapFunction.java => RichProcessFunction.java} (90%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/{TimelyCoFlatMapFunction.java => CoProcessFunction.java} (66%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/{RichTimelyCoFlatMapFunction.java => RichCoProcessFunction.java} (87%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/{StreamTimelyFlatMap.java => ProcessOperator.java} (89%) rename flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/{CoStreamTimelyFlatMap.java => CoProcessOperator.java} (88%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/{TimelyFlatMapTest.java => ProcessOperatorTest.java} (85%) rename flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/{TimelyCoFlatMapTest.java => CoProcessOperatorTest.java} (80%) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index dc763cb4b3c6e..96a08d32d3876 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -27,11 +27,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; -import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap; +import org.apache.flink.streaming.api.operators.co.CoProcessOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import static java.util.Objects.requireNonNull; @@ -234,64 +235,71 @@ CoFlatMapFunction.class, false, true, getType1(), getType2(), } /** - * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams, + * Applies the given {@link CoProcessFunction} on the connected input streams, * thereby creating a transformed output stream. * - *

The function will be called for every element in the streams and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + *

The function will be called for every element in the input streams and can produce zero or + * more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, this + * function can also query the time and set timers. When reacting to the firing of set timers + * the function can directly emit elements and/or register yet more timers. * - *

A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction} + *

A {@link RichCoProcessFunction} * can be used to gain access to features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element + * @param coProcessFunction The {@link CoProcessFunction} that is called for each element * in the stream. * - * @param The of elements emitted by the {@code TimelyCoFlatMapFunction}. + * @param The type of elements emitted by the {@code CoProcessFunction}. * * @return The transformed {@link DataStream}. */ - public SingleOutputStreamOperator flatMap( - TimelyCoFlatMapFunction coFlatMapper) { + @PublicEvolving + public SingleOutputStreamOperator process( + CoProcessFunction coProcessFunction) { - TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, - TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(), + TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction, + CoProcessFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); - return flatMap(coFlatMapper, outTypeInfo); + return process(coProcessFunction, outTypeInfo); } /** - * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams, + * Applies the given {@link CoProcessFunction} on the connected input streams, * thereby creating a transformed output stream. * - *

The function will be called for every element in the streams and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + *

The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, + * this function can also query the time and set timers. When reacting to the firing of set + * timers the function can directly emit elements and/or register yet more timers. * - *

A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction} + *

A {@link RichCoProcessFunction} * can be used to gain access to features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element + * @param coProcessFunction The {@link CoProcessFunction} that is called for each element * in the stream. * - * @param The of elements emitted by the {@code TimelyCoFlatMapFunction}. + * @param The type of elements emitted by the {@code CoProcessFunction}. * * @return The transformed {@link DataStream}. */ @Internal - public SingleOutputStreamOperator flatMap( - TimelyCoFlatMapFunction coFlatMapper, + public SingleOutputStreamOperator process( + CoProcessFunction coProcessFunction, TypeInformation outputType) { - CoStreamTimelyFlatMap operator = new CoStreamTimelyFlatMap<>( - inputStream1.clean(coFlatMapper)); + if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) { + throw new UnsupportedOperationException("A CoProcessFunction can only be applied" + + "when both input streams are keyed."); + } - return transform("Co-Flat Map", outputType, operator); - } + CoProcessOperator operator = new CoProcessOperator<>( + inputStream1.clean(coProcessFunction)); + return transform("Co-Process", outputType, operator); + } @PublicEvolving public SingleOutputStreamOperator transform(String functionName, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 5b00bcddb3977..560ecabab5a61 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -32,7 +33,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.RichProcessFunction; import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction; import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator; import org.apache.flink.streaming.api.functions.aggregation.SumAggregator; @@ -42,7 +44,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamGroupedFold; import org.apache.flink.streaming.api.operators.StreamGroupedReduce; -import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; @@ -173,67 +175,70 @@ public DataStreamSink addSink(SinkFunction sinkFunction) { } /** - * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby + * Applies the given {@link ProcessFunction} on the input stream, thereby * creating a transformed output stream. * - *

The function will be called for every element in the stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + *

The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. * - *

A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction} + *

A {@link RichProcessFunction} * can be used to gain access to features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element + * @param processFunction The {@link ProcessFunction} that is called for each element * in the stream. * - * @param The of elements emitted by the {@code TimelyFlatMapFunction}. + * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. */ - public SingleOutputStreamOperator flatMap(TimelyFlatMapFunction flatMapper) { + @PublicEvolving + public SingleOutputStreamOperator process(ProcessFunction processFunction) { TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType( - flatMapper, - TimelyFlatMapFunction.class, + processFunction, + ProcessFunction.class, false, true, getType(), Utils.getCallLocationName(), true); - return flatMap(flatMapper, outType); + return process(processFunction, outType); } /** - * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby + * Applies the given {@link ProcessFunction} on the input stream, thereby * creating a transformed output stream. * - *

The function will be called for every element in the stream and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + *

The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)} + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. * - *

A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction} + *

A {@link RichProcessFunction} * can be used to gain access to features provided by the * {@link org.apache.flink.api.common.functions.RichFunction} interface. * - * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element + * @param processFunction The {@link ProcessFunction} that is called for each element * in the stream. * @param outputType {@link TypeInformation} for the result type of the function. * - * @param The of elements emitted by the {@code TimelyFlatMapFunction}. + * @param The type of elements emitted by the {@code ProcessFunction}. * * @return The transformed {@link DataStream}. */ @Internal - public SingleOutputStreamOperator flatMap( - TimelyFlatMapFunction flatMapper, + public SingleOutputStreamOperator process( + ProcessFunction processFunction, TypeInformation outputType) { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(clean(flatMapper)); + ProcessOperator operator = + new ProcessOperator<>(clean(processFunction)); - return transform("Flat Map", outputType, operator); + return transform("Process", outputType, operator); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java similarity index 73% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java index 5f039c4f2a920..fd0a829300d53 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java @@ -24,32 +24,31 @@ import org.apache.flink.streaming.api.TimerService; import org.apache.flink.util.Collector; -import java.io.Serializable; - /** - * Base interface for timely flatMap functions. FlatMap functions take elements and transform them, - * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists - * and arrays. - * - *

A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal - * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react - * to them firing. + * A function that processes elements of a stream. * - *

{@code
- * DataStream input = ...;
+ * 

The function will be called for every element in the input stream and can produce + * zero or more output. The function can also query the time and set timers. When + * reacting to the firing of set timers the function can emit yet more elements. * - * DataStream result = input.flatMap(new MyTimelyFlatMapFunction()); - * }

+ *

The function will be called for every element in the input stream and can produce + * zero or more output elements. Contrary to the + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query + * the time (both event and processing) and set timers, through the provided {@link Context}. + * When reacting to the firing of set timers the function can directly emit a result, and/or + * register a timer that will trigger an action in the future. * * @param Type of the input elements. - * @param Type of the returned elements. + * @param Type of the output elements. */ @PublicEvolving -public interface TimelyFlatMapFunction extends Function, Serializable { +public interface ProcessFunction extends Function { /** - * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms - * it into zero, one, or more elements. + * Process one element from the input stream. + * + *

This function can output zero or more elements using the {@link Collector} parameter + * and also update internal state or set timers using the {@link Context} parameter. * * @param value The input value. * @param ctx A {@link Context} that allows querying the timestamp of the element and getting @@ -60,7 +59,7 @@ public interface TimelyFlatMapFunction extends Function, Serializable { * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ - void flatMap(I value, Context ctx, Collector out) throws Exception; + void processElement(I value, Context ctx, Collector out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. @@ -78,7 +77,7 @@ public interface TimelyFlatMapFunction extends Function, Serializable { void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception ; /** - * Information available in an invocation of {@link #flatMap(Object, Context, Collector)} + * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ interface Context { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java similarity index 90% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java index 0d86da92422c9..834f71775c361 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; /** - * Rich variant of the {@link TimelyFlatMapFunction}. As a + * Rich variant of the {@link ProcessFunction}. As a * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods: * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)} @@ -32,9 +32,9 @@ * @param Type of the returned elements. */ @PublicEvolving -public abstract class RichTimelyFlatMapFunction +public abstract class RichProcessFunction extends AbstractRichFunction - implements TimelyFlatMapFunction { + implements ProcessFunction { private static final long serialVersionUID = 1L; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java similarity index 66% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java index 89c7d79c66695..feff8fb19086e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java @@ -22,41 +22,40 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; import org.apache.flink.util.Collector; import java.io.Serializable; /** - * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two - * connected streams. - * - *

The same instance of the transformation function is used to transform - * both of the connected streams. That way, the stream transformations can - * share state. + * A function that processes elements of two streams and produces a single output one. * - *

A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal - * {@link CoFlatMapFunction}, also set timers and react to them firing. - * - *

An example for the use of connected streams would be to apply rules that change over time - * onto elements of a stream. One of the connected streams has the rules, the other stream the - * elements to apply the rules to. The operation on the connected stream maintains the - * current set of rules in the state. It may receive either a rule update (from the first stream) - * and update the state, or a data element (from the second stream) and apply the rules in the - * state to the element. The result of applying the rules would be emitted. + *

The function will be called for every element in the input streams and can produce + * zero or more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also + * query the time (both event and processing) and set timers, through the provided {@link Context}. + * When reacting to the firing of set timers the function can emit yet more elements. + * + *

An example use-case for connected streams would be the application of a set of rules that change + * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The rules + * contained in {@code stream A} can be stored in the state and wait for new elements to arrive on + * {@code stream B}. Upon reception of a new element on {@code stream B}, the function can now apply the + * previously stored rules to the element and directly emit a result, and/or register a timer that + * will trigger an action in the future. * * @param Type of the first input. * @param Type of the second input. * @param Output type. */ @PublicEvolving -public interface TimelyCoFlatMapFunction extends Function, Serializable { +public interface CoProcessFunction extends Function, Serializable { /** * This method is called for each element in the first of the connected streams. + * + * This function can output zero or more elements using the {@link Collector} parameter + * and also update internal state or set timers using the {@link Context} parameter. * * @param value The stream element - * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element, + * @param ctx A {@link Context} that allows querying the timestamp of the element, * querying the {@link TimeDomain} of the firing timer and getting a * {@link TimerService} for registering timers and querying the time. * The context is only valid during the invocation of this method, do not store it. @@ -64,13 +63,16 @@ public interface TimelyCoFlatMapFunction extends Function, Serial * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - void flatMap1(IN1 value, Context ctx, Collector out) throws Exception; + void processElement1(IN1 value, Context ctx, Collector out) throws Exception; /** * This method is called for each element in the second of the connected streams. + * + * This function can output zero or more elements using the {@link Collector} parameter + * and also update internal state or set timers using the {@link Context} parameter. * * @param value The stream element - * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element, + * @param ctx A {@link Context} that allows querying the timestamp of the element, * querying the {@link TimeDomain} of the firing timer and getting a * {@link TimerService} for registering timers and querying the time. * The context is only valid during the invocation of this method, do not store it. @@ -78,7 +80,7 @@ public interface TimelyCoFlatMapFunction extends Function, Serial * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - void flatMap2(IN2 value, Context ctx, Collector out) throws Exception; + void processElement2(IN2 value, Context ctx, Collector out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. @@ -96,8 +98,8 @@ public interface TimelyCoFlatMapFunction extends Function, Serial void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception ; /** - * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/ - * {@link #flatMap2(Object, Context, Collector)} + * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/ + * {@link #processElement2(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ interface Context { @@ -119,7 +121,7 @@ interface Context { /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ - interface OnTimerContext extends TimelyFlatMapFunction.Context { + interface OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java similarity index 87% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java index 12fe18166125b..0fcea9189ae46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.RichFunction; /** - * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives + * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)} * and {@link RichFunction#close()}. @@ -33,9 +33,9 @@ * @param Type of the returned elements. */ @PublicEvolving -public abstract class RichTimelyCoFlatMapFunction +public abstract class RichCoProcessFunction extends AbstractRichFunction - implements TimelyCoFlatMapFunction { + implements CoProcessFunction { private static final long serialVersionUID = 1L; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java similarity index 89% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java index bafc435fbd22d..3b1336051cb22 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java @@ -23,15 +23,15 @@ import org.apache.flink.streaming.api.SimpleTimerService; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @Internal -public class StreamTimelyFlatMap - extends AbstractUdfStreamOperator> +public class ProcessOperator + extends AbstractUdfStreamOperator> implements OneInputStreamOperator, Triggerable { private static final long serialVersionUID = 1L; @@ -44,7 +44,7 @@ public class StreamTimelyFlatMap private transient OnTimerContextImpl onTimerContext; - public StreamTimelyFlatMap(TimelyFlatMapFunction flatMapper) { + public ProcessOperator(ProcessFunction flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; @@ -88,11 +88,11 @@ public void onProcessingTime(InternalTimer timer) throws Excep public void processElement(StreamRecord element) throws Exception { collector.setTimestamp(element); context.element = element; - userFunction.flatMap(element.getValue(), context, collector); + userFunction.processElement(element.getValue(), context, collector); context.element = null; } - private static class ContextImpl implements TimelyFlatMapFunction.Context { + private static class ContextImpl implements ProcessFunction.Context { private final TimerService timerService; @@ -119,7 +119,7 @@ public TimerService timerService() { } } - private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{ + private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{ private final TimerService timerService; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java similarity index 88% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java index 75e4c141e00f7..e6c3d3f973944 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.SimpleTimerService; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; -import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -36,8 +36,8 @@ import static org.apache.flink.util.Preconditions.checkState; @Internal -public class CoStreamTimelyFlatMap - extends AbstractUdfStreamOperator> +public class CoProcessOperator + extends AbstractUdfStreamOperator> implements TwoInputStreamOperator, Triggerable { private static final long serialVersionUID = 1L; @@ -50,7 +50,7 @@ public class CoStreamTimelyFlatMap private transient OnTimerContextImpl onTimerContext; - public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction flatMapper) { + public CoProcessOperator(CoProcessFunction flatMapper) { super(flatMapper); } @@ -72,7 +72,7 @@ public void open() throws Exception { public void processElement1(StreamRecord element) throws Exception { collector.setTimestamp(element); context.element = element; - userFunction.flatMap1(element.getValue(), context, collector); + userFunction.processElement1(element.getValue(), context, collector); context.element = null; } @@ -80,7 +80,7 @@ public void processElement1(StreamRecord element) throws Exception { public void processElement2(StreamRecord element) throws Exception { collector.setTimestamp(element); context.element = element; - userFunction.flatMap2(element.getValue(), context, collector); + userFunction.processElement2(element.getValue(), context, collector); context.element = null; } @@ -108,7 +108,7 @@ protected TimestampedCollector getCollector() { return collector; } - private static class ContextImpl implements TimelyCoFlatMapFunction.Context { + private static class ContextImpl implements CoProcessFunction.Context { private final TimerService timerService; @@ -135,7 +135,7 @@ public TimerService timerService() { } } - private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext { + private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext { private final TimerService timerService; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 8f002ba7d2982..eaac6b8cc9fac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap; +import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -547,18 +547,19 @@ public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception { } /** - * Verify that a timely flat map call is correctly translated to an operator. + * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to + * an operator. */ @Test - public void testTimelyFlatMapTranslation() { + public void testProcessTranslation() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource src = env.generateSequence(0, 0); - TimelyFlatMapFunction timelyFlatMapFunction = new TimelyFlatMapFunction() { + ProcessFunction processFunction = new ProcessFunction() { private static final long serialVersionUID = 1L; @Override - public void flatMap( + public void processElement( Long value, Context ctx, Collector out) throws Exception { @@ -574,14 +575,14 @@ public void onTimer( } }; - DataStream flatMapped = src + DataStream processed = src .keyBy(new IdentityKeySelector()) - .flatMap(timelyFlatMapFunction); + .process(processFunction); - flatMapped.addSink(new DiscardingSink()); + processed.addSink(new DiscardingSink()); - assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped)); - assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap); + assertEquals(processFunction, getFunctionForDataStream(processed)); + assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java similarity index 85% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java index 6080ddc416eb8..74fd04494d8ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.RichProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; @@ -40,15 +40,15 @@ import static org.junit.Assert.assertEquals; /** - * Tests {@link StreamTimelyFlatMap}. + * Tests {@link ProcessOperator}. */ -public class TimelyFlatMapTest extends TestLogger { +public class ProcessOperatorTest extends TestLogger { @Test public void testTimestampAndWatermarkQuerying() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -77,8 +77,8 @@ public void testTimestampAndWatermarkQuerying() throws Exception { @Test public void testTimestampAndProcessingTimeQuerying() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -105,8 +105,8 @@ public void testTimestampAndProcessingTimeQuerying() throws Exception { @Test public void testEventTimeTimers() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -135,8 +135,8 @@ public void testEventTimeTimers() throws Exception { @Test public void testProcessingTimeTimers() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -164,8 +164,8 @@ public void testProcessingTimeTimers() throws Exception { @Test public void testEventTimeTimerWithState() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -204,8 +204,8 @@ public void testEventTimeTimerWithState() throws Exception { @Test public void testProcessingTimeTimerWithState() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); + ProcessOperator operator = + new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -237,8 +237,8 @@ public void testProcessingTimeTimerWithState() throws Exception { @Test public void testSnapshotAndRestore() throws Exception { - StreamTimelyFlatMap operator = - new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); + ProcessOperator operator = + new ProcessOperator<>(new BothTriggeringFlatMapFunction()); OneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -253,7 +253,7 @@ public void testSnapshotAndRestore() throws Exception { testHarness.close(); - operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); + operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction()); testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO); @@ -286,7 +286,7 @@ public T getKey(T value) throws Exception { } } - private static class QueryingFlatMapFunction implements TimelyFlatMapFunction { + private static class QueryingFlatMapFunction implements ProcessFunction { private static final long serialVersionUID = 1L; @@ -297,7 +297,7 @@ public QueryingFlatMapFunction(TimeDomain timeDomain) { } @Override - public void flatMap(Integer value, Context ctx, Collector out) throws Exception { + public void processElement(Integer value, Context ctx, Collector out) throws Exception { if (timeDomain.equals(TimeDomain.EVENT_TIME)) { out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } else { @@ -313,7 +313,7 @@ public void onTimer( } } - private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction { + private static class TriggeringFlatMapFunction implements ProcessFunction { private static final long serialVersionUID = 1L; @@ -324,7 +324,7 @@ public TriggeringFlatMapFunction(TimeDomain timeDomain) { } @Override - public void flatMap(Integer value, Context ctx, Collector out) throws Exception { + public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect(value); if (timeDomain.equals(TimeDomain.EVENT_TIME)) { ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); @@ -344,7 +344,7 @@ public void onTimer( } } - private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction { + private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction { private static final long serialVersionUID = 1L; @@ -358,7 +358,7 @@ public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { } @Override - public void flatMap(Integer value, Context ctx, Collector out) throws Exception { + public void processElement(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT:" + value); getRuntimeContext().getState(state).update(value); if (timeDomain.equals(TimeDomain.EVENT_TIME)) { @@ -378,12 +378,12 @@ public void onTimer( } } - private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction { + private static class BothTriggeringFlatMapFunction implements ProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap(Integer value, Context ctx, Collector out) throws Exception { + public void processElement(Integer value, Context ctx, Collector out) throws Exception { ctx.timerService().registerProcessingTimeTimer(5); ctx.timerService().registerEventTimeTimer(6); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java similarity index 80% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java index 7c2963132f560..a4493597c3317 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.TimeDomain; -import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction; -import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; @@ -40,15 +40,15 @@ import static org.junit.Assert.assertEquals; /** - * Tests {@link CoStreamTimelyFlatMap}. + * Tests {@link CoProcessOperator}. */ -public class TimelyCoFlatMapTest extends TestLogger { +public class CoProcessOperatorTest extends TestLogger { @Test public void testTimestampAndWatermarkQuerying() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new WatermarkQueryingProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -83,8 +83,8 @@ public void testTimestampAndWatermarkQuerying() throws Exception { @Test public void testTimestampAndProcessingTimeQuerying() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -115,8 +115,8 @@ public void testTimestampAndProcessingTimeQuerying() throws Exception { @Test public void testEventTimeTimers() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new EventTimeTriggeringProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -154,8 +154,8 @@ public void testEventTimeTimers() throws Exception { @Test public void testProcessingTimeTimers() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -191,8 +191,8 @@ public void testProcessingTimeTimers() throws Exception { @Test public void testEventTimeTimerWithState() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -240,8 +240,8 @@ public void testEventTimeTimerWithState() throws Exception { @Test public void testProcessingTimeTimerWithState() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -277,8 +277,8 @@ public void testProcessingTimeTimerWithState() throws Exception { @Test public void testSnapshotAndRestore() throws Exception { - CoStreamTimelyFlatMap operator = - new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); + CoProcessOperator operator = + new CoProcessOperator<>(new BothTriggeringProcessFunction()); TwoInputStreamOperatorTestHarness testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( @@ -298,7 +298,7 @@ public void testSnapshotAndRestore() throws Exception { testHarness.close(); - operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction()); + operator = new CoProcessOperator<>(new BothTriggeringProcessFunction()); testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( operator, @@ -344,17 +344,17 @@ public T getKey(T value) throws Exception { } } - private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction { + private static class WatermarkQueryingProcessFunction implements CoProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()); } @@ -366,18 +366,18 @@ public void onTimer( } } - private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction { + private static class EventTimeTriggeringProcessFunction implements CoProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT1:" + value); ctx.timerService().registerEventTimeTimer(5); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect("INPUT2:" + value); ctx.timerService().registerEventTimeTimer(6); } @@ -393,7 +393,7 @@ public void onTimer( } } - private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction { + private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction { private static final long serialVersionUID = 1L; @@ -401,14 +401,14 @@ private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTime new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT1:" + value); getRuntimeContext().getState(state).update("" + value); ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect("INPUT2:" + value); getRuntimeContext().getState(state).update(value); ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5); @@ -425,18 +425,18 @@ public void onTimer( } } - private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction { + private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT1:" + value); ctx.timerService().registerProcessingTimeTimer(5); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect("INPUT2:" + value); ctx.timerService().registerProcessingTimeTimer(6); } @@ -452,17 +452,17 @@ public void onTimer( } } - private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction { + private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()); } @@ -474,7 +474,7 @@ public void onTimer( } } - private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction { + private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction { private static final long serialVersionUID = 1L; @@ -482,14 +482,14 @@ private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends Ric new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { out.collect("INPUT1:" + value); getRuntimeContext().getState(state).update("" + value); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { out.collect("INPUT2:" + value); getRuntimeContext().getState(state).update(value); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5); @@ -506,17 +506,17 @@ public void onTimer( } } - private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction { + private static class BothTriggeringProcessFunction implements CoProcessFunction { private static final long serialVersionUID = 1L; @Override - public void flatMap1(Integer value, Context ctx, Collector out) throws Exception { + public void processElement1(Integer value, Context ctx, Collector out) throws Exception { ctx.timerService().registerEventTimeTimer(6); } @Override - public void flatMap2(String value, Context ctx, Collector out) throws Exception { + public void processElement2(String value, Context ctx, Collector out) throws Exception { ctx.timerService().registerProcessingTimeTimer(5); } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index 50526b578f240..a7325a4dc60ae 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream} -import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction} +import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction} import org.apache.flink.streaming.api.operators.TwoInputStreamOperator import org.apache.flink.util.Collector @@ -101,30 +101,33 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { } /** - * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams, + * Applies the given [[CoProcessFunction]] on the connected input streams, * thereby creating a transformed output stream. * - * The function will be called for every element in the streams and can produce - * zero or more output. The function can also query the time and set timers. When - * reacting to the firing of set timers the function can emit yet more elements. + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function, + * this function can also query the time and set timers. When reacting to the firing of set + * timers the function can directly emit elements and/or register yet more timers. * - * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]] + * A [[RichCoProcessFunction]] * can be used to gain access to features provided by the * [[org.apache.flink.api.common.functions.RichFunction]] interface. * - * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element - * in the stream. - * - * @return The transformed { @link DataStream}. + * @param coProcessFunction The [[CoProcessFunction]] that is called for each element + * in the stream. + * @return The transformed [[DataStream]]. */ - def flatMap[R: TypeInformation]( - coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = { + @PublicEvolving + def process[R: TypeInformation]( + coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = { - if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.") + if (coProcessFunction == null) { + throw new NullPointerException("CoProcessFunction function must not be null.") + } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.flatMap(coFlatMapper, outType)) + asScalaStream(javaStream.process(coProcessFunction, outType)) } @@ -144,14 +147,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return * The resulting data stream. */ - def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): + def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): DataStream[R] = { - + if (coFlatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - - val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]) } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 66d80c2ec3902..f2999b394fca4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescr import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream} -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction +import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator} @@ -54,28 +54,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] // ------------------------------------------------------------------------ /** - * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby + * Applies the given [[ProcessFunction]] on the input stream, thereby * creating a transformed output stream. * * The function will be called for every element in the stream and can produce * zero or more output. The function can also query the time and set timers. When * reacting to the firing of set timers the function can emit yet more elements. * - * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]] + * The function will be called for every element in the input streams and can produce zero + * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]] + * function, this function can also query the time and set timers. When reacting to the firing + * of set timers the function can directly emit elements and/or register yet more timers. + * + * A [[RichProcessFunction]] * can be used to gain access to features provided by the * [[org.apache.flink.api.common.functions.RichFunction]] * - * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element + * @param processFunction The [[ProcessFunction]] that is called for each element * in the stream. */ - def flatMap[R: TypeInformation]( - flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = { + @PublicEvolving + def process[R: TypeInformation]( + processFunction: ProcessFunction[T, R]): DataStream[R] = { - if (flatMapper == null) { - throw new NullPointerException("TimelyFlatMapFunction must not be null.") + if (processFunction == null) { + throw new NullPointerException("ProcessFunction must not be null.") } - asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]])) + asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]])) } // ------------------------------------------------------------------------ diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 967142b193afb..adb59f2f1bf42 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -23,11 +23,11 @@ import java.lang import org.apache.flink.api.common.functions._ import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.streaming.api.collector.selector.OutputSelector -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction -import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, OnTimerContext} +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext} import org.apache.flink.streaming.api.functions.co.CoMapFunction import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph} -import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap} +import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator} import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger} import org.apache.flink.streaming.api.windowing.windows.GlobalWindow @@ -318,26 +318,26 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { } /** - * Verify that a timely flat map call is correctly translated to an operator. + * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator. */ @Test - def testTimelyFlatMapTranslation(): Unit = { + def testProcessTranslation(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val src = env.generateSequence(0, 0) - val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] { - override def flatMap(value: Long, ctx: Context, out: Collector[Int]): Unit = ??? + val processFunction = new ProcessFunction[Long, Int] { + override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ??? override def onTimer( timestamp: Long, ctx: OnTimerContext, out: Collector[Int]): Unit = ??? } - val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction) + val flatMapped = src.keyBy(x => x).process(processFunction) - assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped)) - assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]]) + assert(processFunction == getFunctionForDataStream(flatMapped)) + assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]]) } @Test def operatorTest() {