Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add filtering capabilities to Twitter source

  • Loading branch information...
commit 2015b85499b286161a8d875d7478034006b2f168 1 parent b8f6256
Jon Natkins authored
27 flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java
View
@@ -63,6 +63,8 @@
/** Size of event batches */
private long batchSize;
+ private String[] keywords;
+
/** The actual Twitter stream. It's set up to collect raw JSON data */
private final TwitterStream twitterStream = new TwitterStreamFactory(
new ConfigurationBuilder()
@@ -83,6 +85,12 @@ public void configure(Context context) {
batchSize = context.getLong(TwitterSourceConstants.BATCH_SIZE_KEY,
TwitterSourceConstants.DEFAULT_BATCH_SIZE);
+
+ String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
+ keywords = keywordString.split(",");
+ for (int i = 0; i < keywords.length; i++) {
+ keywords[i] = keywords[i].trim();
+ }
}
/**
@@ -137,15 +145,16 @@ public void onException(Exception ex) {}
twitterStream.setOAuthAccessToken(token);
// Set up a filter to pull out industry-relevant tweets
- FilterQuery query = new FilterQuery()
- .track(new String[] { "hadoop", "big data", "analytics",
- "bigdata", "cloudera", "data science",
- "data scientiest", "business intelligence",
- "mapreduce", "data warehouse", "data warehousing",
- "mahout", "hbase", "nosql", "newsql",
- "businessintelligence", "cloudcomputing" })
- .setIncludeEntities(true);
- twitterStream.filter(query);
+ if (keywords.length == 0) {
+ logger.debug("Starting up Twitter sampling...");
+ twitterStream.sample();
+ } else {
+ logger.debug("Starting up Twitter filtering...");
+ FilterQuery query = new FilterQuery()
+ .track(keywords)
+ .setIncludeEntities(true);
+ twitterStream.filter(query);
+ }
super.start();
}
1  flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java
View
@@ -10,4 +10,5 @@
public static final String BATCH_SIZE_KEY = "batchSize";
public static final long DEFAULT_BATCH_SIZE = 1000L;
+ public static final String KEYWORDS_KEY = "keywords";
}
Please sign in to comment.
Something went wrong with that request. Please try again.