diff --git a/plugins/river/couchdb/.gitignore b/plugins/river/couchdb/.gitignore new file mode 100644 index 0000000000000..3af0ccb687b45 --- /dev/null +++ b/plugins/river/couchdb/.gitignore @@ -0,0 +1 @@ +/data diff --git a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java index e722b6ff54636..75baf58521256 100644 --- a/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java +++ b/plugins/river/couchdb/src/main/java/org/elasticsearch/river/couchdb/CouchdbRiver.java @@ -62,6 +62,7 @@ /** * @author kimchy (shay.banon) + * @author dadoonet (David Pilato) for attachments filter */ public class CouchdbRiver extends AbstractRiverComponent implements River { @@ -75,6 +76,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { private final String couchFilter; private final String couchFilterParamsUrl; private final String basicAuth; + private final boolean couchIgnoreAttachements; private final String indexName; private final String typeName; @@ -116,6 +118,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { } else { couchFilterParamsUrl = null; } + couchIgnoreAttachements = XContentMapValues.nodeBooleanValue(couchSettings.get("ignore_attachments"), false); if (couchSettings.containsKey("user") && couchSettings.containsKey("password")) { String user = couchSettings.get("user").toString(); String password = couchSettings.get("password").toString(); @@ -135,6 +138,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River { couchDb = "db"; couchFilter = null; couchFilterParamsUrl = null; + couchIgnoreAttachements = false; basicAuth = null; script = null; } @@ -214,7 +218,9 @@ private String processLine(String s, BulkRequestBuilder bulk) { // Ignore design documents if (id.startsWith("_design/")) { - logger.trace("ignoring design document {}", id); + if (logger.isTraceEnabled()) { + logger.trace("ignoring design document {}", id); + } return seq; } @@ -246,7 +252,25 @@ private String processLine(String s, BulkRequestBuilder bulk) { if (logger.isTraceEnabled()) { logger.trace("processing [index ]: [{}]/[{}]/[{}], source {}", index, type, id, doc); } - bulk.add(indexRequest(index).type(type).id(id).source(doc).routing(extractRouting(ctx))); + + // Remove _attachement from doc if needed + if (couchIgnoreAttachements) { + if (doc.containsKey("_attachments")) { + Map _attachments = (Map) doc + .get("_attachments"); + if (_attachments != null) { + doc.remove("_attachments"); + if (logger.isTraceEnabled()) { + logger.trace("_attachments found and removed from doc"); + } + } + } + } else { + // TODO by now, couchDB river does not really store attachments in Elastic Search but only attachments meta informations + // So we perhaps need to fully support attachments + } + + bulk.add(indexRequest(index).type(type).id(id).source(doc).routing(extractRouting(ctx))); } else { logger.warn("ignoring unknown change {}", s); } @@ -381,6 +405,7 @@ private class Slurper implements Runnable { file = file + couchFilterParamsUrl; } } + if (lastSeq != null) { file = file + "&since=" + lastSeq; } diff --git a/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverAttachementTest.java b/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverAttachementTest.java new file mode 100644 index 0000000000000..55fdce0b96107 --- /dev/null +++ b/plugins/river/couchdb/src/test/java/org/elasticsearch/river/couchdb/CouchdbRiverAttachementTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.river.couchdb; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.IndexMissingException; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; + +/** + * This is a simple test case for testing attachements removing.
+ * You may have a couchdb instance running on localhost:5984 with a mytest database.
+ * If you push documents with attachements in it, attachements should be ignored by the river. + * @author dadoonet (David Pilato) + */ +public class CouchdbRiverAttachementTest { + + public static void main(String[] args) throws Exception { + String host = "localhost"; + String port = "5984"; + String db = "mytest"; + boolean ignoreAttachements = true; + + Node node = NodeBuilder.nodeBuilder().settings(ImmutableSettings.settingsBuilder().put("gateway.type", "local")).node(); + Thread.sleep(1000); + try { + node.client().admin().indices().delete(new DeleteIndexRequest("_river")).actionGet(); + } catch (IndexMissingException e) { + // Index does not exist... Fine + } + Thread.sleep(1000); + try { + node.client().admin().indices().delete(new DeleteIndexRequest(db)).actionGet(); + } catch (IndexMissingException e) { + // Index does not exist... Fine + } + + XContentBuilder xb = jsonBuilder() + .startObject() + .field("type", "couchdb") + .startObject("couchdb") + .field("host", host) + .field("port", port) + .field("db", db) + .field("ignoreAttachements", ignoreAttachements) + .endObject() + .endObject(); + node.client().prepareIndex("_river", db, "_meta").setSource(xb).execute().actionGet(); + + Thread.sleep(100000); + } +}