From 02b74128275b305716b423bbfdf445d694ea8313 Mon Sep 17 00:00:00 2001 From: smarthi Date: Wed, 18 Nov 2015 12:48:38 -0500 Subject: [PATCH] FLINK-3041: Twitter Streaming Description section of Streaming Programming guide refers to an incorrect example 'TwitterLocal' --- docs/apis/streaming_guide.md | 6 +++--- .../connectors/twitter/TwitterFilterSource.java | 17 ++++++++--------- .../connectors/twitter/TwitterSource.java | 4 ++-- .../connectors/twitter/TwitterStreaming.java | 4 ++-- .../connectors/twitter/TwitterTopology.java | 4 ++-- .../connectors/json/JSONParserTest.java | 1 - .../connectors/json/JSONParserTest2.java | 1 - .../examples/iteration/IterateExample.java | 14 ++++++-------- .../examples/twitter/TwitterStream.java | 4 ++-- .../GroupedProcessingTimeWindowExample.java | 2 +- .../examples/windowing/TopSpeedWindowing.java | 8 ++++---- 11 files changed, 30 insertions(+), 35 deletions(-) diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index 5f4b1f6bf7838..1c97dd9aacbfb 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -3930,7 +3930,7 @@ In order to connect to Twitter stream the user has to register their program and #### Acquiring the authentication information First of all, a Twitter account is needed. Sign up for free at [twitter.com/signup](https://twitter.com/signup) or sign in at Twitter's [Application Management](https://apps.twitter.com/) and register the application by clicking on the "Create New App" button. Fill out a form about your program and accept the Terms and Conditions. -After selecting the application, the API key and API secret (called `consumerKey` and `sonsumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary access token data (`token` and `secret`) can be acquired here. +After selecting the application, the API key and API secret (called `consumerKey` and `consumerSecret` in `TwitterSource` respectively) is located on the "API Keys" tab. The necessary OAuth Access Token data (`token` and `secret` in `TwitterSource`) can be generated and acquired on the "Keys and Access Tokens" tab. Remember to keep these pieces of information secret and do not push them to public repositories. #### Accessing the authentication information @@ -3948,7 +3948,7 @@ consumerKey=*** The `TwitterSource` class has two constructors. 1. `public TwitterSource(String authPath, int numberOfTweets);` -to emit finite number of tweets +to emit a finite number of tweets 2. `public TwitterSource(String authPath);` for streaming @@ -3991,7 +3991,7 @@ function which can be use to acquire the value of a given field. There are two basic types of tweets. The usual tweets contain information such as date and time of creation, id, user, language and many more details. The other type is the delete information. #### Example -`TwitterLocal` is an example how to use `TwitterSource`. It implements a language frequency counter program. +`TwitterStream` is an example of how to use `TwitterSource`. It implements a language frequency counter program. [Back to top](#top) diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java index 8dd44587624f3..289432232353a 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java @@ -39,22 +39,21 @@ */ public class TwitterFilterSource extends TwitterSource { - private static final Logger LOG = LoggerFactory - .getLogger(TwitterFilterSource.class); + private static final Logger LOG = LoggerFactory.getLogger(TwitterFilterSource.class); private static final long serialVersionUID = 1L; - private List trackTerms = new LinkedList(); + private List trackTerms = new LinkedList<>(); - private List languages = new LinkedList(); + private List languages = new LinkedList<>(); - private List followings = new LinkedList(); + private List followings = new LinkedList<>(); - private List locations = new LinkedList(); + private List locations = new LinkedList<>(); - private Map queryParameters = new HashMap(); + private Map queryParameters = new HashMap<>(); - private Map postParameters = new HashMap(); + private Map postParameters = new HashMap<>(); public TwitterFilterSource(String authPath) { super(authPath); @@ -66,7 +65,7 @@ protected void initializeConnection() { if (LOG.isInfoEnabled()) { LOG.info("Initializing Twitter Streaming API connection"); } - queue = new LinkedBlockingQueue(queueSize); + queue = new LinkedBlockingQueue<>(queueSize); StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); configEndpoint(endpoint); diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java index d290f98dbe313..69f136e16fbe5 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java @@ -77,7 +77,7 @@ public TwitterSource(String authPath) { * @param authPath * Location of the properties file containing the required * authentication information. - * @param numberOfTweets + * @param numberOfTweets max number of tweets * */ public TwitterSource(String authPath, int numberOfTweets) { @@ -101,7 +101,7 @@ protected void initializeConnection() { LOG.info("Initializing Twitter Streaming API connection"); } - queue = new LinkedBlockingQueue(queueSize); + queue = new LinkedBlockingQueue<>(queueSize); StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); endpoint.stallWarnings(false); diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java index a80c32aca67af..3e8ce1b37bcd3 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java @@ -58,7 +58,7 @@ public static class SelectDataFlatMap extends public void flatMap(String value, Collector> out) throws Exception { try { - out.collect(new Tuple5( + out.collect(new Tuple5<>( getLong(value, "id"), getInt(value, "entities.hashtags[0].indices[1]"), getString(value, "lang"), @@ -74,7 +74,7 @@ public void flatMap(String value, Collector out) throws Exception { public static void main(String[] args) throws Exception { - String path = new String(); + String path; if (args != null && args.length == 1) { path = args[0]; @@ -79,7 +79,7 @@ public static void main(String[] args) throws Exception { @Override public Tuple2 map(String value) throws Exception { - return new Tuple2(value, 1); + return new Tuple2<>(value, 1); } }) .keyBy(0) diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java index b1d4115247fc8..33108c96731e4 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collection; -import org.apache.flink.streaming.connectors.json.JSONParser; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; import org.junit.Test; diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java index 885108627e6ef..eb796b4f94567 100644 --- a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java +++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.flink.streaming.connectors.json.JSONParser; import org.apache.sling.commons.json.JSONException; import org.apache.sling.commons.json.JSONObject; import org.junit.Test; diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index 52ec896e8034a..b6e1a612a66a4 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -121,7 +121,7 @@ public void run(SourceContext> ctx) throws Exception { int first = rnd.nextInt(BOUND / 2 - 1) + 1; int second = rnd.nextInt(BOUND / 2 - 1) + 1; - ctx.collect(new Tuple2(first, second)); + ctx.collect(new Tuple2<>(first, second)); counter++; Thread.sleep(50L); } @@ -143,7 +143,7 @@ private static class FibonacciInputMap implements MapFunction map(String value) throws Exception { String record = value.substring(1, value.length() - 1); String[] splitted = record.split(","); - return new Tuple2(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); + return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1])); } } @@ -158,7 +158,7 @@ public static class InputMap implements MapFunction, Tu @Override public Tuple5 map(Tuple2 value) throws Exception { - return new Tuple5(value.f0, value.f1, value.f0, value.f1, 0); + return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); } } @@ -173,8 +173,7 @@ public static class Step implements @Override public Tuple5 map(Tuple5 value) throws Exception { - return new Tuple5(value.f0, value.f1, value.f3, value.f2 + - value.f3, ++value.f4); + return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4); } } @@ -186,7 +185,7 @@ public static class MySelector implements OutputSelector select(Tuple5 value) { - List output = new ArrayList(); + List output = new ArrayList<>(); if (value.f2 < BOUND && value.f3 < BOUND) { output.add("iterate"); } else { @@ -207,8 +206,7 @@ public static class OutputMap implements MapFunction, Integer> map(Tuple5 value) throws Exception { - return new Tuple2, Integer>(new Tuple2(value.f0, value.f1), - value.f4); + return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4); } } diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java index 06872f03e0244..d26dc4257413c 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java @@ -110,8 +110,8 @@ public void flatMap(String value, Collector> out) throws while (tokenizer.hasMoreTokens()) { String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase(); - if (result != null && !result.equals("")) { - out.collect(new Tuple2(result, 1)); + if (!result.equals("")) { + out.collect(new Tuple2<>(result, 1)); } } } diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java index 982b73de0d788..f08069b310ec9 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -59,7 +59,7 @@ public void run(SourceContext> ctx) throws Exception { while (running && count < numElements) { count++; - ctx.collect(new Tuple2(val++, 1L)); + ctx.collect(new Tuple2<>(val++, 1L)); if (val > numKeys) { val = 1L; diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index df3402ed7e217..30eda67c118f8 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -64,9 +64,12 @@ public static void main(String[] args) throws Exception { if (fileInput) { carData = env.readTextFile(inputPath).map(new ParseCarData()); } else { + int numOfCars = 2; carData = env.addSource(CarSource.create(numOfCars)); } + int evictionSec = 10; + double triggerMeters = 50; DataStream> topSpeeds = carData .assignTimestamps(new CarTimestamp()) .keyBy(0) @@ -133,7 +136,7 @@ public void run(SourceContext> ctx) throw speeds[carId] = Math.max(0, speeds[carId] - 5); } distances[carId] += speeds[carId] / 3.6d; - Tuple4 record = new Tuple4(carId, + Tuple4 record = new Tuple4<>(carId, speeds[carId], distances[carId], System.currentTimeMillis()); ctx.collect(record); counter++; @@ -186,9 +189,6 @@ public long getCurrentWatermark() { private static boolean fileInput = false; private static boolean fileOutput = false; - private static int numOfCars = 2; - private static int evictionSec = 10; - private static double triggerMeters = 50; private static String inputPath; private static String outputPath;