From e2d5294600c48d012f030f7ef8e4c76e6c3cd055 Mon Sep 17 00:00:00 2001 From: Erik Schmiegelow Date: Wed, 11 Feb 2015 10:38:33 +0100 Subject: [PATCH 1/3] added ProxySupport for TwitterSource --- .../flume/source/twitter/TwitterSource.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java index 27b2c3ff3d..b9b76f48a8 100644 --- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -34,6 +34,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; +import org.apache.commons.lang.StringUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; @@ -54,6 +55,7 @@ import twitter4j.TwitterStreamFactory; import twitter4j.User; import twitter4j.auth.AccessToken; +import twitter4j.conf.ConfigurationBuilder; /** * Demo Flume source that connects via Streaming API to the 1% sample twitter @@ -106,13 +108,27 @@ public void configure(Context context) { String consumerSecret = context.getString("consumerSecret"); String accessToken = context.getString("accessToken"); String accessTokenSecret = context.getString("accessTokenSecret"); + String proxyHost = context.getString("http.proxyHost"); + String proxyPort = context.getString("http.proxyPort"); + String proxyUser = context.getString("http.proxyUser"); + String proxyPassword = context.getString("http.proxyPassword"); + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true); + if (StringUtils.isNotEmpty(proxyHost)) + cb.setHttpProxyHost(proxyHost); + if (StringUtils.isNotEmpty(proxyPort)) + cb.setHttpProxyPort(Integer.valueOf(proxyPort)); + if (StringUtils.isNotEmpty(proxyUser)) + cb.setHttpProxyUser(proxyUser); + if (StringUtils.isNotEmpty(proxyPassword)) + cb.setHttpProxyPassword(proxyPassword); LOGGER.info("Consumer Key: '" + consumerKey + "'"); LOGGER.info("Consumer Secret: '" + consumerSecret + "'"); LOGGER.info("Access Token: '" + accessToken + "'"); LOGGER.info("Access Token Secret: '" + accessTokenSecret + "'"); - twitterStream = new TwitterStreamFactory().getInstance(); + twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); twitterStream.setOAuthConsumer(consumerKey, consumerSecret); twitterStream.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret)); From a588a31e19d2041b2c447815024c208a27a5bb3a Mon Sep 17 00:00:00 2001 From: Erik Schmiegelow Date: Thu, 19 Feb 2015 13:42:06 +0100 Subject: [PATCH 2/3] added ProxySupport for TwitterSource --- .../source/kafka/KafkaSourceConstants.java | 2 +- .../flume/source/twitter/TwitterSource.java | 47 ++++++++++++------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 169cc1001e..10d57977ed 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -25,7 +25,7 @@ public class KafkaSourceConstants { public static final String AUTO_COMMIT_ENABLED = "auto.commit.enabled"; public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect"; - public static final String GROUP_ID = "group.id"; + public static final String GROUP_ID = "groupid"; public static final String GROUP_ID_FLUME = "groupId"; public static final String PROPERTY_PREFIX = "kafka."; diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java index b9b76f48a8..17550adebf 100644 --- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -71,6 +71,17 @@ public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable, StatusListener { + + public static final String CONSUMER_KEY = "consumerKey"; + public static final String CONSUMER_SECRET = "consumerSecret"; + public static final String ACCESS_TOKEN = "accessToken"; + public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret"; + public static final String HTTP_PROXY_HOST = "http.proxyHost"; + public static final String HTTP_PROXY_PORT = "http.proxyPort"; + public static final String HTTP_PROXY_USER = "http.proxyUser"; + public static final String HTTP_PROXY_PASSWORD = "http.proxyPassword"; + public static final String MAX_BATCH_SIZE = "maxBatchSize"; + public static final String MAX_BATCH_DURATION_MILLIS = "maxBatchDurationMillis"; private TwitterStream twitterStream; private Schema avroSchema; @@ -104,24 +115,24 @@ public TwitterSource() { @Override public void configure(Context context) { - String consumerKey = context.getString("consumerKey"); - String consumerSecret = context.getString("consumerSecret"); - String accessToken = context.getString("accessToken"); - String accessTokenSecret = context.getString("accessTokenSecret"); - String proxyHost = context.getString("http.proxyHost"); - String proxyPort = context.getString("http.proxyPort"); - String proxyUser = context.getString("http.proxyUser"); - String proxyPassword = context.getString("http.proxyPassword"); + String consumerKey = context.getString(CONSUMER_KEY); + String consumerSecret = context.getString(CONSUMER_SECRET); + String accessToken = context.getString(ACCESS_TOKEN); + String accessTokenSecret = context.getString(ACCESS_TOKEN_SECRET); + String proxyHost = context.getString(HTTP_PROXY_HOST); + String proxyPort = context.getString(HTTP_PROXY_PORT); + String proxyUser = context.getString(HTTP_PROXY_USER); + String proxyPassword = context.getString(HTTP_PROXY_PASSWORD); ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true); - if (StringUtils.isNotEmpty(proxyHost)) - cb.setHttpProxyHost(proxyHost); - if (StringUtils.isNotEmpty(proxyPort)) - cb.setHttpProxyPort(Integer.valueOf(proxyPort)); - if (StringUtils.isNotEmpty(proxyUser)) - cb.setHttpProxyUser(proxyUser); - if (StringUtils.isNotEmpty(proxyPassword)) - cb.setHttpProxyPassword(proxyPassword); + if (StringUtils.isNotEmpty(proxyHost) && StringUtils.isNotEmpty(proxyPort)) { + cb.setHttpProxyHost(proxyHost); + cb.setHttpProxyPort(Integer.valueOf(proxyPort)); + if (StringUtils.isNotEmpty(proxyUser) && StringUtils.isNotEmpty(proxyPassword)) { + cb.setHttpProxyUser(proxyUser); + cb.setHttpProxyPassword(proxyPassword); + } + } LOGGER.info("Consumer Key: '" + consumerKey + "'"); LOGGER.info("Consumer Secret: '" + consumerSecret + "'"); @@ -137,8 +148,8 @@ public void configure(Context context) { dataFileWriter = new DataFileWriter( new GenericDatumWriter(avroSchema)); - maxBatchSize = context.getInteger("maxBatchSize", maxBatchSize); - maxBatchDurationMillis = context.getInteger("maxBatchDurationMillis", + maxBatchSize = context.getInteger(MAX_BATCH_SIZE, maxBatchSize); + maxBatchDurationMillis = context.getInteger(MAX_BATCH_DURATION_MILLIS, maxBatchDurationMillis); } From 958132263076e4ca39bd9d3d9e78f7a79ca082d1 Mon Sep 17 00:00:00 2001 From: Erik Schmiegelow Date: Thu, 9 Apr 2015 11:27:19 +0200 Subject: [PATCH 3/3] added ProxySupport for TwitterSource --- .../java/org/apache/flume/source/twitter/TwitterSource.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java index 17550adebf..180db1cb65 100644 --- a/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java +++ b/flume-ng-sources/flume-twitter-source/src/main/java/org/apache/flume/source/twitter/TwitterSource.java @@ -68,8 +68,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable -public class TwitterSource - extends AbstractSource +public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable, StatusListener { public static final String CONSUMER_KEY = "consumerKey";