From 1358f95deb52a9d96f69fcf71eaf2ba0684cc5f3 Mon Sep 17 00:00:00 2001 From: Sean Gallagher Date: Tue, 1 Apr 2014 12:28:08 -0400 Subject: [PATCH 01/54] Author: Sean Gallagher Date: Tue Apr 1 12:28:00 2014 Added upgrade.asciidoc and links to it from setup.asciidoc --- README.textile | 4 ++++ docs/reference/setup.asciidoc | 2 ++ 2 files changed, 6 insertions(+) diff --git a/README.textile b/README.textile index 9d8fba6b45d8b..86c4651a3255c 100644 --- a/README.textile +++ b/README.textile @@ -206,6 +206,10 @@ The distribution will be created under @target/releases@. See the "TESTING":TESTING.asciidoc file for more information about running the Elasticsearch test suite. +h3. Upgrading to Elasticsearch 1.0? + +In order to ensure a smooth upgrade process from earlier versions of Elasticsearch (< 0.90.x), it is recommended to perform a full cluster restart. Please see the "Upgrading" section of the "setup reference":http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup.html. + h1. License
diff --git a/docs/reference/setup.asciidoc b/docs/reference/setup.asciidoc
index cf413cd918a67..12324483f89c2 100644
--- a/docs/reference/setup.asciidoc
+++ b/docs/reference/setup.asciidoc
@@ -61,3 +61,5 @@ include::setup/as-a-service-win.asciidoc[]
 include::setup/dir-layout.asciidoc[]
 
 include::setup/repositories.asciidoc[]
+
+include::setup/upgrade.asciidoc[]

From e13d759ccf23ff453b5c0c072b36fb00f7366494 Mon Sep 17 00:00:00 2001
From: Sean Gallagher 
Date: Tue, 1 Apr 2014 12:30:15 -0400
Subject: [PATCH 02/54] Author: Sean Gallagher Date: Apr 1 2014

Added upgrade.asciidoc
---
 docs/reference/setup/upgrade.asciidoc | 72 +++++++++++++++++++++++++++
 1 file changed, 72 insertions(+)
 create mode 100644 docs/reference/setup/upgrade.asciidoc

diff --git a/docs/reference/setup/upgrade.asciidoc b/docs/reference/setup/upgrade.asciidoc
new file mode 100644
index 0000000000000..0d1300ffe6bd1
--- /dev/null
+++ b/docs/reference/setup/upgrade.asciidoc
@@ -0,0 +1,72 @@
+[[setup-upgrade]]
+== Upgrading Elasticsearch
+
+[float]
+=== Rolling Upgrades
+Within major releases (0.90.x to 0.90.x, 1.0 to all later releases), rolling upgrades are supported.  To perform a rolling upgrade:
+
+* Shut down a single node within the cluster.
+
+--------------------------------------------
+curl -XPOST 'http://localhost:9200/_cluster/nodes/_local/_shutdown'
+--------------------------------------------
+
+* Confirm that all shards are correctly reallocated to the remaining running nodes.
+
+* Back up the data directory on the stopped node (optional).  This step is performed in order to allow for a smoother rollback in the event of problems during the upgrade.
+
+* Upgrade the stopped node.  To upgrade using a zip or compressed tarball from elasticsearch.org:
+** Extract the zip or tarball to a new directory, usually in the same volume as the current Elasticsearch installation.  Do not overwrite the existing installation, as the downloaded archive will contain a default elasticsearch.yml file and will overwrite your existing configuration.
+** Copy the configuration files from the old Elasticsearch installation's config directory to the new Elasticsearch installation's config directory.  Copy data files if necessary.
+** The simplest solution for moving from one version to another is to have a symbolic link for 'elasticsearch' that points to the currently running version.  This link can be easily updated and will provide a stable access point to the most recent version.  Update this symbolic link if it is being used.
+
+To upgrade using a .deb or .rpm package:
+
+* Use rpm or deb to install the new package.  All files should be placed in their proper locations, and config files should not be overwritten.
+
+* Start the now upgraded node.  Observe that all shards are properly allocated back to the node and that it joins the cluster smoothly.
+
+Repeat this process for all remaining nodes.
+
+It may be possible to perform the upgrade by installing the new software while the service is running.  This would reduce downtime by ensuring the service was ready to run on the new version as soon as it is stopped on the node being upgraded.  This can be done by installing the new version in its own directory and using the symbolic link method outlined above.  It is important to test this procedure first to be sure that site-specific configuration data and production indices will not be overwritten during the upgrade process.
+
+[float]
+=== Upgrading from 0.90.x or earlier to 1.0 or later
+Elasticsearch releases prior to 1.0 and releases after 1.0 are not compatible with each other, so a rolling upgrade is not possible.  In order to upgrade a pre-1.0 system to 1.0 or later, a full cluster stop and start is required.  In order to perform this upgrade:
+
+* Disable shard reallocation (optional).  This is done to allow for a faster startup after cluster shutdown.  If this step is not performed, the nodes will immediately start trying to replicate shards to each other on startup and will spend a lot of time on wasted I/O.  With shard reallocation disabled, the nodes will join the cluster with their indices intact, without attempting to rebalance.  After startup is complete, reallocation will be turned back on.
+
+This syntax is from versions prior to 1.0:
+------------------------------------------------------------
+	curl -XPUT localhost:9200/_cluster/settings -d '{
+		"persistent" : {
+		"cluster.routing.allocation.disable_allocation" : true
+		}
+	}'
+------------------------------------------------------------
+
+* Stop all Elasticsearch services on all nodes in the cluster.
+------------------------------------------------------
+	curl -XPOST 'http://localhost:9200/_shutdown'
+------------------------------------------------------
+
+* Backup all data directories on all nodes (optional).  This step is performed to allow for a smoother rollback in the event of problems during the upgrade.
+
+* On the first node to be upgraded, extract the archive or install the new package as described above in the Rolling Upgrades section.  Repeat for all nodes.
+
+* After upgrading Elasticsearch on all nodes is complete, the cluster can be started by starting each node individually.
+** Start master-eligible nodes first, one at a time.  Verify that a quorum has been reached and a master has been elected before proceeding.
+** Start data and client nodes one at a time, verifying that they successfully join the cluster.
+
+* When the cluster is up and running, shard reallocation can be enabled.
+
+This synax is from release 1.0 and later:
+------------------------------------------------------
+	curl -XPUT localhost:9200/_cluster/settings -d '{
+     		"persistent" : {
+         	"cluster.routing.allocation.enable" : "all"
+     		}
+ 	}'
+------------------------------------------------------
+
+The cluster upgrade can be streamlined by installing the software before stopping cluster services.  If this is done, testing must be performed to ensure that no production data or configuration files are overwritten prior to restart.

From 31f7696e653ba9ba6593843787c60144e3f316a1 Mon Sep 17 00:00:00 2001
From: Sean Gallagher 
Date: Fri, 4 Apr 2014 16:57:51 -0400
Subject: [PATCH 03/54] Add upgrade instructions Author: Sean Gallagher Date:
 4/4/14 Closes issue #5651

---
 README.textile                        |  2 +-
 docs/reference/setup/upgrade.asciidoc | 84 +++++++++++++++++++++++----
 2 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/README.textile b/README.textile
index 86c4651a3255c..fcf53bdba8c4d 100644
--- a/README.textile
+++ b/README.textile
@@ -206,7 +206,7 @@ The distribution will be created under @target/releases@.
 See the "TESTING":TESTING.asciidoc file for more information about
 running the Elasticsearch test suite.
 
-h3. Upgrading to Elasticsearch 1.0?
+h3. Upgrading to Elasticsearch 1.x?
 
 In order to ensure a smooth upgrade process from earlier versions of Elasticsearch (< 0.90.x), it is recommended to perform a full cluster restart. Please see the "Upgrading" section of the "setup reference":http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup.html.
 
diff --git a/docs/reference/setup/upgrade.asciidoc b/docs/reference/setup/upgrade.asciidoc
index 0d1300ffe6bd1..b52d8eeab215d 100644
--- a/docs/reference/setup/upgrade.asciidoc
+++ b/docs/reference/setup/upgrade.asciidoc
@@ -1,57 +1,116 @@
 [[setup-upgrade]]
