Skip to content

Commit

Permalink
Override RDD count with a fast ES call
Browse files Browse the repository at this point in the history
This avoids having to instantiate all the docs
and it does take into account the associated query

relates #526
  • Loading branch information
costin committed Aug 21, 2015
1 parent 278972f commit eb371b7
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 11 deletions.
Expand Up @@ -368,7 +368,7 @@ public InputStream scroll(String scrollId) {
try {
// use post instead of get to avoid some weird encoding issues (caused by the long URL)
InputStream is = execute(POST, "_search/scroll?scroll=" + scrollKeepAlive.toString(),
new BytesArray(scrollId.getBytes(StringUtils.UTF_8))).body();
new BytesArray(scrollId)).body();
stats.scrollTotal++;
return is;
} finally {
Expand All @@ -377,7 +377,7 @@ public InputStream scroll(String scrollId) {
}

public void deleteScroll(String scrollId) {
execute(DELETE, "_search/scroll", new BytesArray(scrollId.getBytes(StringUtils.UTF_8)), false);
execute(DELETE, "_search/scroll", new BytesArray(scrollId), false);
}

public boolean exists(String indexOrType) {
Expand Down Expand Up @@ -406,8 +406,9 @@ public boolean touch(String indexOrType) {
return false;
}

public long count(String indexAndType) {
Number count = (Number) get(indexAndType + "/_count", "count");
public long count(String indexAndType, ByteSequence query) {
Response response = execute(GET, indexAndType + "/_count", query);
Number count = (Number) parseContent(response.body(), "count");
return (count != null ? count.longValue() : -1);
}

Expand Down
Expand Up @@ -500,7 +500,7 @@ public boolean isEmpty(boolean read) {

public long count(boolean read) {
Resource res = (read ? resourceR : resourceW);
return client.count(res.indexAndType());
return client.count(res.indexAndType(), QueryUtils.parseQuery(settings));
}

public boolean waitForYellow() {
Expand Down
Expand Up @@ -359,13 +359,13 @@ public Response execute(Request request) throws IOException {

switch (request.method()) {
case DELETE:
http = new HttpDeleteWithBody();
http = new DeleteMethodWithBody();
break;
case HEAD:
http = new HeadMethod();
break;
case GET:
http = new GetMethod();
http = (request.body() == null ? new GetMethod() : new GetMethodWithBody());
break;
case POST:
http = new PostMethod();
Expand Down
Expand Up @@ -20,11 +20,11 @@

import org.apache.commons.httpclient.methods.EntityEnclosingMethod;

public class HttpDeleteWithBody extends EntityEnclosingMethod {
public class DeleteMethodWithBody extends EntityEnclosingMethod {

public HttpDeleteWithBody() {}
public DeleteMethodWithBody() {}

public HttpDeleteWithBody(String uri) {
public DeleteMethodWithBody(String uri) {
super(uri);
}

Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.rest.commonshttp;

import org.apache.commons.httpclient.methods.EntityEnclosingMethod;

public class GetMethodWithBody extends EntityEnclosingMethod {

public GetMethodWithBody() {
// not supported out of the box for EntityEnclosingMethods
setFollowRedirects(false);
}

public GetMethodWithBody(String uri) {
super(uri);
// not supported out of the box for EntityEnclosingMethods
setFollowRedirects(false);
}

public String getName() {
return "GET";
}
}

Expand Up @@ -83,6 +83,7 @@ public void testEsRDDWrite() throws Exception {
JavaEsSpark.saveToEs(javaRDD, target);
JavaEsSpark.saveToEs(javaRDD, ImmutableMap.of(ES_RESOURCE, target + "1"));

assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target));
String results = RestUtils.get(target + "/_search?");
assertThat(results, containsString("SFO"));
Expand All @@ -97,6 +98,8 @@ public void testEsRDDWriteWithMappingId() throws Exception {
JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(ImmutableList.of(doc1, doc2));
// eliminate with static import
JavaEsSpark.saveToEs(javaRDD, target, ImmutableMap.of(ES_MAPPING_ID, "number"));

assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target + "/1"));
assertTrue(RestUtils.exists(target + "/2"));
String results = RestUtils.get(target + "/_search?");
Expand All @@ -115,6 +118,7 @@ public void testEsRDDWriteWithDynamicMapping() throws Exception {
// eliminate with static import
JavaEsSpark.saveToEsWithMeta(pairRdd, target);

assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target + "/1"));
assertTrue(RestUtils.exists(target + "/2"));
String results = RestUtils.get(target + "/_search?");
Expand All @@ -134,6 +138,7 @@ public void testEsRDDWriteWithDynamicMappingBasedOnMaps() throws Exception {
// eliminate with static import
JavaEsSpark.saveToEsWithMeta(pairRDD, target);

assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target + "/1"));
assertTrue(RestUtils.exists(target + "/2"));
String results = RestUtils.get(target + "/_search?");
Expand All @@ -150,6 +155,7 @@ public void testEsRDDWriteWithMappingExclude() throws Exception {
JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(ImmutableList.of(doc1, doc2));
JavaEsSpark.saveToEs(javaRDD, target, ImmutableMap.of(ES_MAPPING_EXCLUDE, "airport"));

assertEquals(2, JavaEsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target));
assertThat(RestUtils.get(target + "/_search?"), containsString("business"));
assertThat(RestUtils.get(target + "/_search?"), containsString("participants"));
Expand Down
Expand Up @@ -145,6 +145,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
sc.makeRDD(Seq(javaBean, caseClass2)).saveToEs(target, Map("es.mapping.id"->"id"))

assertTrue(RestUtils.exists(target))
assertEquals(3, EsSpark.esRDD(sc, target).count());
assertThat(RestUtils.get(target + "/_search?"), containsString(""))
}

Expand All @@ -156,6 +157,8 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
val target = "spark-test/scala-id-write"

sc.makeRDD(Seq(doc1, doc2)).saveToEs(target, Map(ES_MAPPING_ID -> "number"))

assertEquals(2, EsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target + "/1"))
assertTrue(RestUtils.exists(target + "/2"))

Expand All @@ -171,6 +174,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend

val pairRDD = sc.makeRDD(Seq((3, doc1), (4, doc2))).saveToEsWithMeta(target, cfg)

assertEquals(2, EsSpark.esRDD(sc, target).count());
assertTrue(RestUtils.exists(target + "/3"))
assertTrue(RestUtils.exists(target + "/4"))

Expand Down
Expand Up @@ -3,7 +3,6 @@ package org.elasticsearch.spark.rdd;
import scala.collection.JavaConversions.collectionAsScalaIterable
import scala.collection.JavaConversions.mapAsJavaMap
import scala.reflect.ClassTag

import org.apache.commons.logging.LogFactory
import org.apache.spark.Partition
import org.apache.spark.SparkContext
Expand All @@ -12,6 +11,8 @@ import org.elasticsearch.hadoop.rest.RestService
import org.elasticsearch.hadoop.rest.RestService.PartitionDefinition
import org.elasticsearch.hadoop.util.ObjectUtils
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.rest.RestClient
import org.elasticsearch.hadoop.rest.RestRepository

private[spark] abstract class AbstractEsRDD[T: ClassTag](
@transient sc: SparkContext,
Expand Down Expand Up @@ -41,6 +42,15 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
override def checkpoint() {
// Do nothing. Elasticsearch RDD should not be checkpointed.
}

override def count(): Long = {
val repo = new RestRepository(esCfg)
try {
return repo.count(true)
} finally {
repo.close()
}
}

@transient private[spark] lazy val esCfg = {
val cfg = new SparkSettingsManager().load(sc.getConf).copy();
Expand Down
Expand Up @@ -242,6 +242,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
if (overwrite) {
Utils.logger("org.elasticsearch.spark.sql.DataSource").info(s"Overwriting data for ${cfg.getResourceWrite}")

// perform a scan-scroll delete
val cfgCopy = cfg.copy()
InitializationUtils.setValueWriterIfNotSet(cfgCopy, classOf[JdkValueWriter], null)
cfgCopy.setProperty(ConfigurationOptions.ES_BATCH_FLUSH_MANUAL, "false")
Expand Down

3 comments on commit eb371b7

@MLnick
Copy link
Contributor

@MLnick MLnick commented on eb371b7 Oct 29, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@costin I like the idea of a "fast count" method for an ES RDD... but overriding RDD.count seems to me like it could easily cause some problems for users. For example, a very common pattern in Spark when caching an RDD for use multiple times (e.g. machine learning or interactive analytics) is to call RDD.count on the cached RDD, in order to force all records to be materialized.

Would it not be better to have a separate method for this? Or at least be very explicit about the fact that count now doesn't do what you might think it does via docs...

@costin
Copy link
Member Author

@costin costin commented on eb371b7 Oct 29, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MLnick I see. I'll revert the change and implement the optimized count method separately to preserve the semantics in place.

@MLnick
Copy link
Contributor

@MLnick MLnick commented on eb371b7 Nov 4, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@costin sounds good !

Please sign in to comment.