Skip to content
Ben McCann edited this page Mar 19, 2015 · 56 revisions

Welcome to the elasticsearch-river-mongodb wiki!

Introduction

This river uses MongoDB as datasource. It support attachment (GridFS).
The main branch support ElasticSearch 1.4.2, MongoDB 3.0.0 and TokuMX 1.5.1

Design

The current implementation monitors oplog collection from the local database. Make sure to enable replica set 0. It does not support master / slave replication. Monitoring is done using tailable cursor 1. All operations are supported: insert, update, delete.

Normal Mongo Document Implementation

Mongo document is stored within ElasticSearch without transformation. The new document stored in ElasticSearch will use the same id as mongoDB.

GridFS Mongo Document Implementation

GridFS Mongo document (with large binary content) requires transformation before being stored in ElasticSearch. It requires mapper-attachment plugin 2 installed in ElasticSearch. A specific mapping is specified to support GridFS Mongo document in ElasticSearch:

  • Mapping for attachment
    {
      “testindex” : {
        “files” : {
          “properties” : {
            “content” : {
              “path” : “full”,
              “type” : “attachment”,
              “fields” : {
                “content” : {"type": "string"},
                “author” : {"type": "string"},
                “title” : {"type": "string"},
                “keywords” : { “type” : “string” },
                “date” : { 
                  “format” : “dateOptionalTime”,
                  “type” : “date” 
              },
              “content_type” : { “type” : “string” }
            }
          },
          “chunkSize” : { “type” : “long” },
          “md5” : { “type” : “string” },
          “length” : { “type” : “long” },
          “filename” : { “type” : “string” },
          “contentType” : { “type” : “string” },
          “uploadDate” : { 
            “format” : “dateOptionalTime”,
            “type” : “date”
          },
          “metadata” : { “type” : “object” }
          }
        }
      }
    }
  • content.content is the base 64 encoded binary content.
  • content.title is GridFS filename property.
  • content.content_type is GridFS content type property.
  • chunkSize, md5, length, filename, contentType and uploadDate map the same property from GridFS MongoDB document.
  • metadata (optional) map metadata properties from GridFS MongoDB document.

Install Guide

How to install

The plugin has a dependency to elasticsearch-mapper-attachment

  • In order to install this plugin, simply run:
    %ES_HOME%\bin\plugin.bat -install elasticsearch/elasticsearch-mapper-attachments/2.4.1

Using plugin.bat

  • Simply run:
    %ES_HOME%\bin\plugin.bat -i com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.6
  • Other options:
    • The plugin can be installed directly using Maven Central url. Releases are available in Maven Repository since 1.6.5:
      %ES_HOME%\bin\plugin.bat -url https://oss.sonatype.org/content/repositories/releases/com/github/richardwilly98/elasticsearch/elasticsearch-river-mongodb/{river.version}/elasticsearch-river-mongodb-{river.version}.zip -install river-mongodb
    • Old releases (up to 1.6.5) are available in Github downloads:
      %ES_HOME%\bin\plugin.bat -install richardwilly98/elasticsearch-river-mongodb/1.6.5
  • Install issue
    The command line plugin has changes few times in ES. Please look at the syntax 10
  • Restart ElasticSearch.

Manually

  • Create path $ES_HOME\plugins\river-mongodb
  • Copy files: mongo-java-driver-{driver.version}.jar, elasticsearch-river-mongodb-{river.version}.jar and site folder in $ESHOME\plugins\river-mongodb.
  • Restart ElasticSearch.

Configuration

