Skip to content

Commit

Permalink
align Hive writer with new command serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
costin committed Nov 7, 2013
1 parent c632783 commit 1bdbdc5
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 108 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.mr.ESOutputFormat;
import org.elasticsearch.hadoop.util.BytesArray;

/**
* Hive specific OutputFormat.
Expand All @@ -36,6 +37,8 @@ public class ESHiveOutputFormat extends ESOutputFormat implements HiveOutputForm

static class ESHiveRecordWriter extends ESOutputFormat.ESRecordWriter implements RecordWriter {

private BytesArray ba = new BytesArray(0);

public ESHiveRecordWriter(Configuration cfg) {
super(cfg);
}
Expand All @@ -49,7 +52,8 @@ public void write(Writable w) throws IOException {

if (w instanceof HiveEntityWritable) {
HiveEntityWritable hew = ((HiveEntityWritable) w);
client.writeToIndex(hew.getBytes(), hew.getLength(), hew.getId());
ba.bytes(hew.getBytes(), hew.getLength());
client.writeProcessedToIndex(ba);
}
else {
throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", HiveEntityWritable.class, w));
Expand Down
37 changes: 14 additions & 23 deletions src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java
Expand Up @@ -22,6 +22,8 @@
import java.util.Map.Entry;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
Expand All @@ -37,41 +39,38 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.cfg.PropertiesSettings;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.serialization.ContentBuilder;
import org.elasticsearch.hadoop.serialization.FieldExtractor;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.serialization.BulkCommands;
import org.elasticsearch.hadoop.serialization.Command;
import org.elasticsearch.hadoop.serialization.SerializationUtils;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

@SuppressWarnings("deprecation")
public class ESSerDe implements SerDe {

private Properties tableProperties;
private static Log log = LogFactory.getLog(ESSerDe.class);

private Properties tableProperties;
private StructObjectInspector inspector;

// serialization artifacts
private BytesArray scratchPad = new BytesArray(512);
private ValueWriter<HiveType> valueWriter;
private HiveType hiveType = new HiveType(null, null);
private HiveEntityWritable result = new HiveEntityWritable();
private StructTypeInfo structTypeInfo;
private FieldAlias alias;
private FieldExtractor idExtractor;
private Command command;

private boolean writeInitialized = false;

@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
inspector = HiveUtils.structObjectInspector(tbl);
structTypeInfo = HiveUtils.typeInfo(inspector);
alias = HiveUtils.alias(tbl);

alias = HiveUtils.alias(new PropertiesSettings(tbl));
this.tableProperties = tbl;
}

Expand Down Expand Up @@ -106,17 +105,11 @@ public Writable serialize(Object data, ObjectInspector objInspector) throws SerD

// serialize the type directly to json (to avoid converting to Writable and then serializing)
scratchPad.reset();
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(scratchPad);

hiveType.setObjectInspector(objInspector);
hiveType.setObject(data);
ContentBuilder.generate(bos, valueWriter).value(hiveType).flush().close();

command.write(hiveType).write(scratchPad);
result.setContent(scratchPad.bytes(), scratchPad.size());
if (idExtractor != null) {
String id = idExtractor.field(hiveType);
result.setId(id.getBytes(StringUtils.UTF_8));
}
return result;
}

Expand All @@ -126,12 +119,10 @@ private void lazyInitializeWrite() {
}
writeInitialized = true;
Settings settings = SettingsManager.loadFrom(tableProperties);
// TODO: externalize
valueWriter = new HiveValueWriter(alias);
InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, null);

idExtractor = (StringUtils.hasText(settings.getMappingId()) ?
ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(), settings) : null);
SerializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);
this.command = BulkCommands.create(settings);
}


Expand Down
Expand Up @@ -33,15 +33,13 @@ public class HiveEntityWritable extends BinaryComparable implements WritableComp

private int size;
private byte[] bytes;
private byte[] id;
private int idSize;

public HiveEntityWritable() {
bytes = null;
}

public int getLength() {
return size + idSize;
return size;
}

public byte[] getBytes() {
Expand All @@ -53,33 +51,14 @@ public void setContent(byte[] bytes, int size) {
this.size = size;
}

public void setId(byte[] id) {
this.id = id;
this.size += idSize;
}

public byte[] getId() {
return id;
}

// inherit javadoc
public void readFields(DataInput in) throws IOException {
size = in.readInt();
in.readFully(bytes, 0, size);
idSize = in.readInt();
if (idSize != 0) {
in.readFully(id, 0, idSize);
}
}

// inherit javadoc
public void write(DataOutput out) throws IOException {
out.writeInt(size);
out.write(bytes, 0, size);
out.writeInt(idSize);
if (idSize > 0) {
out.write(id, 0, idSize);
}
}

public int hashCode() {
Expand All @@ -100,11 +79,6 @@ public boolean equals(Object right_obj) {
*/
public String toString() {
StringBuilder sb = new StringBuilder();
if (idSize > 0) {
sb.append("id[");
sb.append(new String(id, 0, idSize, StringUtils.UTF_8));
sb.append("]=");
}
sb.append(new String(bytes, 0, size, StringUtils.UTF_8));
return sb.toString();
}
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/org/elasticsearch/hadoop/hive/HiveUtils.java
Expand Up @@ -73,8 +73,8 @@ static StructTypeInfo typeInfo(StructObjectInspector inspector) {
return (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(inspector);
}

static FieldAlias alias(Properties tableProperties) {
List<String> aliases = StringUtils.tokenize(tableProperties.getProperty(HiveConstants.MAPPING_NAMES), ",");
static FieldAlias alias(Settings settings) {
List<String> aliases = StringUtils.tokenize(settings.getProperty(HiveConstants.MAPPING_NAMES), ",");

Map<String, String> aliasMap = new LinkedHashMap<String, String>();

Expand All @@ -93,7 +93,7 @@ static FieldAlias alias(Properties tableProperties) {
}

// add default aliases for serialization (_colX -> mapping name)
Map<String, String> columnMap = columnMap(tableProperties);
Map<String, String> columnMap = columnMap(settings);

for (Entry<String, String> entry : columnMap.entrySet()) {
String columnName = entry.getKey();
Expand All @@ -112,10 +112,6 @@ static FieldAlias alias(Properties tableProperties) {
return new FieldAlias(aliasMap);
}

static Map<String, String> columnMap(Properties tableProperties) {
return columnMap(tableProperties.getProperty(HiveConstants.COLUMNS));
}

static Map<String, String> columnMap(Settings settings) {
return columnMap(settings.getProperty(HiveConstants.COLUMNS));
}
Expand Down
18 changes: 11 additions & 7 deletions src/main/java/org/elasticsearch/hadoop/hive/HiveValueWriter.java
Expand Up @@ -26,27 +26,25 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.serialization.ValueWriter;

/**
* Main value writer for hive. However since Hive expects a Writable type to be passed to the record reader,
* the raw JSON data needs to be wrapped (and unwrapped by {@link HiveEntityWritable}).
*/
public class HiveValueWriter implements ValueWriter<HiveType> {
public class HiveValueWriter implements SettingsAware, ValueWriter<HiveType> {

private final boolean writeUnknownTypes;
private final ValueWriter<Writable> writableWriter;
private final FieldAlias alias;
private FieldAlias alias;

public HiveValueWriter() {
this(new FieldAlias());
}

public HiveValueWriter(FieldAlias alias) {
this.writeUnknownTypes = false;
this.writableWriter = new HiveWritableValueWriter(false);
this.alias = alias;
this.alias = new FieldAlias();
}

@Override
Expand Down Expand Up @@ -129,7 +127,13 @@ private boolean write(Object data, ObjectInspector oi, Generator generator) {
return true;
}


protected boolean handleUnknown(Object value, ObjectInspector oi, Generator generator) {
return false;
}

@Override
public void setSettings(Settings settings) {
alias = HiveUtils.alias(settings);
}
}
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.hadoop.serialization.BulkCommands;
import org.elasticsearch.hadoop.serialization.Command;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.SerializedObject;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.BytesRef;
Expand All @@ -50,7 +49,7 @@ public class BufferedRestClient implements Closeable {
private int dataEntries = 0;
private boolean requiresRefreshAfterBulk = false;
private boolean executedBulkWrite = false;
private SerializedObject objectAlreadySerialized;
private BytesRef objectAlreadySerialized;
private boolean writeInitialized = false;

private RestClient client;
Expand All @@ -76,7 +75,7 @@ private void lazyInitWriting() {
writeInitialized = true;

data.bytes(new byte[settings.getBatchSizeInBytes()], 0);
objectAlreadySerialized = new SerializedObject();
objectAlreadySerialized = new BytesRef();
bufferEntriesThreshold = settings.getBatchSizeInEntries();
requiresRefreshAfterBulk = settings.getBatchRefreshAfterWrite();

Expand Down Expand Up @@ -108,7 +107,7 @@ public void writeToIndex(Object object) throws IOException {
Assert.notNull(object, "no object data given");

lazyInitWriting();
doWriteToIndex(object);
doWriteToIndex(command.write(object));
}

/**
Expand All @@ -117,23 +116,17 @@ public void writeToIndex(Object object) throws IOException {
* @param data as a byte array
* @param size the length to use from the given array
*/
public void writeToIndex(byte[] data, int size, byte[] id) throws IOException {
public void writeProcessedToIndex(BytesArray ba) throws IOException {
Assert.hasText(index, "no index given");
Assert.notNull(data, "no data given");
Assert.isTrue(size > 0, "no data given");
Assert.notNull(ba, "no data given");
Assert.isTrue(ba.size() > 0, "no data given");

lazyInitWriting();

objectAlreadySerialized.data = data;
objectAlreadySerialized.size = size;
objectAlreadySerialized.id = id;

objectAlreadySerialized.add(ba);
doWriteToIndex(objectAlreadySerialized);
}

private void doWriteToIndex(Object object) throws IOException {
BytesRef payload = command.write(object);

private void doWriteToIndex(BytesRef payload) throws IOException {
// check space first
if (payload.size() > data.available()) {
flushBatch();
Expand All @@ -142,7 +135,6 @@ private void doWriteToIndex(Object object) throws IOException {
payload.write(data);

dataEntries++;

if (bufferEntriesThreshold > 0 && dataEntries >= bufferEntriesThreshold) {
flushBatch();
}
Expand Down

This file was deleted.

Expand Up @@ -17,6 +17,7 @@

import java.util.Properties;

import org.elasticsearch.hadoop.cfg.PropertiesSettings;
import org.junit.Test;

import static org.junit.Assert.*;
Expand All @@ -27,7 +28,7 @@ public class FieldAliasTest {
public void testFieldMap() throws Exception {
Properties tableProperties = new Properties();
tableProperties.put("es.column.aliases", "timestamp:@timestamp , foo:123foo");
FieldAlias alias = HiveUtils.alias(tableProperties);
FieldAlias alias = HiveUtils.alias(new PropertiesSettings(tableProperties));
assertEquals("@timestamp", alias.toES("timestamp"));
assertEquals("123foo", alias.toES("foo"));
assertEquals("bar", alias.toES("BaR"));
Expand Down

0 comments on commit 1bdbdc5

Please sign in to comment.