Skip to content

Commit

Permalink
[REST] Fixing update/upsert script backwards compatibility issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaiera committed Aug 26, 2016
1 parent 075098a commit 53c07e0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 29 deletions.
12 changes: 7 additions & 5 deletions hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -42,15 +42,14 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobTracker;
import org.elasticsearch.hadoop.cfg.HadoopSettingsManager;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.*;

public class EsSerDe extends AbstractSerDe {

Expand All @@ -67,6 +66,7 @@ public class EsSerDe extends AbstractSerDe {
private final HiveBytesArrayWritable result = new HiveBytesArrayWritable();
private StructTypeInfo structTypeInfo;
private FieldAlias alias;
private EsMajorVersion version;
private BulkCommand command;

private boolean writeInitialized = false;
Expand All @@ -83,6 +83,8 @@ public void initialize(Configuration conf, Properties tbl, Properties partitionP
settings = (cfg != null ? HadoopSettingsManager.loadFrom(cfg).merge(tbl) : HadoopSettingsManager.loadFrom(tbl));
alias = HiveUtils.alias(settings);

version = InitializationUtils.discoverEsVersion(settings, log);

HiveUtils.fixHive13InvalidComments(settings, tbl);
this.tableProperties = tbl;

Expand Down Expand Up @@ -149,7 +151,7 @@ private void lazyInitializeWrite() {
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log);
this.command = BulkCommands.create(settings, null);
this.command = BulkCommands.create(settings, null, version);
}


Expand Down
Expand Up @@ -124,7 +124,7 @@ private void lazyInitWriting() {
bufferEntriesThreshold = settings.getBatchSizeInEntries();
requiresRefreshAfterBulk = settings.getBatchRefreshAfterWrite();

this.command = BulkCommands.create(settings, metaExtractor);
this.command = BulkCommands.create(settings, metaExtractor, client.internalVersion);
}
}

Expand Down
Expand Up @@ -21,13 +21,14 @@
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.util.EsMajorVersion;

/**
* Handles the instantiation of bulk commands.
*/
public abstract class BulkCommands {

public static BulkCommand create(Settings settings, MetadataExtractor metaExtractor) {
public static BulkCommand create(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {

String operation = settings.getOperation();
BulkFactory factory = null;
Expand All @@ -39,10 +40,10 @@ else if (ConfigurationOptions.ES_OPERATION_INDEX.equals(operation)) {
factory = new IndexBulkFactory(settings, metaExtractor);
}
else if (ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)) {
factory = new UpdateBulkFactory(settings, metaExtractor);
factory = new UpdateBulkFactory(settings, metaExtractor, version);
}
else if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation)) {
factory = new UpdateBulkFactory(settings, true, metaExtractor);
factory = new UpdateBulkFactory(settings, true, metaExtractor, version);
}
else {
throw new EsHadoopIllegalArgumentException("Unknown operation " + operation);
Expand Down
Expand Up @@ -24,34 +24,46 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.bulk.MetadataExtractor.Metadata;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;

class UpdateBulkFactory extends AbstractBulkFactory {

private final int RETRY_ON_FAILURE;
private final String RETRY_HEADER;
private final String SCRIPT;
private final String SCRIPT_LANG;

private final String SCRIPT_5X;
private final String SCRIPT_LANG_5X;

private final String SCRIPT_1X;
private final String SCRIPT_LANG_1X;

private final boolean HAS_SCRIPT, HAS_LANG;
private final boolean UPSERT;

public UpdateBulkFactory(Settings settings, MetadataExtractor metaExtractor) {
this(settings, false, metaExtractor);
private final EsMajorVersion esMajorVersion;

public UpdateBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion esMajorVersion) {
this(settings, false, metaExtractor, esMajorVersion);
}

public UpdateBulkFactory(Settings settings, boolean upsert, MetadataExtractor metaExtractor) {
public UpdateBulkFactory(Settings settings, boolean upsert, MetadataExtractor metaExtractor, EsMajorVersion esMajorVersion) {
super(settings, metaExtractor);

this.esMajorVersion = esMajorVersion;

UPSERT = upsert;
RETRY_ON_FAILURE = settings.getUpdateRetryOnConflict();
RETRY_HEADER = "\"_retry_on_conflict\":" + RETRY_ON_FAILURE + "";

HAS_SCRIPT = StringUtils.hasText(settings.getUpdateScript());
HAS_LANG = StringUtils.hasText(settings.getUpdateScriptLang());

SCRIPT_LANG = ",\"lang\":\"" + settings.getUpdateScriptLang() + "\"";
SCRIPT = "{\"script\":{\"inline\":\"" + settings.getUpdateScript() + "\"";
SCRIPT_LANG_5X = ",\"lang\":\"" + settings.getUpdateScriptLang() + "\"";
SCRIPT_5X = "{\"script\":{\"inline\":\"" + settings.getUpdateScript() + "\"";

SCRIPT_LANG_1X = "\"lang\":\"" + settings.getUpdateScriptLang() + "\",";
SCRIPT_1X = "\"script\":\"" + settings.getUpdateScript() + "\"";
}

@Override
Expand All @@ -75,10 +87,55 @@ protected void writeObjectHeader(List<Object> list) {

Object paramExtractor = getExtractorOrDynamicValue(Metadata.PARAMS, getParamExtractor());

if (esMajorVersion.after(EsMajorVersion.V_1_X)) {
writeStrictFormatting(list, paramExtractor);
} else {
writeLegacyFormatting(list, paramExtractor);
}
}

/**
* Script format meant for versions 1.x to 2.x. Required format for 1.x and below.
* @param list Consumer of snippets
* @param paramExtractor Extracts parameters from documents or constants
*/
private void writeLegacyFormatting(List<Object> list, Object paramExtractor) {
if (paramExtractor != null) {
list.add("{\"params\":");
list.add(paramExtractor);
list.add(",");
}
else {
list.add("{");
}

if (HAS_SCRIPT) {
if (HAS_LANG) {
list.add(SCRIPT_LANG_1X);
}
list.add(SCRIPT_1X);
if (UPSERT) {
list.add(",\"upsert\":");
}
}
else {
if (UPSERT) {
list.add("\"doc_as_upsert\":true,");
}
list.add("\"doc\":");
}
}

/**
* Script format meant for versions 2.x to 5.x. Required format for 5.x and above.
* @param list Consumer of snippets
* @param paramExtractor Extracts parameters from documents or constants
*/
private void writeStrictFormatting(List<Object> list, Object paramExtractor) {
if (HAS_SCRIPT) {
list.add(SCRIPT);
list.add(SCRIPT_5X);
if (HAS_LANG) {
list.add(SCRIPT_LANG);
list.add(SCRIPT_LANG_5X);
}
if (paramExtractor != null) {
list.add(",\"params\":");
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Before;
Expand All @@ -52,21 +53,36 @@ public class CommandTest {
private final String operation;
private boolean noId = false;
private boolean jsonInput = false;
private final EsMajorVersion version;

@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ ConfigurationOptions.ES_OPERATION_INDEX, false },
{ ConfigurationOptions.ES_OPERATION_CREATE, false },
{ ConfigurationOptions.ES_OPERATION_UPDATE, false }, { ConfigurationOptions.ES_OPERATION_INDEX, true },
{ ConfigurationOptions.ES_OPERATION_CREATE, true }, { ConfigurationOptions.ES_OPERATION_UPDATE, true },

{ ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_1_X },
{ ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_2_X },
{ ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_5_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_5_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_5_X },
{ ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_5_X },
{ ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_5_X },
{ ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_5_X },
});
}

public CommandTest(String operation, boolean jsonInput) {
public CommandTest(String operation, boolean jsonInput, EsMajorVersion version) {
this.operation = operation;
this.jsonInput = jsonInput;
this.version = version;
}

@Before
Expand Down Expand Up @@ -184,8 +200,27 @@ public void testIdMandatory() throws Exception {
}

@Test
public void testUpdateOnlyScript() throws Exception {
public void testUpdateOnlyScript5X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
assumeTrue(version.after(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");
set.setProperty(ConfigurationOptions.ES_UPDATE_RETRY_ON_CONFLICT, "3");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = 3");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");

create(set).write(data).copyTo(ba);
String result =
"{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" +
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
assertEquals(result, ba.toString());
}

@Test
public void testUpdateOnlyScript1X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");
Expand All @@ -196,13 +231,14 @@ public void testUpdateOnlyScript() throws Exception {
create(set).write(data).copyTo(ba);
String result =
"{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" +
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
"{\"lang\":\"groovy\",\"script\":\"counter = 3\"}\n";
assertEquals(result, ba.toString());
}

@Test
public void testUpdateOnlyParamScript() throws Exception {
public void testUpdateOnlyParamScript5X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
assumeTrue(version.after(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
Expand All @@ -214,17 +250,36 @@ public void testUpdateOnlyParamScript() throws Exception {

String result =
"{\"" + operation + "\":{\"_id\":1}}\n" +
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testUpdateOnlyParamScript1X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"" + operation + "\":{\"_id\":1}}\n" +
"{\"params\":{\"param1\":1,\"param2\":1},\"lang\":\"groovy\",\"script\":\"counter = param1; anothercounter = param2\"}\n";
assertEquals(result, ba.toString());
}


private BulkCommand create(Settings settings) {
if (!StringUtils.hasText(settings.getResourceWrite())) {
settings.setProperty(ConfigurationOptions.ES_WRITE_OPERATION, operation);
}
return BulkCommands.create(settings, null);
return BulkCommands.create(settings, null, version);
}

private Settings settings() {
Expand Down

0 comments on commit 53c07e0

Please sign in to comment.