Skip to content

Commit

Permalink
Use custom Transport Factory to set Transport message and frame size (a…
Browse files Browse the repository at this point in the history
…pache#3737)

Use a custom TFramedTransport.Factory implementation so that when getTransport
is called, the frame and message size are set on the underlying configuration.

This is a workaround for https://issues.apache.org/jira/browse/THRIFT-5732

Fixes apache#3731

Also:
* Throw EOFException when TTransportException type is END_OF_FILE
* Refactor ThriftMaxFrameSizeIT to cover testing messages bigger and
  smaller than the configured value and also to use separate mini dirs
  for each test
* Include stack trace in TabletServerBatchWriter log message for debugging
* Add default value for timeout.factor in Wait class to avoid error message
  and 24 timeout default in IDEs when the system property isn't set

---------

Co-authored-by: Christopher Tubbs <ctubbsii@apache.org>
  • Loading branch information
2 people authored and Ed Coleman committed Sep 11, 2023
1 parent 1d7a99e commit 132ab20
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.io.EOFException;
import java.io.IOException;
import java.time.Duration;
import java.util.AbstractMap.SimpleImmutableEntry;
Expand Down Expand Up @@ -407,7 +408,8 @@ public void run() {
failures.putAll(tsFailures);
}
}

} catch (EOFException e) {
fatalException = e;
} catch (IOException e) {
if (!TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) {
synchronized (failures) {
Expand Down Expand Up @@ -910,6 +912,13 @@ static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Ra
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
timeoutTracker.errorOccured();
if (e.getType() == TTransportException.END_OF_FILE) {
// END_OF_FILE is used in TEndpointTransport when the
// maxMessageSize has been reached.
EOFException eof = new EOFException(e.getMessage());
eof.addSuppressed(e);
throw eof;
}
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void send(TabletServerMutations<Mutation> tsm)
span.end();
}
} catch (IOException e) {
log.debug("failed to send mutations to {} : {}", location, e.getMessage());
log.debug("failed to send mutations to {}", location, e);

HashSet<TableId> tables = new HashSet<>();
for (KeyExtent ke : mutationBatch.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc;

import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;

/**
* This is a workaround for the issue reported in https://issues.apache.org/jira/browse/THRIFT-5732
* and can be removed once that issue is fixed.
*/
public class AccumuloTFramedTransportFactory extends TFramedTransport.Factory {

private final int maxMessageSize;

public AccumuloTFramedTransportFactory(int maxMessageSize) {
super(maxMessageSize);
this.maxMessageSize = maxMessageSize;
}

@Override
public TTransport getTransport(TTransport base) throws TTransportException {
// The input parameter "base" is typically going to be a TSocket implementation
// that represents a connection between two Accumulo endpoints (client-server,
// or server-server). The base transport has a maxMessageSize which defaults to
// 100MB. The FramedTransport that is created by this factory adds a header to
// the message with payload size information. The FramedTransport has a default
// frame size of 16MB, but the TFramedTransport constructor sets the frame size
// to the frame size set on the underlying transport ("base" in this case").
// According to current Thrift docs, a message has to fit into 1 frame, so the
// frame size will be set to the value that is lower. Prior to this class being
// created, we were only setting the frame size, so messages were capped at 100MB
// because that's the default maxMessageSize. Here we are setting the maxMessageSize
// and maxFrameSize to the same value on the "base" transport so that when the
// TFramedTransport object is created, it ends up using the values that we want.
base.getConfiguration().setMaxFrameSize(maxMessageSize);
base.getConfiguration().setMaxMessageSize(maxMessageSize);
return super.getTransport(base);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,8 +62,8 @@ public class ThriftUtil {
private static final Logger log = LoggerFactory.getLogger(ThriftUtil.class);

private static final TraceProtocolFactory protocolFactory = new TraceProtocolFactory();
private static final TFramedTransport.Factory transportFactory =
new TFramedTransport.Factory(Integer.MAX_VALUE);
private static final AccumuloTFramedTransportFactory transportFactory =
new AccumuloTFramedTransportFactory(Integer.MAX_VALUE);
private static final Map<Integer,TTransportFactory> factoryCache = new HashMap<>();

public static final String GSSAPI = "GSSAPI", DIGEST_MD5 = "DIGEST-MD5";
Expand Down Expand Up @@ -186,7 +185,7 @@ public static synchronized TTransportFactory transportFactory(long maxFrameSize)
int maxFrameSize1 = (int) maxFrameSize;
TTransportFactory factory = factoryCache.get(maxFrameSize1);
if (factory == null) {
factory = new TFramedTransport.Factory(maxFrameSize1);
factory = new AccumuloTFramedTransportFactory(maxFrameSize1);
factoryCache.put(maxFrameSize1, factory);
}
return factory;
Expand Down
132 changes: 132 additions & 0 deletions core/src/test/java/org/apache/accumulo/core/rpc/ThriftUtilTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.junit.jupiter.api.Test;

public class ThriftUtilTest {

public static final int FRAME_HDR_SIZE = 4;
public static final int MB1 = 1 * 1024 * 1024;
public static final int MB10 = 10 * 1024 * 1024;
public static final int MB100 = 100 * 1024 * 1024;
public static final int GB = 1 * 1024 * 1024 * 1024;

@Test
public void testDefaultTFramedTransportFactory() throws TTransportException {

// This test confirms that the default maxMessageSize in Thrift is 100MB
// even when we set the frame size to be 1GB

TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(1024));

TFramedTransport.Factory factory = new TFramedTransport.Factory(GB);
TTransport framedTransport = factory.getTransport(underlyingTransport);

assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB);
assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB100);
}

@Test
public void testAccumuloTFramedTransportFactory() throws TTransportException {

// This test confirms that our custom FramedTransportFactory sets the max
// message size and max frame size to the value that we want.

TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(1024));

AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(GB);
TTransport framedTransport = factory.getTransport(underlyingTransport);

assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), GB);
assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), GB);
}