-== Upgrading Elasticsearch
+== Upgrading
+
+Elasticsearch can usually be upgraded using a rolling upgrade process, resulting in no interruption of service.  This section details how to perform both rolling and restart upgrades.
+
+[float]
+[[backup]]
+=== Back Up Your Data!
+
+Before performing an upgrade, it's a good idea to back up the data on your system.  This will allow you to roll back in the event of a problem with the upgrade.  The upgrades sometimes include upgrades to the Lucene libraries used by Elasticsearch to access the index files, and after an index file has been updated to work with a new version of Lucene, it may not be accessible to the versions of Lucene present in earlier Elasticsearch releases.
+
+[float]
+==== 0.90.x and earlier
+
+To back up a running 0.90.x system, first disable index flushing.  This will prevent indices from being flushed to disk while the backup is in process:
+
+[source,sh]
+-----------------------------------
+$ curl -XPUT 'http://localhost:9200/_all/_settings' -d '{
+    "index": {
+        "translog.disable_flush": "true"
+    }
+}'
+-----------------------------------
+
+Then disable reallocation.  This will prevent the cluster from moving data files from one node to another while the backup is in process:
+
+[source,sh]
+-----------------------------------
+$ curl -XPUT 'http://localhost:9200/_cluster/settings' -d '{
+    "transient" : {
+        "cluster.routing.allocation.disable_allocation": "true"
+    }
+}'
+-----------------------------------
+
+After reallocation and index flushing are disabled, initiate a backup of Elasticsearch's data path using your favorite backup method (tar, storage array snapshots, backup software).  When the backup is complete and data no longer needs to be read from the Elasticsearch data path, reallocation and index flushing must be re-enabled:
+
+[source,sh]
+-----------------------------------
+$ curl -XPUT 'http://localhost:9200/_all/_settings' -d '{
+    "index": {
+        "translog.disable_flush": "false"
+    }
+}'
+
+$ curl -XPUT 'http://localhost:9200/_cluster/settings' -d '{
+    "transient" : {
+        "cluster.routing.allocation.disable_allocation": "false"
+    }
+}'
+-----------------------------------
+
+[float]
+==== 1.0 and later
+
+To back up a running 1.0 or later system, it is simplest to use the snapshot feature.  Complete instructions for backup and restore with snapshots are available http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-snapshots.html[here].
 
 [float]
+[[rolling-upgrades]]
 === Rolling Upgrades
-Within major releases (0.90.x to 0.90.x, 1.0 to all later releases), rolling upgrades are supported.  To perform a rolling upgrade:
+
+Within major releases after release 1.0, rolling upgrades are supported.  To perform a rolling upgrade:
 
 * Shut down a single node within the cluster.
 
+[source,sh]
 --------------------------------------------
 curl -XPOST 'http://localhost:9200/_cluster/nodes/_local/_shutdown'
 --------------------------------------------
 
 * Confirm that all shards are correctly reallocated to the remaining running nodes.
 
-* Back up the data directory on the stopped node (optional).  This step is performed in order to allow for a smoother rollback in the event of problems during the upgrade.
-
 * Upgrade the stopped node.  To upgrade using a zip or compressed tarball from elasticsearch.org:
 ** Extract the zip or tarball to a new directory, usually in the same volume as the current Elasticsearch installation.  Do not overwrite the existing installation, as the downloaded archive will contain a default elasticsearch.yml file and will overwrite your existing configuration.
-** Copy the configuration files from the old Elasticsearch installation's config directory to the new Elasticsearch installation's config directory.  Copy data files if necessary.
+** Copy the configuration files from the old Elasticsearch installation's config directory to the new Elasticsearch installation's config directory.  Copy data files if necessary.  If data files are not located within the tarball's extraction directory, they will not have to be moved.
 ** The simplest solution for moving from one version to another is to have a symbolic link for 'elasticsearch' that points to the currently running version.  This link can be easily updated and will provide a stable access point to the most recent version.  Update this symbolic link if it is being used.
 
-To upgrade using a .deb or .rpm package:
+*To upgrade using a .deb or .rpm package:
 
-* Use rpm or deb to install the new package.  All files should be placed in their proper locations, and config files should not be overwritten.
+** Use rpm or deb to install the new package.  All files should be placed in their proper locations, and config files should not be overwritten.
 
-* Start the now upgraded node.  Observe that all shards are properly allocated back to the node and that it joins the cluster smoothly.
+** Start the now upgraded node.  Observe that all shards are properly allocated back to the node and that it joins the cluster smoothly.
 
 Repeat this process for all remaining nodes.
 
 It may be possible to perform the upgrade by installing the new software while the service is running.  This would reduce downtime by ensuring the service was ready to run on the new version as soon as it is stopped on the node being upgraded.  This can be done by installing the new version in its own directory and using the symbolic link method outlined above.  It is important to test this procedure first to be sure that site-specific configuration data and production indices will not be overwritten during the upgrade process.
 
 [float]
+[[restart-upgrade]]
 === Upgrading from 0.90.x or earlier to 1.0 or later
+
 Elasticsearch releases prior to 1.0 and releases after 1.0 are not compatible with each other, so a rolling upgrade is not possible.  In order to upgrade a pre-1.0 system to 1.0 or later, a full cluster stop and start is required.  In order to perform this upgrade:
 
 * Disable shard reallocation (optional).  This is done to allow for a faster startup after cluster shutdown.  If this step is not performed, the nodes will immediately start trying to replicate shards to each other on startup and will spend a lot of time on wasted I/O.  With shard reallocation disabled, the nodes will join the cluster with their indices intact, without attempting to rebalance.  After startup is complete, reallocation will be turned back on.
 
 This syntax is from versions prior to 1.0:
-------------------------------------------------------------
+
+[source,sh]
+--------------------------------------------------
 	curl -XPUT localhost:9200/_cluster/settings -d '{
 		"persistent" : {
 		"cluster.routing.allocation.disable_allocation" : true
 		}
 	}'
-------------------------------------------------------------
+--------------------------------------------------
 
 * Stop all Elasticsearch services on all nodes in the cluster.
+[source,sh]
 ------------------------------------------------------
 	curl -XPOST 'http://localhost:9200/_shutdown'
 ------------------------------------------------------
 
-* Backup all data directories on all nodes (optional).  This step is performed to allow for a smoother rollback in the event of problems during the upgrade.
-
 * On the first node to be upgraded, extract the archive or install the new package as described above in the Rolling Upgrades section.  Repeat for all nodes.
 
 * After upgrading Elasticsearch on all nodes is complete, the cluster can be started by starting each node individually.
@@ -61,6 +120,7 @@ This syntax is from versions prior to 1.0:
 * When the cluster is up and running, shard reallocation can be enabled.
 
 This synax is from release 1.0 and later:
