Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relocation of shards causes bulk indexing client to hang #1839

Closed
snazy opened this issue Apr 3, 2012 · 6 comments
Closed

Relocation of shards causes bulk indexing client to hang #1839

snazy opened this issue Apr 3, 2012 · 6 comments

Comments

@snazy
Copy link

snazy commented Apr 3, 2012

I have set up 4 big servers (lots of cores, lots of disks, lots of ram) - each running an elasticsearch node.
One client reads rows from a database and continuously submits indexing requests to the cluster. Indexing requests are bundled into bulk requests with 2500 indexing requests.
The index has 32 shards.
My client is using the Java client API.

So far so good.

I just wanted to know what happens, if I shutdown a node and restart it again.
Shutdown works fine (except: see below).
Restart works fine...
Until the cluster starts to relocate shards.

When a bulk request "hits" a shard being relocated, the cliend hangs forever.

I have tried several networking settings, transport client vs. node client - nothing helped.

One thing fixed the issue for me:
Previously, the code was:

            Client client = (TransportClient)...
            BulkRequestBuilder bulk = client.prepareBulk();
            for ( 1 to 2500 ) {
                IndexRequestBuilder request = buildIndexingRequest();
                request = request.setReplicationType(ReplicationType.ASYNC); // no effect
                bulk.add(request);
            }
            BulkResponse response;
            response = bulk.execute().actionGet(); // <--- RETURNS NEVER, IF SHARD IS RELOCATED
            if (response != null && response.hasFailures()) {
                // some error handling...
            }

When I use actionGet(timeout) with a timeout, the method throws a ElasticSearchTimeoutException in such a situation and I can submit the bulk request again.

                    while (true) {
                        try {
                            response = bulk.execute().actionGet(getRetryTimeout()); // <--- TIMES OUT, IF SHARD IS RELOCATED
                            break;
                        }
                        catch (ElasticSearchTimeoutException timeout) {
                            warning("TIMEOUT", timeout, null, null);
                        }
                    }

In such a situation I see no activity in the elasticsearch threads and no activity in "my" calling thread - it just waits in org.elasticsearch.common.util.concurrent.BaseFuture.Sync#acquireSharedInterruptibly forever.

None of the cluster log files indicate an error.

I do not know if this behaviour affects searches.

@snazy
Copy link
Author

snazy commented Apr 3, 2012

Verified with elasticsearch 0.18.7 and 0.19.1

@kimchy
Copy link
Member

kimchy commented Apr 4, 2012

Hey, can you help write a standalone test case, and the scenario (i.e. start 4 nodes, restart one node while test case is bulk indexing data), that recreates it? It will help speed things up to see where the problem is.

@snazy
Copy link
Author

snazy commented Apr 5, 2012

OK

@snazy
Copy link
Author

snazy commented Apr 5, 2012

OK - here it is.

