Skip to content

Commit

Permalink
Added cloudant input connector
Browse files Browse the repository at this point in the history
  • Loading branch information
tfennelly committed Oct 11, 2013
1 parent 4abe5e6 commit 614cc78
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 70 deletions.
1 change: 1 addition & 0 deletions cloudant/build.gradle
@@ -1,5 +1,6 @@
dependencies {
compile ("com.foxweave:foxweave-api:${foxweaveVersion}");
compile ("com.foxweave:foxweave-commons-common-data:${foxweaveVersion}");
compile ("com.foxweave:foxweave-commons-common-json-streaming:${foxweaveVersion}");
}

@@ -0,0 +1,54 @@
/*
* Copyright (C) 2012 FoxWeave, Ireland.
*
* All rights reserved.
*/
package com.foxweave.connector.cloudant;

import com.foxweave.codec.Base64Coder;
import com.foxweave.data.component.ConfigUtil;
import com.foxweave.pipeline.component.AbstractPipelineComponent;
import com.foxweave.pipeline.component.ComponentConfigurationException;
import com.foxweave.pipeline.lifecycle.Configurable;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.URIException;
import org.json.JSONObject;

public abstract class AbstractCloudantConnector extends AbstractPipelineComponent implements Configurable<JSONObject> {

public static final String URL = "cloudant_server_url";
public static final String DATABASE_NAME = "cloudant_database_name";
public static final String USER_NAME = "accountName";
public static final String PASSWORD = "password";

protected HttpClient httpClient = new HttpClient();
protected URI requestURI;
protected String encodedAuthCredentials;

@Override
public void setConfiguration(final JSONObject config) throws ComponentConfigurationException {
String serverURL = config.optString(URL);
String dbName = config.optString(DATABASE_NAME);
String dbUrl;

if (serverURL.endsWith("/")) {
dbUrl = serverURL + dbName;
} else {
dbUrl = serverURL + "/" + dbName;
}

String cloudantUrl = dbUrl + getRESTResource();
try {
this.requestURI = new URI(cloudantUrl, false);
} catch (URIException e) {
throw new ComponentConfigurationException("Unexpected exception. Invalid Cloudant URL.", e);
}

String username = ConfigUtil.getAccountParam(USER_NAME, config);
String password = ConfigUtil.getAccountParam(PASSWORD, config);
encodedAuthCredentials = Base64Coder.encodeString(username + ":" + password);
}

protected abstract String getRESTResource();
}
@@ -0,0 +1,180 @@
package com.foxweave.connector.cloudant;