+[source,sh]
 ------------------------------------------------------
 	curl -XPUT localhost:9200/_cluster/settings -d '{
      		"persistent" : {

From 124d370b5fc8dde000344f272e91f4f7b6d437b1 Mon Sep 17 00:00:00 2001
From: Simon Willnauer 
Date: Sat, 5 Apr 2014 22:18:38 +0200
Subject: [PATCH 04/54] [TEST] cleanup secondary cluster properly in Tribe
 tests.

---
 .../java/org/elasticsearch/tribe/TribeTests.java  | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java
index 8ea5453ce3e40..dbc0e5de2ec46 100644
--- a/src/test/java/org/elasticsearch/tribe/TribeTests.java
+++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java
@@ -66,19 +66,26 @@ public static void setupSecondCluster() throws Exception {
     @AfterClass
     public static void tearDownSecondCluster() {
         if (cluster2 != null) {
-            cluster2.afterTest();
-            cluster2.close();
-            cluster2 = null;
+            try {
+                cluster2.close();
+            } finally {
+                cluster2 = null;
+            }
         }
     }
 
     @After
     public void tearDownTribeNode() {
         if (cluster2 != null) {
-            cluster2.client().admin().indices().prepareDelete("_all").execute().actionGet();
+            try {
+                cluster2.wipe();
+            } finally {
+                cluster2.afterTest();
+            }
         }
         if (tribeNode != null) {
             tribeNode.close();
+            tribeNode = null;
         }
     }
 

From a5aafbb04c06d691be3e70ac53b7dfc24f84455e Mon Sep 17 00:00:00 2001
From: Simon Willnauer 
Date: Sat, 5 Apr 2014 22:35:55 +0200
Subject: [PATCH 05/54] [TEST] Prevent RelocationTests from going crazy when
 relocations take time

---
 .../java/org/elasticsearch/recovery/RelocationTests.java  | 5 +++--
 .../java/org/elasticsearch/test/BackgroundIndexer.java    | 8 ++++----
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
index 314a097e708d6..dc3fa203488d1 100644
--- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java
+++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java
@@ -21,6 +21,7 @@
 
 import com.carrotsearch.hppc.IntOpenHashSet;
 import com.carrotsearch.hppc.procedures.IntProcedure;
+import com.carrotsearch.randomizedtesting.RandomizedTest;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
@@ -128,8 +129,8 @@ public void testRelocationWhileIndexingRandom() throws Exception {
             }
         }
 
-        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client())) {
-            final int numDocs = scaledRandomIntBetween(200, 2500);
+        final int numDocs = scaledRandomIntBetween(200, 2500);
+        try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), scaledRandomIntBetween(2, 5), true, numDocs * 2)) {
             logger.info("--> waiting for {} docs to be indexed ...", numDocs);
             waitForDocs(numDocs, indexer);
             logger.info("--> {} docs indexed", numDocs);
diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java
index a906789758bd1..fa432ce7042f5 100644
--- a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java
+++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java
@@ -54,10 +54,10 @@ public BackgroundIndexer(String index, String type, Client client) {
     }
 
     public BackgroundIndexer(String index, String type, Client client, int writerCount) {
-        this(index, type, client, writerCount, true);
+        this(index, type, client, writerCount, true, Integer.MAX_VALUE);
     }
 
-    public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart) {
+    public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart, final int maxNumDocs) {
 
         failures = new CopyOnWriteArrayList<>();
         writers = new Thread[writerCount];
@@ -73,7 +73,7 @@ public void run() {
                     try {
                         startLatch.await();
                         logger.info("**** starting indexing thread {}", indexerId);
-                        while (!stop.get()) {
+                        while (!stop.get() && indexCounter.get() < maxNumDocs) {  // step out once we reach the hard limit
                             if (batch) {
                                 int batchSize = RandomizedTest.getRandom().nextInt(20) + 1;
                                 BulkRequestBuilder bulkRequest = client.prepareBulk();
@@ -97,7 +97,7 @@ public void run() {
                                 indexCounter.incrementAndGet();
                             }
                         }
-                        logger.info("**** done indexing thread {}", indexerId);
+                        logger.info("**** done indexing thread {}  stop: {} numDocsIndexed: {} maxNumDocs: {}", indexerId, stop.get(), indexCounter.get(), maxNumDocs);
                     } catch (Throwable e) {
                         failures.add(e);
                         logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id);

From d26a9562312c97b27f25997b1cc683b77c6338f9 Mon Sep 17 00:00:00 2001
From: Shay Banon 
Date: Fri, 4 Apr 2014 20:44:12 +0200
Subject: [PATCH 06/54] releasable bytes output + use in transport / translog
 create a new releasable bytes output, that can be recycled, and use it in
 netty and the translog, 2 areas where the recycling will help nicely. Note,
 opted for statically typed enforced releasble bytes output, to make sure
 people take the extra care to control when the bytes reference are released. 
 Also, the mock page/array classes were fixed to not take into account
 potential recycling going during teardown, for example, on a shared cluster
 ping requests still happen, so recycling happen actively during teardown.
 closes #5691

---
 .../common/bytes/PagedBytesReference.java     |   8 +-
 .../bytes/ReleasableBytesReference.java       |  28 +++++
 .../bytes/ReleasablePagedBytesReference.java  |  46 +++++++
 .../common/io/ReleasableBytesStream.java      |  30 +++++
 .../common/io/stream/BytesStreamOutput.java   |  30 ++---
 .../stream/ReleasableBytesStreamOutput.java   |  48 ++++++++
 .../index/translog/fs/FsTranslog.java         |  25 ++--
 .../transport/netty/NettyTransport.java       | 114 ++++++++++--------
 .../netty/NettyTransportChannel.java          |  96 +++++++++------
 .../BenchmarkNettyLargeMessages.java          |   5 +-
 .../transport/TransportBenchmark.java         |   3 +-
 .../common/util/BigArraysTests.java           |   2 +-
 .../zen/ping/unicast/UnicastZenPingTests.java |   5 +-
 .../test/ElasticsearchTestCase.java           |   6 +-
 .../test/cache/recycler/MockBigArrays.java    |  32 ++++-
 .../cache/recycler/MockPageCacheRecycler.java |  32 ++++-
 .../recycler/MockPageCacheRecyclerModule.java |   1 -
 .../netty/SimpleNettyTransportTests.java      |   3 +-
 18 files changed, 379 insertions(+), 135 deletions(-)
 create mode 100644 src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java
 create mode 100644 src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
 create mode 100644 src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java
 create mode 100644 src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
 rename src/test/java/org/elasticsearch/{ => test}/cache/recycler/MockPageCacheRecycler.java (75%)

diff --git a/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
index 8ea6f8fc8ef51..664f041f61b1a 100644
--- a/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
+++ b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
@@ -36,13 +36,17 @@
 import java.nio.channels.GatheringByteChannel;
 import java.util.Arrays;
 
-public final class PagedBytesReference implements BytesReference {
+/**
+ * A page based bytes reference, internally holding the bytes in a paged
+ * data structure.
+ */
+public class PagedBytesReference implements BytesReference {
 
     private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
     private static final int NIO_GATHERING_LIMIT = 524288;
 
     private final BigArrays bigarrays;
-    private final ByteArray bytearray;
+    protected final ByteArray bytearray;
     private final int offset;
     private final int length;
     private int hash = 0;
diff --git a/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java b/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java
new file mode 100644
index 0000000000000..749a83a67f42c
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.elasticsearch.common.lease.Releasable;
+
+/**
+ * A bytes reference that needs to be released once its usage is done.
+ */
+public interface ReleasableBytesReference extends BytesReference, Releasable {
+}
diff --git a/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java b/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
new file mode 100644
index 0000000000000..f660cbd022d6b
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/bytes/ReleasablePagedBytesReference.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+
+/**
+ * An extension to {@link PagedBytesReference} that requires releasing its content. This
+ * class exists to make it explicit when a bytes reference needs to be released, and when not.
+ */
+public class ReleasablePagedBytesReference extends PagedBytesReference implements ReleasableBytesReference {
+
+    public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
+        super(bigarrays, bytearray, length);
+    }
+
+    public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) {
+        super(bigarrays, bytearray, from, length);
+    }
+
+    @Override
+    public boolean release() throws ElasticsearchException {
+        Releasables.release(bytearray);
+        return true;
+    }
+}
diff --git a/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java b/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java
new file mode 100644
index 0000000000000..d6971b52d0c05
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/io/ReleasableBytesStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io;
+
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+
+/**
+ * A bytes stream that requires its bytes to be released once no longer used.
+ */
+public interface ReleasableBytesStream extends BytesStream {
+
+    ReleasableBytesReference bytes();
+}
diff --git a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
index 899ecf2d87315..6f51f07a0d498 100644
--- a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
+++ b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
@@ -33,37 +33,31 @@
  */
 public class BytesStreamOutput extends StreamOutput implements BytesStream {
 
-    /**
-     * Factory/manager for our ByteArray
-     */
-    private final BigArrays bigarrays;
+    protected final BigArrays bigarrays;
 
-    /**
-     * The internal list of pages.
-     */
-    private ByteArray bytes;
+    protected ByteArray bytes;
+    protected int count;
 
     /**
-     * The number of valid bytes in the buffer.
-     */
-    private int count;
-
-    /**
-     * Create a nonrecycling {@link BytesStreamOutput} with 1 initial page acquired.
+     * Create a non recycling {@link BytesStreamOutput} with 1 initial page acquired.
      */
     public BytesStreamOutput() {
         this(BigArrays.PAGE_SIZE_IN_BYTES);
     }
 
     /**
-     * Create a nonrecycling {@link BytesStreamOutput} with enough initial pages acquired
-     * to satisfy the capacity given by {@link expectedSize}.
+     * Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
+     * to satisfy the capacity given by expected size.
      * 
      * @param expectedSize the expected maximum size of the stream in bytes.
      */
     public BytesStreamOutput(int expectedSize) {
-        bigarrays = BigArrays.NON_RECYCLING_INSTANCE;
-        bytes = bigarrays.newByteArray(expectedSize);
+        this(expectedSize, BigArrays.NON_RECYCLING_INSTANCE);
+    }
+
+    protected BytesStreamOutput(int expectedSize, BigArrays bigarrays) {
+        this.bigarrays = bigarrays;
+        this.bytes = bigarrays.newByteArray(expectedSize);
     }
 
     @Override
diff --git a/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
new file mode 100644
index 0000000000000..0ead43059f043
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
+import org.elasticsearch.common.io.ReleasableBytesStream;
+import org.elasticsearch.common.util.BigArrays;
+
+/**
+ * An bytes stream output that allows providing a {@link BigArrays} instance
+ * expecting it to require releasing its content ({@link #bytes()}) once done.
+ * 

+ * Please note, its is the responsibility of the caller to make sure the bytes + * reference do not "escape" and are released only once. + */ +public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream { + + public ReleasableBytesStreamOutput(BigArrays bigarrays) { + super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays); + } + + public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigarrays) { + super(expectedSize, bigarrays); + } + + @Override + public ReleasableBytesReference bytes() { + return new ReleasablePagedBytesReference(bigarrays, bytes, count); + } +} diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index d485e695905c9..528c5d66d5cc8 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -21,12 +21,14 @@ import jsr166y.ThreadLocalRandom; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; @@ -62,6 +64,7 @@ public void onRefreshSettings(Settings settings) { } private final IndexSettingsService indexSettingsService; + private final BigArrays bigArrays; private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private final File[] locations; @@ -79,9 +82,10 @@ public void onRefreshSettings(Settings settings) { private final ApplySettings applySettings = new ApplySettings(); @Inject - public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv) { + public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, NodeEnvironment nodeEnv, BigArrays bigArrays) { super(shardId, indexSettings); this.indexSettingsService = indexSettingsService; + this.bigArrays = bigArrays; File[] shardLocations = nodeEnv.shardLocations(shardId); this.locations = new File[shardLocations.length]; for (int i = 0; i < shardLocations.length; i++) { @@ -101,6 +105,7 @@ public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, File l this.indexSettingsService = null; this.locations = new File[]{location}; FileSystemUtils.mkdirs(location); + this.bigArrays = BigArrays.NON_RECYCLING_INSTANCE; this.type = FsTranslogFile.Type.fromString(componentSettings.get("type", FsTranslogFile.Type.BUFFERED.name())); } @@ -335,8 +340,9 @@ public byte[] read(Location location) { @Override public Location add(Operation operation) throws TranslogException { rwl.readLock().lock(); + ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); + boolean released = false; try { - BytesStreamOutput out = new BytesStreamOutput(); out.writeInt(0); // marker for the size... TranslogStreams.writeTranslogOperation(out, operation); out.flush(); @@ -345,11 +351,11 @@ public Location add(Operation operation) throws TranslogException { int size = out.size(); out.seek(0); out.writeInt(size - 4); - + // seek back to end out.seek(size); - BytesReference bytes = out.bytes(); + ReleasableBytesReference bytes = out.bytes(); Location location = current.add(bytes); if (syncOnEachOperation) { current.sync(); @@ -362,11 +368,16 @@ public Location add(Operation operation) throws TranslogException { // ignore } } + Releasables.release(bytes); + released = true; return location; - } catch (Exception e) { + } catch (Throwable e) { throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e); } finally { rwl.readLock().unlock(); + if (!released) { + Releasables.release(out.bytes()); + } } } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index eeaee80354888..deaf4dd098b04 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -27,15 +27,19 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.netty.NettyStaticSetup; import org.elasticsearch.common.netty.OpenChannelsHandler; +import org.elasticsearch.common.netty.ReleaseChannelFutureListener; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; @@ -45,6 +49,7 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.monitor.jvm.JvmInfo; @@ -140,6 +145,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ByteSizeValue maxCumulationBufferCapacity; final int maxCompositeBufferComponents; + final BigArrays bigArrays; + private final ThreadPool threadPool; private volatile OpenChannelsHandler serverOpenChannels; @@ -165,10 +172,11 @@ public class NettyTransport extends AbstractLifecycleComponent implem private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); @Inject - public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, Version version) { + public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { super(settings); this.threadPool = threadPool; this.networkService = networkService; + this.bigArrays = bigArrays; this.version = version; if (settings.getAsBoolean("netty.epollBugWorkaround", false)) { @@ -547,58 +555,58 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St byte status = 0; status = TransportStatus.setRequest(status); - BytesStreamOutput bStream = new BytesStreamOutput(); - bStream.skip(NettyHeader.HEADER_SIZE); - StreamOutput stream = bStream; - // only compress if asked, and, the request is not bytes, since then only - // the header part is compressed, and the "body" can't be extracted as compressed - if (options.compress() && (!(request instanceof BytesTransportRequest))) { - status = TransportStatus.setCompress(status); - stream = CompressorFactory.defaultCompressor().streamOutput(stream); - } - stream = new HandlesStreamOutput(stream); - - // we pick the smallest of the 2, to support both backward and forward compatibility - // note, this is the only place we need to do this, since from here on, we use the serialized version - // as the version to use also when the node receiving this request will send the response with - Version version = Version.smallest(this.version, node.version()); - - stream.setVersion(version); - stream.writeString(action); - - ChannelBuffer buffer; - // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output - // that create paged channel buffers, but its tricky to know when to do it (where this option is - // more explicit). - if (request instanceof BytesTransportRequest) { - BytesTransportRequest bRequest = (BytesTransportRequest) request; - assert node.version().equals(bRequest.version()); - bRequest.writeThin(stream); - stream.close(); - ChannelBuffer headerBuffer = bStream.bytes().toChannelBuffer(); - ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer(); - // false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer.... - buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.of(headerBuffer, contentBuffer), false); - } else { - request.writeTo(stream); - stream.close(); - buffer = bStream.bytes().toChannelBuffer(); - } - NettyHeader.writeHeader(buffer, requestId, status, version); - targetChannel.write(buffer); - - // We handle close connection exception in the #exceptionCaught method, which is the main reason we want to add this future -// channelFuture.addListener(new ChannelFutureListener() { -// @Override public void operationComplete(ChannelFuture future) throws Exception { -// if (!future.isSuccess()) { -// // maybe add back the retry? -// TransportResponseHandler handler = transportServiceAdapter.remove(requestId); -// if (handler != null) { -// handler.handleException(new RemoteTransportException("Failed write request", new SendRequestTransportException(node, action, future.getCause()))); -// } -// } -// } -// }); + ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); + boolean addedReleaseListener = false; + try { + bStream.skip(NettyHeader.HEADER_SIZE); + StreamOutput stream = bStream; + // only compress if asked, and, the request is not bytes, since then only + // the header part is compressed, and the "body" can't be extracted as compressed + if (options.compress() && (!(request instanceof BytesTransportRequest))) { + status = TransportStatus.setCompress(status); + stream = CompressorFactory.defaultCompressor().streamOutput(stream); + } + stream = new HandlesStreamOutput(stream); + + // we pick the smallest of the 2, to support both backward and forward compatibility + // note, this is the only place we need to do this, since from here on, we use the serialized version + // as the version to use also when the node receiving this request will send the response with + Version version = Version.smallest(this.version, node.version()); + + stream.setVersion(version); + stream.writeString(action); + + ReleasableBytesReference bytes; + ChannelBuffer buffer; + // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output + // that create paged channel buffers, but its tricky to know when to do it (where this option is + // more explicit). + if (request instanceof BytesTransportRequest) { + BytesTransportRequest bRequest = (BytesTransportRequest) request; + assert node.version().equals(bRequest.version()); + bRequest.writeThin(stream); + stream.close(); + bytes = bStream.bytes(); + ChannelBuffer headerBuffer = bytes.toChannelBuffer(); + ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer(); + // false on gathering, cause gathering causes the NIO layer to combine the buffers into a single direct buffer.... + buffer = new CompositeChannelBuffer(headerBuffer.order(), ImmutableList.of(headerBuffer, contentBuffer), false); + } else { + request.writeTo(stream); + stream.close(); + bytes = bStream.bytes(); + buffer = bytes.toChannelBuffer(); + } + NettyHeader.writeHeader(buffer, requestId, status, version); + ChannelFuture future = targetChannel.write(buffer); + ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); + future.addListener(listener); + addedReleaseListener = true; + } finally { + if (!addedReleaseListener) { + Releasables.release(bStream.bytes()); + } + } } @Override diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 5d50d4e3158bf..c915ccea7c63c 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -20,15 +20,19 @@ package org.elasticsearch.transport.netty; import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.ThrowableObjectOutputStream; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; +import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.netty.ReleaseChannelFutureListener; import org.elasticsearch.transport.*; import org.elasticsearch.transport.support.TransportStatus; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; import java.io.IOException; import java.io.NotSerializableException; @@ -71,47 +75,69 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op byte status = 0; status = TransportStatus.setResponse(status); - BytesStreamOutput bStream = new BytesStreamOutput(); - bStream.skip(NettyHeader.HEADER_SIZE); - StreamOutput stream = bStream; - if (options.compress()) { - status = TransportStatus.setCompress(status); - stream = CompressorFactory.defaultCompressor().streamOutput(stream); - } - stream = new HandlesStreamOutput(stream); - stream.setVersion(version); - response.writeTo(stream); - stream.close(); + ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(transport.bigArrays); + boolean addedReleaseListener = false; + try { + bStream.skip(NettyHeader.HEADER_SIZE); + StreamOutput stream = bStream; + if (options.compress()) { + status = TransportStatus.setCompress(status); + stream = CompressorFactory.defaultCompressor().streamOutput(stream); + } + stream = new HandlesStreamOutput(stream); + stream.setVersion(version); + response.writeTo(stream); + stream.close(); - ChannelBuffer buffer = bStream.bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status, version); - channel.write(buffer); + ReleasableBytesReference bytes = bStream.bytes(); + ChannelBuffer buffer = bytes.toChannelBuffer(); + NettyHeader.writeHeader(buffer, requestId, status, version); + ChannelFuture future = channel.write(buffer); + ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); + future.addListener(listener); + addedReleaseListener = true; + } finally { + if (!addedReleaseListener) { + Releasables.release(bStream.bytes()); + } + } } @Override public void sendResponse(Throwable error) throws IOException { - BytesStreamOutput stream = new BytesStreamOutput(); + ReleasableBytesStreamOutput stream = new ReleasableBytesStreamOutput(transport.bigArrays); + boolean addedReleaseListener = false; try { - stream.skip(NettyHeader.HEADER_SIZE); - RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); - ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); - too.writeObject(tx); - too.close(); - } catch (NotSerializableException e) { - stream.reset(); - stream.skip(NettyHeader.HEADER_SIZE); - RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); - ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); - too.writeObject(tx); - too.close(); - } + try { + stream.skip(NettyHeader.HEADER_SIZE); + RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); + } catch (NotSerializableException e) { + stream.reset(); + stream.skip(NettyHeader.HEADER_SIZE); + RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); + } - byte status = 0; - status = TransportStatus.setResponse(status); - status = TransportStatus.setError(status); + byte status = 0; + status = TransportStatus.setResponse(status); + status = TransportStatus.setError(status); - ChannelBuffer buffer = stream.bytes().toChannelBuffer(); - NettyHeader.writeHeader(buffer, requestId, status, version); - channel.write(buffer); + ReleasableBytesReference bytes = stream.bytes(); + ChannelBuffer buffer = bytes.toChannelBuffer(); + NettyHeader.writeHeader(buffer, requestId, status, version); + ChannelFuture future = channel.write(buffer); + ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); + future.addListener(listener); + addedReleaseListener = true; + } finally { + if (!addedReleaseListener) { + Releasables.release(stream.bytes()); + } + } } } diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index 84bf8d9cc8def..2b355676c3da5 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.netty.NettyTransport; @@ -53,8 +54,8 @@ public static void main(String[] args) throws InterruptedException { NetworkService networkService = new NetworkService(settings); final ThreadPool threadPool = new ThreadPool(); - final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start(); - final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, Version.CURRENT), threadPool).start(); + final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start(); + final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT), threadPool).start(); final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300), Version.CURRENT); // final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300)); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java index 3c206ef61cc70..39a44e9a9552e 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/TransportBenchmark.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import org.elasticsearch.transport.local.LocalTransport; @@ -50,7 +51,7 @@ public Transport newTransport(Settings settings, ThreadPool threadPool) { NETTY { @Override public Transport newTransport(Settings settings, ThreadPool threadPool) { - return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), Version.CURRENT); + return new NettyTransport(settings, threadPool, new NetworkService(ImmutableSettings.EMPTY), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); } }; diff --git a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 1b82f010d6703..781ab143d9dcc 100644 --- a/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.common.util; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.test.ElasticsearchTestCase; diff --git a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java index 7a553e2b3d8f5..b5b0b4c0b4649 100644 --- a/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.node.service.NodeService; @@ -55,13 +56,13 @@ public void testSimplePings() { ClusterName clusterName = new ClusterName("test"); NetworkService networkService = new NetworkService(settings); - NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, Version.CURRENT); + NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); final TransportService transportServiceA = new TransportService(transportA, threadPool).start(); final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); - NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, Version.CURRENT); + NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT); final TransportService transportServiceB = new TransportService(transportB, threadPool).start(); final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(), Version.CURRENT); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java index 7402676deb04e..1d13ebc4b5737 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchTestCase.java @@ -30,7 +30,7 @@ import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TimeUnits; import org.elasticsearch.Version; -import org.elasticsearch.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; import org.elasticsearch.common.logging.ESLogger; @@ -134,7 +134,7 @@ public void resetPageTracking() { } @After - public void ensureAllPagesReleased() { + public void ensureAllPagesReleased() throws Exception { MockPageCacheRecycler.ensureAllPagesAreReleased(); } @@ -145,7 +145,7 @@ public void resetArrayTracking() { } @After - public void ensureAllArraysReleased() { + public void ensureAllArraysReleased() throws Exception { MockBigArrays.ensureAllArraysAreReleased(); } diff --git a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java index af7c364725ed7..5d6b4b5b626e0 100644 --- a/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java +++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockBigArrays.java @@ -21,12 +21,17 @@ import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.SeedUtils; +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.lucene.util.BytesRef; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.*; +import org.elasticsearch.test.ElasticsearchTestCase; +import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -56,12 +61,31 @@ public static void reset() { ACQUIRED_ARRAYS.clear(); } - public static void ensureAllArraysAreReleased() { + public static void ensureAllArraysAreReleased() throws Exception { if (DISCARD) { DISCARD = false; - } else if (ACQUIRED_ARRAYS.size() > 0) { - final Object cause = ACQUIRED_ARRAYS.entrySet().iterator().next().getValue(); - throw new RuntimeException(ACQUIRED_ARRAYS.size() + " arrays have not been released", cause instanceof Throwable ? (Throwable) cause : null); + } else { + final Map masterCopy = Maps.newHashMap(ACQUIRED_ARRAYS); + if (masterCopy.isEmpty()) { + return; + } + // not empty, we might be executing on a shared cluster that keeps on obtaining + // and releasing arrays, lets make sure that after a reasonable timeout, all master + // copy (snapshot) have been released + boolean success = ElasticsearchTestCase.awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return Sets.intersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet()).isEmpty(); + } + }); + if (success) { + return; + } + masterCopy.keySet().retainAll(ACQUIRED_ARRAYS.keySet()); + if (!masterCopy.isEmpty()) { + final Object cause = masterCopy.entrySet().iterator().next().getValue(); + throw new RuntimeException(masterCopy.size() + " arrays have not been released", cause instanceof Throwable ? (Throwable) cause : null); + } } } diff --git a/src/test/java/org/elasticsearch/cache/recycler/MockPageCacheRecycler.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecycler.java similarity index 75% rename from src/test/java/org/elasticsearch/cache/recycler/MockPageCacheRecycler.java rename to src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecycler.java index f3904800c836e..2866c71e1bdfe 100644 --- a/src/test/java/org/elasticsearch/cache/recycler/MockPageCacheRecycler.java +++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecycler.java @@ -17,17 +17,22 @@ * under the License. */ -package org.elasticsearch.cache.recycler; +package org.elasticsearch.test.cache.recycler; +import com.google.common.base.Predicate; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.recycler.Recycler.V; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.TestCluster; import org.elasticsearch.threadpool.ThreadPool; import java.lang.reflect.Array; +import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentMap; @@ -39,10 +44,27 @@ public static void reset() { ACQUIRED_PAGES.clear(); } - public static void ensureAllPagesAreReleased() { - if (ACQUIRED_PAGES.size() > 0) { - final Throwable t = ACQUIRED_PAGES.entrySet().iterator().next().getValue(); - throw new RuntimeException(ACQUIRED_PAGES.size() + " pages have not been released", t); + public static void ensureAllPagesAreReleased() throws Exception { + final Map masterCopy = Maps.newHashMap(ACQUIRED_PAGES); + if (masterCopy.isEmpty()) { + return; + } + // not empty, we might be executing on a shared cluster that keeps on obtaining + // and releasing pages, lets make sure that after a reasonable timeout, all master + // copy (snapshot) have been released + boolean success = ElasticsearchTestCase.awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + return Sets.intersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet()).isEmpty(); + } + }); + if (success) { + return; + } + masterCopy.keySet().retainAll(ACQUIRED_PAGES.keySet()); + if (!masterCopy.isEmpty()) { + final Throwable t = masterCopy.entrySet().iterator().next().getValue(); + throw new RuntimeException(masterCopy.size() + " pages have not been released", t); } } diff --git a/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecyclerModule.java b/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecyclerModule.java index 3f5b2a4429778..339fb949699fb 100644 --- a/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecyclerModule.java +++ b/src/test/java/org/elasticsearch/test/cache/recycler/MockPageCacheRecyclerModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.cache.recycler; -import org.elasticsearch.cache.recycler.MockPageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.common.inject.AbstractModule; diff --git a/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java b/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java index 17db10094780b..6215ffcae384c 100644 --- a/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/SimpleNettyTransportTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.AbstractSimpleTransportTests; import org.elasticsearch.transport.ConnectTransportException; @@ -37,7 +38,7 @@ protected MockTransportService build(Settings settings, Version version) { int startPort = 11000 + randomIntBetween(0, 255); int endPort = startPort + 10; settings = ImmutableSettings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build(); - MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), version), threadPool); + MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version), threadPool); transportService.start(); return transportService; } From 7b9df39800ff290daa339946e8d0a15d3591233b Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sun, 6 Apr 2014 00:41:24 +0200 Subject: [PATCH 07/54] [Test] Added better control over the number of documents indexed by BackgroundIndexer Used the new controls to reduce indexing activity in RelocationTests and RecoveryWhileUnderLoadTests Closes #5696 --- .../recovery/RecoveryWhileUnderLoadTests.java | 37 ++++-- .../recovery/RelocationTests.java | 9 +- .../elasticsearch/test/BackgroundIndexer.java | 120 ++++++++++++++++-- .../test/ElasticsearchIntegrationTest.java | 13 +- 4 files changed, 150 insertions(+), 29 deletions(-) diff --git a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java index 1980cfa7c6462..537c4ef6d82fa 100644 --- a/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java +++ b/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadTests.java @@ -58,23 +58,29 @@ public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception { assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); + logger.info("--> allow 2 nodes for index [test] ..."); // now start another node, while we index allowNodes("test", 2); @@ -108,22 +114,28 @@ public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws E assertAcked(prepareCreate("test", 1, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -156,24 +168,29 @@ public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { assertAcked(prepareCreate("test", 2, settingsBuilder().put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1))); final int totalNumDocs = scaledRandomIntBetween(200, 20000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { - int waitFor = totalNumDocs / 10; + int waitFor = totalNumDocs / 10; + int extraDocs = waitFor; + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), extraDocs)) { logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); + extraDocs = totalNumDocs / 10; + waitFor += extraDocs; + indexer.continueIndexing(extraDocs); logger.info("--> flushing the index ...."); // now flush, just to make sure we have some data in the index, not just translog client().admin().indices().prepareFlush().execute().actionGet(); - waitFor += totalNumDocs / 10; logger.info("--> waiting for {} docs to be indexed ...", waitFor); waitForDocs(waitFor, indexer); indexer.assertNoFailures(); logger.info("--> {} docs indexed", waitFor); // now start more nodes, while we index + extraDocs = totalNumDocs - waitFor; + indexer.continueIndexing(extraDocs); logger.info("--> allow 4 nodes for index [test] ..."); allowNodes("test", 4); @@ -227,7 +244,7 @@ public void recoverWhileRelocating() throws Exception { final int numDocs = scaledRandomIntBetween(200, 50000); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client())) { + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type", client(), numDocs)) { for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) { indexer.assertNoFailures(); diff --git a/src/test/java/org/elasticsearch/recovery/RelocationTests.java b/src/test/java/org/elasticsearch/recovery/RelocationTests.java index dc3fa203488d1..3f739353be125 100644 --- a/src/test/java/org/elasticsearch/recovery/RelocationTests.java +++ b/src/test/java/org/elasticsearch/recovery/RelocationTests.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.procedures.IntProcedure; -import com.carrotsearch.randomizedtesting.RandomizedTest; import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -129,8 +128,8 @@ public void testRelocationWhileIndexingRandom() throws Exception { } } - final int numDocs = scaledRandomIntBetween(200, 2500); - try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), scaledRandomIntBetween(2, 5), true, numDocs * 2)) { + int numDocs = scaledRandomIntBetween(200, 2500); + try (BackgroundIndexer indexer = new BackgroundIndexer("test", "type1", client(), numDocs)) { logger.info("--> waiting for {} docs to be indexed ...", numDocs); waitForDocs(numDocs, indexer); logger.info("--> {} docs indexed", numDocs); @@ -142,6 +141,9 @@ public void testRelocationWhileIndexingRandom() throws Exception { int toNode = fromNode == 0 ? 1 : 0; fromNode += nodeShiftBased; toNode += nodeShiftBased; + numDocs = scaledRandomIntBetween(200, 1000); + logger.debug("--> Allow indexer to index [{}] documents", numDocs); + indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); client().admin().cluster().prepareReroute() .add(new MoveAllocationCommand(new ShardId("test", 0), nodes[fromNode], nodes[toNode])) @@ -154,6 +156,7 @@ public void testRelocationWhileIndexingRandom() throws Exception { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).setTimeout(ACCEPTABLE_RELOCATION_TIME).execute().actionGet(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } logger.info("--> done relocations"); diff --git a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java index fa432ce7042f5..3326cb8d167dc 100644 --- a/src/test/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/src/test/java/org/elasticsearch/test/BackgroundIndexer.java @@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -48,21 +49,64 @@ public class BackgroundIndexer implements AutoCloseable { final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); final CountDownLatch startLatch = new CountDownLatch(1); - + final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore + final Semaphore availableBudget = new Semaphore(0); + + /** + * Start indexing in the background using a random number of threads. + * + * @param index index name to index into + * @param type document type + * @param client client to use + */ public BackgroundIndexer(String index, String type, Client client) { - this(index, type, client, RandomizedTest.scaledRandomIntBetween(2, 5)); + this(index, type, client, -1); + } + + /** + * Start indexing in the background using a random number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public BackgroundIndexer(String index, String type, Client client, int numOfDocs) { + this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5)); } - public BackgroundIndexer(String index, String type, Client client, int writerCount) { - this(index, type, client, writerCount, true, Integer.MAX_VALUE); + /** + * Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + * @param writerCount number of indexing threads to use + */ + public BackgroundIndexer(String index, String type, Client client, int numOfDocs, final int writerCount) { + this(index, type, client, numOfDocs, writerCount, true); } - public BackgroundIndexer(final String index, final String type, final Client client, final int writerCount, boolean autoStart, final int maxNumDocs) { + /** + * Start indexing in the background using a given number of threads. Indexing will be paused after numOfDocs docs has + * been indexed. + * + * @param index index name to index into + * @param type document type + * @param client client to use + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + * @param writerCount number of indexing threads to use + * @param autoStart set to true to start indexing as soon as all threads have been created. + */ + public BackgroundIndexer(final String index, final String type, final Client client, final int numOfDocs, final int writerCount, boolean autoStart) { failures = new CopyOnWriteArrayList<>(); writers = new Thread[writerCount]; stopLatch = new CountDownLatch(writers.length); - logger.info("--> starting {} indexing threads", writerCount); + logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); for (int i = 0; i < writers.length; i++) { final int indexerId = i; final boolean batch = RandomizedTest.getRandom().nextBoolean(); @@ -73,9 +117,17 @@ public void run() { try { startLatch.await(); logger.info("**** starting indexing thread {}", indexerId); - while (!stop.get() && indexCounter.get() < maxNumDocs) { // step out once we reach the hard limit + while (!stop.get()) { if (batch) { int batchSize = RandomizedTest.getRandom().nextInt(20) + 1; + if (hasBudget.get()) { + batchSize = Math.max(Math.min(batchSize, availableBudget.availablePermits()), 1);// always try to get at least one + if (!availableBudget.tryAcquire(batchSize, 250, TimeUnit.MILLISECONDS)) { + // time out -> check if we have to stop. + continue; + } + + } BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < batchSize; i++) { id = idGenerator.incrementAndGet(); @@ -92,12 +144,17 @@ public void run() { } } else { + + if (hasBudget.get() && !availableBudget.tryAcquire(250, TimeUnit.MILLISECONDS)) { + // time out -> check if we have to stop. + continue; + } id = idGenerator.incrementAndGet(); client.prepareIndex(index, type, Long.toString(id) + "-" + indexerId).setSource("test", "value" + id).get(); indexCounter.incrementAndGet(); } } - logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {} maxNumDocs: {}", indexerId, stop.get(), indexCounter.get(), maxNumDocs); + logger.info("**** done indexing thread {} stop: {} numDocsIndexed: {}", indexerId, stop.get(), indexCounter.get()); } catch (Throwable e) { failures.add(e); logger.warn("**** failed indexing thread {} on doc id {}", e, indexerId, id); @@ -110,20 +167,63 @@ public void run() { } if (autoStart) { - startLatch.countDown(); + start(numOfDocs); } } + private void setBudget(int numOfDocs) { + logger.debug("updating budget to [{}]", numOfDocs); + if (numOfDocs >= 0) { + hasBudget.set(true); + availableBudget.release(numOfDocs); + } else { + hasBudget.set(false); + } + + } + + /** Start indexing with no limit to the number of documents */ public void start() { + start(-1); + } + + /** + * Start indexing + * + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public void start(int numOfDocs) { + assert !stop.get() : "background indexer can not be started after it has stopped"; + setBudget(numOfDocs); startLatch.countDown(); } + /** Pausing indexing by setting current document limit to 0 */ + public void pauseIndexing() { + availableBudget.drainPermits(); + setBudget(0); + } + + /** Continue indexing after it has paused. No new document limit will be set */ + public void continueIndexing() { + continueIndexing(-1); + } + + /** + * Continue indexing after it has paused. + * + * @param numOfDocs number of document to index before pausing. Set to -1 to have no limit. + */ + public void continueIndexing(int numOfDocs) { + setBudget(numOfDocs); + } + + /** Stop all background threads **/ public void stop() throws InterruptedException { if (stop.get()) { return; } stop.set(true); - Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); assertNoFailures(); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 80eddf841a313..e3acdaf6b0bf1 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -207,7 +207,7 @@ private static void initializeGlobalCluster() { } try { transportAddresses[i++] = new InetSocketTransportAddress(split[0], Integer.valueOf(split[1])); - } catch(NumberFormatException e) { + } catch (NumberFormatException e) { throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]"); } } @@ -319,7 +319,7 @@ public static ImmutableTestCluster immutableCluster() { } public static TestCluster cluster() { - if (!(currentCluster instanceof TestCluster)) { + if (!(currentCluster instanceof TestCluster)) { throw new UnsupportedOperationException("current test cluster is immutable"); } return (TestCluster) currentCluster; @@ -558,7 +558,7 @@ public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTim Predicate testDocs = new Predicate() { public boolean apply(Object o) { lastKnownCount[0] = indexer.totalIndexedDocs(); - if (lastKnownCount[0] > numDocs) { + if (lastKnownCount[0] >= numDocs) { long count = client().prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount(); if (count == lastKnownCount[0]) { // no progress - try to refresh for the next time @@ -569,7 +569,7 @@ public boolean apply(Object o) { } else { logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount[0], numDocs); } - return lastKnownCount[0] > numDocs; + return lastKnownCount[0] >= numDocs; } }; @@ -1088,7 +1088,7 @@ private static boolean runTestScopeLifecycle() { @Before public final void before() throws IOException { if (runTestScopeLifecycle()) { - beforeInternal(); + beforeInternal(); } } @@ -1139,7 +1139,8 @@ private final static void initializeSuiteScope() throws Exception { * * @see SuiteScopeTest */ - protected void setupSuiteScopeCluster() throws Exception {} + protected void setupSuiteScopeCluster() throws Exception { + } private static boolean isSuiteScope(Class clazz) { if (clazz == Object.class || clazz == ElasticsearchIntegrationTest.class) { From ade1d0ef571e8eb49ca9e75d16c712e92a3c79c2 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 17 Feb 2014 02:23:38 +0100 Subject: [PATCH 08/54] Added global ordinals (unique incremental numbering for terms) to fielddata. Added a terms aggregation implementations that work on global ordinals, which is also the default. Closes #5672 --- .../index-modules/fielddata.asciidoc | 52 ++ .../bucket/terms-aggregation.asciidoc | 17 +- .../common/lucene/TopReaderContextAware.java | 29 ++ .../index/engine/internal/InternalEngine.java | 1 + .../fielddata/AbstractIndexFieldData.java | 4 + .../index/fielddata/AtomicFieldData.java | 15 +- .../index/fielddata/FieldDataType.java | 2 - .../index/fielddata/IndexFieldData.java | 13 +- .../index/fielddata/IndexFieldDataCache.java | 50 +- .../fielddata/IndexFieldDataService.java | 15 +- .../index/fielddata/RamUsage.java | 30 ++ .../index/fielddata/ShardFieldData.java | 16 +- .../ordinals/GlobalOrdinalsBuilder.java | 35 ++ .../GlobalOrdinalsIndexFieldData.java | 201 ++++++++ .../InternalGlobalOrdinalsBuilder.java | 466 ++++++++++++++++++ .../plain/AbstractBytesIndexFieldData.java | 39 +- .../AtomicFieldDataWithOrdinalsTermsEnum.java | 149 ++++++ .../plain/BinaryDVIndexFieldData.java | 5 +- .../plain/BinaryDVNumericIndexFieldData.java | 5 +- .../plain/DisabledIndexFieldData.java | 3 +- .../plain/DocValuesIndexFieldData.java | 21 +- .../plain/DoubleArrayIndexFieldData.java | 3 +- .../plain/FSTBytesAtomicFieldData.java | 6 +- .../plain/FSTBytesIndexFieldData.java | 10 +- .../plain/FloatArrayIndexFieldData.java | 3 +- .../plain/GeoPointBinaryDVIndexFieldData.java | 9 +- .../GeoPointCompressedIndexFieldData.java | 3 +- .../GeoPointDoubleArrayIndexFieldData.java | 3 +- .../plain/NumericDVIndexFieldData.java | 5 +- .../plain/PackedArrayIndexFieldData.java | 3 +- .../plain/PagedBytesAtomicFieldData.java | 6 + .../plain/PagedBytesIndexFieldData.java | 13 +- .../plain/ParentChildIndexFieldData.java | 3 +- .../plain/SortedSetDVAtomicFieldData.java | 19 + .../SortedSetDVBytesAtomicFieldData.java | 1 + .../plain/SortedSetDVBytesIndexFieldData.java | 42 +- .../index/mapper/FieldMapper.java | 9 + .../elasticsearch/index/shard/ShardUtils.java | 8 + .../cache/IndicesFieldDataCache.java | 63 ++- .../cache/IndicesFieldDataCacheListener.java | 7 +- .../indices/warmer/IndicesWarmer.java | 18 + .../indices/warmer/InternalIndicesWarmer.java | 33 +- .../percolator/QueryCollector.java | 1 + .../elasticsearch/search/SearchService.java | 52 +- .../search/aggregations/AggregationPhase.java | 1 + .../bucket/BucketsAggregator.java | 18 + .../terms/AbstractStringTermsAggregator.java | 57 +++ .../GlobalOrdinalsStringTermsAggregator.java | 168 +++++++ .../bucket/terms/StringTermsAggregator.java | 13 +- .../bucket/terms/TermsAggregatorFactory.java | 89 ++-- .../support/AggregationContext.java | 12 + .../aggregations/support/ValuesSource.java | 41 +- .../aggregations/GlobalOrdinalsBenchmark.java | 277 +++++++++++ ...AggregationSearchAndIndexingBenchmark.java | 354 +++++++++++++ .../TermsAggregationSearchBenchmark.java | 91 +++- .../fielddata/AbstractFieldDataTests.java | 14 +- .../AbstractStringFieldDataTests.java | 156 ++++++ .../index/fielddata/DuelFieldDataTests.java | 52 ++ .../NoOrdinalsStringFieldDataTests.java | 11 + .../search/aggregations/RandomTests.java | 53 +- .../aggregations/bucket/StringTermsTests.java | 32 +- .../org/elasticsearch/test/TestCluster.java | 6 +- 62 files changed, 2738 insertions(+), 195 deletions(-) create mode 100644 src/main/java/org/elasticsearch/common/lucene/TopReaderContextAware.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/RamUsage.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsBuilder.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/ordinals/GlobalOrdinalsIndexFieldData.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/ordinals/InternalGlobalOrdinalsBuilder.java create mode 100644 src/main/java/org/elasticsearch/index/fielddata/plain/AtomicFieldDataWithOrdinalsTermsEnum.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java create mode 100644 src/test/java/org/elasticsearch/benchmark/search/aggregations/GlobalOrdinalsBenchmark.java create mode 100644 src/test/java/org/elasticsearch/benchmark/search/aggregations/TermsAggregationSearchAndIndexingBenchmark.java diff --git a/docs/reference/index-modules/fielddata.asciidoc b/docs/reference/index-modules/fielddata.asciidoc index c958dcbc037e2..57afa043fa558 100644 --- a/docs/reference/index-modules/fielddata.asciidoc +++ b/docs/reference/index-modules/fielddata.asciidoc @@ -124,6 +124,41 @@ field data format. `doc_values`:: Computes and stores field data data-structures on disk at indexing time. +[float] +==== Global ordinals + +coming[1.2.0] + +Global ordinals is a data-structure on top of field data, that maintains an +incremental numbering for all the terms in field data in a lexicographic order. +Each term has a unique number and the number of term 'A' is lower than the number +of term 'B'. Global ordinals are only supported on string fields. + +Field data on string also has ordinals, which is a unique numbering for all terms +in a particular segment and field. Global ordinals just build on top of this, +by providing a mapping between the segment ordinals and the global ordinals. +The latter being unique across the entire shard. + +Global ordinals can be beneficial in search features that use segment ordinals already +such as the terms aggregator to improve the execution time. Often these search features +need to merge the segment ordinal results to a cross segment terms result. With +global ordinals this mapping happens during field data load time instead of during each +query execution. With global ordinals search features only need to resolve the actual +term when building the (shard) response, but during the execution there is no need +at all to use the actual terms and the unique numbering global ordinals provided is +sufficient and improves the execution time. + +Global ordinals for a specified field are tied to all the segments of a shard (Lucene index), +which is different than for field data for a specific field which is tied to a single segment. +For this reason global ordinals need to be rebuilt in its entirety once new segments +become visible. This one time cost would happen anyway without global ordinals, but +then it would happen for each search execution instead! + +The loading time of global ordinals depends on the number of terms in a field, but in general +it is low, since it source field data has already been loaded. The memory overhead of global +ordinals is a small because it is very efficiently compressed. Eager loading of global ordinals +can move the loading time from the first search request, to the refresh itself. + [float] === Fielddata loading @@ -147,6 +182,23 @@ It is possible to force field data to be loaded and cached eagerly through the } -------------------------------------------------- +Global ordinals can also be eagerly loaded: + +[source,js] +-------------------------------------------------- +{ + category: { + type: "string", + fielddata: { + loading: "eager_global_ordinals" + } + } +} +-------------------------------------------------- + +With the above setting both field data and global ordinals for a specific field +are eagerly loaded. + [float] ==== Disabling field data loading diff --git a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc index 09583e290c180..cea5409d8dde9 100644 --- a/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc +++ b/docs/reference/search/aggregations/bucket/terms-aggregation.asciidoc @@ -310,12 +310,15 @@ http://docs.oracle.com/javase/7/docs/api/java/util/regex/Pattern.html#UNIX_LINES ==== Execution hint -There are two mechanisms by which terms aggregations can be executed: either by using field values directly in order to aggregate -data per-bucket (`map`), or by using ordinals of the field values instead of the values themselves (`ordinals`). Although the -latter execution mode can be expected to be slightly faster, it is only available for use when the underlying data source exposes -those terms ordinals. Moreover, it may actually be slower if most field values are unique. Elasticsearch tries to have sensible -defaults when it comes to the execution mode that should be used, but in case you know that one execution mode may perform better -than the other one, you have the ability to "hint" it to Elasticsearch: +coming[1.2.0] The `global_ordinals` execution mode + +There are three mechanisms by which terms aggregations can be executed: either by using field values directly in order to aggregate +data per-bucket (`map`), by using ordinals of the field values instead of the values themselves (`ordinals`) or by using global +ordinals of the field (`global_ordinals`). The latter is faster, especially for fields with many unique +values. However it can be slower if only a few documents match, when for example a terms aggregator is nested in another +aggregator, this applies for both `ordinals` and `global_ordinals` execution modes. Elasticsearch tries to have sensible +defaults when it comes to the execution mode that should be used, but in case you know that one execution mode may +perform better than the other one, you have the ability to "hint" it to Elasticsearch: [source,js] -------------------------------------------------- @@ -331,6 +334,6 @@ than the other one, you have the ability to "hint" it to Elasticsearch: } -------------------------------------------------- -<1> the possible values are `map` and `ordinals` +<1> the possible values are `map`, `ordinals` and `global_ordinals` Please note that Elasticsearch will ignore this execution hint if it is not applicable. diff --git a/src/main/java/org/elasticsearch/common/lucene/TopReaderContextAware.java b/src/main/java/org/elasticsearch/common/lucene/TopReaderContextAware.java new file mode 100644 index 0000000000000..ea11025319932 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/TopReaderContextAware.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.lucene; + +import org.apache.lucene.index.IndexReaderContext; + +/** + * + */ +public interface TopReaderContextAware { + + public void setNextReader(IndexReaderContext reader); +} diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index d069fd5525def..7fe8849ce27d4 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -1560,6 +1560,7 @@ public IndexSearcher newSearcher(IndexReader reader) throws IOException { new SimpleSearcher("warmer", newSearcher)); warmer.warm(context); } + warmer.warmTop(new IndicesWarmer.WarmerContext(shardId, searcher.getIndexReader())); } catch (Throwable e) { if (!closed) { logger.warn("failed to prepare/warm", e); diff --git a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java index 34e1ef80a29ce..c370b9cc8a5c3 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/AbstractIndexFieldData.java @@ -53,6 +53,10 @@ public FieldMapper.Names getFieldNames() { return this.fieldNames; } + public FieldDataType getFieldDataType() { + return fieldDataType; + } + @Override public void clear() { cache.clear(fieldNames.indexName()); diff --git a/src/main/java/org/elasticsearch/index/fielddata/AtomicFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/AtomicFieldData.java index 9845608d664cb..cc3975ebdfccc 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/AtomicFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/AtomicFieldData.java @@ -19,10 +19,12 @@ package org.elasticsearch.index.fielddata; +import org.apache.lucene.index.TermsEnum; + /** * The thread safe {@link org.apache.lucene.index.AtomicReader} level cache of the data. */ -public interface AtomicFieldData