A sample elasticsearch river able to stream json data in
Switch branches/tags
Nothing to show
Pull request Compare This branch is 1 commit behind spinscale:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
src
.gitignore
README.textile
pom.xml

README.textile

layout title cat author tutorial_desc
tutorial
Creating a streaming JSON river for data import
tutorials
Alexander Reelsen
Creating a river, which streams data in

Disclaimer I: You can get the source at github

Disclaimer II: This article will not introduce you how to write rivers!

Using rivers for automated data import into elasticsearch

One of the core questions of elasticsearch integration is how to index the data into elasticsearch. In most cases there will be an existing system, from which the data gets imported into elasticsearch.

A simple case would be to import the data from the database directly into elasticsearch (for example by using the JDBC river by Jörg Prante). However this only works out in case all the data is stored in the database and not calculated dynamically – prices for example might be calculated dynamically and not stored in the database. Furthermore this implies, that you have full access to the database.

This article will cover a possible integration scenario, in case you will not be able to index directly from your database, but use a river, which imports new data via a JSON export from the main system.

The samples used here will feature an ecommerce system using a SQL database, which uses elasticsearch as a product search engine. At least this is a setup where a concrete implementation of this river is running at my current employer lusini.de.

Functional requirements of any importing river

  • Update often: As this is a product search engine, the product search engine needs to be updated in a lot of cases like a product running out of stock, price changes, product name or description changes, product deletions or product creations. As most ecommerce stores do not need realtime product information (though it is a nice to have feature), it is sufficient to query the ecommerce system every few seconds for changes.
  • Incremental updates: This is very important. If you implement a river, which queries the main ecommerce system every 30 seconds you will definately not want to do a full export everytime and possibly import hundreds of thousands of products, but only export products which have changed since the last update.
  • Handling deletes: A product may not be searchable anymore, if it cannot be bought. For example it might have run out of stock or was simply deleted in the ecommerce system.
  • Bulk updates: As you never know, how many products are updated in one run. If a new product import with thousands of products has happened, you need to import those, if only one product ran out of stock due to a checkout, you will only need to update one product. As a conclusion you should always use bulk updates in a river.
  • Streaming: As the amount of imported products is not known, the river should stream the data in. This means that data is already imported, while the data is not yet read completely from the ecommerce server. You always have to think about whether this is what you really want. If the import fails you might have already imported some data. This is practical in case of a product import like this, but might be very impractical in other cases.

The request

The river has to query a certain endpoint in the ecommerce system which includes information about the last update, in order to only update the products as needed.

In order to support incremental updates, every request from the river to the ecommerce system has to include something like a timestamp, from which on all changes should be exported. This can be a unix timestamp or a concrete date like these:

http://ecommerce.system/export?fromTimestamp=1348084755
http://ecommerce.system/export?fromTimestamp=20120920-165523

This timestamp has to be stored in the river configuration and updated after every data import. This allows the river always to know when the last import happened, even if you stop the elasticsearch server or the river import functionality. This even works in case the ecommerce system is not reachable, when you take the timestamp from the data returned from the ecommerce system.

Sample JSON data

The JSON used to import or delete products from the river should look like this in order to be parsed from the river.

{
  "timestamp" : "2012-09-20 16:55:23",
  "products" : [
    {
      "action" : "index",
      "product" : {
        "id"     : "5",
	"name"   : "IPhone 4s",
	"price"  : 400.00,
	"stock"  : 123,
      }
    },
    {
      "action" : "index",
      "product" : {
        "id"     : "12",
	"name"   : "IPhone 5",
	"price"  : 700.00
      }
    },
    {
      "action" : "delete",
      "product" : {
        "id"     : "16",
      }
    }
  ]
}

You see the timestamp, which can simply be the timestamp when the river connected to the ecommerce system. Then there is a products array, which includes products to be indexed as well as those to be deleted. As you only need an ID for deletion, there is no need to submit the complete metadata of a product to be deleted – perhaps this data does not exist even more.

The river implementation

In order to support streaming and importing new data into elasticsearch the river needs two core features:

  • It has to stream the data and parse it while the request has not finished yet and the data is still flowing in
  • It needs a data structure where the slurper (the thread reading the data from the ecommerce system) can put new data in, while the indexer (the thread doing the actual indexing) can read from it.

Fortunately elasticsearch comes builtin with some of these structures. The one used in this example is a TransferQueue. It is declared in the JsonRiver class like this:

private final TransferQueue<RiverProduct> stream = new LinkedTransferQueue<RiverProduct>();

The slurper uses this queue to add new products. A RiverProduct is a simple abstraction class, which includes an id needed for indexing, the action to be taken (index or delete) and the product itself, a simple hashmap.

In case a new product should be indexed, the slurper calls this code:

queue.add(RiverProduct.index(id, product));