import com.foxweave.exception.FoxWeaveException;
import com.foxweave.io.StreamUtils;
import com.foxweave.json.streaming.JSONObjectCallback;
import com.foxweave.json.streaming.JSONStreamer;
import com.foxweave.pipeline.component.PollContext;
import com.foxweave.pipeline.component.PollingInputConnector;
import com.foxweave.pipeline.exchange.EntityState;
import com.foxweave.pipeline.exchange.Exchange;
import com.foxweave.pipeline.exchange.ExchangeFactory;
import com.foxweave.pipeline.exchange.Message;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.GetMethod;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class CloudantInputConnector extends AbstractCloudantConnector implements PollingInputConnector {

private static final Logger logger = LoggerFactory.getLogger(CloudantInputConnector.class);

private static final String LAST_SEQ_CACHE_KEY = CloudantInputConnector.class.getName() + "#LAST_SEQ_CACHE_KEY";

private ExchangeFactory exchangeFactory;
private PollContext pollContext;

@Override
protected String getRESTResource() {
return "/_changes";
}

@Override
public void setExchangeFactory(ExchangeFactory exchangeFactory) {
this.exchangeFactory = exchangeFactory;
}

@Override
public void setPollContext(PollContext pollContext) {
this.pollContext = pollContext;
}

@Override
public void poll() throws Exception {
String lastSeqId = getLastSequenceId();

if (lastSeqId == null) {
if (getPipelineContext().isSync()) {
initializeLastSyncSeqId();
return;
} else {
lastSeqId = "0";
}
}

GetMethod method = new GetMethod(requestURI.toString());
try {
NameValuePair[] queryParams = new NameValuePair[3];
queryParams[0] = new NameValuePair("include_docs", "true");
queryParams[1] = new NameValuePair("descending", "false");
queryParams[2] = new NameValuePair("since", lastSeqId);

method.setQueryString(queryParams);
method.setRequestHeader("Authorization", "Basic " + encodedAuthCredentials);
if (httpClient.executeMethod(method) == 200) {
InputStream dataStream = method.getResponseBodyAsStream();

if (dataStream != null) {
try {
String charEnc = method.getResponseCharSet();

if (charEnc == null) {
charEnc = "UTF-8";
}

InputStreamReader dataStreamReader = new InputStreamReader(dataStream, charEnc);
try {
CloudantDocHandler callback = new CloudantDocHandler();
try {
JSONStreamer jsonStreamer = new JSONStreamer(callback, "results");

jsonStreamer.stream(dataStreamReader);
} finally {
callback.endExchange();
}
} finally {
StreamUtils.safeClose(dataStreamReader);
}
} finally {
StreamUtils.safeClose(dataStream);
}
}
}
} finally {
method.releaseConnection();
}
}

private void initializeLastSyncSeqId() throws IOException, JSONException {
GetMethod method = new GetMethod(requestURI.toString());
NameValuePair[] queryParams = new NameValuePair[3];

queryParams[0] = new NameValuePair("include_docs", "true");
queryParams[1] = new NameValuePair("descending", "true");
queryParams[2] = new NameValuePair("limit", "1");

method.setQueryString(queryParams);
method.setRequestHeader("Authorization", "Basic " + encodedAuthCredentials);
if (httpClient.executeMethod(method) == 200) {
JSONObject responseJSON = new JSONObject(method.getResponseBodyAsString());
storeLastSequenceId(responseJSON.getString("last_seq"));
}
}

private void storeLastSequenceId(String seq) {
getPipelineContext().getPipelineScopedCache().put(LAST_SEQ_CACHE_KEY, seq);
}

private String getLastSequenceId() {
return (String) getPipelineContext().getPipelineScopedCache().get(LAST_SEQ_CACHE_KEY);
}

private class CloudantDocHandler implements JSONObjectCallback {

private Exchange exchange;

@Override
public boolean onJSONObject(JSONObject jsonObject) {
JSONObject doc = jsonObject.optJSONObject("doc");
if (doc == null) {
logger.warn("Cloudant '_changes' document without a 'doc' element? Potential JSONStreamer issue!");
return pollContext.okayToContinue();
}

String _rev = doc.optString("_rev", null);
if (_rev == null) {
logger.warn("Cloudant '_changes' document without a 'doc' element? Potential JSONStreamer issue!");
return pollContext.okayToContinue();
}

// For now, we're only processing new documents i.e. not supporting updates yet...
if (_rev.charAt(0) != '1') {
return pollContext.okayToContinue();
}

if (exchange == null) {
exchange = exchangeFactory.newExchange();
exchange.start();
}

Message message = exchange.newMessage();
message.setPayload(doc);

exchange.send(message);
if (exchange.getState() != EntityState.OK) {
return false;
}

storeLastSequenceId(jsonObject.optString("seq"));

return pollContext.okayToContinue();
}

private void endExchange() {
if (exchange != null) {
exchange.end();
}
}

@Override
public void onException(Exception e) throws Exception {
throw new FoxWeaveException("Error processing Cloudant '_changes' stream.", e);
}
}
}
Expand Up @@ -6,30 +6,24 @@
package com.foxweave.connector.cloudant;

