Permalink
Browse files

Custom Flume source and Flume configuration

This adds a Flume source which will access the Twitter Streaming API and stream
raw JSON tweets through Flume. The configuration defines a setup which will
take the JSON data and load it into HDFS via a Memory Channel.
  • Loading branch information...
1 parent 41c7dbd commit 2b930878293825317e83405376639e549d7d84ab Jon Natkins committed Aug 29, 2012
View
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# The configuration file needs to define the sources,
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent,
+# in this case called 'TwitterAgent'
+
+TwitterAgent.sources = Twitter
+TwitterAgent.channels = MemChannel
+TwitterAgent.sinks = HDFS
+
+TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
+TwitterAgent.sources.Twitter.channels = Channel-1
+TwitterAgent.sources.Twitter.consumerKey = <required>
+TwitterAgent.sources.Twitter.consumerSecret = <required>
+TwitterAgent.sources.Twitter.accessToken = <required>
+TwitterAgent.sources.Twitter.accessTokenSecret = <required>
+
+TwitterAgent.sinks.HDFS.channel = Channel-1
+TwitterAgent.sinks.HDFS.type = hdfs
+TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
+TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
+TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
+TwitterAgent.sinks.HDFS.hdfs.batchSize = 10000
+TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
+TwitterAgent.sinks.HDFS.hdfs.rollCount = 0
+
+TwitterAgent.channels.MemChannel.type = memory
+TwitterAgent.channels.MemChannel.capacity = 10000
+TwitterAgent.channels.MemChannel.transactionCapacity = 10000
View
@@ -0,0 +1,116 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.cloudera</groupId>
+ <artifactId>flume-sources</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>flume-sources</name>
+ <url>http://www.cloudera.com</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <flume.version>1.1.0-cdh4.0.1</flume.version>
+ <hadoop.version>2.0.0-cdh4.0.1</hadoop.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.9</version>
+ <configuration>
+ <buildOutputDirectory>eclipse-classes</buildOutputDirectory>
+ <downloadSources>true</downloadSources>
+ <downloadJavadocs>false</downloadJavadocs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.7.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+
+ <!-- For the Twitter API -->
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>[2.2,)</version>
+ </dependency>
+
+ <!-- Hadoop Dependencies -->
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>${flume.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+</project>
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.flume.source;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.auth.AccessToken;
+import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
+
+/**
+ * A Flume Source, which pulls data from Twitter's streaming API. Currently,
+ * this only supports pulling from the sample API, and only gets new status
+ * updates.
+ */
+public class TwitterSource extends AbstractSource
+ implements EventDrivenSource, Configurable {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(TwitterSource.class);
+
+ /** Information necessary for accessing the Twitter API */
+ private String consumerKey;
+ private String consumerSecret;
+ private String accessToken;
+ private String accessTokenSecret;
+
+ /** Size of event batches */
+ private long batchSize;
+
+ /** The actual Twitter stream. It's set up to collect raw JSON data */
+ private final TwitterStream twitterStream = new TwitterStreamFactory(
+ new ConfigurationBuilder()
+ .setJSONStoreEnabled(true)
+ .build()).getInstance();
+
+ /**
+ * The initialization method for the Source. The context contains all the
+ * Flume configuration info, and can be used to retrieve any configuration
+ * values necessary to set up the Source.
+ */
+ @Override
+ public void configure(Context context) {
+ consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
+ consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
+ accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
+ accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
+
+ batchSize = context.getLong(TwitterSourceConstants.BATCH_SIZE_KEY,
+ TwitterSourceConstants.DEFAULT_BATCH_SIZE);
+ }
+
+ /**
+ * Start processing events. This uses the Twitter Streaming API to sample
+ * Twitter, and process tweets.
+ */
+ @Override
+ public void start() {
+ // The channel is the piece of Flume that sits between the Source and Sink,
+ // and is used to process events.
+ final ChannelProcessor channel = getChannelProcessor();
+
+ final List<Event> eventList = new ArrayList<Event>();
+ final Map<String, String> headers = new HashMap<String, String>();
+
+ // The StatusListener is a twitter4j API, which can be added to a Twitter
+ // stream, and will execute methods every time a message comes in through
+ // the stream.
+ StatusListener listener = new StatusListener() {
+ // The onStatus method is executed every time a new tweet comes in.
+ public void onStatus(Status status) {
+ // The EventBuilder is used to build an event using the headers and
+ // the raw JSON of a tweet
+ eventList.add(EventBuilder.withBody(
+ DataObjectFactory.getRawJSON(status).getBytes(), headers));
+
+ // When we've filled up a batch, we use the channel to process the
+ // list of events.
+ if (eventList.size() >= batchSize) {
+ logger.debug("Processing a batch of {} events", eventList.size());
+ channel.processEventBatch(eventList);
+ eventList.clear();
+ }
+ }
+
+ // This listener will ignore everything except for new tweets
+ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
+ public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
+ public void onScrubGeo(long userId, long upToStatusId) {}
+ public void onException(Exception ex) {}
+ };
+
+ logger.debug("Setting up Twitter sample stream using consumer key {} and" +
+ " access token {}", new String[] { consumerKey, accessToken });
+ // Set up the stream's listener (defined above), and set any necessary
+ // security information.
+ twitterStream.addListener(listener);
+ twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
+ AccessToken token = new AccessToken(accessToken, accessTokenSecret);
+ twitterStream.setOAuthAccessToken(token);
+
+ // Start sampling Twitter!
+ twitterStream.sample();
+ super.start();
+ }
+
+ /**
+ * Stops the Source's event processing and shuts down the Twitter stream.
+ */
+ @Override
+ public void stop() {
+ logger.debug("Shutting down Twitter sample stream...");
+ twitterStream.shutdown();
+ super.stop();
+ }
+}
@@ -0,0 +1,13 @@
+package com.cloudera.flume.source;
+
+public class TwitterSourceConstants {
+
+ public static final String CONSUMER_KEY_KEY = "consumerKey";
+ public static final String CONSUMER_SECRET_KEY = "consumerSecret";
+ public static final String ACCESS_TOKEN_KEY = "accessToken";
+ public static final String ACCESS_TOKEN_SECRET_KEY = "accessTokenSecret";
+
+ public static final String BATCH_SIZE_KEY = "batchSize";
+ public static final long DEFAULT_BATCH_SIZE = 1000L;
+
+}

0 comments on commit 2b93087

Please sign in to comment.