diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java new file mode 100644 index 000000000000..d2301bed2cb0 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/io/ByteBufferInputStream.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.utils.io; + + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; + +/** Warp a {@link ByteBuffer} as an {@link InputStream}. */ +public class ByteBufferInputStream extends InputStream { + private final ByteBuffer buffer; + + public ByteBufferInputStream(ByteBuffer buffer) { + this.buffer = buffer.asReadOnlyBuffer(); + } + + @Override + public int read() throws IOException { + if (!buffer.hasRemaining()) { + return -1; + } + return buffer.get() & 0xFF; + } + + @Override + public int read(@Nonnull byte[] array, int offset, int length) throws IOException { + assertArrayIndex(array, offset, length); + + if (length == 0) { + return 0; + } + final int remaining = buffer.remaining(); + if (remaining <= 0) { + return -1; + } + final int min = Math.min(remaining, length); + buffer.get(array, offset, min); + return min; + } + + static void assertArrayIndex(@Nonnull byte[] array, int offset, int length) { + Objects.requireNonNull(array, "array == null"); + if (offset < 0) { + throw new ArrayIndexOutOfBoundsException("offset = " + offset + " < 0"); + } else if (length < 0) { + throw new ArrayIndexOutOfBoundsException("length = " + length + " < 0"); + } + final int end = offset + length; + if (end < 0) { + throw new ArrayIndexOutOfBoundsException( + "Overflow: offset+length > Integer.MAX_VALUE, offset=" + offset + ", length=" + length); + } else if (end > array.length) { + throw new ArrayIndexOutOfBoundsException( + "offset+length > array.length = " + array.length + ", offset=" + offset + ", length=" + length); + } + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java index bd379c207a75..eb7ce0f699b0 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java @@ -17,22 +17,23 @@ package org.apache.hadoop.ozone.om.helpers; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMRequest; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos - .OMResponse; +import org.apache.hadoop.hdds.utils.io.ByteBufferInputStream; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; + /** - * Ratis helper methods for OM Ratis server and client. + * Helper methods for converting between proto 2 (OM) and proto 3 (Ratis) messages. */ public final class OMRatisHelper { private static final Logger LOG = LoggerFactory.getLogger( @@ -41,50 +42,47 @@ public final class OMRatisHelper { private OMRatisHelper() { } - static RaftPeerId getRaftPeerId(String omId) { - return RaftPeerId.valueOf(omId); - } - + /** Convert the given proto 2 request to a proto 3 {@link ByteString}. */ public static ByteString convertRequestToByteString(OMRequest request) { - byte[] requestBytes = request.toByteArray(); - return ByteString.copyFrom(requestBytes); + return UnsafeByteOperations.unsafeWrap(request.toByteString().asReadOnlyByteBuffer()); } - public static OMRequest convertByteStringToOMRequest(ByteString byteString) - throws InvalidProtocolBufferException { - byte[] bytes = byteString.toByteArray(); - return OMRequest.parseFrom(bytes); + /** Convert the given proto 3 {@link ByteString} to a proto 2 request. */ + public static OMRequest convertByteStringToOMRequest(ByteString bytes) throws IOException { + final ByteBuffer buffer = bytes.asReadOnlyByteBuffer(); + return OMRequest.parseFrom(new ByteBufferInputStream(buffer)); } + /** Convert the given proto 2 response to a proto 3 {@link ByteString}. */ public static Message convertResponseToMessage(OMResponse response) { - byte[] requestBytes = response.toByteArray(); - return Message.valueOf(ByteString.copyFrom(requestBytes)); + return () -> UnsafeByteOperations.unsafeWrap(response.toByteString().asReadOnlyByteBuffer()); } - public static OMResponse getOMResponseFromRaftClientReply( - RaftClientReply reply) throws InvalidProtocolBufferException { - byte[] bytes = reply.getMessage().getContent().toByteArray(); - return OMResponse.newBuilder(OMResponse.parseFrom(bytes)) + /** Convert the given proto 3 {@link ByteString} to a proto 2 response. */ + public static OMResponse convertByteStringToOMResponse(ByteString bytes) throws IOException { + final ByteBuffer buffer = bytes.asReadOnlyByteBuffer(); + return OMResponse.parseFrom(new ByteBufferInputStream(buffer)); + } + + /** Convert the given reply with proto 3 {@link ByteString} to a proto 2 response. */ + public static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply) throws IOException { + final OMResponse response = convertByteStringToOMResponse(reply.getMessage().getContent()); + if (reply.getReplierId().equals(response.getLeaderOMNodeId())) { + return response; + } + return OMResponse.newBuilder(response) .setLeaderOMNodeId(reply.getReplierId()) .build(); } - /** - * Convert StateMachineLogEntryProto to String. - * @param proto - {@link StateMachineLogEntryProto} - * @return String - */ + /** Convert the given {@link StateMachineLogEntryProto} to a short {@link String}. */ public static String smProtoToString(StateMachineLogEntryProto proto) { - StringBuilder builder = new StringBuilder(); try { - builder.append(TextFormat.shortDebugString( - OMRatisHelper.convertByteStringToOMRequest(proto.getLogData()))); - + final OMRequest request = convertByteStringToOMRequest(proto.getLogData()); + return TextFormat.shortDebugString(request); } catch (Throwable ex) { LOG.info("smProtoToString failed", ex); - builder.append("smProtoToString failed with"); - builder.append(ex.getMessage()); + return "Failed to smProtoToString: " + ex; } - return builder.toString(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index f26a5e300aa4..54ba92d334b5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.ServiceException; import java.io.File; @@ -522,7 +521,7 @@ private OMResponse createOmResponseImpl(OMRequest omRequest, try { return OMRatisHelper.getOMResponseFromRaftClientReply(reply); - } catch (InvalidProtocolBufferException ex) { + } catch (IOException ex) { if (ex.getMessage() != null) { throw new ServiceException(ex.getMessage(), ex); } else {