diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index d80fc632df..c7bbdf4966 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,6 +44,7 @@ streams-persist-hdfs streams-persist-kafka streams-persist-mongo + streams-amazon-aws streams-processor-urls diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml new file mode 100644 index 0000000000..57a67cb032 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/pom.xml @@ -0,0 +1,67 @@ + + + + + + streams-contrib + org.apache.streams + 0.1-SNAPSHOT + + 4.0.0 + + streams-amazon-aws + + pom + streams-amazon-aws + + + + + + + streams-persist-s3 + + + + + + com.amazonaws + aws-java-sdk + 1.7.5 + + + org.apache.streams + streams-config + ${project.version} + + + org.apache.streams + streams-core + ${project.version} + + + org.apache.streams + streams-pojo + ${project.version} + + + + diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml new file mode 100644 index 0000000000..5cadd5c4cb --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml @@ -0,0 +1,104 @@ + + + + + streams-amazon-aws + org.apache.streams + 0.1-SNAPSHOT + + 4.0.0 + + streams-persist-s3 + + + + org.apache.streams + streams-config + ${project.version} + + + org.apache.streams + streams-core + ${project.version} + + + org.apache.streams + streams-pojo + ${project.version} + + + com.amazonaws + aws-java-sdk + + + org.apache.streams + streams-util + ${project.version} + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.8 + + + add-source + generate-sources + + add-source + + + + target/generated-sources/jsonschema2pojo + + + + + + + org.jsonschema2pojo + jsonschema2pojo-maven-plugin + + true + true + + src/main/jsonschema/org/apache/streams/s3/S3Configuration.json + src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json + src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json + + target/generated-sources/jsonschema2pojo + org.apache.streams.s3.pojo + true + true + + + + + generate + + + + + + + diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java new file mode 100644 index 0000000000..dfa0426636 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3Configurator { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3Configurator.class); + + private final static ObjectMapper mapper = new ObjectMapper(); + + public static S3Configuration detectConfiguration(Config s3) { + + S3Configuration s3Configuration = new S3Configuration(); + + s3Configuration.setBucket(s3.getString("bucket")); + s3Configuration.setKey(s3.getString("key")); + s3Configuration.setSecretKey(s3.getString("secretKey")); + + // The Amazon S3 Library defaults to HTTPS + String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase(); + + if(!(protocol.equals("https") || protocol.equals("http"))) { + // you must specify either HTTP or HTTPS + throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol"); + } + + s3Configuration.setProtocol(protocol.toLowerCase()); + + return s3Configuration; + } + + public static S3ReaderConfiguration detectReaderConfiguration(Config s3) { + + S3Configuration S3Configuration = detectConfiguration(s3); + S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class); + + s3ReaderConfiguration.setReaderPath(s3.getString("readerPath")); + + return s3ReaderConfiguration; + } + + public static S3WriterConfiguration detectWriterConfiguration(Config s3) { + + S3Configuration s3Configuration = detectConfiguration(s3); + S3WriterConfiguration s3WriterConfiguration = mapper.convertValue(s3Configuration, S3WriterConfiguration.class); + + String rootPath = s3.getString("writerPath"); + + // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path. + s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/")); + + s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default"); + + if(s3.hasPath("maxFileSize")) + s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize")); + if(s3.hasPath("chunk")) + s3WriterConfiguration.setChunk(s3.getBoolean("chunk")); + + return s3WriterConfiguration; + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java new file mode 100644 index 0000000000..c13314d4a2 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; + +/** + * There is a nuance associated with reading portions of files in S3. Everything occurs over + * an Apache HTTP client object. Apache and therefore Amazon defaults to re-using the stream. + * As a result, if you only intend read a small portion of the file. You must first "abort" the + * stream, then close the 'inputStream'. Otherwise, Apache will exhaust the entire stream + * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file + * this becomes problematic. + * + * This class operates as a wrapper to fix the aforementioned nuances. + * + * Reference: + * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3 + */ +public class S3ObjectInputStreamWrapper extends InputStream +{ + private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class); + + private final S3Object s3Object; + private final S3ObjectInputStream is; + private boolean isClosed = false; + + /** + * Create an input stream safely from + * @param s3Object + */ + public S3ObjectInputStreamWrapper(S3Object s3Object) { + this.s3Object = s3Object; + this.is = this.s3Object.getObjectContent(); + } + + public int hashCode() { + return this.is.hashCode(); + } + + public boolean equals(Object obj) { + return this.is.equals(obj); + } + + public String toString() { + return this.is.toString(); + } + + public int read() throws IOException { + return this.is.read(); + } + + public int read(byte[] b) throws IOException { + return this.is.read(b); + } + + public int read(byte[] b, int off, int len) throws IOException { + return this.is.read(b, off, len); + } + + public long skip(long n) throws IOException { + return this.is.skip(n); + } + + public int available() throws IOException { + return this.is.available(); + } + + public boolean markSupported() { + return this.is.markSupported(); + } + + public synchronized void mark(int readlimit) { + this.is.mark(readlimit); + } + + public synchronized void reset() throws IOException { + this.is.reset(); + } + + public void close() throws IOException { + ensureEverythingIsReleased(); + } + + public void ensureEverythingIsReleased() { + if(this.isClosed) + return; + + + try { + // ensure that the S3 Object is closed properly. + this.s3Object.close(); + } catch(Throwable e) { + LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); + } + + + try { + // Abort the stream + this.is.abort(); + } + catch(Throwable e) { + LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage()); + } + + // close the input Stream Safely + closeSafely(this.is); + + // This corrects the issue with Open HTTP connections + closeSafely(this.s3Object); + this.isClosed = true; + } + + private static void closeSafely(Closeable is) { + try { + if(is != null) + is.close(); + } catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage()); + } + } + + protected void finalize( ) throws Throwable + { + try { + // If there is an accidental leak where the user did not close, call this on the classes destructor + ensureEverythingIsReleased(); + super.finalize(); + } catch(Exception e) { + // this should never be called, just being very cautious + LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage()); + } + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java new file mode 100644 index 0000000000..08fc7748e2 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Map; + +/** + * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. The stream is written to the + * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become + * is directly controlled by the S3PersistWriter. + */ +public class S3OutputStreamWrapper extends OutputStream +{ + private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class); + + private final AmazonS3Client amazonS3Client; + private final String bucketName; + private final String path; + private final String fileName; + private ByteArrayOutputStream outputStream; + private final Map metaData; + private boolean isClosed = false; + + /** + * Create an OutputStream Wrapper + * @param amazonS3Client + * The Amazon S3 Client which will be handling the file + * @param bucketName + * The Bucket Name you are wishing to write to. + * @param path + * The path where the object will live + * @param fileName + * The fileName you ware wishing to write. + * @param metaData + * Any meta data that is to be written along with the object + * @throws IOException + * If there is an issue creating the stream, this + */ + public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map metaData) throws IOException { + this.amazonS3Client = amazonS3Client; + this.bucketName = bucketName; + this.path = path; + this.fileName = fileName; + this.metaData = metaData; + this.outputStream = new ByteArrayOutputStream(); + } + + public void write(int b) throws IOException { + this.outputStream.write(b); + } + + public void write(byte[] b) throws IOException { + this.outputStream.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException { + this.outputStream.write(b, off, len); + } + + public void flush() throws IOException { + this.outputStream.flush(); + } + + /** + * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3. + * @throws IOException + * Exception thrown from the FileOutputStream + */ + public void close() throws IOException { + if(!isClosed) + { + try + { + this.addFile(); + this.outputStream.close(); + this.outputStream = null; + } + catch(Exception e) { + e.printStackTrace(); + LOGGER.warn("There was an error adding the temporaryFile to S3"); + } + finally { + // we are done here. + this.isClosed = true; + } + } + } + + private void addFile() throws Exception { + + InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray()); + int contentLength = outputStream.size(); + + TransferManager transferManager = new TransferManager(amazonS3Client); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate()); + metadata.setContentLength(contentLength); + + metadata.addUserMetadata("writer", "org.apache.streams"); + + for(String s : metaData.keySet()) + metadata.addUserMetadata(s, metaData.get(s)); + + String fileNameToWrite = path + fileName; + Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata); + try { + upload.waitForUploadResult(); + + is.close(); + transferManager.shutdownNow(false); + LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName); + } catch (Exception e) { + // No Op + } + + + } + + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java new file mode 100644 index 0000000000..5c7413eb15 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Queues; +import org.apache.streams.core.*; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable { + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class); + public final static String STREAMS_ID = "S3PersistReader"; + protected final static char DELIMITER = '\t'; + + private S3ReaderConfiguration s3ReaderConfiguration; + private AmazonS3Client amazonS3Client; + private ObjectMapper mapper = new ObjectMapper(); + private Collection files; + private ExecutorService executor; + protected volatile Queue persistQueue; + + protected DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected DatumStatusCounter countersCurrent = new DatumStatusCounter(); + + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } + + public S3ReaderConfiguration getS3ReaderConfiguration() { + return this.s3ReaderConfiguration; + } + + public String getBucketName() { + return this.s3ReaderConfiguration.getBucket(); + } + + public StreamsResultSet readNew(BigInteger sequence) { + return null; + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + return null; + } + + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } + + public Collection getFiles() { + return this.files; + } + + public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) { + this.s3ReaderConfiguration = s3ReaderConfiguration; + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) + { + // Create the credentials Object + AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + + final ListObjectsRequest request = new ListObjectsRequest() + .withBucketName(this.s3ReaderConfiguration.getBucket()) + .withPrefix(s3ReaderConfiguration.getReaderPath()) + .withMaxKeys(50); + + + ObjectListing listing = this.amazonS3Client.listObjects(request); + + this.files = new ArrayList(); + + /** + * If you can list files that are in this path, then you must be dealing with a directory + * if you cannot list files that are in this path, then you are most likely dealing with + * a simple file. + */ + if(listing.getCommonPrefixes().size() > 0) { + // Handle the 'directory' use case + do + { + for (String file : listing.getCommonPrefixes()) + this.files.add(file); + + // get the next batch. + listing = this.amazonS3Client.listNextBatchOfObjects(listing); + } while (listing.isTruncated()); + } + else { + // handle the single file use-case + this.files.add(s3ReaderConfiguration.getReaderPath()); + } + + if(this.files.size() <= 0) + LOGGER.error("There are no files to read"); + + this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(10000)); + this.executor = Executors.newSingleThreadExecutor(); + } + + public void cleanUp() { + // no Op + } + + public StreamsResultSet readAll() { + startStream(); + return new StreamsResultSet(persistQueue); + } + + public void startStream() { + LOGGER.debug("startStream"); + executor.submit(new S3PersistReaderTask(this)); + } + + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized( S3PersistReader.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + persistQueue.clear(); + } + return current; + } + +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java new file mode 100644 index 0000000000..73763e68da --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.google.common.base.Strings; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.util.ComponentUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.InputStreamReader; + +public class S3PersistReaderTask implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class); + + private S3PersistReader reader; + + public S3PersistReaderTask(S3PersistReader reader) { + this.reader = reader; + } + + @Override + public void run() { + + for(String file : reader.getFiles()) { + + // Create our buffered reader + S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file)); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); + LOGGER.info("Reading: {} ", file); + + String line = ""; + try { + while((line = bufferedReader.readLine()) != null) { + if( !Strings.isNullOrEmpty(line) ) { + reader.countersCurrent.incrementAttempt(); + String[] fields = line.split(Character.toString(reader.DELIMITER)); + StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); + ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); + reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn(e.getMessage()); + reader.countersCurrent.incrementStatus(DatumStatus.FAIL); + } + + LOGGER.info("Completed: " + file); + + try { + closeSafely(file, is); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + } + + private static void closeSafely(String file, Closeable closeable) { + try { + closeable.close(); + } catch(Exception e) { + LOGGER.error("There was an issue closing file: {}", file); + } + } +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java new file mode 100644 index 0000000000..058f7487ba --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import org.apache.streams.core.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable +{ + public final static String STREAMS_ID = "S3PersistWriter"; + + private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class); + + private final static char DELIMITER = '\t'; + + private ObjectMapper objectMapper; + private AmazonS3Client amazonS3Client; + private S3WriterConfiguration s3WriterConfiguration; + private final List writtenFiles = new ArrayList(); + + private final AtomicLong totalBytesWritten = new AtomicLong(); + private AtomicLong bytesWrittenThisFile = new AtomicLong(); + + private final AtomicInteger totalRecordsWritten = new AtomicInteger(); + private AtomicInteger fileLineCounter = new AtomicInteger(); + + private Map objectMetaData = new HashMap() {{ + put("line[0]", "id"); + put("line[1]", "timeStamp"); + put("line[2]", "metaData"); + put("line[3]", "document"); + }}; + + private OutputStreamWriter currentWriter = null; + + public AmazonS3Client getAmazonS3Client() { + return this.amazonS3Client; + } + + public S3WriterConfiguration getS3WriterConfiguration() { + return this.s3WriterConfiguration; + } + + public List getWrittenFiles() { + return this.writtenFiles; + } + + public Map getObjectMetaData() { + return this.objectMetaData; + } + + public ObjectMapper getObjectMapper() { + return this.objectMapper; + } + + public void setObjectMapper(ObjectMapper mapper) { + this.objectMapper = mapper; + } + + public void setObjectMetaData(Map val) { + this.objectMetaData = val; + } + + /** + * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use. + * @param amazonS3Client + * If you have an existing amazonS3Client, it wont' bother to create another one + * @param s3WriterConfiguration + * Configuration of the write paths and instructions are still required. + */ + public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) { + this.amazonS3Client = amazonS3Client; + this.s3WriterConfiguration = s3WriterConfiguration; + } + + public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) { + this.s3WriterConfiguration = s3WriterConfiguration; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + synchronized (this) { + // Check to see if we need to reset the file that we are currently working with + if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) { + try { + LOGGER.info("Resetting the file"); + this.currentWriter = resetFile(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + String line = convertResultToString(streamsDatum); + + try { + this.currentWriter.write(line); + } catch (IOException e) { + e.printStackTrace(); + } + + // add the bytes we've written + int recordSize = line.getBytes().length; + this.totalBytesWritten.addAndGet(recordSize); + this.bytesWrittenThisFile.addAndGet(recordSize); + + // increment the record count + this.totalRecordsWritten.incrementAndGet(); + this.fileLineCounter.incrementAndGet(); + } + + } + + private synchronized OutputStreamWriter resetFile() throws Exception { + // this will keep it thread safe, so we don't create too many files + if(this.fileLineCounter.get() == 0 && this.currentWriter != null) + return this.currentWriter; + + closeAndDestroyWriter(); + + // Create the path for where the file is going to live. + try { + // generate a file name + String fileName = this.s3WriterConfiguration.getWriterFilePrefix() + + (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv"; + + // create the output stream + OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client, + this.s3WriterConfiguration.getBucket(), + this.s3WriterConfiguration.getWriterPath(), + fileName, + this.objectMetaData); + + // reset the counter + this.fileLineCounter = new AtomicInteger(); + this.bytesWrittenThisFile = new AtomicLong(); + + // add this to the list of written files + writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName); + + // Log that we are creating this file + LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName); + + // return the output stream + return new OutputStreamWriter(outputStream); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + throw e; + } + } + + private synchronized void closeAndDestroyWriter() { + // if there is a current writer, we must close it first. + if (this.currentWriter != null) { + this.safeFlush(this.currentWriter); + this.closeSafely(this.currentWriter); + this.currentWriter = null; + + // Logging of information to alert the user to the activities of this class + LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1)); + } + } + + private synchronized void closeSafely(Writer writer) { + if(writer != null) { + try { + writer.flush(); + writer.close(); + } catch(Exception e) { + // noOp + } + LOGGER.debug("File Closed"); + } + } + + private void safeFlush(Flushable flushable) { + // This is wrapped with a ByteArrayOutputStream, so this is really safe. + if(flushable != null) { + try { + flushable.flush(); + } catch(IOException e) { + // noOp + } + } + } + + private String convertResultToString(StreamsDatum entry) + { + String metadata = null; + + try { + metadata = objectMapper.writeValueAsString(entry.getMetadata()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + String documentJson = null; + try { + documentJson = objectMapper.writeValueAsString(entry.getDocument()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + // Save the class name that it came from + entry.metadata.put("class", entry.getDocument().getClass().getName()); + + if(Strings.isNullOrEmpty(documentJson)) + return null; + else + return entry.getId() + DELIMITER + // [0] = Unique id of the verbatim + entry.getTimestamp() + DELIMITER + // [1] = Timestamp of the item + metadata + DELIMITER + // [2] = Metadata of the item + documentJson + "\n"; // [3] = The actual object + } + + public void prepare(Object configurationObject) { + // Connect to S3 + synchronized (this) { + + // if the user has chosen to not set the object mapper, then set a default object mapper for them. + if(this.objectMapper == null) + this.objectMapper = new ObjectMapper(); + + // Create the credentials Object + if(this.amazonS3Client == null) { + AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey()); + + ClientConfiguration clientConfig = new ClientConfiguration(); + clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase())); + + // We want path style access + S3ClientOptions clientOptions = new S3ClientOptions(); + clientOptions.setPathStyleAccess(true); + + this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); + this.amazonS3Client.setS3ClientOptions(clientOptions); + } + } + } + + public void cleanUp() { + closeAndDestroyWriter(); + } + + public DatumStatusCounter getDatumStatusCounter() { + DatumStatusCounter counters = new DatumStatusCounter(); + counters.incrementAttempt(this.totalRecordsWritten.get()); + counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get()); + return counters; + } +} diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json new file mode 100644 index 0000000000..863668f485 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json @@ -0,0 +1,25 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3Configuration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "key": { + "type": "string", + "description": "Your Amazon Key" + }, + "secretKey": { + "type": "string", + "description": "Your Amazon Secret Key" + }, + "bucket": { + "type": "string", + "description": "The AWS bucket you want to write to" + }, + "protocol": { + "type": "string", + "description": "Whether you are using HTTP or HTTPS" + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json new file mode 100644 index 0000000000..2959b3dd76 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3ReaderConfiguration.json @@ -0,0 +1,14 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3ReaderConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "readerPath": { + "type": "string", + "description": "Path below root path" + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json new file mode 100644 index 0000000000..f43087bb23 --- /dev/null +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json @@ -0,0 +1,28 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.s3.S3WriterConfiguration", + "extends": {"$ref":"S3Configuration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "writerPath": { + "type": "string", + "description": "Path " + }, + "writerFilePrefix": { + "type": "string", + "description": "File Prefix" + }, + "maxFileSize": { + "type": "integer", + "default" : 20, + "description": "If files are elected to be 'chunked' which they are by default, this is the maximum size of that file before the byte array stream is vacated and the file is created." + }, + "chunk": { + "type": "boolean", + "default" : true, + "description": "Whether you want the file chunked inside of a folder or not" + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 3c27b8c639..8a41ca51a2 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -48,11 +48,6 @@ org.apache.streams streams-config - - org.apache.streams - streams-util - ${project.version} - com.google.guava guava @@ -73,9 +68,8 @@ org.twitter4j twitter4j-core - 3.0.5 + [4.0,) - @@ -118,7 +112,9 @@ true true + src/main/jsonschema/com/twitter/TwitterConfiguration.json src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json + src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json src/main/jsonschema/com/twitter/Delete.json src/main/jsonschema/com/twitter/Retweet.json src/main/jsonschema/com/twitter/tweet.json diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 9bf2d9a00a..7c7ef1b46b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -1,12 +1,11 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.twitter.TwitterBasicAuthConfiguration; -import org.apache.streams.twitter.TwitterOAuthConfiguration; -import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,19 +17,18 @@ public class TwitterStreamConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); - public static TwitterStreamConfiguration detectConfiguration(Config twitter) { - TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration(); - - twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint")); + public static TwitterConfiguration detectTwitterConfiguration(Config config) { + TwitterConfiguration twitterConfiguration = new TwitterConfiguration(); try { Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth"); TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration(); twitterBasicAuthConfiguration.setUsername(basicauth.getString("username")); twitterBasicAuthConfiguration.setPassword(basicauth.getString("password")); - twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration); + twitterConfiguration.setBasicauth(twitterBasicAuthConfiguration); } catch( ConfigException ce ) {} try { @@ -40,27 +38,44 @@ public static TwitterStreamConfiguration detectConfiguration(Config twitter) { twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret")); twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken")); twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret")); - twitterStreamConfiguration.setOauth(twitterOAuthConfiguration); + twitterConfiguration.setOauth(twitterOAuthConfiguration); } catch( ConfigException ce ) {} + twitterConfiguration.setEndpoint(config.getString("endpoint")); + + return twitterConfiguration; + } + + public static TwitterStreamConfiguration detectConfiguration(Config config) { + + TwitterStreamConfiguration twitterStreamConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterStreamConfiguration.class); + try { - twitterStreamConfiguration.setTrack(twitter.getStringList("track")); + twitterStreamConfiguration.setTrack(config.getStringList("track")); } catch( ConfigException ce ) {} try { + // create the array List follows = Lists.newArrayList(); - for( Integer id : twitter.getIntList("follow")) - follows.add(new Long(id)); + // add the ids of the people we want to 'follow' + for(Integer id : config.getIntList("follow")) + follows.add((long)id); + // set the array twitterStreamConfiguration.setFollow(follows); + } catch( ConfigException ce ) {} - twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level")); - twitterStreamConfiguration.setWith(twitter.getString("with")); - twitterStreamConfiguration.setReplies(twitter.getString("replies")); + twitterStreamConfiguration.setFilterLevel(config.getString("filter-level")); + twitterStreamConfiguration.setWith(config.getString("with")); + twitterStreamConfiguration.setReplies(config.getString("replies")); twitterStreamConfiguration.setJsonStoreEnabled("true"); twitterStreamConfiguration.setIncludeEntities("true"); return twitterStreamConfiguration; } + public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) { + return mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class); + } + } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 3df7d02ce8..b1785e526e 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -1,6 +1,5 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b9551ada48..b456fa4268 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -1,12 +1,7 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; @@ -15,18 +10,21 @@ import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; import twitter4j.*; import twitter4j.conf.ConfigurationBuilder; -import twitter4j.json.DataObjectFactory; import java.io.Serializable; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by sblackmon on 12/10/13. @@ -49,17 +47,11 @@ public void setConfig(TwitterStreamConfiguration config) { this.config = config; } - protected volatile Queue providerQueue = new LinkedBlockingQueue(); + protected final Queue providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue(500)); - protected Twitter client; + protected int idsCount; protected Iterator ids; - ListenableFuture providerTaskComplete; -// -// public BlockingQueue getInQueue() { -// return inQueue; -// } - protected ListeningExecutorService executor; protected DateTime start; @@ -74,6 +66,7 @@ private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int public TwitterTimelineProvider() { Config config = StreamsConfigurator.config.getConfig("twitter"); this.config = TwitterStreamConfigurator.detectConfiguration(config); + } public TwitterTimelineProvider(TwitterStreamConfiguration config) { @@ -95,68 +88,32 @@ public Queue getProviderQueue() { return this.providerQueue; } -// public void run() { -// -// LOGGER.info("{} Running", STREAMS_ID); -// -// while( ids.hasNext() ) { -// Long currentId = ids.next(); -// LOGGER.info("Provider Task Starting: {}", currentId); -// captureTimeline(currentId); -// } -// -// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID); -// -// client.shutdown(); -// -// LOGGER.info("{} Exiting", STREAMS_ID); -// -// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) { -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) {} -// } -// } - @Override public void startStream() { // no op } - private void captureTimeline(long currentId) { + protected void captureTimeline(long currentId) { Paging paging = new Paging(1, 200); List statuses = null; - boolean KeepGoing = true; - boolean hadFailure = false; do { + Twitter client = getTwitterClient(); int keepTrying = 0; // keep trying to load, give it 5 attempts. //while (keepTrying < 10) while (keepTrying < 1) { - try { statuses = client.getUserTimeline(currentId, paging); - for (Status tStat : statuses) - { -// if( provider.start != null && -// provider.start.isAfter(new DateTime(tStat.getCreatedAt()))) -// { -// // they hit the last date we wanted to collect -// // we can now exit early -// KeepGoing = false; -// } - // emit the record - String json = DataObjectFactory.getRawJSON(tStat); - - providerQueue.offer(new StreamsDatum(json)); - + for (Status tStat : statuses) { + String json = TwitterObjectFactory.getRawJSON(tStat); + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); } paging.setPage(paging.getPage() + 1); @@ -166,19 +123,36 @@ private void captureTimeline(long currentId) { catch(TwitterException twitterException) { keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } - catch(Exception e) - { - hadFailure = true; + catch(Exception e) { keepTrying += TwitterErrorHandler.handleTwitterError(client, e); } - finally - { - // Shutdown the twitter to release the resources - client.shutdown(); - } } } - while ((statuses != null) && (statuses.size() > 0) && KeepGoing); + while (shouldContinuePulling(statuses)); + } + + private Map userPullInfo; + + protected boolean shouldContinuePulling(List statuses) { + return (statuses != null) && (statuses.size() > 0); + } + + private void sleep() + { + Thread.yield(); + try { + // wait one tenth of a millisecond + Thread.yield(); + Thread.sleep(new Random().nextInt(2)); + Thread.yield(); + } + catch(IllegalArgumentException e) { + // passing in static values, this will never happen + } + catch(InterruptedException e) { + // noOp, there must have been an issue sleeping + } + Thread.yield(); } public StreamsResultSet readCurrent() { @@ -244,21 +218,19 @@ public void prepare(Object o) { executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); Preconditions.checkNotNull(config.getOauth().getAccessToken()); Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getFollow()); - Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); - Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true); - + idsCount = config.getFollow().size(); ids = config.getFollow().iterator(); + } + protected Twitter getTwitterClient() + { String baseUrl = "https://api.twitter.com:443/1.1/"; ConfigurationBuilder builder = new ConfigurationBuilder() @@ -266,23 +238,18 @@ public void prepare(Object o) { .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) .setOAuthAccessToken(config.getOauth().getAccessToken()) .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(includeEntitiesEnabled) - .setJSONStoreEnabled(jsonStoreEnabled) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) .setAsyncNumThreads(3) .setRestBaseURL(baseUrl) .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setIncludeRTsEnabled(Boolean.TRUE) .setPrettyDebugEnabled(Boolean.TRUE); - client = new TwitterFactory(builder.build()).getInstance(); - + return new TwitterFactory(builder.build()).getInstance(); } @Override public void cleanUp() { - - client.shutdown(); - shutdownAndAwaitTermination(executor); } } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index 9619f4fed6..9a1d4e71f8 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -74,19 +74,12 @@ public void run() { hadFailure = true; keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e); } - finally - { - // Shutdown the twitter to release the resources - twitter.shutdown(); - } } } while ((statuses != null) && (statuses.size() > 0) && KeepGoing); LOGGER.info("Provider Finished. Cleaning up..."); - twitter.shutdown(); - LOGGER.info("Provider Exiting"); } diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java new file mode 100644 index 0000000000..049c3bb617 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.twitter.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.apache.streams.util.ComponentUtils; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import twitter4j.Twitter; +import twitter4j.TwitterException; +import twitter4j.TwitterFactory; +import twitter4j.User; +import twitter4j.conf.ConfigurationBuilder; +import twitter4j.json.DataObjectFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +public class TwitterUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + + private TwitterUserInformationConfiguration twitterUserInformationConfiguration; + + private Class klass; + protected volatile Queue providerQueue = new LinkedBlockingQueue(); + + public TwitterUserInformationConfiguration getConfig() { return twitterUserInformationConfiguration; } + + public void setConfig(TwitterUserInformationConfiguration config) { this.twitterUserInformationConfiguration = config; } + + protected Iterator idsBatches; + protected Iterator screenNameBatches; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public TwitterUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.twitterUserInformationConfiguration = config; + } + + public TwitterUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + this.klass = klass; + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) { + this.twitterUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + // no op + } + + + private void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + long[] toQuery = new long[ids.length]; + for(int i = 0; i < ids.length; i++) + toQuery[i] = ids[i]; + + for (User tStat : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(tStat); + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + private void loadBatch(String[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + for (User tStat : client.lookupUsers(ids)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); + + LOGGER.info("readCurrent"); + + while(idsBatches.hasNext()) + loadBatch(idsBatches.next()); + + while(screenNameBatches.hasNext()) + loadBatch(screenNameBatches.next()); + + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + + LOGGER.info("Exiting"); + + return result; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getInfo()); + + List screenNames = new ArrayList(); + List screenNameBatches = new ArrayList(); + + List ids = new ArrayList(); + List idsBatches = new ArrayList(); + + for(String s : twitterUserInformationConfiguration.getInfo()) { + if(s != null) + { + String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); + + // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the + // screen name list + try { + ids.add(Long.parseLong(potentialScreenName)); + } catch (Exception e) { + screenNames.add(potentialScreenName); + } + + // Twitter allows for batches up to 100 per request, but you cannot mix types + + if(ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new Long[ids.size()])); + // reset the Ids + ids = new ArrayList(); + } + + if(screenNames.size() >= 100) { + // add the batch + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + // reset the Ids + screenNames = new ArrayList(); + } + } + } + + + if(ids.size() > 0) + idsBatches.add(ids.toArray(new Long[ids.size()])); + + if(screenNames.size() > 0) + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + + this.idsBatches = idsBatches.iterator(); + this.screenNameBatches = screenNameBatches.iterator(); + } + + protected Twitter getTwitterClient() + { + String baseUrl = "https://api.twitter.com:443/1.1/"; + + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(twitterUserInformationConfiguration.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(twitterUserInformationConfiguration.getOauth().getConsumerSecret()) + .setOAuthAccessToken(twitterUserInformationConfiguration.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(3) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); + + return new TwitterFactory(builder.build()).getInstance(); + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } +} diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json new file mode 100644 index 0000000000..9e22b93317 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json @@ -0,0 +1,70 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "protocol": { + "type": "string", + "description": "The protocol" + }, + "host": { + "type": "string", + "description": "The host" + }, + "port": { + "type": "integer", + "description": "The port" + }, + "version": { + "type": "string", + "description": "The version" + }, + "endpoint": { + "type": "string", + "description": "The endpoint" + }, + "jsonStoreEnabled": { + "default" : true, + "type": "string" + }, + "oauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "appName": { + "type": "string" + }, + "consumerKey": { + "type": "string" + }, + "consumerSecret": { + "type": "string" + }, + "accessToken": { + "type": "string" + }, + "accessTokenSecret": { + "type": "string" + } + } + }, + "basicauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json index c1a0d0c6ab..2ff73627de 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json @@ -3,34 +3,12 @@ "$schema": "http://json-schema.org/draft-03/schema", "id": "#", "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, "javaInterfaces": ["java.io.Serializable"], "properties": { - "protocol": { - "type": "string", - "description": "The protocol" - }, - "host": { - "type": "string", - "description": "The host" - }, - "port": { - "type": "integer", - "description": "The port" - }, - "version": { - "type": "string", - "description": "The version" - }, - "endpoint": { - "type": "string", - "description": "The endpoint" - }, "includeEntities": { "type": "string" }, - "jsonStoreEnabled": { - "type": "string" - }, "truncated": { "type": "boolean" }, @@ -59,43 +37,6 @@ "items": { "type": "string" } - }, - "oauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "appName": { - "type": "string" - }, - "consumerKey": { - "type": "string" - }, - "consumerSecret": { - "type": "string" - }, - "accessToken": { - "type": "string" - }, - "accessTokenSecret": { - "type": "string" - } - } - }, - "basicauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "username": { - "type": "string" - }, - "password": { - "type": "string" - } - } } } } \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json new file mode 100644 index 0000000000..afd203f90d --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json @@ -0,0 +1,17 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream", + "items": { + "type": "string" + } + } + } +} \ No newline at end of file diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index b8bfe1a21a..31ddfce590 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -6,6 +6,8 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; @@ -21,6 +23,8 @@ import java.io.InputStreamReader; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java index 0b254b6292..c1827dfd7b 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/LocalStreamProcessMonitorThread.java @@ -7,7 +7,7 @@ import java.lang.management.MemoryUsage; import java.util.concurrent.Executor; -public class LocalStreamProcessMonitorThread implements Runnable +public class LocalStreamProcessMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamProcessMonitorThread.class); @@ -22,10 +22,16 @@ public LocalStreamProcessMonitorThread(Executor executor, int delayInSeconds) { this.seconds = delayInSeconds; } + @Override public void shutdown(){ this.run = false; } + @Override + public boolean isRunning() { + return this.run; + } + @Override public void run() { diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java new file mode 100644 index 0000000000..ee6e102e89 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorRunnable.java @@ -0,0 +1,6 @@ +package org.apache.streams.local.tasks; + +public interface StatusCounterMonitorRunnable extends Runnable { + void shutdown(); + boolean isRunning(); +} diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java index c6febbe3ba..7579209bf4 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StatusCounterMonitorThread.java @@ -4,7 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StatusCounterMonitorThread implements Runnable +public class StatusCounterMonitorThread implements StatusCounterMonitorRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(StatusCounterMonitorThread.class); @@ -19,10 +19,16 @@ public StatusCounterMonitorThread(DatumStatusCountable task, int delayInSeconds) this.seconds = delayInSeconds; } + @Override public void shutdown(){ this.run = false; } + @Override + public boolean isRunning() { + return this.run; + } + @Override public void run() {