Create a new river for each MongoDB collection that should be indexed by ElasticSearch.

  • Replace ${es.river.name}, ${mongo.db.name}, ${mongo.collection.name}, ${mongo.is.gridfs.collection}, ${es.index.name} and ${es.type.name} by the correct values. Parameters servers, options and credentials are optional.
    • ${es.river.name} is the Elasticsearch river name
    • servers is an array of mongo instances. If not specify the ${mongo.instance1.host} (default value: localhost) and ${mongo.instance1.port} (default value: 27017)
    • options define additional river options.
      • Mongo options settings used by the driver (only secondary_read_preference is implemented).
      • drop_collection can be used to remove all document associated with the index type when the collection is dropped from MongoDB.
      • exclude_fields this option will remove unwanted fields from MongoDB before documents are indexed in ES. See example 6.
      • include_fields this option will only include specified fields from MongoDB before documents are indexed in ES. This option is mutually exclusive with exclude_fields.
      • include_collection this option will include the collection name in the document indexed ${mongo.include.collection} is the attribute name. See example 8.
      • import_all_collections this option will import all collections of the specified database. ${mongo.collection.name} value is ignored in that scenario.
      • initial_timestamp this option set the timestamp for the initial document import. See example 9.
      • skip_initial_import this option will skip the initial import (using collection data) and directly use oplog.rs collection. The default is false.
      • store_statistics statistics of documents indexed will be store in ES in _river/${es.river.name}
    • credentials is an array of the credential required by the databases. db can be ‘admin’ or ‘local’. The credentials are used to connect to local and ${mongo.db.name}. See example 4.
    • Deprecated use bulk processor settings – In index a throttle size can be defined ${es.throttle.size} default value is 500.
    • Deprecated use bulk processor settings – In index a bulk update size can be defined ${es.bulk.size} default value is 100.
    • In index bulk processor settings can be changed: ${es.bulk.actions} default value is 1000, ${es.bulk.size} default value is 5mb, ${es.bulk.concurrent.requests} default value is 50, ${es.bulk.flush.interval} default value is 10ms.
  • In mongo a custom filter can be added in ${mongo.filter}. For more details see 5.
  • From shell execute the command:
