Skip to content

Commit

Permalink
add new commands infrastructure to Hive
Browse files Browse the repository at this point in the history
relates to elastic#69
  • Loading branch information
costin committed Oct 31, 2013
1 parent e8cee57 commit e375e60
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 64 deletions.
Expand Up @@ -42,12 +42,12 @@ public ESHiveRecordWriter(Configuration cfg) {

@Override
public void write(Writable w) throws IOException {
if (w instanceof FastBytesWritable) {
FastBytesWritable fbw = ((FastBytesWritable) w);
client.writeToIndex(fbw.getBytes(), fbw.getLength());
if (w instanceof HiveEntityWritable) {
HiveEntityWritable hew = ((HiveEntityWritable) w);
client.writeToIndex(hew.getBytes(), hew.getLength(), hew.getId());
}
else {
throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", FastBytesWritable.class, w));
throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", HiveEntityWritable.class, w));
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/main/java/org/elasticsearch/hadoop/hive/ESSerDe.java
Expand Up @@ -37,10 +37,15 @@
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.serialization.ContentBuilder;
import org.elasticsearch.hadoop.serialization.IdExtractor;
import org.elasticsearch.hadoop.serialization.ValueWriter;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

@SuppressWarnings("deprecation")
public class ESSerDe implements SerDe {
Expand All @@ -51,16 +56,20 @@ public class ESSerDe implements SerDe {
private BytesArray scratchPad = new BytesArray(512);
private ValueWriter<HiveType> valueWriter;
private HiveType hiveType = new HiveType(null, null);
private FastBytesWritable result = new FastBytesWritable();
private HiveEntityWritable result = new HiveEntityWritable();
private StructTypeInfo structTypeInfo;
private FieldAlias alias;
private IdExtractor idExtractor;

@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
inspector = HiveUtils.structObjectInspector(tbl);
structTypeInfo = HiveUtils.typeInfo(inspector);
alias = HiveUtils.alias(tbl);
valueWriter = new HiveValueWriter(alias);
Settings settings = SettingsManager.loadFrom(tbl);
idExtractor = (StringUtils.hasText(settings.getMappingId()) ?
ObjectUtils.<IdExtractor> instantiate(settings.getMappingIdExtractorClassName(), settings) : null);
}

@Override
Expand All @@ -85,21 +94,24 @@ public SerDeStats getSerDeStats() {

@Override
public Class<? extends Writable> getSerializedClass() {
return FastBytesWritable.class;
return HiveEntityWritable.class;
}

@Override
public Writable serialize(Object data, ObjectInspector objInspector) throws SerDeException {
// serialize the type directly to json
// serialize the type directly to json (to avoid converting to Writable and then serializing)
scratchPad.reset();
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(scratchPad);

hiveType.setObjectInspector(objInspector);
hiveType.setObject(data);

ContentBuilder.generate(bos, valueWriter).value(hiveType).flush().close();

result.set(scratchPad.bytes(), scratchPad.size());
result.setContent(scratchPad.bytes(), scratchPad.size());
if (idExtractor != null) {
String id = idExtractor.id(hiveType);
result.setId(id.getBytes(StringUtils.UTF_8));
}
return result;
}

Expand Down
Expand Up @@ -23,56 +23,55 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.elasticsearch.hadoop.util.StringUtils;

/**
* Replacement of {@link BytesWritable} that allows direct access to the underlying byte array without copying.
* Used to wrap already json serialized hive entities.
*/
public class FastBytesWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {
public class HiveEntityWritable extends BinaryComparable implements WritableComparable<BinaryComparable> {

private int size;
private byte[] bytes;
private byte[] id = new byte[0];

public FastBytesWritable() {
public HiveEntityWritable() {
bytes = null;
}

/**
* Create a BytesWritable using the byte array as the initial value.
* @param bytes This array becomes the backing storage for the object.
*/
public FastBytesWritable(byte[] bytes, int size) {
set(bytes, size);
public int getLength() {
return size + id.length;
}

/**
* Get the current size of the buffer.
*/
public int getLength() {
return size;
public byte[] getBytes() {
return bytes;
}

public void set(byte[] bytes, int size) {
public void setContent(byte[] bytes, int size) {
this.bytes = bytes;
this.size = size;
}
/**
* Get the data from the BytesWritable.
* @return The data is only valid between 0 and getLength() - 1.
*/
public byte[] getBytes() {
return bytes;

public void setId(byte[] id) {
this.id = id;
}

public byte[] getId() {
return id;
}

// inherit javadoc
public void readFields(DataInput in) throws IOException {
size = in.readInt();
in.readFully(bytes, 0, size);
in.readFully(id, 0, id.length);
}

// inherit javadoc
public void write(DataOutput out) throws IOException {
out.writeInt(size);
out.write(bytes, 0, size);
out.write(id, 0, id.length);
}

public int hashCode() {
Expand All @@ -83,7 +82,7 @@ public int hashCode() {
* Are the two byte sequences equal?
*/
public boolean equals(Object right_obj) {
if (right_obj instanceof FastBytesWritable)
if (right_obj instanceof HiveEntityWritable)
return super.equals(right_obj);
return false;
}
Expand All @@ -92,26 +91,20 @@ public boolean equals(Object right_obj) {
* Generate the stream of bytes as hex pairs separated by ' '.
*/
public String toString() {
StringBuffer sb = new StringBuffer(3 * size);
for (int idx = 0; idx < size; idx++) {
// if not the first, put a blank separator in
if (idx != 0) {
sb.append(' ');
}
String num = Integer.toHexString(0xff & bytes[idx]);
// if it is only one digit, add a leading 0.
if (num.length() < 2) {
sb.append('0');
}
sb.append(num);
StringBuilder sb = new StringBuilder();
if (id != null && id.length > 0) {
sb.append("id[");
sb.append(new String(id, 0, id.length, StringUtils.UTF_8));
sb.append("]=");
}
sb.append(new String(bytes, 0, size, StringUtils.UTF_8));
return sb.toString();
}


public static class Comparator extends WritableComparator {
public Comparator() {
super(FastBytesWritable.class);
super(HiveEntityWritable.class);
}

/**
Expand All @@ -123,6 +116,6 @@ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
}

static { // register this comparator
WritableComparator.define(FastBytesWritable.class, new Comparator());
WritableComparator.define(HiveEntityWritable.class, new Comparator());
}
}
54 changes: 54 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/hive/HiveIdExtractor.java
@@ -0,0 +1,54 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.elasticsearch.hadoop.hive;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.IdExtractor;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.util.Assert;

public class HiveIdExtractor implements IdExtractor, SettingsAware {

private String id;

@Override
public String id(Object target) {
if (target instanceof HiveType) {
HiveType type = (HiveType) target;
ObjectInspector inspector = type.getObjectInspector();
if (inspector instanceof StructObjectInspector) {
StructObjectInspector soi = (StructObjectInspector) inspector;
StructField field = soi.getStructFieldRef(id);
ObjectInspector foi = field.getFieldObjectInspector();
Assert.isTrue(foi.getCategory() == ObjectInspector.Category.PRIMITIVE,
String.format("Id field [%s] needs to be a primitive; found [%s]", id, foi.getTypeName()));

// expecting a writeable - simply do a toString
return soi.getStructFieldData(target, field).toString();
}
}

return null;
}

@Override
public void setSettings(Settings settings) {
id = settings.getMappingId().trim().toLowerCase();
}
}
Expand Up @@ -31,7 +31,7 @@

/**
* Main value writer for hive. However since Hive expects a Writable type to be passed to the record reader,
* the raw JSON data needs to be wrapped (and unwrapped by {@link FastBytesWritable}).
* the raw JSON data needs to be wrapped (and unwrapped by {@link HiveEntityWritable}).
*/
public class HiveValueWriter implements ValueWriter<HiveType> {

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.hadoop.serialization.BulkCommands;
import org.elasticsearch.hadoop.serialization.Command;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.SerializedObject;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;

Expand All @@ -47,7 +48,7 @@ public class BufferedRestClient implements Closeable {
private int dataEntries = 0;
private boolean requiresRefreshAfterBulk = false;
private boolean executedBulkWrite = false;

private SerializedObject objectAlreadySerialized;
private boolean writeInitialized = false;

private RestClient client;
Expand All @@ -73,6 +74,7 @@ private void lazyInitWriting() {
writeInitialized = true;

data.bytes(new byte[settings.getBatchSizeInBytes()], 0);
objectAlreadySerialized = new SerializedObject();
bufferEntriesThreshold = settings.getBatchSizeInEntries();
requiresRefreshAfterBulk = settings.getBatchRefreshAfterWrite();

Expand Down Expand Up @@ -113,18 +115,24 @@ public void writeToIndex(Object object) throws IOException {
* @param data as a byte array
* @param size the length to use from the given array
*/
public void writeToIndex(byte[] data, int size) throws IOException {
public void writeToIndex(byte[] data, int size, byte[] id) throws IOException {
Assert.hasText(index, "no index given");
Assert.notNull(data, "no data given");
Assert.isTrue(size > 0, "no data given");

throw new UnsupportedOperationException();
lazyInitWriting();

objectAlreadySerialized.data = data;
objectAlreadySerialized.size = size;
objectAlreadySerialized.id = id;

doWriteToIndex(objectAlreadySerialized);
}

private void doWriteToIndex(Object object) throws IOException {
int entrySize = command.prepare(object);

// make some space first
// check space first
if (entrySize + data.size() > data.capacity()) {
flushBatch();
}
Expand Down

0 comments on commit e375e60

Please sign in to comment.