From 97ac095801f93ea927164e4358a2db06a079a335 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Thu, 31 Oct 2013 21:27:56 +0200 Subject: [PATCH] add id awareness to Pig relates to #69 --- .../hadoop/hive/ESStorageHandler.java | 3 + .../elasticsearch/hadoop/pig/ESStorage.java | 21 ++++--- .../hadoop/pig/PigIdExtractor.java | 61 +++++++++++++++++++ .../hadoop/util/ObjectUtils.java | 8 +++ .../hadoop/integration/pig/PigSaveTest.java | 61 ++++++++++++++++++- .../hadoop/integration/pig/PigWrapper.java | 10 ++- 6 files changed, 154 insertions(+), 10 deletions(-) create mode 100644 src/main/java/org/elasticsearch/hadoop/pig/PigIdExtractor.java diff --git a/src/main/java/org/elasticsearch/hadoop/hive/ESStorageHandler.java b/src/main/java/org/elasticsearch/hadoop/hive/ESStorageHandler.java index 7892bfed10bea7..6d7e3209581a6b 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/ESStorageHandler.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/ESStorageHandler.java @@ -83,6 +83,9 @@ private void init(TableDesc tableDesc) { // NB: ESSerDe is already initialized at this stage // NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes + + InitializationUtils.checkIdForOperation(settings); + SerializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log); SerializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log); InitializationUtils.setIdExtractorIfNotSet(settings, HiveIdExtractor.class, log); diff --git a/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java b/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java index d165413ba61943..abcb4d4645e904 100644 --- a/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java +++ b/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.io.StringReader; +import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -43,9 +44,10 @@ import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.cfg.SettingsManager; import org.elasticsearch.hadoop.mr.ESOutputFormat; +import org.elasticsearch.hadoop.rest.InitializationUtils; import org.elasticsearch.hadoop.serialization.SerializationUtils; import org.elasticsearch.hadoop.util.IOUtils; -import org.elasticsearch.hadoop.util.StringUtils; +import org.elasticsearch.hadoop.util.ObjectUtils; /** * Pig storage for reading and writing data into an ElasticSearch index. @@ -78,17 +80,19 @@ public class ESStorage extends LoadFunc implements StoreFuncInterface, StoreMeta private PigTuple pigTuple; public ESStorage() { - this(null); + this(new String[0]); } - public ESStorage(String configuration) { - if (StringUtils.hasText(configuration)) { + public ESStorage(String... configuration) { + if (!ObjectUtils.isEmpty(configuration)) { try { properties = new Properties(); - // replace ; with line separators - properties.load(new StringReader(configuration.replace(';', '\n'))); + for (String string : configuration) { + // replace ; with line separators + properties.load(new StringReader(string)); + } } catch (IOException ex) { - throw new IllegalArgumentException("Cannot parse options " + properties, ex); + throw new IllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex); } } } @@ -122,8 +126,11 @@ public void setStoreLocation(String location, Job job) throws IOException { private void init(String location, Job job) { Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties).setResource(location); boolean changed = false; + InitializationUtils.checkIdForOperation(settings); + changed |= SerializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log); changed |= SerializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log); + changed |= InitializationUtils.setIdExtractorIfNotSet(settings, PigIdExtractor.class, log); settings.save(); } diff --git a/src/main/java/org/elasticsearch/hadoop/pig/PigIdExtractor.java b/src/main/java/org/elasticsearch/hadoop/pig/PigIdExtractor.java new file mode 100644 index 00000000000000..f81aeaa669750c --- /dev/null +++ b/src/main/java/org/elasticsearch/hadoop/pig/PigIdExtractor.java @@ -0,0 +1,61 @@ +/* + * Copyright 2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elasticsearch.hadoop.pig; + +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.elasticsearch.hadoop.cfg.Settings; +import org.elasticsearch.hadoop.serialization.IdExtractor; +import org.elasticsearch.hadoop.serialization.SettingsAware; +import org.elasticsearch.hadoop.util.Assert; + +public class PigIdExtractor implements IdExtractor, SettingsAware { + + private String id; + + @Override + public String id(Object target) { + if (target instanceof PigTuple) { + PigTuple pt = (PigTuple) target; + ResourceFieldSchema[] fields = pt.getSchema().getSchema().getFields(); + + + for (int i = 0; i < fields.length; i++) { + ResourceFieldSchema field = fields[i]; + if (id.equals(field.getName())) { + byte type = field.getType(); + Assert.isTrue( + DataType.isAtomic(type), + String.format("Unsupported data type [%s] for id [%s]; use only 'primitives'", DataType.findTypeName(type), id)); + + try { + return pt.getTuple().get(i).toString(); + } catch (ExecException ex) { + throw new IllegalStateException(String.format("Cannot retrieve id field [%s]", id), ex); + } + } + } + } + + return null; + } + + @Override + public void setSettings(Settings settings) { + id = settings.getMappingId().trim().toLowerCase(); + } +} diff --git a/src/main/java/org/elasticsearch/hadoop/util/ObjectUtils.java b/src/main/java/org/elasticsearch/hadoop/util/ObjectUtils.java index bd08fe1a9e1769..0fd40b8b2066bc 100644 --- a/src/main/java/org/elasticsearch/hadoop/util/ObjectUtils.java +++ b/src/main/java/org/elasticsearch/hadoop/util/ObjectUtils.java @@ -50,4 +50,12 @@ public static T instantiate(String className, ClassLoader loader, Settings s return obj; } + + public static boolean isEmpty(byte[] array) { + return (array == null || array.length == 0); + } + + public static boolean isEmpty(String[] array) { + return (array == null || array.length == 0); + } } diff --git a/src/test/java/org/elasticsearch/hadoop/integration/pig/PigSaveTest.java b/src/test/java/org/elasticsearch/hadoop/integration/pig/PigSaveTest.java index f1e33441f664e7..9dd190a65257cf 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/pig/PigSaveTest.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/pig/PigSaveTest.java @@ -17,16 +17,19 @@ import java.util.Date; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.integration.Provisioner; import org.elasticsearch.hadoop.rest.RestClient; import org.elasticsearch.hadoop.util.TestSettings; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; +import org.junit.FixMethodOrder; import org.junit.Test; +import org.junit.runners.MethodSorters; /** */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class PigSaveTest { static PigWrapper pig; @@ -116,9 +119,63 @@ public void testEmptyComplexStructures() throws Exception { String script = "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" + "A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" + - "B = FOREACH A GENERATE (), [], {} ;" + + "AL = LIMIT A 10;" + + "B = FOREACH AL GENERATE (), [], {};" + "STORE B INTO 'pig/emptyconst' USING org.elasticsearch.hadoop.pig.ESStorage();"; pig.executeScript(script); } + + @Test + public void testCreateWithId() throws Exception { + String script = + "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" + + "A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" + + "B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" + + "STORE B INTO 'pig/createwithid' USING org.elasticsearch.hadoop.pig.ESStorage('" + + ConfigurationOptions.ES_WRITE_OPERATION + "=create','" + + ConfigurationOptions.ES_MAPPING_ID + "=id');"; + pig.executeScript(script); + } + + @Test(expected = IllegalStateException.class) + public void testCreateWithIdShouldFailOnDuplicate() throws Exception { + testCreateWithId(); + } + + @Test(expected = Exception.class) + public void testUpdateWithoutId() throws Exception { + String script = + "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" + + "A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" + + "B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" + + "STORE B INTO 'pig/updatewoid' USING org.elasticsearch.hadoop.pig.ESStorage('" + + ConfigurationOptions.ES_WRITE_OPERATION + "=update');"; + pig.executeScript(script); + } + + @Test + public void testUpdateWithId() throws Exception { + String script = + "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" + + "A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" + + "B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" + + "STORE B INTO 'pig/update' USING org.elasticsearch.hadoop.pig.ESStorage('" + + ConfigurationOptions.ES_WRITE_OPERATION + "=update','" + + ConfigurationOptions.ES_MAPPING_ID + "=id');"; + pig.executeScript(script); + } + + @Test(expected = IllegalStateException.class) + public void testUpdateWithoutUpsert() throws Exception { + String script = + "REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" + + "A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" + + "B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" + + "STORE B INTO 'pig/updatewoupsert' USING org.elasticsearch.hadoop.pig.ESStorage('" + + ConfigurationOptions.ES_WRITE_OPERATION + "=update','" + + ConfigurationOptions.ES_MAPPING_ID + "=id','" + + ConfigurationOptions.ES_UPSERT_DOC + "=false');"; + pig.executeScript(script); + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/hadoop/integration/pig/PigWrapper.java b/src/test/java/org/elasticsearch/hadoop/integration/pig/PigWrapper.java index 59754d0407338a..0bbe8a6a76c3e9 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/pig/PigWrapper.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/pig/PigWrapper.java @@ -16,12 +16,14 @@ package org.elasticsearch.hadoop.integration.pig; import java.io.ByteArrayInputStream; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.LogFactory; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.executionengine.ExecJob; import org.elasticsearch.hadoop.integration.HdpBootstrap; import org.elasticsearch.hadoop.util.StringUtils; import org.elasticsearch.hadoop.util.TestSettings; @@ -68,8 +70,14 @@ public void stop() { public void executeScript(String script) throws Exception { pig.registerScript(new ByteArrayInputStream(script.getBytes())); - pig.executeBatch(); + List executeBatch = pig.executeBatch(); + for (ExecJob execJob : executeBatch) { + if (execJob.getStatus() == ExecJob.JOB_STATUS.FAILED) { + throw new IllegalStateException("Pig execution failed"); + } + } pig.discardBatch(); pig.setBatchOn(); + } }