Whereas for a deletion only an id is needed

queue.add(RiverProduct.delete(id));

Reading json data from a stream

The slurper has a helper class called RiverImporter which includes the logic to add new products to the queue after having read them from the stream. Adding products has been described above. The product is a simple Map<String,Object>. The first thing needed is an inputstream, which reads from an URL. You would want to use some nice HTTP client library like async-http-client for this, but after reading this post, it turned out, that it does not really stream the data out. So sticking with a normal HttpURLConnection in this case is sufficient. Do not use HttpUrlConnection, if you need to configure features like HTTP basic authentication or have to support SSL for self signed certificates (it works but is not nice to configure).

As elasticsearch comes with the powerful jackson JSON parsing library builtin, you do not need any external libraries for streaming JSON in. This is the code used to add products while streaming json data in (from RiverImporter class):

    public RiverProductImport executeImport(String lastIndexUpdate) {
        RiverProductImport result = new RiverProductImport();

        XContentParser parser = null;
        InputStream is = null;
        String timestamp = null;

        try {
            is = getConnectionInputstream(lastIndexUpdate);
            parser = JsonXContent.jsonXContent.createParser(is);

            String action = null;
            Map<String, Object> product = null;
            String id = null;

            @SuppressWarnings("unused")
            Token token = null;
            while ((token = parser.nextToken()) != null) {

                if ("timestamp".equals(parser.text())) {
                    token = parser.nextToken();
                    timestamp = parser.text();
                } else if ("action".equals(parser.text())) {
                    token = parser.nextToken();
                    action = parser.text();
                } else if ("id".equals(parser.text())) {
                    token = parser.nextToken();
                    id = parser.text();
                } else if ("product".equals(parser.text())) {
                    token = parser.nextToken();
                    product = parser.map();
                }

                if ("delete".equals(action) && id != null) {
                    queue.add(RiverProduct.delete(id));
                    action = null;
                    id = null;
                    result.exportedProductCount++;
                }

                if ("index".equals(action) && product != null && id != null) {
                    queue.add(RiverProduct.index(id, product));
                    action = null;
                    product = null;
                    id = null;
                    result.exportedProductCount++;
                }
            }

            result.exportTimestamp = timestamp;

        } catch (IOException e) {
            logger.error("Could not get content with lastUpdatedTimestamp [{}]", e, lastIndexUpdate);
        } finally {
            Closeables.closeQuietly(is);
            if (parser != null) {
                parser.close();
            }
        }

        return result;
    }

Lots of stuff happening here, time to go through it.

After getting the input stream with the data in it, it is easy to create a JSON parser using this input stream.

You might recognize the action, product and id variable as part of the RiverProduct. Per definition of the JSON we saw above, it is ok to have a product with either a delete action and an id field or to have a product with an index action, an id field and product data in order to create a valid entry in the queue.

The next step is to parse every (well, almost every) token the json parser returns. We need to check any token if it is one of timestamp, action, id or product. In any of these cases we need to do something with the next token. If we receive

"timestamp" : "2012-09-23 16:45:23"

and parser.text() receives timestamp, we are not interested to get this string, but rather the next token in order to store it after importing the whole stream in the river configuration. The parsing of the action and id fields works in the same manner. However the parsing of the product has a specialty. In case the parser detects a product token, the logic in the while loop takes the whole content of the product and creates a map out of it, instead of reading it per token.

As you can see, this method includes the logic to put products in the queue, even though the parser has not yet finished processing the whole HTTP request – exactly what you want, because the indexer can now start importing, while the slurper is still getting data.

Creating a bulk request with products from the queue

The Indexer is part of the JsonRiver class and mainly consists of the run() method, which looks like this:

public void run() {
    while (!closed) {
        logger.debug("Indexer run() started");
        sw = new StopWatch().start();
        deletedDocuments = 0;
        insertedDocuments = 0;

        try {
            RiverProduct product = stream.take();
            bulk = client.prepareBulk();
            do {
                addProductToBulkRequest(product);
            } while ((product = stream.poll(250, TimeUnit.MILLISECONDS)) != null && deletedDocuments + insertedDocuments < RIVER_MAX_BULK_SIZE);
        } catch (InterruptedException e) {
            continue;
        } finally {
            bulk.execute().actionGet();
        }
        logStatistics();
    }
}

