Skip to content

Commit

Permalink
add id awareness to Pig
Browse files Browse the repository at this point in the history
relates to elastic#69
  • Loading branch information
costin committed Oct 31, 2013
1 parent 819f5d3 commit 97ac095
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 10 deletions.
Expand Up @@ -83,6 +83,9 @@ private void init(TableDesc tableDesc) {

// NB: ESSerDe is already initialized at this stage
// NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes

InitializationUtils.checkIdForOperation(settings);

SerializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
SerializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
InitializationUtils.setIdExtractorIfNotSet(settings, HiveIdExtractor.class, log);
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
Expand All @@ -43,9 +44,10 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.ESOutputFormat;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;

/**
* Pig storage for reading and writing data into an ElasticSearch index.
Expand Down Expand Up @@ -78,17 +80,19 @@ public class ESStorage extends LoadFunc implements StoreFuncInterface, StoreMeta
private PigTuple pigTuple;

public ESStorage() {
this(null);
this(new String[0]);
}

public ESStorage(String configuration) {
if (StringUtils.hasText(configuration)) {
public ESStorage(String... configuration) {
if (!ObjectUtils.isEmpty(configuration)) {
try {
properties = new Properties();
// replace ; with line separators
properties.load(new StringReader(configuration.replace(';', '\n')));
for (String string : configuration) {
// replace ; with line separators
properties.load(new StringReader(string));
}
} catch (IOException ex) {
throw new IllegalArgumentException("Cannot parse options " + properties, ex);
throw new IllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex);
}
}
}
Expand Down Expand Up @@ -122,8 +126,11 @@ public void setStoreLocation(String location, Job job) throws IOException {
private void init(String location, Job job) {
Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties).setResource(location);
boolean changed = false;
InitializationUtils.checkIdForOperation(settings);

changed |= SerializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
changed |= SerializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
changed |= InitializationUtils.setIdExtractorIfNotSet(settings, PigIdExtractor.class, log);
settings.save();
}

Expand Down
61 changes: 61 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/pig/PigIdExtractor.java
@@ -0,0 +1,61 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.elasticsearch.hadoop.pig;

import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.IdExtractor;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.util.Assert;

public class PigIdExtractor implements IdExtractor, SettingsAware {

private String id;

@Override
public String id(Object target) {
if (target instanceof PigTuple) {
PigTuple pt = (PigTuple) target;
ResourceFieldSchema[] fields = pt.getSchema().getSchema().getFields();


for (int i = 0; i < fields.length; i++) {
ResourceFieldSchema field = fields[i];
if (id.equals(field.getName())) {
byte type = field.getType();
Assert.isTrue(
DataType.isAtomic(type),
String.format("Unsupported data type [%s] for id [%s]; use only 'primitives'", DataType.findTypeName(type), id));

try {
return pt.getTuple().get(i).toString();
} catch (ExecException ex) {
throw new IllegalStateException(String.format("Cannot retrieve id field [%s]", id), ex);
}
}
}
}

return null;
}

@Override
public void setSettings(Settings settings) {
id = settings.getMappingId().trim().toLowerCase();
}
}
8 changes: 8 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/util/ObjectUtils.java
Expand Up @@ -50,4 +50,12 @@ public static <T> T instantiate(String className, ClassLoader loader, Settings s

return obj;
}

public static boolean isEmpty(byte[] array) {
return (array == null || array.length == 0);
}

public static boolean isEmpty(String[] array) {
return (array == null || array.length == 0);
}
}
Expand Up @@ -17,16 +17,19 @@

import java.util.Date;

import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.integration.Provisioner;
import org.elasticsearch.hadoop.rest.RestClient;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;

/**
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class PigSaveTest {

static PigWrapper pig;
Expand Down Expand Up @@ -116,9 +119,63 @@ public void testEmptyComplexStructures() throws Exception {
String script =
"REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
"A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" +
"B = FOREACH A GENERATE (), [], {} ;" +
"AL = LIMIT A 10;" +
"B = FOREACH AL GENERATE (), [], {};" +
"STORE B INTO 'pig/emptyconst' USING org.elasticsearch.hadoop.pig.ESStorage();";

pig.executeScript(script);
}

@Test
public void testCreateWithId() throws Exception {
String script =
"REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
"A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" +
"B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" +
"STORE B INTO 'pig/createwithid' USING org.elasticsearch.hadoop.pig.ESStorage('"
+ ConfigurationOptions.ES_WRITE_OPERATION + "=create','"
+ ConfigurationOptions.ES_MAPPING_ID + "=id');";
pig.executeScript(script);
}

@Test(expected = IllegalStateException.class)
public void testCreateWithIdShouldFailOnDuplicate() throws Exception {
testCreateWithId();
}

@Test(expected = Exception.class)
public void testUpdateWithoutId() throws Exception {
String script =
"REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
"A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" +
"B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" +
"STORE B INTO 'pig/updatewoid' USING org.elasticsearch.hadoop.pig.ESStorage('"
+ ConfigurationOptions.ES_WRITE_OPERATION + "=update');";
pig.executeScript(script);
}

@Test
public void testUpdateWithId() throws Exception {
String script =
"REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
"A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" +
"B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" +
"STORE B INTO 'pig/update' USING org.elasticsearch.hadoop.pig.ESStorage('"
+ ConfigurationOptions.ES_WRITE_OPERATION + "=update','"
+ ConfigurationOptions.ES_MAPPING_ID + "=id');";
pig.executeScript(script);
}

@Test(expected = IllegalStateException.class)
public void testUpdateWithoutUpsert() throws Exception {
String script =
"REGISTER "+ Provisioner.ESHADOOP_TESTING_JAR + ";" +
"A = LOAD 'src/test/resources/artists.dat' USING PigStorage() AS (id:long, name:chararray, url:chararray, picture: chararray);" +
"B = FOREACH A GENERATE id, name, TOBAG(url, picture) AS links;" +
"STORE B INTO 'pig/updatewoupsert' USING org.elasticsearch.hadoop.pig.ESStorage('"
+ ConfigurationOptions.ES_WRITE_OPERATION + "=update','"
+ ConfigurationOptions.ES_MAPPING_ID + "=id','"
+ ConfigurationOptions.ES_UPSERT_DOC + "=false');";
pig.executeScript(script);
}
}
Expand Up @@ -16,12 +16,14 @@
package org.elasticsearch.hadoop.integration.pig;

import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.elasticsearch.hadoop.integration.HdpBootstrap;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.TestSettings;
Expand Down Expand Up @@ -68,8 +70,14 @@ public void stop() {

public void executeScript(String script) throws Exception {
pig.registerScript(new ByteArrayInputStream(script.getBytes()));
pig.executeBatch();
List<ExecJob> executeBatch = pig.executeBatch();
for (ExecJob execJob : executeBatch) {
if (execJob.getStatus() == ExecJob.JOB_STATUS.FAILED) {
throw new IllegalStateException("Pig execution failed");
}
}
pig.discardBatch();
pig.setBatchOn();

}
}

0 comments on commit 97ac095

Please sign in to comment.