Skip to content

Commit

Permalink
[streaming] WindowWordCount example added
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Apr 6, 2015
1 parent 12c555f commit 4f4c6b9
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 3 deletions.
Expand Up @@ -46,7 +46,7 @@ public TestStreamEnvironment(int parallelism, long memorySize){

public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
this.executor = executor;
setDefaultLocalParallelism(parallelism);
setParallelism(parallelism);
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions flink-staging/flink-streaming/flink-streaming-examples/pom.xml
Expand Up @@ -265,6 +265,30 @@ under the License.
</configuration>
</execution>

<!-- WindowWordCount -->
<execution>
<id>WindowWordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WindowWordCount</classifier>

<archive>
<manifestEntries>
<program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
</manifestEntries>
</archive>

<includes>
<include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>

<!-- SocketTextStreamWordCount -->
<execution>
<id>SocketTextStreamWordCount</id>
Expand Down
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.windowing;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.examples.wordcount.WordCount;

/**
* Implements a windowed version of the streaming "WordCount" program.
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
* If no parameters are provided, the program is run with default data from
* {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
*
* <p>
* This example shows how to:
* <ul>
* <li>write a simple Flink Streaming program,
* <li>use tuple data types,
* <li>use basic windowing abstractions.
* </ul>
*
*/
public class WindowWordCount {

// *************************************************************************
// PROGRAM
// *************************************************************************

public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data
DataStream<String> text = getTextDataStream(env);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new WordCount.Tokenizer())
// create windows of 250 records slided every 150 records
.window(Count.of(250)).every(Count.of(150))
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).sum(1)
// flatten the windows to a single stream
.flatten();

// emit result
if (fileOutput) {
counts.writeAsText(outputPath);
} else {
counts.print();
}

// execute program
env.execute("WindowWordCount");
}


// *************************************************************************
// UTIL METHODS
// *************************************************************************

private static boolean fileOutput = false;
private static String textPath;
private static String outputPath;

private static boolean parseParameters(String[] args) {

if (args.length > 0) {
// parse input arguments
fileOutput = true;
if (args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
System.err.println("Usage: WindowWordCount <text path> <result path>");
return false;
}
} else {
System.out.println("Executing WindowWordCount example with built-in default data.");
System.out.println(" Provide parameters to read input data from a file.");
System.out.println(" Usage: WindowWordCount <text path> <result path>");
}
return true;
}

private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
// get default test text data
return env.fromElements(WordCountData.WORDS);
}
}
}
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.examples.test;
package org.apache.flink.streaming.examples.test.wordcount;

import org.apache.flink.streaming.examples.wordcount.PojoExample;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.examples.test;
package org.apache.flink.streaming.examples.test.wordcount;

import org.apache.flink.streaming.examples.wordcount.WordCount;
import org.apache.flink.test.testdata.WordCountData;
Expand Down

0 comments on commit 4f4c6b9

Please sign in to comment.