$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d '
{
  "type": "mongodb",
  "mongodb": { 
    "servers":
    [
      { "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
      { "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
    ],
    "options": { 
      "secondary_read_preference" : true, 
      "drop_collection": ${mongo.drop.collection}, 
      "exclude_fields": ${mongo.exclude.fields},
      "include_fields": ${mongo.include.fields},
      "include_collection": ${mongo.include.collection},
      "import_all_collections": ${mongo.import.all.collections},
      "initial_timestamp": {
        "script_type": ${mongo.initial.timestamp.script.type},
        "script": ${mongo.initial.timestamp.script}
      },
      "skip_initial_import" : ${mongo.skip.initial.import},
      "store_statistics" : ${mongo.store.statistics},
    },
    "credentials":
    [
      { "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
      { "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
    ],
    "db": ${mongo.db.name}, 
    "collection": ${mongo.collection.name}, 
    "gridfs": ${mongo.is.gridfs.collection},
    "filter": ${mongo.filter}
  }, 
  "index": { 
    "name": ${es.index.name}, 
    "throttle_size": ${es.throttle.size},
    "bulk_size": ${es.bulk.size},
    "type": ${es.type.name}
    "bulk": {
      "actions": ${es.bulk.actions},
      "size": ${es.bulk.size},
      "concurrent_requests": ${es.bulk.concurrent.requests},
      "flush_interval": ${es.bulk.flush.interval}
    }
  }
}'
  • Example:
$ curl -XPUT "localhost:9200/_river/mongogridfs/_meta" -d'
{
  "type": "mongodb",
    "mongodb": {
      "db": "testmongo", 
      "collection": "files", 
      "gridfs": true
    },
    "index": {
      "name": "testmongo", 
      "type": "files"
    }
}'
  • Get river settings:
$ curl -XGET "localhost:9200/_river/${es.river.name}/_meta"

Validation

  • Import a PDF file using mongofiles utility.
    %MONGO_HOME%\bin\mongofiles.exe —host localhost:27017 —db testmongo —collection files put test-large-document.pdf
    connected to: localhost:27017
    added file: 
    {
    	_id: ObjectId(‘4f244b4528a039f8f1178fdd’), 
    	filename: “test-large-document.pdf”, 
    	chunkSize: 262144, 
    	uploadDate: new Date(1327778630447), 
    	md5: “8ae3c6998db4ebbaf69421464e0c3ff9”, 
    	length: 50255626 
    }
    done!
  • Retrieve the indexed document by the id:
    $ curl -XGET “localhost:9200/testmongo/files/4f244b4528a039f8f1178fdd?pretty=true
  • The imported PDF contains the word ‘Reference’.
    $ curl -XGET “localhost:9200/testmongo/files/_search?q=Reference&pretty=true

Troubleshooting

  • Add logging in $ES_HOME\config\logging.yml (/etc/elasticsearch/logging.yml in Ubuntu)
    logger:
      river.mongodb: TRACE
      rest.action: TRACE
      org.elasticsearch.river.mongodb: TRACE

Then restart ES.
Please post ES log file in Github issues

Features

Sharded collection

The plugin should point to one of more mongos instance. It will discover automatically the shards available (looking at config.shards collection).
It will create one thread monitoring oplog.rs for each shard.

Script filters

This feature has been tested with lang-javascript and lang-groovy plugins.

  • In order to install the plugins, simply run:
    %ES_HOME%\bin\plugin.bat -install elasticsearch/elasticsearch-lang-javascript/2.4.1

    If required install Groovy plugin (this plugin is included in ES since 1.3.x)
    %ES_HOME%\bin\plugin.bat -install elasticsearch/elasticsearch-lang-groovy/{groovy.version}

New attributes “scriptType” and “script” should be added to “mongodb” attribute. “scriptType” is optional with javascript plugin. It should be set to groovy with groovy plugin.

  • Example:

Assuming the document in MongoDB has an attribute “title_from_mongo” and this attribute should mapped to the attribute “title”.

$ curl -XPUT "localhost:9200/_river/mongoscriptfilter/_meta" -d'
{
  "type": "mongodb",
    "mongodb": {
      "db": "testmongo", 
      "collection": "documents", 
      "script": "ctx.document.title = ctx.document.title_from_mongo; delete ctx.document.title_from_mongo;"
    },
    "index": {
      "name": "testmongo", 
      "type": "documents"
    }
}'

Assuming the document in MongoDB has an attribute “state” if it’s value is ‘CLOSED’ the document should be deleted from ES index.

$ curl -XPUT "localhost:9200/_river/mongoscriptfilter/_meta" -d'
{
  "type": "mongodb",
    "mongodb": {
      "db": "testmongo", 
      "collection": "documents", 
      "script": "if( ctx.document.state == 'CLOSED' ) { ctx.deleted = true; }"
    },
    "index": {
      "name": "testmongo", 
      "type": "documents"
    }
}'

Assuming the document in MongoDB has an attribute “score”, only documents with score >= 100 should be indexed.

$ curl -XPUT "localhost:9200/_river/mongoscriptfilter/_meta" -d'
{
  "type": "mongodb",
    "mongodb": {
      "db": "testmongo", 
      "collection": "documents", 
      "script": "if( ctx.document.score < 100 ) { ctx.ignore = true; }"
    },
    "index": {
      "name": "testmongo", 
      "type": "documents"
    }
}'

Examples for Groovy plugin are available here 7

Resources

0 http://www.mongodb.org/display/DOCS/Replica+Set+Tutorial

1 http://www.mongodb.org/display/DOCS/Tailable+Cursors

2 http://www.elasticsearch.org/guide/reference/mapping/attachment-type.html

3 Ubuntu install script for MongoDB and ElasticSearch

4 Example river settings with replica set and database authentication

5 MongoDB custom filter

6 Example river settings with exclude fields

7 Example river settings using Groovy script

8 Example river settings with include collection

9 Example river settings with initial timestamp

10 Installing plugin