@Test
public void testMessageSizeReadWriteSuccess() throws Exception {

// This test creates an 10MB buffer in memory as the underlying transport, then
// creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize. It then
// writes 1MB - 4 bytes (to account for the frame header) to the transport and
// reads the data back out.

TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(MB10));
AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(MB1);
TTransport framedTransport = factory.getTransport(underlyingTransport);
assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1);
assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1);

byte[] writeBuf = new byte[MB1 - FRAME_HDR_SIZE];
Arrays.fill(writeBuf, (byte) 1);
framedTransport.write(writeBuf);
framedTransport.flush();

assertEquals(MB1, underlyingTransport.getByteBuffer().position());
underlyingTransport.flip();
assertEquals(0, underlyingTransport.getByteBuffer().position());
assertEquals(MB1, underlyingTransport.getByteBuffer().limit());

byte[] readBuf = new byte[MB1];
framedTransport.read(readBuf, 0, MB1);
}

@Test
public void testMessageSizeWriteFailure() throws Exception {

// This test creates an 10MB buffer in memory as the underlying transport, then
// creates a TFramedTransport with a 1MB maxFrameSize and maxMessageSize. It then
// writes 1MB + 100 bytes to the transport, which fails as it's larger than the
// configured frame and message size.

TByteBuffer underlyingTransport = new TByteBuffer(ByteBuffer.allocate(MB10));
AccumuloTFramedTransportFactory factory = new AccumuloTFramedTransportFactory(MB1);
TTransport framedTransport = factory.getTransport(underlyingTransport);
assertEquals(framedTransport.getConfiguration().getMaxFrameSize(), MB1);
assertEquals(framedTransport.getConfiguration().getMaxMessageSize(), MB1);

// Write more than 1MB to the TByteBuffer, it's possible to write more data
// than allowed by the frame, it's enforced on the read.
final int ourSize = MB1 + 100;
byte[] writeBuf = new byte[ourSize];
Arrays.fill(writeBuf, (byte) 1);
framedTransport.write(writeBuf);
framedTransport.flush();

assertEquals(ourSize + FRAME_HDR_SIZE, underlyingTransport.getByteBuffer().position());
underlyingTransport.flip();
assertEquals(0, underlyingTransport.getByteBuffer().position());
assertEquals(ourSize + FRAME_HDR_SIZE, underlyingTransport.getByteBuffer().limit());

byte[] readBuf = new byte[ourSize];
var e =
assertThrows(TTransportException.class, () -> framedTransport.read(readBuf, 0, ourSize));
assertEquals("Frame size (" + ourSize + ") larger than max length (" + MB1 + ")!",
e.getMessage());
}
}
Loading

0 comments on commit 132ab20

Please sign in to comment.