Skip to content

Commit

Permalink
Added exceptions when encountering TTL and Timestamp mappings on ES 6+
Browse files Browse the repository at this point in the history
InitializationUtils validates that the settings are not present when
running against ES 6+, and logs deprecation warnings if they are
present when running against ES 5.x and below. Added deprecated tags
to the TTL and Timestamp metadata enums (in MetadataExtractor AND in
spark rdd packages). When using the metadata tags with Spark RDD's
and the MetadataExtractor, a dummy field extractor that throws
exceptions is returned for TTL and Timestamp when running against
ES 6+.

Fixes #986
  • Loading branch information
jbaiera committed Jun 7, 2017
1 parent 9803e81 commit 0986a78
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.elasticsearch.hadoop;

/**
* Denotes an operation that is not allowed to be performed, often due to the feature support of
* the version of Elasticsearch being used.
*/
public class EsHadoopUnsupportedOperationException extends EsHadoopException {

public EsHadoopUnsupportedOperationException() {
super();
}

public EsHadoopUnsupportedOperationException(String message, Throwable cause) {
super(message, cause);
}

public EsHadoopUnsupportedOperationException(String message) {
super(message);
}

public EsHadoopUnsupportedOperationException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

public abstract class InitializationUtils {

private static final Log LOG = LogFactory.getLog(InitializationUtils.class);

public static void checkIdForOperation(Settings settings) {
String operation = settings.getOperation();

Expand Down Expand Up @@ -272,8 +274,33 @@ public static void validateSettingsForReading(Settings settings) {
public static void validateSettingsForWriting(Settings settings) {
EsMajorVersion version = settings.getInternalVersionOrThrow();

if (version.onOrAfter(EsMajorVersion.V_6_X) && StringUtils.hasText(settings.getUpdateScriptFile())) {
throw new EsHadoopIllegalArgumentException("Cannot use file scripts on ES 6.x and above. Please use stored scripts.");
// Things that were removed in 6.x and forward
if (version.onOrAfter(EsMajorVersion.V_6_X)) {
// File Scripts
if (StringUtils.hasText(settings.getUpdateScriptFile())) {
throw new EsHadoopIllegalArgumentException("Cannot use file scripts on ES 6.x and above. Please use " +
"stored scripts with [" + ConfigurationOptions.ES_UPDATE_SCRIPT_STORED + "] instead.");
}

// Timestamp and TTL in index/updates
if (StringUtils.hasText(settings.getMappingTimestamp())) {
throw new EsHadoopIllegalArgumentException("Cannot use timestamps on index/update requests in ES 6.x " +
"and above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TIMESTAMP + "] setting.");
}
if (StringUtils.hasText(settings.getMappingTtl())) {
throw new EsHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " +
"above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting.");
}
} else {
if (StringUtils.hasText(settings.getMappingTtl())) {
LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TTL + "] is deprecated! Support for [ttl] on " +
"indexing and update requests has been removed in ES 6.x and above!");
}
if (StringUtils.hasText(settings.getMappingTimestamp())) {
LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TIMESTAMP + "] is deprecated! Support for " +
"[timestamp] on indexing and update requests has been removed in ES 6.x and above!");
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,19 @@ enum Metadata {
ID("_id"),
PARENT("_parent"),
ROUTING("_routing"),

/**
* @deprecated TTL is not allowed on index or update requests in Elasticsearch 6.x
*/
@Deprecated
TTL("_ttl"),

/**
* @deprecated Timestamp is not allowed on index or update requests in Elasticsearch 6.x
*/
@Deprecated
TIMESTAMP("_timestamp"),

VERSION("_version"),
VERSION_TYPE("_version_type"),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@

import java.util.EnumMap;

import org.elasticsearch.hadoop.EsHadoopUnsupportedOperationException;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.util.EsMajorVersion;

// specific implementation that relies on basic field extractors that are computed
// lazy per entity. Both the pool and extractors are meant to be reused.
public abstract class PerEntityPoolingMetadataExtractor implements MetadataExtractor {

protected EsMajorVersion version;
protected Object entity;

public PerEntityPoolingMetadataExtractor(EsMajorVersion version) {
this.version = version;
}

private static class StaticFieldExtractor implements FieldExtractor {
private Object field;
private boolean needsInit = true;
Expand All @@ -47,6 +54,28 @@ public boolean needsInit() {
}
}

/**
* A special field extractor meant to be used for metadata fields that are supported in
* some versions of Elasticsearch, but not others. In the case that a metadata field is
* unsupported for the configured version of Elasticsearch, this extractor which throws
* exceptions for using unsupported metadata tags is returned instead of the regular one.
*/
private static class UnsupportedMetadataFieldExtractor extends StaticFieldExtractor {
private Metadata unsupportedMetadata;
private EsMajorVersion version;

public UnsupportedMetadataFieldExtractor(Metadata unsupportedMetadata, EsMajorVersion version) {
this.unsupportedMetadata = unsupportedMetadata;
this.version = version;
}

@Override
public Object field(Object target) {
throw new EsHadoopUnsupportedOperationException("Unsupported metadata tag [" + unsupportedMetadata.getName()
+ "] for Elasticsearch version [" + version.toString() + "]. Bailing out...");
}
}

private final EnumMap<Metadata, StaticFieldExtractor> pool = new EnumMap<Metadata, StaticFieldExtractor>(Metadata.class);

public void reset() {
Expand All @@ -63,7 +92,7 @@ public FieldExtractor get(Metadata metadata) {
return null;
}
if (fieldExtractor == null) {
fieldExtractor = new StaticFieldExtractor();
fieldExtractor = createExtractorFor(metadata);
}
if (fieldExtractor.needsInit()) {
fieldExtractor.setField(value);
Expand All @@ -73,6 +102,23 @@ public FieldExtractor get(Metadata metadata) {
return fieldExtractor;
}

/**
* If a metadata tag is unsupported for this version of Elasticsearch then a
*/
private StaticFieldExtractor createExtractorFor(Metadata metadata) {
// Boot metadata tags that are not supported in this version of Elasticsearch
if (version.onOrAfter(EsMajorVersion.V_6_X)) {
// 6.0 Removed support for TTL and Timestamp metadata on index and update requests.
switch (metadata) {
case TTL: // Fall through
case TIMESTAMP:
return new UnsupportedMetadataFieldExtractor(metadata, version);
}
}

return new StaticFieldExtractor();
}

public abstract Object getValue(Metadata metadata);

public void setObject(Object entity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Test;

Expand Down Expand Up @@ -118,4 +119,20 @@ public void testValidateMultipleScripts() throws Exception {
set.setProperty(ES_UPDATE_SCRIPT_STORED, "test");
validateSettings(set);
}

@Test(expected = EsHadoopIllegalArgumentException.class)
public void testValidateWriteV6PlusTTLRemoved() throws Exception {
Settings set = new TestSettings();
set.setInternalVersion(EsMajorVersion.V_6_X);
set.setProperty(ES_MAPPING_TTL, "1000");
validateSettingsForWriting(set);
}

@Test(expected = EsHadoopIllegalArgumentException.class)
public void testValidateWriteV6PlusTimestampRemoved() throws Exception {
Settings set = new TestSettings();
set.setInternalVersion(EsMajorVersion.V_6_X);
set.setProperty(ES_MAPPING_TIMESTAMP, "1000");
validateSettingsForWriting(set);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkException
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException
import org.elasticsearch.hadoop.EsHadoopUnsupportedOperationException
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_INDEX_AUTO_CREATE
import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_INDEX_READ_MISSING_AS_EMPTY
Expand All @@ -50,6 +51,7 @@ import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.hadoop.util.TestUtils
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.rdd.Metadata.ID
import org.elasticsearch.spark.rdd.Metadata.TTL
import org.elasticsearch.spark.rdd.Metadata.VERSION
import org.elasticsearch.spark.serialization.Bean
import org.elasticsearch.spark.serialization.ReflectionUtils
Expand Down Expand Up @@ -282,6 +284,33 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
assertThat(RestUtils.get(target + "/_search?"), containsString("SFO"))
}

@Test(expected = classOf[EsHadoopUnsupportedOperationException])
def testEsRDDWriteWithUnsupportedMapping() {
EsAssume.versionOnOrAfter(EsMajorVersion.V_6_X, "TTL only removed in v6 and up.")

val doc1 = Map("one" -> null, "two" -> Set("2"), "three" -> (".", "..", "..."), "number" -> 1)
val doc2 = Map("OTP" -> "Otopeni", "SFO" -> "San Fran", "number" -> 2)

val target = wrapIndex("spark-test-scala-dyn-id-write-fail/data")

val metadata1 = Map(ID -> 5)
val metadata2 = Map(ID -> 6, TTL -> "23")

assertEquals(5, metadata1.getOrElse(ID, null))
assertEquals(6, metadata2.getOrElse(ID, null))

val pairRDD = sc.makeRDD(Seq((metadata1, doc1), (metadata2, doc2)))

try {
pairRDD.saveToEsWithMeta(target, cfg)
} catch {
case s: SparkException => throw s.getCause
case t: Throwable => throw t
}

fail("Should not have ingested TTL on ES 6.x+")
}

@Test
def testEsRDDWriteWithMappingExclude() {
val trip1 = Map("reason" -> "business", "airport" -> "SFO")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class EsRDDWriter[T: ClassTag](val serializedSettings: String,
settings
}

lazy val metaExtractor = new ScalaMetadataExtractor()
lazy val metaExtractor = new ScalaMetadataExtractor(settings.getInternalVersionOrThrow)

def write(taskContext: TaskContext, data: Iterator[T]) {
val writer = RestService.createWriter(settings, taskContext.partitionId, -1, log)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ public enum Metadata {
ID,
PARENT,
ROUTING,
/**
* @deprecated TTL is not allowed on index or update requests in Elasticsearch 6.x
*/
@Deprecated
TTL,
/**
* @deprecated Timestamp is not allowed on index or update requests in Elasticsearch 6.x
*/
@Deprecated
TIMESTAMP,
VERSION,
VERSION_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import java.util.EnumMap
import java.util.{Map => JMap}

import scala.collection.{Map => SMap}

import org.elasticsearch.hadoop.serialization.bulk.MetadataExtractor.{Metadata => InternalMetadata}
import org.elasticsearch.hadoop.serialization.bulk.PerEntityPoolingMetadataExtractor
import org.elasticsearch.hadoop.util.EsMajorVersion
import org.elasticsearch.spark.rdd.{Metadata => SparkMetadata}

private[spark] class ScalaMetadataExtractor extends PerEntityPoolingMetadataExtractor {
private[spark] class ScalaMetadataExtractor(version: EsMajorVersion) extends PerEntityPoolingMetadataExtractor(version) {

override def getValue(metadata: InternalMetadata): AnyRef = {
val sparkEnum = ScalaMetadataExtractor.toSparkEnum(metadata)
Expand Down

0 comments on commit 0986a78

Please sign in to comment.