Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some code refactor for better performance of Avro-Extension #4092

Merged
merged 5 commits into from Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,8 +53,13 @@ public InputRow parse(ByteBuffer input)
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false);
}

protected static InputRow parseGenericRecord(GenericRecord record, ParseSpec parseSpec, List<String> dimensions,
boolean fromPigAvroStorage, boolean binaryAsString)
protected static InputRow parseGenericRecord(
GenericRecord record,
ParseSpec parseSpec,
List<String> dimensions,
boolean fromPigAvroStorage,
boolean binaryAsString
)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.schemarepo.json.GsonJsonUtil;
import org.schemarepo.json.JsonUtil;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class AvroExtensionsModule implements DruidModule
Expand All @@ -46,7 +46,7 @@ public AvroExtensionsModule() {}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
return Collections.singletonList(
new SimpleModule("AvroInputRowParserModule")
.registerSubtypes(
new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package io.druid.data.input.avro;

import io.druid.java.util.common.logger.Logger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
Expand All @@ -31,8 +32,6 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import io.druid.java.util.common.logger.Logger;

import java.io.IOException;

public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericRecord>
Expand All @@ -55,13 +54,10 @@ public RecordReader<NullWritable, GenericRecord> createRecordReader(
String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
if (StringUtils.isNotBlank(schemaFilePath)) {
log.info("Using file: %s as reader schema.", schemaFilePath);
FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath));
try {
try (FSDataInputStream inputStream =
FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath))) {
readerSchema = new Schema.Parser().parse(inputStream);
}
finally {
inputStream.close();
}
}
}

Expand Down
Expand Up @@ -48,6 +48,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder

private final Schema schemaObj;
private final Map<String, Object> schema;
private final DatumReader<GenericRecord> reader;

@JsonCreator
public InlineSchemaAvroBytesDecoder(
Expand All @@ -61,14 +62,16 @@ public InlineSchemaAvroBytesDecoder(
String schemaStr = mapper.writeValueAsString(schema);

LOGGER.info("Schema string [%s]", schemaStr);
schemaObj = new Schema.Parser().parse(schemaStr);
this.schemaObj = new Schema.Parser().parse(schemaStr);
this.reader = new GenericDatumReader<>(this.schemaObj);
}

//For UT only
@VisibleForTesting
InlineSchemaAvroBytesDecoder(Schema schemaObj)
{
this.schemaObj = schemaObj;
this.reader = new GenericDatumReader<>(schemaObj);
this.schema = null;
}

Expand All @@ -81,9 +84,7 @@ public Map<String, Object> getSchema()
@Override
public GenericRecord parse(ByteBuffer bytes)
{
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Expand Up @@ -72,7 +72,6 @@ public InlineSchemasAvroBytesDecoder(

Map<String, Object> schema = e.getValue();
String schemaStr = mapper.writeValueAsString(schema);
;

LOGGER.info("Schema string [%s] = [%s]", id, schemaStr);
schemaObjs.put(id, new Schema.Parser().parse(schemaStr));
Expand Down Expand Up @@ -116,10 +115,8 @@ public GenericRecord parse(ByteBuffer bytes)
throw new ParseException("Failed to find schema for id [%s]", schemaId);
}

try {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));

DatumReader<GenericRecord> reader = new GenericDatumReader<>(schemaObj);
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Expand Up @@ -47,7 +47,8 @@ public SchemaRegistryBasedAvroBytesDecoder(
}

//For UT only
@VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
@VisibleForTesting
SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
{
this.registry = registry;
}
Expand All @@ -63,7 +64,8 @@ public GenericRecord parse(ByteBuffer bytes)
Schema schema = registry.getByID(id);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} catch (Exception e) {
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!");
}
}
Expand All @@ -81,7 +83,6 @@ public boolean equals(Object o)
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;

return registry != null ? registry.equals(that.registry) : that.registry == null;

}

@Override
Expand Down
Expand Up @@ -52,7 +52,7 @@ public SchemaRepoBasedAvroBytesDecoder(
{
this.subjectAndIdConverter = subjectAndIdConverter;
this.schemaRepository = schemaRepository;
typedRepository = new TypedSchemaRepository<ID, Schema, SUBJECT>(
this.typedRepository = new TypedSchemaRepository<>(
schemaRepository,
subjectAndIdConverter.getIdConverter(),
new AvroSchemaConverter(false),
Expand All @@ -77,9 +77,8 @@ public GenericRecord parse(ByteBuffer bytes)
{
Pair<SUBJECT, ID> subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes);
Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (EOFException eof) {
Expand Down
Expand Up @@ -20,9 +20,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import io.druid.java.util.common.Pair;

import org.schemarepo.api.converter.Converter;
import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
Expand Down Expand Up @@ -51,7 +49,7 @@ public Avro1124SubjectAndIdConverter(@JsonProperty("topic") String topic)
@Override
public Pair<String, Integer> getSubjectAndId(ByteBuffer payload)
{
return new Pair<String, Integer>(topic, payload.getInt());
return new Pair<>(topic, payload.getInt());
}

@Override
Expand Down
Expand Up @@ -20,9 +20,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

import io.druid.java.util.common.Pair;

import org.schemarepo.api.converter.Converter;

import java.nio.ByteBuffer;
Expand Down