Skip to content
Permalink
Browse files

Optimize `ProtoBufSerializationProvider` by minimizing copying (#943)

Motivation:

Current implementation of `ProtoBufSerializationProvider` has a
few performance issues:
1. `ProtoDeserializer` may create an intermediate copy of the composite
buffers and then copy it again inside proto parser.
2. `DefaultSerializer.DEFAULT_SIZE_ESTIMATOR` inaccurately predicts
size for destination `Buffer`. For aggregated API we may use
`getSerializedSize()` to allocate enough bytes for destination
`Buffer`.
3. `MessageLite.writeTo(OutputStream)` creates an internal copy of
data that could be avoided if we create `CodedOutputStream` from
internal data storage of destination `Buffer`.

Modifications:

- Create optimized version of `CodedInputStream` for
`ProtoDeserializer` to do less copying during proto deserialization;
- Provide a number of `bytesEstimate` for serialization for aggregated
API;
- Create optimized version of `CodedOutputStream` for
`ProtoSerializer` to do less copying during proto serialization;

Results:

Less copying during protobuf serialization/deserialization leads to
improved throughput by 5-15% on the gRPC client and server when
they (de)serialize 16Kb payload body.
  • Loading branch information
idelpivnitskiy committed Feb 21, 2020
1 parent c585735 commit 1653fc7135d59bcb98608342ad73027e9ab441e2
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,16 +24,20 @@
import io.servicetalk.serialization.api.StreamingSerializer;
import io.servicetalk.serialization.api.TypeHolder;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;

import static com.google.protobuf.CodedOutputStream.newInstance;
import static com.google.protobuf.UnsafeByteOperations.unsafeWrap;
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static java.lang.Math.max;
import static java.util.Collections.emptyList;
@@ -146,7 +150,22 @@ private static boolean isCompressed(Buffer buffer) throws SerializationException

final T t;
try {
t = parser.parseFrom(toDeserialize.toNioBuffer(toDeserialize.readerIndex(), lengthOfData));
final CodedInputStream in;
if (toDeserialize.nioBufferCount() == 1) {
in = CodedInputStream.newInstance(toDeserialize.toNioBuffer(toDeserialize.readerIndex(),
lengthOfData));
} else {
// Aggregated payload body may consist of multiple Buffers. In this case,
// CompositeBuffer.toNioBuffer(idx, length) may return a single ByteBuffer (when requested
// length < components[0].length) or create a new ByteBuffer and copy multiple components
// into it. Later, proto parser will copy data from this temporary ByteBuffer again.
// To avoid unnecessary copying, we use newCodedInputStream(buffers, lengthOfData).
final ByteBuffer[] buffers = toDeserialize.toNioBuffers(toDeserialize.readerIndex(),
lengthOfData);
in = buffers.length == 1 ? CodedInputStream.newInstance(buffers[0]) :
newCodedInputStream(buffers, lengthOfData);
}
t = parser.parseFrom(in);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(e);
}
@@ -186,6 +205,23 @@ private static boolean isCompressed(Buffer buffer) throws SerializationException
}
}

private static CodedInputStream newCodedInputStream(final ByteBuffer[] buffers, final int lengthOfData) {
// Because we allocated a new internal ByteBuffer that will never be mutated we may just wrap it and
// enable aliasing to avoid an extra copying inside parser for a deserialized message.
final CodedInputStream in = unsafeWrap(mergeByteBuffers(buffers, lengthOfData)).newCodedInput();
in.enableAliasing(true);
return in;
}

private static ByteBuffer mergeByteBuffers(final ByteBuffer[] buffers, final int lengthOfData) {
final ByteBuffer merged = ByteBuffer.allocate(lengthOfData);
for (ByteBuffer buf : buffers) {
merged.put(buf);
}
merged.flip();
return merged;
}

@Override
public boolean hasData() {
return accumulate.readableBytes() > 0;
@@ -226,16 +262,24 @@ public void serialize(final Object toSerialize, final Buffer destination) {
throw new SerializationException("Unknown type to serialize (expected MessageLite): " +
toSerialize.getClass().getName());
}
MessageLite msg = (MessageLite) toSerialize;
int size = msg.getSerializedSize();
final MessageLite msg = (MessageLite) toSerialize;
final int size = msg.getSerializedSize();
// TODO (nkant) : handle compression
destination.writeByte(0);
destination.writeInt(size);
try (OutputStream out = Buffer.asOutputStream(destination)) {
destination.ensureWritable(size);

final int writerIdx = destination.writerIndex();
final int writableBytes = destination.writableBytes();
final CodedOutputStream out = destination.hasArray() ?
newInstance(destination.array(), destination.arrayOffset() + writerIdx, writableBytes) :
newInstance(destination.toNioBuffer(writerIdx, writableBytes));
try {
msg.writeTo(out);
} catch (IOException e) {
throw new SerializationException(e);
}
destination.writerIndex(writerIdx + size);
}
}
}
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019-2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -140,7 +140,9 @@ public GrpcSerializationProvider build() {
}
}

private static final class ProtoHttpSerializer<T> implements HttpSerializer<T> {
private static final class ProtoHttpSerializer<T extends MessageLite> implements HttpSerializer<T> {
private static final int METADATA_SIZE = 5; // 1 byte for compression flag and 4 bytes for length of data

private final Serializer serializer;
private final GrpcMessageEncoding grpcMessageEncoding;
private final Class<T> type;
@@ -155,7 +157,7 @@ public GrpcSerializationProvider build() {
@Override
public Buffer serialize(final HttpHeaders headers, final T value, final BufferAllocator allocator) {
addContentHeaders(headers);
return serializer.serialize(value, allocator);
return serializer.serialize(value, allocator, METADATA_SIZE + value.getSerializedSize());
}

@Override

0 comments on commit 1653fc7

Please sign in to comment.
You can’t perform that action at this time.