Just edit and execute the class (with a dependency to elasticsearch 0.19.1 jar).

  1. Setup a cluster with 4 nodes
  2. Start the class (it will automatically re-create index and mapping)
  3. Wait until the index has about 1000000 docs (means: relocation of the shards will take some time)
  4. Gracefully stop one node (I did it using elasticsearch head's SHUTDOWN functionality)
  5. Main class still runs
  6. Restart the stopped node
  7. Node appears (without any shards)
  8. Some shards will be relocated (purple color in elasticsearch head)
  9. Bang - indexer hangs ... forever

Here's the code:

package org.elasticsearch.issue1839;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.node.NodeBuilder;

import java.util.Date;
import java.util.Random;

public final class Main {
public static void main(String[] args) {
new Main().doit();
}

static final int THREADS = 4;
static final String CLUSTER = "elasticsearch";
static final String NAME = "issue1839";

private void doit() {

    NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().client(true);
    ImmutableSettings.Builder settings = nodeBuilder.settings();
    settings = settings.
            put("cluster.name", CLUSTER).//
            //
            put("client.transport.sniff", "false").//
            put("transport.tcp.compress", true).//
            put("transport.tcp.connect_timeout", "10s").//
            put("network.tcp.keep_alive", "true").//
            put("network.tcp.send_buffer_size", "64k").//
            put("network.tcp.receive_buffer_size", "64k");

    TransportClient client = new TransportClient(settings);

    client.addTransportAddress(new InetSocketTransportAddress("10.40.101.211", 9300));
    client.addTransportAddress(new InetSocketTransportAddress("10.40.101.212", 9300));
    client.addTransportAddress(new InetSocketTransportAddress("10.40.101.213", 9300));
    client.addTransportAddress(new InetSocketTransportAddress("10.40.101.214", 9300));

    IndicesAdminClient indicesAdmin = client.admin().indices();

    try {
        indicesAdmin.prepareDelete(NAME).execute().actionGet();
    }
    catch (IndexMissingException ignore) {
        //
    }

    String indexSettings = "{\"index\": {" +//
            "    \"number_of_shards\" : \"32\"," +//
            "    \"number_of_replicas\" : \"1\"" +//
            "  }" +//
            '}';
    indicesAdmin.prepareCreate(NAME).setSettings(indexSettings).execute().actionGet();

    String indexMapping = "{\"" + NAME + "\": {" +//
            "  \"properties\": {" +//
            "    \"my_text_1\" : {" +//
            "      \"type\" : \"string\"," +//
            "      \"store\" : \"yes\"," +//
            "      \"index\" : \"analyzed\"," +//
            "      \"include_in_all\" : \"true\"" +//
            "    }," +//
            "    \"my_text_2\" : {" +//
            "      \"type\" : \"string\"," +//
            "      \"store\" : \"yes\"," +//
            "      \"index\" : \"analyzed\"," +//
            "      \"include_in_all\" : \"true\"" +//
            "    }," +//
            "    \"my_text_3\" : {" +//
            "      \"type\" : \"string\"," +//
            "      \"store\" : \"yes\"," +//
            "      \"index\" : \"analyzed\"," +//
            "      \"include_in_all\" : \"true\"" +//
            "    }," +//
            "    \"when\" : {" +//
            "      \"type\" : \"date\"," +//
            "      \"store\" : \"yes\"," +//
            "      \"include_in_all\" : \"true\"" +//
            "    }" +//
            "  }" +// properties
            '}' +// issue1839
            '}';
    indicesAdmin.preparePutMapping(NAME).setType(NAME).setSource(indexMapping).execute().actionGet();

    for (int n = 0; n < THREADS; n++)
        new Thread(new Indexer(client), "indexer#" + n).start();

    while (true)
        try {
            Thread.sleep(500);
        }
        catch (InterruptedException e) {
            break;
        }
}

@SuppressWarnings("UseOfSystemOutOrSystemErr")
static final class Indexer implements Runnable {
    private final TransportClient client;
    private final Random rand = new Random(System.currentTimeMillis() + System.nanoTime());

    Indexer(TransportClient client) {
        this.client = client;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(10);
            }
            catch (InterruptedException e) {
                break;
            }

            BulkRequestBuilder bulk = client.prepareBulk();
            for (int n = 0; n < 2500; n++)
                bulk.add(createIndexRequest());
            System.out.println(new Date().toString() + " : " + Thread.currentThread().getName() + " indexing 2500 docs");
            bulk.execute().actionGet();
            System.out.println(new Date().toString() + " : " + Thread.currentThread().getName() + " indexed 2500 docs");
        }
    }

    private IndexRequestBuilder createIndexRequest() {
        StringBuilder document = new StringBuilder().//
                append('{').//
                append("  \"").append(NAME).append("\" : {").//
                append("    \"my_text_1\" : \"").append(createSomeText()).append("\", ").//
                append("    \"my_text_2\" : \"").append(createSomeText()).append("\", ").//
                append("    \"my_text_3\" : \"").append(createSomeText()).append("\", ").//
                append("    \"when\" : \"").append(System.currentTimeMillis()).append("\" ").//
                append("  }").//
                append('}');
        return client.prepareIndex(NAME, NAME).setSource(document.toString());
    }

    private String createSomeText() {
        StringBuilder text = new StringBuilder();
        for (int n = 0; n < rand.nextInt(40) + 3; n++) {
            for (int m = 0; m < rand.nextInt(15) + 3; m++)
                text.append((char) (rand.nextInt(26) + 65));
            text.append(' ');
        }
        return text.append(rand.nextInt(10000000)).toString();
    }
}

}

@kimchy
Copy link
Member

kimchy commented Apr 5, 2012

Hi, thanks for the recreation, I managed to recreate it locally as well. I found the problem, it revolves around not properly handling a relocation of a primary shard when just when the one we relocated from gets closed. I will post a fix in both 0.19 and master branches (closing this issue in the commit, so we can keep track of the change). If you can check it yourself as well it would be great.

@kimchy kimchy closed this as completed in 824b0bd Apr 5, 2012
@snazy
Copy link
Author

snazy commented Apr 5, 2012

Cool - that was fast :-)

I'll try it when 0.19.3 is released - so I can rollback my "timeout loop".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants