Skip to content

Commit

Permalink
Fixing issues with checking for index, type and document existence.
Browse files Browse the repository at this point in the history
In 5.5 it seems that the old deprecated way of doing things is now gone.
Added new methods for checking existence of indices, types on indices,
and documents in those types. These calls should be explicit for what
they're checking.

When initializing and checking if the index exists, we need to ensure
that the ES version is discovered now. When testing, make sure the
extended rest client has the version installed. Ignore routing data
on parent child for tests on 1.x.

(cherry picked from commit b8d2657)
  • Loading branch information
jbaiera committed Jul 6, 2017
1 parent 8065795 commit 2f4e3a5
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 24 deletions.
52 changes: 45 additions & 7 deletions mr/src/itest/java/org/elasticsearch/hadoop/mr/RestUtils.java
Expand Up @@ -19,15 +19,19 @@
package org.elasticsearch.hadoop.mr;

import java.io.IOException;
import java.util.List;

import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Request;
import org.elasticsearch.hadoop.rest.Response;
import org.elasticsearch.hadoop.rest.RestClient;
import org.elasticsearch.hadoop.rest.RestClient.Health;
import org.elasticsearch.hadoop.serialization.dto.mapping.Field;
import org.elasticsearch.hadoop.util.ByteSequence;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.elasticsearch.hadoop.util.TestUtils;
import org.elasticsearch.hadoop.util.unit.TimeValue;
Expand All @@ -38,8 +42,18 @@ public class RestUtils {

public static class ExtendedRestClient extends RestClient {

private static EsMajorVersion TEST_VERSION = null;

private static Settings withVersion(Settings settings) {
if (TEST_VERSION == null) {
TEST_VERSION = TestUtils.getEsVersion();
}
settings.setInternalVersion(TEST_VERSION);
return settings;
}

public ExtendedRestClient() {
super(new TestSettings());
super(withVersion(new TestSettings()));
}

@Override
Expand All @@ -64,15 +78,22 @@ public String refresh(String index) throws IOException {
}
}

public static void putMapping(String index, byte[] content) throws Exception {
public static void putMapping(String index, String type, byte[] content) throws Exception {
RestClient rc = new ExtendedRestClient();
BytesArray bs = new BytesArray(content);
rc.putMapping(index, index + "/_mapping", bs.bytes());
rc.putMapping(index, index + "/" + type + "/_mapping", bs.bytes());
rc.close();
}

public static void createMultiTypeIndex(String index) throws Exception {
put(index, "{\"settings\":{\"index.mapping.single_type\":false}}".getBytes());
/**
* @deprecated use putMapping(index, type, content) instead
*/
public static void putMapping(String indexAndType, byte[] content) throws Exception {
List<String> parts = StringUtils.tokenize(indexAndType, "/");
if (parts.size() != 2) {
throw new IllegalArgumentException("Expected index/type, got [" + indexAndType + "] instead.");
}
putMapping(parts.get(0), parts.get(1), content);
}

public static Field getMapping(String index) throws Exception {
Expand All @@ -89,6 +110,13 @@ public static String get(String index) throws Exception {
return str;
}

public static void putMapping(String index, String type, String location) throws Exception {
putMapping(index, type, TestUtils.fromInputStream(RestUtils.class.getClassLoader().getResourceAsStream(location)));
}

/**
* @deprecated use putMapping(index, type, location) instead
*/
public static void putMapping(String index, String location) throws Exception {
putMapping(index, TestUtils.fromInputStream(RestUtils.class.getClassLoader().getResourceAsStream(location)));
}
Expand Down Expand Up @@ -136,7 +164,17 @@ public static void refresh(String string) throws Exception {

public static boolean exists(String string) throws Exception {
ExtendedRestClient rc = new ExtendedRestClient();
boolean result = rc.exists(string);
boolean result;
List<String> parts = StringUtils.tokenize(string, "/");
if (parts.size() == 1) {
result = rc.indexExists(string);
} else if (parts.size() == 2) {
result = rc.typeExists(parts.get(0), parts.get(1));
} else if (parts.size() == 3) {
result = rc.documentExists(parts.get(0), parts.get(1), parts.get(2));
} else {
throw new RuntimeException("Invalid exists path : " + string);
}
rc.close();
return result;
}
Expand All @@ -147,4 +185,4 @@ public static boolean touch(String index) throws IOException {
rc.close();
return result;
}
}
}
Expand Up @@ -256,6 +256,8 @@ private void init(Configuration cfg) throws IOException {
Settings settings = HadoopSettingsManager.loadFrom(cfg);
Assert.hasText(settings.getResourceWrite(), String.format("No resource ['%s'] (index/query/location) specified", ES_RESOURCE));

// Need to discover the ESVersion before checking if index exists.
InitializationUtils.discoverEsVersion(settings, log);
InitializationUtils.checkIdForOperation(settings);
InitializationUtils.checkIndexExistence(settings);

Expand Down
Expand Up @@ -70,7 +70,7 @@ public static void checkIndexStatus(Settings settings) {
Resource readResource = new Resource(settings, true);

try {
if (bootstrap.exists(readResource.indexAndType())) {
if (bootstrap.indexExists(readResource.index())) {
RestClient.Health status = bootstrap.getHealth(readResource.index());
if (status == RestClient.Health.RED) {
throw new EsHadoopIllegalStateException("Index specified [" + readResource.index() + "] is either red or " +
Expand Down
48 changes: 34 additions & 14 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Expand Up @@ -542,29 +542,49 @@ public boolean deleteScroll(String scrollId) {
return (res.status() == HttpStatus.OK ? true : false);
}

public boolean exists(String indexOrType) {
public boolean documentExists(String index, String type, String id) {
return exists(index + "/" + type + "/" + id);
}

public boolean typeExists(String index, String type) {
String indexType;
if (internalVersion.onOrAfter(EsMajorVersion.V_5_X)) {
indexType = index + "/_mapping/" + type;
} else {
indexType = index + "/" + type;
}
return exists(indexType);
}

public boolean indexExists(String index) {
return exists(index);
}

private boolean exists(String indexOrType) {
Request req = new SimpleRequest(HEAD, null, indexOrType);
Response res = executeNotFoundAllowed(req);

return (res.status() == HttpStatus.OK ? true : false);
}

public boolean touch(String index) {
String target = index;
int slash = index.indexOf("/");
if (slash > 0) {
// Remove the type if one is given...
target = index.substring(0, slash);
}
if (!exists(target)) {
try {
Response response = execute(PUT, target, true);
return response.hasSucceeded();
} catch (EsHadoopInvalidRequest invalidRequest) {
if (StringUtils.hasText(invalidRequest.getMessage()) && !invalidRequest.getMessage().contains("already exists")) {
throw new EsHadoopIllegalStateException(invalidRequest.getMessage());
if (!indexExists(index)) {
Response response = execute(PUT, index, false);

if (response.hasFailed()) {
String msg = null;
// try to parse the answer
try {
msg = parseContent(response.body(), "error");
} catch (Exception ex) {
// can't parse message, move on
}

if (StringUtils.hasText(msg) && !msg.contains("IndexAlreadyExistsException")) {
throw new EsHadoopIllegalStateException(msg);
}
}
return response.hasSucceeded();
}
return false;
}
Expand Down
Expand Up @@ -385,7 +385,11 @@ Scroll scroll(String scrollId, ScrollReader reader) throws IOException {
public boolean indexExists(boolean read) {
Resource res = (read ? resourceR : resourceW);
// cheap hit
boolean exists = client.exists(res.indexAndType());
boolean exists = client.indexExists(res.index());
if (exists && StringUtils.hasText(res.type())) {
exists = client.typeExists(res.index(), res.type());
}

// could be a _all or a pattern which is valid for read
// try again by asking the mapping - could be expensive
if (!exists && read) {
Expand Down Expand Up @@ -470,7 +474,7 @@ public void delete() {

public boolean isEmpty(boolean read) {
Resource res = (read ? resourceR : resourceW);
boolean exists = client.exists(res.indexAndType());
boolean exists = client.indexExists(res.index());
return (exists ? count(read) <= 0 : true);
}

Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.hadoop.Provisioner;
import org.elasticsearch.hadoop.QueryTestParams;
import org.elasticsearch.hadoop.mr.RestUtils;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,6 +45,7 @@ public class AbstractPigReadAsJsonTest extends AbstractPigTests {
private static int testInstance = 0;
private static String previousQuery;
private boolean readMetadata;
private EsMajorVersion testVersion;

@Parameters
public static Collection<Object[]> queries() {
Expand All @@ -54,6 +57,7 @@ public static Collection<Object[]> queries() {
public AbstractPigReadAsJsonTest(String query, boolean metadata) {
this.query = query;
this.readMetadata = metadata;
this.testVersion = TestUtils.getEsVersion();

if (!query.equals(previousQuery)) {
previousQuery = query;
Expand Down Expand Up @@ -218,6 +222,9 @@ public void testParentChild() throws Exception {
if (readMetadata) {
doc1.add(",\"_metadata\":{\"_index\":\"json-pig-pc\",\"_type\":\"child\",\"_id\":\"");
doc1.add("\",\"_score\":");
if (testVersion.onOrAfter(EsMajorVersion.V_2_X)) {
doc1.add("\"_routing\":\"12\",\"_parent\":\"12\"");
}
}

List<String> doc2 = Lists.newArrayList(
Expand All @@ -226,6 +233,9 @@ public void testParentChild() throws Exception {
if (readMetadata) {
doc2.add(",\"_metadata\":{\"_index\":\"json-pig-pc\",\"_type\":\"child\",\"_id\":\"");
doc2.add("\",\"_score\":");
if (testVersion.onOrAfter(EsMajorVersion.V_2_X)) {
doc2.add("\"_routing\":\"918\",\"_parent\":\"918\"");
}
}

List<String> doc3 = Lists.newArrayList(
Expand All @@ -234,6 +244,9 @@ public void testParentChild() throws Exception {
if (readMetadata) {
doc3.add(",\"_metadata\":{\"_index\":\"json-pig-pc\",\"_type\":\"child\",\"_id\":\"");
doc3.add("\",\"_score\":");
if (testVersion.onOrAfter(EsMajorVersion.V_2_X)) {
doc3.add("\"_routing\":\"982\",\"_parent\":\"982\"");
}
}

assertThat(results, stringContainsInOrder(doc1));
Expand Down
Expand Up @@ -34,6 +34,9 @@ import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.rest.InitializationUtils

object EsSpark {

@transient private[this] val LOG = LogFactory.getLog(EsSpark.getClass)

//
// Load methods
//
Expand Down Expand Up @@ -96,6 +99,8 @@ object EsSpark {
val config = new PropertiesSettings().load(sparkCfg.save())
config.merge(cfg.asJava)

// Need to discover the EsVersion here before checking if the index exists
InitializationUtils.discoverEsVersion(config, LOG)
InitializationUtils.checkIdForOperation(config)
InitializationUtils.checkIndexExistence(config)

Expand Down
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.spark.sql

import org.apache.commons.logging.LogFactory

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import scala.collection.Map
Expand All @@ -36,6 +38,8 @@ object EsSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)

def esDF(sc: SQLContext): DataFrame = esDF(sc, Map.empty[String, String])
def esDF(sc: SQLContext, resource: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource))
def esDF(sc: SQLContext, resource: String, query: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))
Expand Down Expand Up @@ -68,6 +72,8 @@ object EsSparkSQL {
val esCfg = new PropertiesSettings().load(sparkCfg.save())
esCfg.merge(cfg.asJava)

// Need to discover es version before checking index existence
InitializationUtils.discoverEsVersion(esCfg, LOG)
InitializationUtils.checkIdForOperation(esCfg)
InitializationUtils.checkIndexExistence(esCfg)

Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.spark.sql

import org.apache.commons.logging.LogFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SQLContext
Expand All @@ -39,6 +40,8 @@ object EsSparkSQL {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }

@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)

//
// Read
//
Expand Down Expand Up @@ -90,6 +93,8 @@ object EsSparkSQL {
val esCfg = new PropertiesSettings().load(sparkCfg.save())
esCfg.merge(cfg.asJava)

// Need to discover ES Version before checking index existence
InitializationUtils.discoverEsVersion(esCfg, LOG)
InitializationUtils.checkIdForOperation(esCfg)
InitializationUtils.checkIndexExistence(esCfg)

Expand Down

0 comments on commit 2f4e3a5

Please sign in to comment.