From 741a45445ca2e6ad49f18c5ddd151c04de58fb6f Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Fri, 2 May 2014 12:47:35 -0500 Subject: [PATCH 1/8] Added user information and made some other modifications to increase the readability. --- .../streams-provider-twitter/pom.xml | 10 +- .../provider/TwitterStreamConfigurator.java | 62 +++- .../provider/TwitterStreamProvider.java | 1 - .../provider/TwitterTimelineProvider.java | 119 +++----- .../provider/TwitterTimelineProviderTask.java | 7 - .../TwitterUserInformationProvider.java | 286 ++++++++++++++++++ .../com/twitter/TwitterConfiguration.json | 70 +++++ .../twitter/TwitterStreamConfiguration.json | 61 +--- .../TwitterUserInformationConfiguration.json | 17 ++ .../streams/twitter/test/SimpleTweetTest.java | 4 + 10 files changed, 472 insertions(+), 165 deletions(-) create mode 100644 streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java create mode 100644 streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json create mode 100644 streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 3c27b8c639..8a41ca51a2 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -48,11 +48,6 @@ org.apache.streams streams-config - - org.apache.streams - streams-util - ${project.version} - com.google.guava guava @@ -73,9 +68,8 @@ org.twitter4j twitter4j-core - 3.0.5 + [4.0,) - @@ -118,7 +112,9 @@ true true + src/main/jsonschema/com/twitter/TwitterConfiguration.json src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json + src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json src/main/jsonschema/com/twitter/Delete.json src/main/jsonschema/com/twitter/Retweet.json src/main/jsonschema/com/twitter/tweet.json diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 9bf2d9a00a..5435f24209 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -1,15 +1,15 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.twitter.TwitterBasicAuthConfiguration; -import org.apache.streams.twitter.TwitterOAuthConfiguration; -import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -18,19 +18,18 @@ public class TwitterStreamConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); - public static TwitterStreamConfiguration detectConfiguration(Config twitter) { - TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration(); - - twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint")); + public static TwitterConfiguration detectTwitterConfiguration(Config config) { + TwitterConfiguration twitterConfiguration = new TwitterConfiguration(); try { Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth"); TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration(); twitterBasicAuthConfiguration.setUsername(basicauth.getString("username")); twitterBasicAuthConfiguration.setPassword(basicauth.getString("password")); - twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration); + twitterConfiguration.setBasicauth(twitterBasicAuthConfiguration); } catch( ConfigException ce ) {} try { @@ -40,27 +39,60 @@ public static TwitterStreamConfiguration detectConfiguration(Config twitter) { twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret")); twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken")); twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret")); - twitterStreamConfiguration.setOauth(twitterOAuthConfiguration); + twitterConfiguration.setOauth(twitterOAuthConfiguration); } catch( ConfigException ce ) {} + twitterConfiguration.setEndpoint(config.getString("endpoint")); + + return twitterConfiguration; + } + + public static TwitterStreamConfiguration detectConfiguration(Config config) { + + TwitterStreamConfiguration twitterStreamConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterStreamConfiguration.class); + try { - twitterStreamConfiguration.setTrack(twitter.getStringList("track")); + twitterStreamConfiguration.setTrack(config.getStringList("track")); } catch( ConfigException ce ) {} try { + // create the array List follows = Lists.newArrayList(); - for( Integer id : twitter.getIntList("follow")) - follows.add(new Long(id)); + // add the ids of the people we want to 'follow' + for(Integer id : config.getIntList("follow")) + follows.add((long)id); + // set the array twitterStreamConfiguration.setFollow(follows); + } catch( ConfigException ce ) {} - twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level")); - twitterStreamConfiguration.setWith(twitter.getString("with")); - twitterStreamConfiguration.setReplies(twitter.getString("replies")); + twitterStreamConfiguration.setFilterLevel(config.getString("filter-level")); + twitterStreamConfiguration.setWith(config.getString("with")); + twitterStreamConfiguration.setReplies(config.getString("replies")); twitterStreamConfiguration.setJsonStoreEnabled("true"); twitterStreamConfiguration.setIncludeEntities("true"); return twitterStreamConfiguration; } + public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) { + + TwitterUserInformationConfiguration twitterUserInformationConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class); + + try { + if(config.hasPath("info")) + { + List info = new ArrayList(); + + for (String s : config.getStringList("info")) + info.add(s); + } + } + catch(Exception e) { + LOGGER.error("There was an error: {}", e.getMessage()); + } + + return twitterUserInformationConfiguration; + } + } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 3df7d02ce8..b1785e526e 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -1,6 +1,5 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b9551ada48..2c39cf93f5 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -1,11 +1,7 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -49,17 +45,11 @@ public void setConfig(TwitterStreamConfiguration config) { this.config = config; } - protected volatile Queue providerQueue = new LinkedBlockingQueue(); + protected final Queue providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue(500)); - protected Twitter client; + protected int idsCount; protected Iterator ids; - ListenableFuture providerTaskComplete; -// -// public BlockingQueue getInQueue() { -// return inQueue; -// } - protected ListeningExecutorService executor; protected DateTime start; @@ -74,6 +64,7 @@ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int public TwitterTimelineProvider() { Config config = StreamsConfigurator.config.getConfig("twitter"); this.config = TwitterStreamConfigurator.detectConfiguration(config); + } public TwitterTimelineProvider(TwitterStreamConfiguration config) { @@ -95,43 +86,19 @@ public Queue getProviderQueue() { return this.providerQueue; } -// public void run() { -// -// LOGGER.info("{} Running", STREAMS_ID); -// -// while( ids.hasNext() ) { -// Long currentId = ids.next(); -// LOGGER.info("Provider Task Starting: {}", currentId); -// captureTimeline(currentId); -// } -// -// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID); -// -// client.shutdown(); -// -// LOGGER.info("{} Exiting", STREAMS_ID); -// -// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) { -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) {} -// } -// } - @Override public void startStream() { // no op } - private void captureTimeline(long currentId) { + protected void captureTimeline(long currentId) { Paging paging = new Paging(1, 200); List statuses = null; - boolean KeepGoing = true; - boolean hadFailure = false; do { + Twitter client = getTwitterClient(); int keepTrying = 0; // keep trying to load, give it 5 attempts. @@ -143,20 +110,12 @@ private void captureTimeline(long currentId) { { statuses = client.getUserTimeline(currentId, paging); - for (Status tStat : statuses) - { -// if( provider.start != null && -// provider.start.isAfter(new DateTime(tStat.getCreatedAt()))) -// { -// // they hit the last date we wanted to collect -// // we can now exit early -// KeepGoing = false; -// } - // emit the record - String json = DataObjectFactory.getRawJSON(tStat); - - providerQueue.offer(new StreamsDatum(json)); + for (Status tStat : statuses) { + String json = TwitterObjectFactory.getRawJSON(tStat); + while(!providerQueue.offer(new StreamsDatum(json))) { + sleep(); + } } paging.setPage(paging.getPage() + 1); @@ -166,19 +125,36 @@ private void captureTimeline(long currentId) { catch(TwitterException twitterException) { keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } - catch(Exception e) - { - hadFailure = true; + catch(Exception e) { keepTrying += TwitterErrorHandler.handleTwitterError(client, e); } - finally - { - // Shutdown the twitter to release the resources - client.shutdown(); - } } } - while ((statuses != null) && (statuses.size() > 0) && KeepGoing); + while (shouldContinuePulling(statuses)); + } + + private Map userPullInfo; + + protected boolean shouldContinuePulling(List statuses) { + return (statuses != null) && (statuses.size() > 0); + } + + private void sleep() + { + Thread.yield(); + try { + // wait one tenth of a millisecond + Thread.yield(); + Thread.sleep(new Random().nextInt(2)); + Thread.yield(); + } + catch(IllegalArgumentException e) { + // passing in static values, this will never happen + } + catch(InterruptedException e) { + // noOp, there must have been an issue sleeping + } + Thread.yield(); } public StreamsResultSet readCurrent() { @@ -244,21 +220,19 @@ public void prepare(Object o) { executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); Preconditions.checkNotNull(config.getOauth().getAccessToken()); Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getFollow()); - Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); - Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true); - + idsCount = config.getFollow().size(); ids = config.getFollow().iterator(); + } + protected Twitter getTwitterClient() + { String baseUrl = "https://api.twitter.com:443/1.1/"; ConfigurationBuilder builder = new ConfigurationBuilder() @@ -266,23 +240,18 @@ public void prepare(Object o) { .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) .setOAuthAccessToken(config.getOauth().getAccessToken()) .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(includeEntitiesEnabled) - .setJSONStoreEnabled(jsonStoreEnabled) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) .setAsyncNumThreads(3) .setRestBaseURL(baseUrl) .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setIncludeRTsEnabled(Boolean.TRUE) .setPrettyDebugEnabled(Boolean.TRUE); - client = new TwitterFactory(builder.build()).getInstance(); - + return new TwitterFactory(builder.build()).getInstance(); } @Override public void cleanUp() { - - client.shutdown(); - shutdownAndAwaitTermination(executor); } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index 9619f4fed6..9a1d4e71f8 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -74,19 +74,12 @@ public void run() { hadFailure = true; keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e); } - finally - { - // Shutdown the twitter to release the resources - twitter.shutdown(); - } } } while ((statuses != null) && (statuses.size() > 0) && KeepGoing); LOGGER.info("Provider Finished. Cleaning up..."); - twitter.shutdown(); - LOGGER.info("Provider Exiting"); } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java new file mode 100644 index 0000000000..dac5cd61b0 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -0,0 +1,286 @@ +package org.apache.streams.twitter.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import twitter4j.*; +import twitter4j.conf.ConfigurationBuilder; +import twitter4j.json.DataObjectFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +public class TwitterUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + + private TwitterUserInformationConfiguration twitterUserInformationConfiguration; + + private Class klass; + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public TwitterUserInformationConfiguration getConfig() { return twitterUserInformationConfiguration; } + + public void setConfig(TwitterUserInformationConfiguration config) { this.twitterUserInformationConfiguration = config; } + + protected Iterator idsBatches; + protected Iterator screenNameBatches; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public TwitterUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.twitterUserInformationConfiguration = config; + } + + public TwitterUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + this.klass = klass; + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) { + this.twitterUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + // no op + } + + + private void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + long[] toQuery = new long[ids.length]; + for(int i = 0; i < ids.length; i++) + toQuery[i] = ids[i]; + + for (User tStat : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + private void loadBatch(String[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + for (User tStat : client.lookupUsers(ids)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); + + LOGGER.info("readCurrent"); + + while(idsBatches.hasNext()) + loadBatch(idsBatches.next()); + + while(screenNameBatches.hasNext()) + loadBatch(screenNameBatches.next()); + + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + + LOGGER.info("Exiting"); + + return result; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getInfo()); + + List screenNames = new ArrayList(); + List screenNameBatches = new ArrayList(); + + List ids = new ArrayList(); + List idsBatches = new ArrayList(); + + for(String s : twitterUserInformationConfiguration.getInfo()) { + if(s != null) + { + String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); + + // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the + // screen name list + try { + ids.add(Long.parseLong(potentialScreenName)); + } catch (Exception e) { + screenNames.add(potentialScreenName); + } + + // Twitter allows for batches up to 100 per request, but you cannot mix types + + if(ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new Long[ids.size()])); + // reset the Ids + ids = new ArrayList(); + } + + if(screenNames.size() >= 100) { + // add the batch + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + // reset the Ids + screenNames = new ArrayList(); + } + } + } + + + if(ids.size() > 0) + idsBatches.add(ids.toArray(new Long[ids.size()])); + + if(screenNames.size() > 0) + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + + this.idsBatches = idsBatches.iterator(); + this.screenNameBatches = screenNameBatches.iterator(); + } + + protected Twitter getTwitterClient() + { + String baseUrl = "https://api.twitter.com:443/1.1/"; + + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(twitterUserInformationConfiguration.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(twitterUserInformationConfiguration.getOauth().getConsumerSecret()) + .setOAuthAccessToken(twitterUserInformationConfiguration.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(3) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); + + return new TwitterFactory(builder.build()).getInstance(); + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } +} diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json new file mode 100644 index 0000000000..9e22b93317 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json @@ -0,0 +1,70 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "protocol": { + "type": "string", + "description": "The protocol" + }, + "host": { + "type": "string", + "description": "The host" + }, + "port": { + "type": "integer", + "description": "The port" + }, + "version": { + "type": "string", + "description": "The version" + }, + "endpoint": { + "type": "string", + "description": "The endpoint" + }, + "jsonStoreEnabled": { + "default" : true, + "type": "string" + }, + "oauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "appName": { + "type": "string" + }, + "consumerKey": { + "type": "string" + }, + "consumerSecret": { + "type": "string" + }, + "accessToken": { + "type": "string" + }, + "accessTokenSecret": { + "type": "string" + } + } + }, + "basicauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json index c1a0d0c6ab..2ff73627de 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json @@ -3,34 +3,12 @@ "$schema": "http://json-schema.org/draft-03/schema", "id": "#", "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, "javaInterfaces": ["java.io.Serializable"], "properties": { - "protocol": { - "type": "string", - "description": "The protocol" - }, - "host": { - "type": "string", - "description": "The host" - }, - "port": { - "type": "integer", - "description": "The port" - }, - "version": { - "type": "string", - "description": "The version" - }, - "endpoint": { - "type": "string", - "description": "The endpoint" - }, "includeEntities": { "type": "string" }, - "jsonStoreEnabled": { - "type": "string" - }, "truncated": { "type": "boolean" }, @@ -59,43 +37,6 @@ "items": { "type": "string" } - }, - "oauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "appName": { - "type": "string" - }, - "consumerKey": { - "type": "string" - }, - "consumerSecret": { - "type": "string" - }, - "accessToken": { - "type": "string" - }, - "accessTokenSecret": { - "type": "string" - } - } - }, - "basicauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "username": { - "type": "string" - }, - "password": { - "type": "string" - } - } } } } \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json new file mode 100644 index 0000000000..afd203f90d --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json @@ -0,0 +1,17 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream", + "items": { + "type": "string" + } + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index b8bfe1a21a..31ddfce590 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -6,6 +6,8 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; @@ -21,6 +23,8 @@ import java.io.InputStreamReader; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** From 544a0c92ecfa1ab7e1334721abeeb881cd25cd3f Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Fri, 2 May 2014 13:03:50 -0500 Subject: [PATCH 2/8] S3 Reader / Writer for apache streams --- streams-contrib/pom.xml | 1 + streams-contrib/streams-amazon-aws/pom.xml | 67 +++++ .../streams-persist-s3/pom.xml | 87 ++++++ .../org/apache/streams/s3/S3Configurator.java | 64 +++++ .../s3/S3ObjectInputStreamWrapper.java | 111 ++++++++ .../streams/s3/S3OutputStreamWrapper.java | 128 +++++++++ .../apache/streams/s3/S3PersistReader.java | 141 ++++++++++ .../streams/s3/S3PersistReaderTask.java | 87 ++++++ .../apache/streams/s3/S3PersistWriter.java | 257 ++++++++++++++++++ .../streams/s3/S3PersistWriterTask.java | 37 +++ .../apache/streams/s3/S3Configuration.json | 25 ++ .../streams/s3/S3ReaderConfiguration.json | 14 + .../streams/s3/S3WriterConfiguration.json | 28 ++ 13 files changed, 1047 insertions(+) create mode 100644 streams-contrib/streams-amazon-aws/pom.xml create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json create mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index d80fc632df..c7bbdf4966 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,6 +44,7 @@ streams-persist-hdfs streams-persist-kafka streams-persist-mongo + streams-amazon-aws streams-processor-urls diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml new file mode 100644 index 0000000000..57a67cb032 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/pom.xml @@ -0,0 +1,67 @@ + + + + + + streams-contrib + org.apache.streams + 0.1-SNAPSHOT + + 4.0.0 + + streams-amazon-aws + + pom + streams-amazon-aws + + + + + + + streams-persist-s3 + + + + + + com.amazonaws + aws-java-sdk + 1.7.5 + + + org.apache.streams + streams-config + ${project.version} + + + org.apache.streams + streams-core + ${project.version} + + + org.apache.streams + streams-pojo + ${project.version} + + + + diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml new file mode 100644 index 0000000000..4e9b9b1d5f --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml @@ -0,0 +1,87 @@ + + + + streams-amazon-aws + org.apache.streams + 0.1-SNAPSHOT + + 4.0.0 + + streams-persist-s3 + + + + org.apache.streams + streams-config + ${project.version} + + + org.apache.streams + streams-core + ${project.version} + + + org.apache.streams + streams-pojo + ${project.version} + + + com.amazonaws + aws-java-sdk + + + org.apache.streams + streams-util + ${project.version} + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/jsonschema2pojo + + + + + + + org.jsonschema2pojo + jsonschema2pojo-maven-plugin + + true + true + + src/main/jsonschema/org/apache/streams/s3/S3Configuration.json + src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json + src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json + + target/generated-sources/jsonschema2pojo + org.apache.streams.s3.pojo + true + true + + + + + generate + + + + + + + diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java new file mode 100644 index 0000000000..81904041e3 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -0,0 +1,64 @@ +package org.apache.streams.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3Configurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3Configurator.class); + + private final static ObjectMapper mapper = new ObjectMapper(); + + public static S3Configuration detectConfiguration(Config s3) { + + S3Configuration s3Configuration = new S3Configuration(); + + s3Configuration.setBucket(s3.getString("bucket")); + s3Configuration.setKey(s3.getString("key")); + s3Configuration.setSecretKey(s3.getString("secretKey")); + + // The Amazon S3 Library defaults to HTTPS + String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase(); + + if(!(protocol.equals("https") || protocol.equals("http"))) { + // you must specify either HTTP or HTTPS + } + + s3Configuration.setProtocol(protocol.toLowerCase()); + + return s3Configuration; + } + + public static S3ReaderConfiguration detectReaderConfiguration(Config s3) { + + S3Configuration S3Configuration = detectConfiguration(s3); + S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class); + + s3ReaderConfiguration.setReaderPath(s3.getString("readerPath")); + + return s3ReaderConfiguration; + } + + public static S3WriterConfiguration detectWriterConfiguration(Config s3) { + + S3Configuration s3Configuration = detectConfiguration(s3); + S3WriterConfiguration s3WriterConfiguration = mapper.convertValue(s3Configuration, S3WriterConfiguration.class); + + String rootPath = s3.getString("writerPath"); + + // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path. + s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/")); + + s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default"); + + if(s3.hasPath("maxFileSize")) + s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize")); + if(s3.hasPath("chunk")) + s3WriterConfiguration.setChunk(s3.getBoolean("chunk")); + + return s3WriterConfiguration; + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java new file mode 100644 index 0000000000..2a2dba0377 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -0,0 +1,111 @@ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +/** + * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over + * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read + * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will + * exhaust the stream and transfer a ton of data attempting to do so. + * + * + * Author Smashew + * Date 2014-04-11 + * + * After a few more days and some demos that had some issues with concurrency and high user load. This + * was also discovered. There is an issue with the S3Object's HTTP connection not being released back + * to the connection pool (until it times out) even once the item is garbage collected. So.... + * + * Reference: + * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 + */ +public class S3ObjectInputStreamWrapper extends InputStream +{ + private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class); + + private final S3Object s3Object; + private final S3ObjectInputStream is; + private boolean isClosed = false; + + public S3ObjectInputStreamWrapper(S3Object s3Object) { + this.s3Object = s3Object; + this.is = this.s3Object.getObjectContent(); + } + + public int hashCode() { return this.is.hashCode(); } + public boolean equals(Object obj) { return this.is.equals(obj); } + public String toString() { return this.is.toString(); } + public int read() throws IOException { return this.is.read(); } + public int read(byte[] b) throws IOException { return this.is.read(b); } + public int read(byte[] b, int off, int len) throws IOException { return this.is.read(b, off, len); } + public long skip(long n) throws IOException { return this.is.skip(n); } + public int available() throws IOException { return this.is.available(); } + public boolean markSupported() { return this.is.markSupported(); } + public synchronized void mark(int readlimit) { this.is.mark(readlimit); } + public synchronized void reset() throws IOException { this.is.reset(); } + + public void close() throws IOException { + ensureEverythingIsReleased(); + } + + public void ensureEverythingIsReleased() + { + if(this.isClosed) + return; + + // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION! + // Amazon S3 - HTTP Exhaust all file contents issue + try { + this.is.abort(); + } + catch(Exception e) { + LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage()); + } + + // close the input Stream Safely + closeSafely(this.is); + + // This corrects the issue with Open HTTP connections + closeSafely(this.s3Object); + this.isClosed = true; + } + + private static void closeSafely(Closeable is) { + try { + if(is != null) + is.close(); + } catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage()); + } + } + + protected void finalize( ) throws Throwable + { + try + { + ensureEverythingIsReleased(); + super.finalize(); + } catch(Exception e) { + // this should never be called, just being very cautious + LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage()); + } + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java new file mode 100644 index 0000000000..8f55983c3d --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -0,0 +1,128 @@ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.apache.commons.io.FilenameUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Date; +import java.util.Map; + +/** + * + * Author: Smashew + * Date: 2014-04-14 + * + * Description: + * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. + * + * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload + * is kind of a PITA to deal with. + * + * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones + */ +public class S3OutputStreamWrapper extends OutputStream +{ + private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class); + + private final AmazonS3Client amazonS3Client; + private final String bucketName; + private final String path; + private final String fileName; + private ByteArrayOutputStream outputStream; + private final Map metaData; + + private boolean isClosed = false; + + public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map metaData) throws IOException + { + this.amazonS3Client = amazonS3Client; + this.bucketName = bucketName; + this.path = path; + this.fileName = fileName; + this.metaData = metaData; + this.outputStream = new ByteArrayOutputStream(); + } + + /* + * The Methods that are overriden to support the 'OutputStream' object. + */ + + public void write(int b) throws IOException { this.outputStream.write(b); } + public void write(byte[] b) throws IOException { this.outputStream.write(b); } + public void write(byte[] b, int off, int len) throws IOException { this.outputStream.write(b, off, len); } + public void flush() throws IOException { this.outputStream.flush(); } + + /** + * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. + * @throws IOException + * Exception thrown from the FileOutputStream + */ + public void close() throws IOException { + if(!isClosed) + { + try + { + this.addFile(); + this.outputStream.close(); + this.outputStream = null; + } + catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("There was an error adding the temporaryFile to S3"); + } + finally { + // we are done here. + this.isClosed = true; + } + } + } + + private void addFile() throws Exception { + + InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray()); + int contentLength = outputStream.size(); + + TransferManager transferManager = new TransferManager(amazonS3Client); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate()); + metadata.setContentLength(contentLength); + + metadata.addUserMetadata("writer", "org.apache.streams"); + + for(String s : metaData.keySet()) + metadata.addUserMetadata(s, metaData.get(s)); + + String fileNameToWrite = path + fileName; + Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata); + try { + upload.waitForUploadResult(); + + is.close(); + transferManager.shutdownNow(false); + LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName); + } catch (Exception e) { + // No Op + } + + + } + + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java new file mode 100644 index 0000000000..a987a47d22 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -0,0 +1,141 @@ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; +import org.apache.streams.core.*; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigInteger; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class); + public final static String STREAMS_ID = "S3PersistReader"; + protected final static char DELIMITER = '\t'; + + private S3ReaderConfiguration s3ReaderConfiguration; + private AmazonS3Client amazonS3Client; + private ObjectMapper mapper = new ObjectMapper(); + private Collection files; + private ExecutorService executor; + protected volatile Queue persistQueue; + + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + + public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } + public S3ReaderConfiguration getS3ReaderConfiguration() { return this.s3ReaderConfiguration; } + public String getBucketName() { return this.s3ReaderConfiguration.getBucket(); } + public StreamsResultSet readNew(BigInteger sequence) { return null; } + public StreamsResultSet readRange(DateTime start, DateTime end) { return null; } + public DatumStatusCounter getDatumStatusCounter() { return countersTotal; } + public Collection getFiles() { return this.files; } + + public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { + this.s3ReaderConfiguration = s3ReaderConfiguration; + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) + { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + + final ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(this.s3ReaderConfiguration.getBucket()) + .withPrefix(s3ReaderConfiguration.getReaderPath()) + .withMaxKeys(50); + + + ObjectListing listing = this.amazonS3Client.listObjects(request); + + this.files = new ArrayList(); + + /** + * If you can list files that are in this path, then you must be dealing with a directory + * if you cannot list files that are in this path, then you are most likely dealing with + * a simple file. + */ + if(listing.getCommonPrefixes().size() > 0) { + // Handle the 'directory' use case + do + { + for (String file : listing.getCommonPrefixes()) + this.files.add(file); + + // get the next batch. + listing = this.amazonS3Client.listNextBatchOfObjects(listing); + } while (listing.isTruncated()); + } + else { + // handle the single file use-case + this.files.add(s3ReaderConfiguration.getReaderPath()); + } + + if(this.files.size() <= 0) + LOGGER.error("There are no files to read"); + + this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(10000)); + this.executor = Executors.newSingleThreadExecutor(); + } + + public void cleanUp() { } + + public StreamsResultSet readAll() { + startStream(); + return new StreamsResultSet(persistQueue); + } + + public void startStream() { + LOGGER.debug("startStream"); + executor.submit(new S3PersistReaderTask(this)); + } + + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized( S3PersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); + } + return current; + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java new file mode 100644 index 0000000000..70015fb9e2 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -0,0 +1,87 @@ +package org.apache.streams.s3; + +import com.google.common.base.Strings; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.InputStreamReader; + +public class S3PersistReaderTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class); + + private S3PersistReader reader; + + public S3PersistReaderTask(S3PersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { + + for(String file : reader.getFiles()) + { + // Create our buffered reader + + S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); + LOGGER.info("Reading: {} ", file); + + String line = ""; + try { + while((line = bufferedReader.readLine()) != null) + { + if( !Strings.isNullOrEmpty(line) ) + { + reader.countersCurrent.incrementAttempt(); + String[] fields = line.split(Character.toString(reader.DELIMITER)); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); + write( entry ); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } + + LOGGER.info("Completed: " + file); + + try { + closeSafely(file, is); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error(e.getMessage()); + } + } + } + + private static void closeSafely(String file, Closeable closeable) { + try { + closeable.close(); + } + catch(Exception e) { + LOGGER.error("There was an issue closing file: {}", file); + } + } + + + private void write( StreamsDatum entry ) { + boolean success; + do { + synchronized( S3PersistReader.class ) { + success = reader.persistQueue.offer(entry); + } + Thread.yield(); + } + while( !success ); + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java new file mode 100644 index 0000000000..c46ff03b31 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -0,0 +1,257 @@ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.AtomicDouble; +import org.apache.streams.core.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable +{ + public final static String STREAMS_ID = "S3PersistWriter"; + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class); + + private final static char DELIMITER = '\t'; + + private ObjectMapper objectMapper; + private AmazonS3Client amazonS3Client; + private S3WriterConfiguration s3WriterConfiguration; + private final List writtenFiles = new ArrayList(); + + private final AtomicLong totalBytesWritten = new AtomicLong(); + private AtomicLong bytesWrittenThisFile = new AtomicLong(); + + private final AtomicInteger totalRecordsWritten = new AtomicInteger(); + private AtomicInteger fileLineCounter = new AtomicInteger(); + + private Map objectMetaData = new HashMap() {{ + put("line[0]", "id"); + put("line[1]", "timeStamp"); + put("line[2]", "metaData"); + put("line[3]", "document"); + }}; + + private OutputStreamWriter currentWriter = null; + protected volatile Queue persistQueue; + + public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } + public S3WriterConfiguration getS3WriterConfiguration() { return this.s3WriterConfiguration; } + public List getWrittenFiles() { return this.writtenFiles; } + public Map getObjectMetaData() { return this.objectMetaData; } + public ObjectMapper getObjectMapper() { return this.objectMapper; } + + public void setObjectMapper(ObjectMapper mapper) { this.objectMapper = mapper; } + public void setObjectMetaData(Map val) { this.objectMetaData = val; } + + /** + * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. + * @param amazonS3Client + * If you have an existing amazonS3Client, it wont' bother to create another one + * @param s3WriterConfiguration + * Configuration of the write paths and instructions are still required. + */ + public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) { + this.amazonS3Client = amazonS3Client; + this.s3WriterConfiguration = s3WriterConfiguration; + } + + public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { + this.s3WriterConfiguration = s3WriterConfiguration; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + synchronized (this) + { + // Check to see if we need to reset the file that we are currently working with + if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { + try { + LOGGER.info("Resetting the file"); + this.currentWriter = resetFile(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + + String line = convertResultToString(streamsDatum); + + try { + this.currentWriter.write(line); + } catch (IOException e) { + e.printStackTrace(); + } + + // add the bytes we've written + int recordSize = line.getBytes().length; + this.totalBytesWritten.addAndGet(recordSize); + this.bytesWrittenThisFile.addAndGet(recordSize); + + // increment the record count + this.totalRecordsWritten.incrementAndGet(); + this.fileLineCounter.incrementAndGet(); + } + + } + + private synchronized OutputStreamWriter resetFile() throws Exception + { + // this will keep it thread safe, so we don't create too many files + if(this.fileLineCounter.get() == 0 && this.currentWriter != null) + return this.currentWriter; + + closeAndDestroyWriter(); + + // Create the path for where the file is going to live. + try + { + // generate a file name + String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + + (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv"; + + // create the output stream + OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client, + this.s3WriterConfiguration.getBucket(), + this.s3WriterConfiguration.getWriterPath(), + fileName, + this.objectMetaData); + + // reset the counter + this.fileLineCounter = new AtomicInteger(); + this.bytesWrittenThisFile = new AtomicLong(); + + // add this to the list of written files + writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName); + + // Log that we are creating this file + LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName); + + // return the output stream + return new OutputStreamWriter(outputStream); + } + catch (Exception e) + { + LOGGER.error(e.getMessage()); + throw e; + } + } + + private synchronized void closeAndDestroyWriter() { + // if there is a current writer, we must close it first. + if (this.currentWriter != null) { + this.safeFlush(this.currentWriter); + this.closeSafely(this.currentWriter); + this.currentWriter = null; + + // + LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); + } + } + + private synchronized void closeSafely(Writer writer) { + if(writer != null) { + try { + writer.flush(); + writer.close(); + } + catch(Exception e) { + // noOp + } + LOGGER.debug("File Closed"); + } + } + + private void safeFlush(Flushable flushable) { + // This is wrapped with a ByteArrayOutputStream, so this is really safe. + if(flushable != null) { + try { + flushable.flush(); + } + catch(IOException e) { + // noOp + } + } + } + + + private String convertResultToString(StreamsDatum entry) + { + String metadata = null; + + try { + metadata = objectMapper.writeValueAsString(entry.getMetadata()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + String documentJson = null; + try { + documentJson = objectMapper.writeValueAsString(entry.getDocument()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + // Save the class name that it came from + entry.metadata.put("class", entry.getDocument().getClass().getName()); + + if(Strings.isNullOrEmpty(documentJson)) + return null; + else + return entry.getId() + DELIMITER + // [0] = Unique id of the verbatim + entry.getTimestamp() + DELIMITER + // [1] = Timestamp of the item + metadata + DELIMITER + // [2] = Metadata of the item + documentJson + "\n"; // [3] = The actual object + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) { + + // if the user has chosen to not set the object mapper, then set a default object mapper for them. + if(this.objectMapper == null) + this.objectMapper = new ObjectMapper(); + + // Create the credentials Object + if(this.amazonS3Client == null) + { + AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + } + } + + public void cleanUp() { + closeAndDestroyWriter(); + } + + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten.get()); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get()); + return counters; + } +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java new file mode 100644 index 0000000000..d791c87d2e --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java @@ -0,0 +1,37 @@ +package org.apache.streams.s3; + +import org.apache.streams.core.StreamsDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; + +public class S3PersistWriterTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriterTask.class); + + private S3PersistWriter writer; + + public S3PersistWriterTask(S3PersistWriter writer) { + this.writer = writer; + } + + @Override + public void run() { + while(true) { + if( writer.persistQueue.peek() != null ) { + try { + StreamsDatum entry = writer.persistQueue.remove(); + writer.write(entry); + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(new Random().nextInt(1)); + } catch (InterruptedException e) {} + } + + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json new file mode 100644 index 0000000000..863668f485 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json @@ -0,0 +1,25 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3Configuration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "key": { + "type": "string", + "description": "Your Amazon Key" + }, + "secretKey": { + "type": "string", + "description": "Your Amazon Secret Key" + }, + "bucket": { + "type": "string", + "description": "The AWS bucket you want to write to" + }, + "protocol": { + "type": "string", + "description": "Whether you are using HTTP or HTTPS" + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json new file mode 100644 index 0000000000..2959b3dd76 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3ReaderConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "readerPath": { + "type": "string", + "description": "Path below root path" + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json new file mode 100644 index 0000000000..f43087bb23 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json @@ -0,0 +1,28 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3WriterConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "writerPath": { + "type": "string", + "description": "Path " + }, + "writerFilePrefix": { + "type": "string", + "description": "File Prefix" + }, + "maxFileSize": { + "type": "integer", + "default" : 20, + "description": "If files are elected to be 'chunked' which they are by default, this is the maximum size of that file before the byte array stream is vacated and the file is created." + }, + "chunk": { + "type": "boolean", + "default" : true, + "description": "Whether you want the file chunked inside of a folder or not" + } + } +} \ No newline at end of file From 651be7921193eede7218fee62232ac3faf5abfaa Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Fri, 2 May 2014 13:35:39 -0500 Subject: [PATCH 3/8] https://issues.apache.org/jira/browse/STREAMS-73 --- .../local/tasks/LocalStreamProcessMonitorThread.java | 8 +++++++- .../streams/local/tasks/StatusCounterMonitorRunnable.java | 6 ++++++ .../streams/local/tasks/StatusCounterMonitorThread.java | 8 +++++++- 3 files changed, 20 insertions(+), 2 deletions(-) create mode 100644 streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java index 0b254b6292..c1827dfd7b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java @@ -7,7 +7,7 @@ import java.lang.management.MemoryUsage; import java.util.concurrent.Executor; -public class LocalStreamProcessMonitorThread implements Runnable +public class LocalStreamProcessMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class); @@ -22,10 +22,16 @@ public LocalStreamProcessMonitorThread(Executor executor, int delayInSeconds) { this.seconds = delayInSeconds; } + @Override public void shutdown(){ this.run = false; } + @Override + public boolean isRunning() { + return this.run; + } + @Override public void run() { diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java new file mode 100644 index 0000000000..ee6e102e89 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java @@ -0,0 +1,6 @@ +package org.apache.streams.local.tasks; + +public interface StatusCounterMonitorRunnable extends Runnable { + void shutdown(); + boolean isRunning(); +} diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java index c6febbe3ba..7579209bf4 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java @@ -4,7 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatusCounterMonitorThread implements Runnable +public class StatusCounterMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class); @@ -19,10 +19,16 @@ public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) this.seconds = delayInSeconds; } + @Override public void shutdown(){ this.run = false; } + @Override + public boolean isRunning() { + return this.run; + } + @Override public void run() { From 4f30bd2a8cec2869765fa875d303e98492242b7b Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Mon, 5 May 2014 16:37:53 -0500 Subject: [PATCH 4/8] Sorry, this isn't even being used. --- .../streams/s3/S3PersistWriterTask.java | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java deleted file mode 100644 index d791c87d2e..0000000000 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriterTask.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.apache.streams.s3; - -import org.apache.streams.core.StreamsDatum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Random; - -public class S3PersistWriterTask implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriterTask.class); - - private S3PersistWriter writer; - - public S3PersistWriterTask(S3PersistWriter writer) { - this.writer = writer; - } - - @Override - public void run() { - while(true) { - if( writer.persistQueue.peek() != null ) { - try { - StreamsDatum entry = writer.persistQueue.remove(); - writer.write(entry); - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(new Random().nextInt(1)); - } catch (InterruptedException e) {} - } - - } - -} From 1a03d5f67ae0b634f903733dd14a26236331690c Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Mon, 5 May 2014 17:13:06 -0500 Subject: [PATCH 5/8] Changes based on feedback. --- .../org/apache/streams/s3/S3Configurator.java | 1 + .../s3/S3ObjectInputStreamWrapper.java | 108 +++++++++++------- .../streams/s3/S3OutputStreamWrapper.java | 67 ++++++----- .../apache/streams/s3/S3PersistReader.java | 42 +++++-- .../streams/s3/S3PersistReaderTask.java | 19 +-- .../apache/streams/s3/S3PersistWriter.java | 5 +- 6 files changed, 140 insertions(+), 102 deletions(-) diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java index 81904041e3..3413ef707b 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -24,6 +24,7 @@ public static S3Configuration detectConfiguration(Config s3) { if(!(protocol.equals("https") || protocol.equals("http"))) { // you must specify either HTTP or HTTPS + throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol"); } s3Configuration.setProtocol(protocol.toLowerCase()); diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java index 2a2dba0377..900ebfbd78 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -9,28 +9,15 @@ import java.io.IOException; import java.io.InputStream; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; - /** - * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over - * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read - * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will - * exhaust the stream and transfer a ton of data attempting to do so. - * + * There is a nuance associated with reading portions of files in S3. Everything occurs over + * an Apache HTTP client object. Apache and therefore Amazon defaults to re-using the stream. + * As a result, if you only intend read a small portion of the file. You must first "abort" the + * stream, then close the 'inputStream'. Otherwise, Apache will exhaust the entire stream + * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file + * this becomes problematic. * - * Author Smashew - * Date 2014-04-11 - * - * After a few more days and some demos that had some issues with concurrency and high user load. This - * was also discovered. There is an issue with the S3Object's HTTP connection not being released back - * to the connection pool (until it times out) even once the item is garbage collected. So.... + * This class operates as a wrapper to fix the aforementioned nuances. * * Reference: * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 @@ -43,39 +30,82 @@ public class S3ObjectInputStreamWrapper extends InputStream private final S3ObjectInputStream is; private boolean isClosed = false; + /** + * Create an input stream safely from + * @param s3Object + */ public S3ObjectInputStreamWrapper(S3Object s3Object) { this.s3Object = s3Object; this.is = this.s3Object.getObjectContent(); } - public int hashCode() { return this.is.hashCode(); } - public boolean equals(Object obj) { return this.is.equals(obj); } - public String toString() { return this.is.toString(); } - public int read() throws IOException { return this.is.read(); } - public int read(byte[] b) throws IOException { return this.is.read(b); } - public int read(byte[] b, int off, int len) throws IOException { return this.is.read(b, off, len); } - public long skip(long n) throws IOException { return this.is.skip(n); } - public int available() throws IOException { return this.is.available(); } - public boolean markSupported() { return this.is.markSupported(); } - public synchronized void mark(int readlimit) { this.is.mark(readlimit); } - public synchronized void reset() throws IOException { this.is.reset(); } + public int hashCode() { + return this.is.hashCode(); + } + + public boolean equals(Object obj) { + return this.is.equals(obj); + } + + public String toString() { + return this.is.toString(); + } + + public int read() throws IOException { + return this.is.read(); + } + + public int read(byte[] b) throws IOException { + return this.is.read(b); + } + + public int read(byte[] b, int off, int len) throws IOException { + return this.is.read(b, off, len); + } + + public long skip(long n) throws IOException { + return this.is.skip(n); + } + + public int available() throws IOException { + return this.is.available(); + } + + public boolean markSupported() { + return this.is.markSupported(); + } + + public synchronized void mark(int readlimit) { + this.is.mark(readlimit); + } + + public synchronized void reset() throws IOException { + this.is.reset(); + } public void close() throws IOException { ensureEverythingIsReleased(); } - public void ensureEverythingIsReleased() - { + public void ensureEverythingIsReleased() { if(this.isClosed) return; - // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION! - // Amazon S3 - HTTP Exhaust all file contents issue + try { + // ensure that the S3 Object is closed properly. + this.s3Object.close(); + } catch(Throwable e) { + LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); + } + + + try { + // Abort the stream this.is.abort(); } - catch(Exception e) { - LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage()); + catch(Throwable e) { + LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); } // close the input Stream Safely @@ -98,8 +128,8 @@ private static void closeSafely(Closeable is) { protected void finalize( ) throws Throwable { - try - { + try { + // If there is an accidental leak where the user did not close, call this on the classes destructor ensureEverythingIsReleased(); super.finalize(); } catch(Exception e) { diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java index 8f55983c3d..c488b489fc 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -9,33 +9,12 @@ import org.slf4j.LoggerFactory; import java.io.*; - -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.Upload; -import org.apache.commons.io.FilenameUtils; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.Date; import java.util.Map; /** - * - * Author: Smashew - * Date: 2014-04-14 - * - * Description: - * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. - * - * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload - * is kind of a PITA to deal with. - * - * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones + * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. The stream is written to the + * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become + * is directly controlled by the S3PersistWriter. */ public class S3OutputStreamWrapper extends OutputStream { @@ -47,11 +26,24 @@ public class S3OutputStreamWrapper extends OutputStream private final String fileName; private ByteArrayOutputStream outputStream; private final Map metaData; - private boolean isClosed = false; - public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map metaData) throws IOException - { + /** + * Create an OutputStream Wrapper + * @param amazonS3Client + * The Amazon S3 Client which will be handling the file + * @param bucketName + * The Bucket Name you are wishing to write to. + * @param path + * The path where the object will live + * @param fileName + * The fileName you ware wishing to write. + * @param metaData + * Any meta data that is to be written along with the object + * @throws IOException + * If there is an issue creating the stream, this + */ + public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map metaData) throws IOException { this.amazonS3Client = amazonS3Client; this.bucketName = bucketName; this.path = path; @@ -60,14 +52,21 @@ public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, S this.outputStream = new ByteArrayOutputStream(); } - /* - * The Methods that are overriden to support the 'OutputStream' object. - */ + public void write(int b) throws IOException { + this.outputStream.write(b); + } - public void write(int b) throws IOException { this.outputStream.write(b); } - public void write(byte[] b) throws IOException { this.outputStream.write(b); } - public void write(byte[] b, int off, int len) throws IOException { this.outputStream.write(b, off, len); } - public void flush() throws IOException { this.outputStream.flush(); } + public void write(byte[] b) throws IOException { + this.outputStream.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.outputStream.write(b, off, len); + } + + public void flush() throws IOException { + this.outputStream.flush(); + } /** * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index a987a47d22..938dc665b1 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -15,11 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.math.BigInteger; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Queue; @@ -43,13 +39,33 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab protected DatumStatusCounter countersTotal = new DatumStatusCounter(); protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); - public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } - public S3ReaderConfiguration getS3ReaderConfiguration() { return this.s3ReaderConfiguration; } - public String getBucketName() { return this.s3ReaderConfiguration.getBucket(); } - public StreamsResultSet readNew(BigInteger sequence) { return null; } - public StreamsResultSet readRange(DateTime start, DateTime end) { return null; } - public DatumStatusCounter getDatumStatusCounter() { return countersTotal; } - public Collection getFiles() { return this.files; } + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } + + public S3ReaderConfiguration getS3ReaderConfiguration() { + return this.s3ReaderConfiguration; + } + + public String getBucketName() { + return this.s3ReaderConfiguration.getBucket(); + } + + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } + + public Collection getFiles() { + return this.files; + } public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { this.s3ReaderConfiguration = s3ReaderConfiguration; @@ -111,7 +127,9 @@ public void prepare(Object configurationObject) { this.executor = Executors.newSingleThreadExecutor(); } - public void cleanUp() { } + public void cleanUp() { + // no Op + } public StreamsResultSet readAll() { startStream(); diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index 70015fb9e2..5b4abe4709 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -23,20 +23,17 @@ public S3PersistReaderTask(S3PersistReader reader) { @Override public void run() { - for(String file : reader.getFiles()) - { - // Create our buffered reader + for(String file : reader.getFiles()) { + // Create our buffered reader S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); LOGGER.info("Reading: {} ", file); String line = ""; try { - while((line = bufferedReader.readLine()) != null) - { - if( !Strings.isNullOrEmpty(line) ) - { + while((line = bufferedReader.readLine()) != null) { + if( !Strings.isNullOrEmpty(line) ) { reader.countersCurrent.incrementAttempt(); String[] fields = line.split(Character.toString(reader.DELIMITER)); StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); @@ -44,9 +41,7 @@ public void run() { reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); } } - } - catch (Exception e) - { + } catch (Exception e) { e.printStackTrace(); LOGGER.warn(e.getMessage()); reader.countersCurrent.incrementStatus(DatumStatus.FAIL); @@ -57,7 +52,6 @@ public void run() { try { closeSafely(file, is); } catch (Exception e) { - e.printStackTrace(); LOGGER.error(e.getMessage()); } } @@ -66,8 +60,7 @@ public void run() { private static void closeSafely(String file, Closeable closeable) { try { closeable.close(); - } - catch(Exception e) { + } catch(Exception e) { LOGGER.error("There was an issue closing file: {}", file); } } diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index c46ff03b31..3685012d76 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -9,7 +9,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; -import com.google.common.util.concurrent.AtomicDouble; import org.apache.streams.core.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,7 +187,6 @@ private void safeFlush(Flushable flushable) { } } - private String convertResultToString(StreamsDatum entry) { String metadata = null; @@ -227,8 +225,7 @@ public void prepare(Object configurationObject) { this.objectMapper = new ObjectMapper(); // Create the credentials Object - if(this.amazonS3Client == null) - { + if(this.amazonS3Client == null) { AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); ClientConfiguration clientConfig = new ClientConfiguration(); From 361d122a5a8c93064e10f8c1817a8268d0bea3ea Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Mon, 5 May 2014 17:22:20 -0500 Subject: [PATCH 6/8] That was a debugging statement, pardon. --- .../provider/TwitterStreamConfigurator.java | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 5435f24209..7c7ef1b46b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -9,7 +9,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.List; /** @@ -76,23 +75,7 @@ public static TwitterStreamConfiguration detectConfiguration(Config config) { } public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) { - - TwitterUserInformationConfiguration twitterUserInformationConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class); - - try { - if(config.hasPath("info")) - { - List info = new ArrayList(); - - for (String s : config.getStringList("info")) - info.add(s); - } - } - catch(Exception e) { - LOGGER.error("There was an error: {}", e.getMessage()); - } - - return twitterUserInformationConfiguration; + return mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class); } } From d1018e9070d8976ac3175f2b89af91c933d711f9 Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Mon, 5 May 2014 17:32:35 -0500 Subject: [PATCH 7/8] Added licenses, and more code formatting to comply with styles. --- .../streams-persist-s3/pom.xml | 17 +++++ .../org/apache/streams/s3/S3Configurator.java | 17 +++++ .../s3/S3ObjectInputStreamWrapper.java | 17 +++++ .../streams/s3/S3OutputStreamWrapper.java | 17 +++++ .../apache/streams/s3/S3PersistReader.java | 17 +++++ .../streams/s3/S3PersistReaderTask.java | 17 +++++ .../apache/streams/s3/S3PersistWriter.java | 76 +++++++++++++------ .../TwitterUserInformationProvider.java | 24 +++++- 8 files changed, 175 insertions(+), 27 deletions(-) diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml index 4e9b9b1d5f..5cadd5c4cb 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml @@ -1,4 +1,21 @@ + diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java index 3413ef707b..dfa0426636 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java index 900ebfbd78..c13314d4a2 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.amazonaws.services.s3.model.S3Object; diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java index c488b489fc..08fc7748e2 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.amazonaws.services.s3.AmazonS3Client; diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index 938dc665b1..5c7413eb15 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.amazonaws.ClientConfiguration; diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index 5b4abe4709..996721623c 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.google.common.base.Strings; diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 3685012d76..98671ba8d3 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.s3; import com.amazonaws.ClientConfiguration; @@ -47,14 +64,33 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab private OutputStreamWriter currentWriter = null; protected volatile Queue persistQueue; - public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; } - public S3WriterConfiguration getS3WriterConfiguration() { return this.s3WriterConfiguration; } - public List getWrittenFiles() { return this.writtenFiles; } - public Map getObjectMetaData() { return this.objectMetaData; } - public ObjectMapper getObjectMapper() { return this.objectMapper; } + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } + + public S3WriterConfiguration getS3WriterConfiguration() { + return this.s3WriterConfiguration; + } + + public List getWrittenFiles() { + return this.writtenFiles; + } - public void setObjectMapper(ObjectMapper mapper) { this.objectMapper = mapper; } - public void setObjectMetaData(Map val) { this.objectMetaData = val; } + public Map getObjectMetaData() { + return this.objectMetaData; + } + + public ObjectMapper getObjectMapper() { + return this.objectMapper; + } + + public void setObjectMapper(ObjectMapper mapper) { + this.objectMapper = mapper; + } + + public void setObjectMetaData(Map val) { + this.objectMetaData = val; + } /** * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. @@ -75,15 +111,13 @@ public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { @Override public void write(StreamsDatum streamsDatum) { - synchronized (this) - { + synchronized (this) { // Check to see if we need to reset the file that we are currently working with if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { try { LOGGER.info("Resetting the file"); this.currentWriter = resetFile(); - } - catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } } @@ -108,8 +142,7 @@ public void write(StreamsDatum streamsDatum) { } - private synchronized OutputStreamWriter resetFile() throws Exception - { + private synchronized OutputStreamWriter resetFile() throws Exception { // this will keep it thread safe, so we don't create too many files if(this.fileLineCounter.get() == 0 && this.currentWriter != null) return this.currentWriter; @@ -117,8 +150,7 @@ private synchronized OutputStreamWriter resetFile() throws Exception closeAndDestroyWriter(); // Create the path for where the file is going to live. - try - { + try { // generate a file name String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv"; @@ -142,9 +174,7 @@ private synchronized OutputStreamWriter resetFile() throws Exception // return the output stream return new OutputStreamWriter(outputStream); - } - catch (Exception e) - { + } catch (Exception e) { LOGGER.error(e.getMessage()); throw e; } @@ -157,8 +187,8 @@ private synchronized void closeAndDestroyWriter() { this.closeSafely(this.currentWriter); this.currentWriter = null; - // - LOGGER.info("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); + // Logging of information to alert the user to the activities of this class + LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); } } @@ -167,8 +197,7 @@ private synchronized void closeSafely(Writer writer) { try { writer.flush(); writer.close(); - } - catch(Exception e) { + } catch(Exception e) { // noOp } LOGGER.debug("File Closed"); @@ -180,8 +209,7 @@ private void safeFlush(Flushable flushable) { if(flushable != null) { try { flushable.flush(); - } - catch(IOException e) { + } catch(IOException e) { // noOp } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index dac5cd61b0..04aa1fe04c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -1,7 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.twitter.provider; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; @@ -9,13 +25,15 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.twitter.TwitterStreamConfiguration; import org.apache.streams.twitter.TwitterUserInformationConfiguration; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; -import twitter4j.*; +import twitter4j.Twitter; +import twitter4j.TwitterException; +import twitter4j.TwitterFactory; +import twitter4j.User; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory; From ae27541e08674f4db6996e065516b32b8fe0f45d Mon Sep 17 00:00:00 2001 From: Matthew Hager Date: Mon, 5 May 2014 18:05:09 -0500 Subject: [PATCH 8/8] Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request. --- .../apache/streams/s3/S3PersistReaderTask.java | 16 ++-------------- .../org/apache/streams/s3/S3PersistWriter.java | 1 - .../provider/TwitterTimelineProvider.java | 14 ++++++-------- .../provider/TwitterUserInformationProvider.java | 3 ++- 4 files changed, 10 insertions(+), 24 deletions(-) diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index 996721623c..73763e68da 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -20,6 +20,7 @@ import com.google.common.base.Strings; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public void run() { reader.countersCurrent.incrementAttempt(); String[] fields = line.split(Character.toString(reader.DELIMITER)); StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); - write( entry ); + ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); } } @@ -81,17 +82,4 @@ private static void closeSafely(String file, Closeable closeable) { LOGGER.error("There was an issue closing file: {}", file); } } - - - private void write( StreamsDatum entry ) { - boolean success; - do { - synchronized( S3PersistReader.class ) { - success = reader.persistQueue.offer(entry); - } - Thread.yield(); - } - while( !success ); - } - } diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 98671ba8d3..058f7487ba 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -62,7 +62,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab }}; private OutputStreamWriter currentWriter = null; - protected volatile Queue persistQueue; public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index 2c39cf93f5..b456fa4268 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -2,7 +2,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; @@ -11,18 +10,21 @@ import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; import twitter4j.*; import twitter4j.conf.ConfigurationBuilder; -import twitter4j.json.DataObjectFactory; import java.io.Serializable; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by sblackmon on 12/10/13. @@ -105,17 +107,13 @@ protected void captureTimeline(long currentId) { //while (keepTrying < 10) while (keepTrying < 1) { - try { statuses = client.getUserTimeline(currentId, paging); for (Status tStat : statuses) { String json = TwitterObjectFactory.getRawJSON(tStat); - - while(!providerQueue.offer(new StreamsDatum(json))) { - sleep(); - } + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); } paging.setPage(paging.getPage() + 1); diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 04aa1fe04c..049c3bb617 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +123,7 @@ private void loadBatch(Long[] ids) { for (User tStat : client.lookupUsers(toQuery)) { String json = DataObjectFactory.getRawJSON(tStat); - providerQueue.offer(new StreamsDatum(json)); + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); } keepTrying = 10; }