The starting point here is the code in the try-catch block, which starts with stream.take(). This methods blocks until someone puts a RiverProduct in the stream TransferQueue, which is to be done by the slurper after it successfully read in a RiverProduct from the input stream. After that a bulk request is prepared. The next while loop provides a very nice additional feature by setting useful break-conditions. The loop is stopped and the bulk request is executed under two circumstances:

  • Either RIVER_MAX_BULK_SIZE is reached (for storing the products after having read 5000 products). This has two advantages. First, the products you read in first become searchable faster. Second, you will not have to keep all products in memory, but only as much as you have defined your bulk size. This ensures you will not run out of memory during a large import.
  • Or the TransferQueue is not filled with a new product for 250 milliseconds. There are several reasons for this behaviour. The normal one is, that the import executed in the @RiverImport@ class and triggered by the slurper is finished. Unusual ones might be, that the system the river pulls the data from is under such high load, that it does not write out the data as fast. Or there might happen a network outage right while the river is reading data. This ensures that no RiverProduct entities are lurking around in the queue in memory instead of being written to elasticsearch.

The addProductToBulkRequest() method simply inserts an IndexRequest or an DeleteRequest to the product, depending on the content of the RiverProduct being processed.

The execution of the bulk request is done in the finally() call to ensure it always gets written, even if elasticsearch or the river gets shut down.

That’s it basically – this is the streaming JSON river. No magic, just some code. :-)

Testing

There are three tests in the JsonRiverTest class. I have used the spark micro web framework for setting up a river endpoint. As the spark framework does not feature creating JSON data, the built-in elasticsearch JSON builder was used. The spark framework code in the setup() method checks for the lastUpdate parameter. If it is not set (this means this is the first request from the river), it returns two products to add and one to delete. If the parameter is set, it simply appends “1” to the given parameter and returns it as timestamp without returning any products. This means the river has one shot to get the data imported right.

Furthermore the setup() method creates a new product with the id TODELETE, which is expected to be deleted, as exactly this product is marked as deleted in the response rendered by the spark server. In order to be sure that the product is in the index, when the river slurper runs for the first time, the products index is flushed after the product has been added.

The test tests for three important features of the river:

  • Creation of entities is working: testThatIndexingWorks()
  • Deletion of entities is working testThatDeletionWorks()
  • The lastUpdatedTimestamp is updated in order to prevent full export on every execution: testThatLastUpdatedTimestampIsWritten()

Do yourself a favour and write tests with huge data in order to make sure you will not hit any memory boundaries in your live environment.

Building, packaging and including in elasticsearch

In case you want to play around with the river or include it in your elasticsearch installation (not recommended before changing it, see the last paragraph)

git clone https://github.com/spinscale/elasticsearch-river-streaming-json.git
mvn package
path/to/es/bin/plugin -install json-river -url target/releases/elasticsearch-river-json-0.0.1-SNAPSHOT.zip

You might want to execute mvn eclipse:eclipse as well in case you want to use it with some IDE.

Improvements

This river is intended to be a minimal example, so you can rip it off and make it suit your needs. I think it is not too useful to create a generic-hyper-can-import-everything-from-everywhere river, as every integration with any systems tends to differ. So here are some suggestions to

Reading river configuration from river index

Every configuration value is hardcoded in the river, which is useless. You want different configuration per river setup, which means that the configuration has to be supplied, when registering the river like this:

curl -v -XPUT 'localhost:9200/_river/my_river/_meta' -d '{
  "type" : "json",
  "configuration" : {
    "index"          : "products",
    "type"           : "product",
    "max_bulk_size"  : 5000,
    "endpoint"       : "http://localhost:4567/data?",
    "parameter"      : "lastUpdate",
    "refresh"        : "30s",
    "active"         : true
  } 
}'

This setup would allow you to to have an arbitrary amount of json rivers reading into arbitrary indexes. You would just need to create more rivers.

Keep in mind, what you would have to reread your configuration always after or before your slurper and indexer thread is running in order to make sure it will change its configuration not only on startup of the river.

It is also very useful to have an active boolean in your river configuration, so you can stop it, if you either do not need it, or you are expecting some sort of outage or maintenance in your main ecommerce system.

Reading a configuration item from the river works like this:

GetResponse response = client.prepareGet().setIndex("_river").setType("my_river").setId("_meta").execute().actionGet();
Map<String, Object> configuration = (Map<String, Object>) response.getSource().get("configuration");
String endpoint = configuration.get("endpoint").toString();

Use aliases

Another hint from the operational trenches is to use aliases when using rivers to index data. Aliases allow you to index into different indexes than those used to search. This is very useful in case you do a long lasting full export with your river. An example setup is like this

  • products is an alias to the index products1, which is full with data and currently used for live searches
  • You need a full export, which lasts 20 minutes. You cannot stop live search for 20 minutes or delete the index and wait for 20 minutes until all index data is back
  • You create and index products2, reconfigure your river to index into this index, delete the lastUpdated timestamp for this river configuration, and wait until the export is done
  • You switch the products alias from products1 to products2
  • You can delete the products1 index now
  • Drawback: You will not get product updates in your live search as long the full export is running (you could do this by setting up another river, but you would generate more load in the system where the data gets pulled from)

Thanks for reading and happy streaming. In case of questions you can reach me via github, twitter or mail