import java.io.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.foxweave.codec.Base64Coder;
import com.foxweave.data.component.ConfigUtil;
import com.foxweave.exception.FoxWeaveException;
import com.foxweave.io.CharsetUtils;
import com.foxweave.io.FileUtils;
import com.foxweave.io.StreamUtils;
import com.foxweave.pipeline.component.AbstractPipelineComponent;
import com.foxweave.pipeline.component.ComponentConfigurationException;
import com.foxweave.pipeline.component.OutputConnector;
import com.foxweave.pipeline.component.listener.ExchangeLifecycleListener;
import com.foxweave.pipeline.exchange.Exchange;
import com.foxweave.pipeline.exchange.Message;
import com.foxweave.pipeline.lifecycle.Configurable;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.methods.FileRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudantOutputConnector extends AbstractPipelineComponent implements ExchangeLifecycleListener, Configurable<JSONObject>, OutputConnector {
public class CloudantOutputConnector extends AbstractCloudantConnector implements ExchangeLifecycleListener, OutputConnector {

private static final Logger logger = LoggerFactory.getLogger(CloudantOutputConnector.class);
private static final String CLOUDANT_DEFAULT_MAX_BATCH_SIZE = "CLOUDANT_DEFAULT_MAX_BATCH_SIZE";
Expand All @@ -47,54 +41,33 @@ public class CloudantOutputConnector extends AbstractPipelineComponent implement
}
}

public static final String URL = "cloudant_server_url";
public static final String DATABASE_NAME = "cloudant_database_name";
public static final String USER_NAME = "accountName";
public static final String PASSWORD = "password";
public static final String MAX_BATCH_SIZE = "maxBatchSize";

private HttpClient httpClient;
private URI requestURI;
private String encodedAuthCredentials;
private File batchFile;
private int maxBatchSize = DEFAULT_MAX_BATCH_SIZE;
private AtomicInteger numDocsInBatch = new AtomicInteger(0);

public File getBatchFile() {
return batchFile;
}

@Override
public void setConfiguration(final JSONObject config) throws ComponentConfigurationException {
String serverURL = config.optString(URL);
String dbName = config.optString(DATABASE_NAME);
String dbUrl;

if (serverURL.endsWith("/")) {
dbUrl = serverURL + dbName;
} else {
dbUrl = serverURL + "/" + dbName;
}
try {
this.requestURI = new URI(dbUrl + "/_bulk_docs", false);
} catch (URIException e) {
throw new ComponentConfigurationException("");
}

String username = ConfigUtil.getAccountParam(USER_NAME, config);
String password = ConfigUtil.getAccountParam(PASSWORD, config);
encodedAuthCredentials = Base64Coder.encodeString(username + ":" + password);

super.setConfiguration(config);
maxBatchSize = config.optInt(MAX_BATCH_SIZE, DEFAULT_MAX_BATCH_SIZE);
}

@Override
protected String getRESTResource() {
return "/_bulk_docs";
}

@Override
public void start() throws Exception {
batchFile = new File(getPipelineContext().getTempDir(), "cloudant/" + getComponentId() + "-batch.json");
if (!batchFile.getParentFile().exists()) {
batchFile.getParentFile().mkdirs();
}

httpClient = new HttpClient();
if (batchFile.exists()) {
sendBatch();
}
Expand Down Expand Up @@ -139,6 +112,7 @@ public synchronized Message send(final Message message) throws Exception {
}
batchFileWriter.write(json.toString());
batchFileWriter.flush();
numDocsInBatch.incrementAndGet();
} finally {
StreamUtils.safeClose(batchFileWriter);
}
Expand Down Expand Up @@ -170,11 +144,19 @@ public void writeRequest(OutputStream out) throws IOException {
}
};

if (numDocsInBatch.get() != 0) {
logger.debug("Sending {} documents to Cloudant ({}).", numDocsInBatch.get(), requestURI);
} else {
// Num docs unknown - probably a restart
logger.debug("Sending documents to Cloudant ({}).", requestURI);
}

postMethod.setURI(requestURI);
postMethod.setRequestHeader("Authorization", "Basic " + encodedAuthCredentials);
postMethod.setRequestEntity(requestEntity);
httpClient.executeMethod(postMethod);

logger.debug("Sending batch of documents to ");
if (postMethod.getStatusCode() >= 200 && postMethod.getStatusCode() < 300) {
batchFile.delete();
} else {
Expand Down

0 comments on commit 614cc78

Please sign in to comment.