Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implemented (experimental) sparse record format, which means that mos…
…tly-empty records are efficiently encoded
  • Loading branch information
justinsb committed Nov 8, 2009
1 parent 1d40a42 commit 7f6ad25
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 21 deletions.
39 changes: 31 additions & 8 deletions src/java/org/apache/avro/Schema.java
Expand Up @@ -64,13 +64,21 @@ public abstract class Schema {
FACTORY.enableParserFeature(JsonParser.Feature.ALLOW_COMMENTS);
FACTORY.setCodec(MAPPER);
}


/** Encoding styles for records */
public enum Encoding {
DEFAULT, SPARSE;
};

/** The type of a schema. */
public enum Type {
RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES,
INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL;
INT(0), LONG(0L), FLOAT(0f), DOUBLE(0.0), BOOLEAN(false), NULL;
private String name;
private Type() { this.name = this.name().toLowerCase(); }
private Object defaultValue;
private Type() { this(null); }
private Type(Object defaultValue) { this.name = this.name().toLowerCase(); this.defaultValue = defaultValue; }
public Object defaultValue() { return defaultValue; }
};

private final Type type;
Expand All @@ -94,15 +102,15 @@ public static Schema create(Type type) {

/** Create an anonymous record schema. */
public static Schema createRecord(LinkedHashMap<String,Field> fields) {
Schema result = createRecord(null, null, false);
Schema result = createRecord(null, null, false, null);
result.setFields(fields);
return result;
}

/** Create a named record schema. */
public static Schema createRecord(String name, String namespace,
boolean isError) {
return new RecordSchema(name, namespace, isError);
boolean isError, RecordSchema.Encoding encoding) {
return new RecordSchema(name, namespace, isError, encoding);
}

/** Create an enum schema. */
Expand Down Expand Up @@ -139,6 +147,11 @@ public Map<String, Field> getFields() {
throw new AvroRuntimeException("Not a record: "+this);
}

/** If this is a record, returns its encoding style. */
public Encoding getEncoding() {
throw new AvroRuntimeException("Not a record: "+this);
}

/** If this is a record, enumerate its field names and their schemas. */
public Iterable<Map.Entry<String,Schema>> getFieldSchemas() {
throw new AvroRuntimeException("Not a record: "+this);
Expand Down Expand Up @@ -355,10 +368,13 @@ private static class RecordSchema extends NamedSchema {
private Map<String,Field> fields;
private Iterable<Map.Entry<String,Schema>> fieldSchemas;
private final boolean isError;
public RecordSchema(String name, String space, boolean isError) {
private final Encoding encoding;
public RecordSchema(String name, String space, boolean isError, Encoding encoding) {
super(Type.RECORD, name, space);
this.isError = isError;
this.encoding = encoding;
}
public Encoding getEncoding() { return encoding != null ? encoding : Encoding.DEFAULT; }
public boolean isError() { return isError; }
public Map<String, Field> getFields() { return fields; }
public Iterable<Map.Entry<String, Schema>> getFieldSchemas() {
Expand Down Expand Up @@ -410,6 +426,8 @@ void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartObject();
gen.writeStringField("type", isError?"error":"record");
writeName(names, gen);
if (encoding != null)
gen.writeStringField("encoding", encoding.name().toLowerCase());
gen.writeFieldName("fields");
fieldsToJson(names, gen);
gen.writeEndObject();
Expand Down Expand Up @@ -703,9 +721,14 @@ static Schema parse(JsonNode schema, Names names) {
names.space(space); // set default namespace
}
if (type.equals("record") || type.equals("error")) { // record
JsonNode encodingNode = schema.get("encoding");
RecordSchema.Encoding encoding = null;
if (encodingNode != null)
encoding = RecordSchema.Encoding.valueOf(encodingNode.getTextValue().toUpperCase());

LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
RecordSchema result =
new RecordSchema(name, space, type.equals("error"));
new RecordSchema(name, space, type.equals("error"), encoding);
if (name != null) names.add(result);
JsonNode fieldsNode = schema.get("fields");
if (fieldsNode == null || !fieldsNode.isArray())
Expand Down
63 changes: 55 additions & 8 deletions src/java/org/apache/avro/generic/GenericDatumReader.java
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Encoding;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.DatumReader;
Expand Down Expand Up @@ -126,6 +127,15 @@ private Schema resolveExpected(Schema actual, Schema expected) {
throw new AvroTypeException("Expected "+expected+", found "+actual);
}

protected static Encoding determineEncoding(Schema schema, Decoder in) {
Encoding encoding = schema.getEncoding();
if (!in.isBinary() && encoding == Schema.Encoding.SPARSE) {
// We only use sparse for binary encodings
encoding = Schema.Encoding.DEFAULT;
}
return encoding;
}

/** Called to read a record instance. May be overridden for alternate record
* representations.*/
protected Object readRecord(Object old, Schema actual, Schema expected,
Expand All @@ -139,20 +149,40 @@ protected Object readRecord(Object old, Schema actual, Schema expected,
// all fields not in expected should be removed by newRecord.
Object record = newRecord(old, expected);
int size = 0;
for (Map.Entry<String, Field> entry : actual.getFields().entrySet()) {
Map<String, Field> actualFieldMap = actual.getFields();
byte[] sparseFieldMask = null;
switch (determineEncoding(actual, in)) {
case DEFAULT:
break;
case SPARSE:
sparseFieldMask = new byte[(actualFieldMap.size() + 7) / 8];
in.readFixed(sparseFieldMask);
break;
default:
throw new IllegalStateException();
}
for (Map.Entry<String, Field> entry : actualFieldMap.entrySet()) {
String fieldName = entry.getKey();
Field actualField = entry.getValue();
int actualIndex = actualField.pos();
Field expectedField =
expected == actual ? actualField : expectedFields.get(entry.getKey());
if (expectedField == null) {
skip(actualField.schema(), in);
if (sparseFieldMask == null || (0 != (sparseFieldMask[actualIndex / 8] & (1 << (actualIndex % 8))))) {
skip(actualField.schema(), in);
}
continue;
}
int fieldPosition = expectedField.pos();
Object oldDatum =
(old != null) ? getField(record, fieldName, fieldPosition) : null;
addField(record, fieldName, fieldPosition,
read(oldDatum,actualField.schema(),expectedField.schema(), in));
Object fieldValue = null;
if (sparseFieldMask == null || (0 != (sparseFieldMask[actualIndex / 8] & (1 << (actualIndex % 8))))) {
fieldValue = read(oldDatum,actualField.schema(),expectedField.schema(), in);
} else {
fieldValue = actualField.schema().getType().defaultValue();
}
addField(record, fieldName, fieldPosition,fieldValue);
size++;
}
if (expectedFields.size() > size) { // not all fields set
Expand Down Expand Up @@ -416,10 +446,27 @@ protected Object readBytes(Object old, Decoder in) throws IOException {
/** Skip an instance of a schema. */
public static void skip(Schema schema, Decoder in) throws IOException {
switch (schema.getType()) {
case RECORD:
for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
skip(entry.getValue(), in);
break;
case RECORD: {
byte[] sparseFieldMask = null;
switch (determineEncoding(schema, in)) {
case DEFAULT:
break;
case SPARSE:
sparseFieldMask = new byte[(schema.getFields().size() + 7) / 8];
in.readFixed(sparseFieldMask);
break;
default:
throw new IllegalStateException();
}
int i = 0;
for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
if (sparseFieldMask == null || (0 != (sparseFieldMask[i / 8] & (1 << (i % 8))))) {
skip(entry.getValue(), in);
}
i++;
}
}
break;
case ENUM:
in.readInt();
break;
Expand Down
65 changes: 62 additions & 3 deletions src/java/org/apache/avro/generic/GenericDatumWriter.java
Expand Up @@ -19,13 +19,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
Expand Down Expand Up @@ -85,9 +88,65 @@ protected void write(Schema schema, Object datum, Encoder out)
* representations.*/
protected void writeRecord(Schema schema, Object datum, Encoder out)
throws IOException {
for (Entry<String, Field> entry : schema.getFields().entrySet()) {
Field field = entry.getValue();
write(field.schema(), getField(datum, entry.getKey(), field.pos()), out);
Schema.Encoding encoding = schema.getEncoding();
if (!out.isBinary() && encoding == Schema.Encoding.SPARSE) {
// If we're writing JSON, the sparse encoding doesn't make any sense...
encoding = Schema.Encoding.DEFAULT;
}

switch (encoding) {
case DEFAULT:
for (Entry<String, Field> entry : schema.getFields().entrySet()) {
Field field = entry.getValue();
write(field.schema(), getField(datum, entry.getKey(), field.pos()), out);
}
break;

case SPARSE:
/* Use a header which is a bit mask of fields that are present */
Map<String, Field> fields = schema.getFields();
Object[] fieldValues = new Object[fields.size()];
byte[] fieldPresentMask = new byte[(fields.size() + 7) / 8];
int i = 0;
for (Entry<String, Field> entry : fields.entrySet()) {
Field field = entry.getValue();
Object fieldDatum = getField(datum, entry.getKey(), field.pos());
boolean isEmpty = true;
if (fieldDatum == null) {
isEmpty = true;
} else {
Type fieldType = field.schema().getType();
switch (fieldType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BOOLEAN:
isEmpty = fieldDatum.equals(fieldType.defaultValue());
break;
case NULL:
isEmpty = true;
break;
}
}
if (!isEmpty) fieldPresentMask[i / 8] |= 1 << (i % 8);
fieldValues[i] = fieldDatum;
i++;
}
out.writeFixed(fieldPresentMask);
i = 0;
for (Entry<String, Field> entry : fields.entrySet()) {
if (0 == (fieldPresentMask[i / 8] & (1 << (i % 8)))) continue;

Field field = entry.getValue();
Object fieldDatum = fieldValues[i];
write(field.schema(), fieldDatum, out);
i++;
}
break;

default:
throw new IllegalArgumentException();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/avro/io/BinaryDecoder.java
Expand Up @@ -297,5 +297,8 @@ public long skipMap() throws IOException {
public int readIndex() throws IOException {
return readInt();
}

@Override
public boolean isBinary() { return true; }
}

5 changes: 5 additions & 0 deletions src/java/org/apache/avro/io/BinaryEncoder.java
Expand Up @@ -187,6 +187,11 @@ public void writeIndex(int unionIndex) throws IOException {
encodeLong(unionIndex, out);
}

@Override
public boolean isBinary() {
return true;
}

protected static void encodeLong(long n, OutputStream o) throws IOException {
n = (n << 1) ^ (n >> 63); // move sign to low-order bit
while ((n & ~0x7F) != 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/avro/io/Decoder.java
Expand Up @@ -39,6 +39,8 @@
*/

public abstract class Decoder {
/** Are we reading binary? **/
public abstract boolean isBinary();

/** Start reading against a different input stream. Stateful
* subclasses will reset their states to their initial state.
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/avro/io/Encoder.java
Expand Up @@ -42,7 +42,9 @@
* @see Decoder
*/
public abstract class Encoder {

/** Are we writing binary? **/
public abstract boolean isBinary();

/** Redirect output (and reset the parser state if we're checking). */
public abstract void init(OutputStream out) throws IOException;

Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/avro/io/JsonDecoder.java
Expand Up @@ -357,6 +357,9 @@ public int readIndex() throws IOException {
return n;
}

@Override
public boolean isBinary() { return false; }

public Symbol doAction(Symbol input, Symbol top) throws IOException {
if (top instanceof Symbol.FieldAdjustAction) {
Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/avro/io/JsonEncoder.java
Expand Up @@ -235,5 +235,10 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException {
}
return Symbol.CONTINUE;
}

@Override
public boolean isBinary() {
return false;
}
}

3 changes: 3 additions & 0 deletions src/java/org/apache/avro/io/ValidatingDecoder.java
Expand Up @@ -223,6 +223,9 @@ public int readIndex() throws IOException {
return result;
}

@Override
public boolean isBinary() { return in.isBinary(); }

public Symbol doAction(Symbol input, Symbol top) throws IOException {
return Symbol.CONTINUE;
}
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/avro/io/ValidatingEncoder.java
Expand Up @@ -195,6 +195,11 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException {
return Symbol.CONTINUE;
}

@Override
public boolean isBinary() {
return out.isBinary();
}

/** Have we written at least one item into the current collection? */
protected final boolean isTopEmpty() {
return isEmpty.get(pos);
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/avro/reflect/ReflectData.java
Expand Up @@ -284,7 +284,8 @@ else if (type instanceof ParameterizedType) {
LinkedHashMap<String,Schema.Field> fields =
new LinkedHashMap<String,Schema.Field>();
schema = Schema.createRecord(name, space,
Throwable.class.isAssignableFrom(c));
Throwable.class.isAssignableFrom(c),
null);
if (!names.containsKey(fullName))
names.put(fullName, schema);
for (Field field : c.getDeclaredFields())
Expand Down

0 comments on commit 7f6ad25

Please sign in to comment.