From e375e60c1ea85e29b760be1795b9ec500e1d6f53 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Wed, 30 Oct 2013 18:39:48 +0200 Subject: [PATCH] add new commands infrastructure to Hive relates to #69 --- .../hadoop/hive/ESHiveOutputFormat.java | 8 +-- .../elasticsearch/hadoop/hive/ESSerDe.java | 22 +++++-- ...sWritable.java => HiveEntityWritable.java} | 63 +++++++++---------- .../hadoop/hive/HiveIdExtractor.java | 54 ++++++++++++++++ .../hadoop/hive/HiveValueWriter.java | 2 +- .../hadoop/rest/BufferedRestClient.java | 16 +++-- .../hadoop/serialization/AbstractCommand.java | 52 ++++++++++----- .../serialization/SerializedObject.java | 30 +++++++++ 8 files changed, 183 insertions(+), 64 deletions(-) rename src/main/java/org/elasticsearch/hadoop/hive/{FastBytesWritable.java => HiveEntityWritable.java} (65%) create mode 100644 src/main/java/org/elasticsearch/hadoop/hive/HiveIdExtractor.java create mode 100644 src/main/java/org/elasticsearch/hadoop/serialization/SerializedObject.java diff --git a/src/main/java/org/elasticsearch/hadoop/hive/ESHiveOutputFormat.java b/src/main/java/org/elasticsearch/hadoop/hive/ESHiveOutputFormat.java index 77e8e4739e2e48..b4a469daf7ef56 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/ESHiveOutputFormat.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/ESHiveOutputFormat.java @@ -42,12 +42,12 @@ public ESHiveRecordWriter(Configuration cfg) { @Override public void write(Writable w) throws IOException { - if (w instanceof FastBytesWritable) { - FastBytesWritable fbw = ((FastBytesWritable) w); - client.writeToIndex(fbw.getBytes(), fbw.getLength()); + if (w instanceof HiveEntityWritable) { + HiveEntityWritable hew = ((HiveEntityWritable) w); + client.writeToIndex(hew.getBytes(), hew.getLength(), hew.getId()); } else { - throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", FastBytesWritable.class, w)); + throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", HiveEntityWritable.class, w)); } } diff --git a/src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java b/src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java index 3ad2501bc9d5d5..5d55cf49a7cb1d 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java @@ -37,10 +37,15 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.elasticsearch.hadoop.cfg.Settings; +import org.elasticsearch.hadoop.cfg.SettingsManager; import org.elasticsearch.hadoop.serialization.ContentBuilder; +import org.elasticsearch.hadoop.serialization.IdExtractor; import org.elasticsearch.hadoop.serialization.ValueWriter; import org.elasticsearch.hadoop.util.BytesArray; import org.elasticsearch.hadoop.util.FastByteArrayOutputStream; +import org.elasticsearch.hadoop.util.ObjectUtils; +import org.elasticsearch.hadoop.util.StringUtils; @SuppressWarnings("deprecation") public class ESSerDe implements SerDe { @@ -51,9 +56,10 @@ public class ESSerDe implements SerDe { private BytesArray scratchPad = new BytesArray(512); private ValueWriter valueWriter; private HiveType hiveType = new HiveType(null, null); - private FastBytesWritable result = new FastBytesWritable(); + private HiveEntityWritable result = new HiveEntityWritable(); private StructTypeInfo structTypeInfo; private FieldAlias alias; + private IdExtractor idExtractor; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { @@ -61,6 +67,9 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException structTypeInfo = HiveUtils.typeInfo(inspector); alias = HiveUtils.alias(tbl); valueWriter = new HiveValueWriter(alias); + Settings settings = SettingsManager.loadFrom(tbl); + idExtractor = (StringUtils.hasText(settings.getMappingId()) ? + ObjectUtils. instantiate(settings.getMappingIdExtractorClassName(), settings) : null); } @Override @@ -85,21 +94,24 @@ public SerDeStats getSerDeStats() { @Override public Class getSerializedClass() { - return FastBytesWritable.class; + return HiveEntityWritable.class; } @Override public Writable serialize(Object data, ObjectInspector objInspector) throws SerDeException { - // serialize the type directly to json + // serialize the type directly to json (to avoid converting to Writable and then serializing) scratchPad.reset(); FastByteArrayOutputStream bos = new FastByteArrayOutputStream(scratchPad); hiveType.setObjectInspector(objInspector); hiveType.setObject(data); - ContentBuilder.generate(bos, valueWriter).value(hiveType).flush().close(); - result.set(scratchPad.bytes(), scratchPad.size()); + result.setContent(scratchPad.bytes(), scratchPad.size()); + if (idExtractor != null) { + String id = idExtractor.id(hiveType); + result.setId(id.getBytes(StringUtils.UTF_8)); + } return result; } diff --git a/src/main/java/org/elasticsearch/hadoop/hive/FastBytesWritable.java b/src/main/java/org/elasticsearch/hadoop/hive/HiveEntityWritable.java similarity index 65% rename from src/main/java/org/elasticsearch/hadoop/hive/FastBytesWritable.java rename to src/main/java/org/elasticsearch/hadoop/hive/HiveEntityWritable.java index 3a19cc7d3756a1..0ebeceb5d7d15c 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/FastBytesWritable.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/HiveEntityWritable.java @@ -23,56 +23,55 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; +import org.elasticsearch.hadoop.util.StringUtils; /** * Replacement of {@link BytesWritable} that allows direct access to the underlying byte array without copying. + * Used to wrap already json serialized hive entities. */ -public class FastBytesWritable extends BinaryComparable implements WritableComparable { +public class HiveEntityWritable extends BinaryComparable implements WritableComparable { private int size; private byte[] bytes; + private byte[] id = new byte[0]; - public FastBytesWritable() { + public HiveEntityWritable() { bytes = null; } - /** - * Create a BytesWritable using the byte array as the initial value. - * @param bytes This array becomes the backing storage for the object. - */ - public FastBytesWritable(byte[] bytes, int size) { - set(bytes, size); + public int getLength() { + return size + id.length; } - /** - * Get the current size of the buffer. - */ - public int getLength() { - return size; + public byte[] getBytes() { + return bytes; } - public void set(byte[] bytes, int size) { + public void setContent(byte[] bytes, int size) { this.bytes = bytes; this.size = size; } - /** - * Get the data from the BytesWritable. - * @return The data is only valid between 0 and getLength() - 1. - */ - public byte[] getBytes() { - return bytes; + + public void setId(byte[] id) { + this.id = id; + } + + public byte[] getId() { + return id; } // inherit javadoc public void readFields(DataInput in) throws IOException { size = in.readInt(); in.readFully(bytes, 0, size); + in.readFully(id, 0, id.length); } // inherit javadoc public void write(DataOutput out) throws IOException { out.writeInt(size); out.write(bytes, 0, size); + out.write(id, 0, id.length); } public int hashCode() { @@ -83,7 +82,7 @@ public int hashCode() { * Are the two byte sequences equal? */ public boolean equals(Object right_obj) { - if (right_obj instanceof FastBytesWritable) + if (right_obj instanceof HiveEntityWritable) return super.equals(right_obj); return false; } @@ -92,26 +91,20 @@ public boolean equals(Object right_obj) { * Generate the stream of bytes as hex pairs separated by ' '. */ public String toString() { - StringBuffer sb = new StringBuffer(3 * size); - for (int idx = 0; idx < size; idx++) { - // if not the first, put a blank separator in - if (idx != 0) { - sb.append(' '); - } - String num = Integer.toHexString(0xff & bytes[idx]); - // if it is only one digit, add a leading 0. - if (num.length() < 2) { - sb.append('0'); - } - sb.append(num); + StringBuilder sb = new StringBuilder(); + if (id != null && id.length > 0) { + sb.append("id["); + sb.append(new String(id, 0, id.length, StringUtils.UTF_8)); + sb.append("]="); } + sb.append(new String(bytes, 0, size, StringUtils.UTF_8)); return sb.toString(); } public static class Comparator extends WritableComparator { public Comparator() { - super(FastBytesWritable.class); + super(HiveEntityWritable.class); } /** @@ -123,6 +116,6 @@ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { } static { // register this comparator - WritableComparator.define(FastBytesWritable.class, new Comparator()); + WritableComparator.define(HiveEntityWritable.class, new Comparator()); } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/hadoop/hive/HiveIdExtractor.java b/src/main/java/org/elasticsearch/hadoop/hive/HiveIdExtractor.java new file mode 100644 index 00000000000000..a38c145e0637ec --- /dev/null +++ b/src/main/java/org/elasticsearch/hadoop/hive/HiveIdExtractor.java @@ -0,0 +1,54 @@ +/* + * Copyright 2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elasticsearch.hadoop.hive; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.elasticsearch.hadoop.cfg.Settings; +import org.elasticsearch.hadoop.serialization.IdExtractor; +import org.elasticsearch.hadoop.serialization.SettingsAware; +import org.elasticsearch.hadoop.util.Assert; + +public class HiveIdExtractor implements IdExtractor, SettingsAware { + + private String id; + + @Override + public String id(Object target) { + if (target instanceof HiveType) { + HiveType type = (HiveType) target; + ObjectInspector inspector = type.getObjectInspector(); + if (inspector instanceof StructObjectInspector) { + StructObjectInspector soi = (StructObjectInspector) inspector; + StructField field = soi.getStructFieldRef(id); + ObjectInspector foi = field.getFieldObjectInspector(); + Assert.isTrue(foi.getCategory() == ObjectInspector.Category.PRIMITIVE, + String.format("Id field [%s] needs to be a primitive; found [%s]", id, foi.getTypeName())); + + // expecting a writeable - simply do a toString + return soi.getStructFieldData(target, field).toString(); + } + } + + return null; + } + + @Override + public void setSettings(Settings settings) { + id = settings.getMappingId().trim().toLowerCase(); + } +} diff --git a/src/main/java/org/elasticsearch/hadoop/hive/HiveValueWriter.java b/src/main/java/org/elasticsearch/hadoop/hive/HiveValueWriter.java index d030fd80c4b7c3..dadc7f1d505320 100644 --- a/src/main/java/org/elasticsearch/hadoop/hive/HiveValueWriter.java +++ b/src/main/java/org/elasticsearch/hadoop/hive/HiveValueWriter.java @@ -31,7 +31,7 @@ /** * Main value writer for hive. However since Hive expects a Writable type to be passed to the record reader, - * the raw JSON data needs to be wrapped (and unwrapped by {@link FastBytesWritable}). + * the raw JSON data needs to be wrapped (and unwrapped by {@link HiveEntityWritable}). */ public class HiveValueWriter implements ValueWriter { diff --git a/src/main/java/org/elasticsearch/hadoop/rest/BufferedRestClient.java b/src/main/java/org/elasticsearch/hadoop/rest/BufferedRestClient.java index f719e8d2113347..b7d19552071f2e 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/BufferedRestClient.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/BufferedRestClient.java @@ -30,6 +30,7 @@ import org.elasticsearch.hadoop.serialization.BulkCommands; import org.elasticsearch.hadoop.serialization.Command; import org.elasticsearch.hadoop.serialization.ScrollReader; +import org.elasticsearch.hadoop.serialization.SerializedObject; import org.elasticsearch.hadoop.util.Assert; import org.elasticsearch.hadoop.util.BytesArray; @@ -47,7 +48,7 @@ public class BufferedRestClient implements Closeable { private int dataEntries = 0; private boolean requiresRefreshAfterBulk = false; private boolean executedBulkWrite = false; - + private SerializedObject objectAlreadySerialized; private boolean writeInitialized = false; private RestClient client; @@ -73,6 +74,7 @@ private void lazyInitWriting() { writeInitialized = true; data.bytes(new byte[settings.getBatchSizeInBytes()], 0); + objectAlreadySerialized = new SerializedObject(); bufferEntriesThreshold = settings.getBatchSizeInEntries(); requiresRefreshAfterBulk = settings.getBatchRefreshAfterWrite(); @@ -113,18 +115,24 @@ public void writeToIndex(Object object) throws IOException { * @param data as a byte array * @param size the length to use from the given array */ - public void writeToIndex(byte[] data, int size) throws IOException { + public void writeToIndex(byte[] data, int size, byte[] id) throws IOException { Assert.hasText(index, "no index given"); Assert.notNull(data, "no data given"); Assert.isTrue(size > 0, "no data given"); - throw new UnsupportedOperationException(); + lazyInitWriting(); + + objectAlreadySerialized.data = data; + objectAlreadySerialized.size = size; + objectAlreadySerialized.id = id; + + doWriteToIndex(objectAlreadySerialized); } private void doWriteToIndex(Object object) throws IOException { int entrySize = command.prepare(object); - // make some space first + // check space first if (entrySize + data.size() > data.capacity()) { flushBatch(); } diff --git a/src/main/java/org/elasticsearch/hadoop/serialization/AbstractCommand.java b/src/main/java/org/elasticsearch/hadoop/serialization/AbstractCommand.java index f7b0e332a4c060..0e6fd9e093f16a 100644 --- a/src/main/java/org/elasticsearch/hadoop/serialization/AbstractCommand.java +++ b/src/main/java/org/elasticsearch/hadoop/serialization/AbstractCommand.java @@ -40,8 +40,8 @@ abstract class AbstractCommand implements Command { AbstractCommand(Settings settings) { this.valueWriter = ObjectUtils.instantiate(settings.getSerializerValueWriterClassName(), settings); - this.idExtractor = (StringUtils.hasText(settings.getMappingId()) ? - ObjectUtils. instantiate(settings.getMappingIdExtractorClassName(), settings) : null); + this.idExtractor = (StringUtils.hasText(settings.getMappingId()) ? ObjectUtils. instantiate( + settings.getMappingIdExtractorClassName(), settings) : null); if (log.isTraceEnabled()) { log.trace(String.format("Instantiated value writer [%s]", valueWriter)); @@ -53,12 +53,21 @@ abstract class AbstractCommand implements Command { @Override public int prepare(Object object) { - String id = extractId(object); - int entrySize = 0; - if (StringUtils.hasText(id)) { - idAsBytes = id.getBytes(StringUtils.UTF_8); + if (object instanceof SerializedObject) { + SerializedObject so = ((SerializedObject) object); + idAsBytes = so.id; + } + + else { + String id = extractId(object); + if (StringUtils.hasText(id)) { + idAsBytes = id.getBytes(StringUtils.UTF_8); + } + } + + if (idAsBytes != null) { entrySize += headerPrefix().length; entrySize += idAsBytes.length; entrySize += headerSuffix().length; @@ -68,21 +77,27 @@ public int prepare(Object object) { throw new IllegalArgumentException(String.format( "Operation [%s] requires an id but none was given/found", this.toString())); } - - idAsBytes = null; - entrySize += header().length; + else { + entrySize += header().length; + } } if (isSourceRequired()) { - serialize(object); - // add the trailing \n - entrySize += scratchPad.size() + 1; + if (object instanceof SerializedObject) { + entrySize += ((SerializedObject) object).size; + } + else { + serialize(object); + // add the trailing \n + entrySize += scratchPad.size(); + } + // trailing \n + entrySize++; } return entrySize; } - @Override public void write(Object object, BytesArray buffer) { writeActionAndMetadata(object, buffer); @@ -120,8 +135,15 @@ protected boolean isSourceRequired() { protected void writeSource(Object object, BytesArray buffer) { // object was serialized - just write it down - System.arraycopy(scratchPad.bytes(), 0, buffer.bytes(), buffer.size(), scratchPad.size()); - buffer.increment(scratchPad.size()); + if (object instanceof SerializedObject) { + SerializedObject so = (SerializedObject) object; + System.arraycopy(so.data, 0, buffer.bytes(), buffer.size(), so.size); + buffer.increment(so.size); + } + else { + System.arraycopy(scratchPad.bytes(), 0, buffer.bytes(), buffer.size(), scratchPad.size()); + buffer.increment(scratchPad.size()); + } } private void writeTrailingReturn(BytesArray buffer) { diff --git a/src/main/java/org/elasticsearch/hadoop/serialization/SerializedObject.java b/src/main/java/org/elasticsearch/hadoop/serialization/SerializedObject.java new file mode 100644 index 00000000000000..d649d57891a942 --- /dev/null +++ b/src/main/java/org/elasticsearch/hadoop/serialization/SerializedObject.java @@ -0,0 +1,30 @@ +/* + * Copyright 2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.elasticsearch.hadoop.serialization; + +/** + * Simple container used for indicating to a {@link Command}, the data has been processed already. + */ +public class SerializedObject { + + public byte[] data; + public int size; + public byte[] id; + + public int totalSize() { + return size + (id != null ? id.length : 0); + } +}