Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,16 @@ property, when available, is noted below.
consistency check during recovery process.
Default value is false.

* *largeRequestMaxBytes* :
(Java system property: **zookeeper.largeRequestMaxBytes**)
**New in 3.6.0:**
The maximum number of bytes of all inflight large request. The connection will be closed if a coming large request causes the limit exceeded. The default is 100 * 1024 * 1024.

* *largeRequestThreshold* :
(Java system property: **zookeeper.largeRequestThreshold**)
**New in 3.6.0:**
The size threshold after which a request is considered a large request. If it is -1, then all requests are considered small, effectively turning off large request throttling. The default is -1.

<a name="sc_clusterOptions"></a>

#### Cluster Options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ public void processRequest(Request request) {
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();

String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ private boolean readLength(SelectionKey k) throws IOException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
incomingBuffer = ByteBuffer.allocate(len);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ private void receiveMessage(ByteBuf message) {
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
bb = ByteBuffer.allocate(len);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon
*/
private boolean isLocalSession = false;

private int largeRequestSize = -1;

public boolean isLocalSession() {
return isLocalSession;
}
Expand All @@ -117,6 +119,14 @@ public void setLocalSession(boolean isLocalSession) {
this.isLocalSession = isLocalSession;
}

public void setLargeRequestSize(int size) {
largeRequestSize = size;
}

public int getLargeRequestSize() {
return largeRequestSize;
}

public Object getOwner() {
return owner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ private void dropRequest(Request request) {
// Note: this will close the connection
conn.setInvalid();
}
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits.
zks.requestFinished(request);
}

public void submitRequest(Request request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
STALE_REPLIES = metricsContext.getCounter("stale_replies");
REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
LARGE_REQUESTS_REJECTED = metricsContext.getCounter("large_requests_rejected");

NETTY_QUEUED_BUFFER = metricsContext.getSummary("netty_queued_buffer_capacity", DetailLevel.BASIC);

Expand Down Expand Up @@ -425,6 +426,7 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter STALE_REQUESTS_DROPPED;
public final Counter STALE_REPLIES;
public final Counter REQUEST_THROTTLE_WAIT_COUNT;
public final Counter LARGE_REQUESTS_REJECTED;

public final Summary NETTY_QUEUED_BUFFER;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,35 @@ protected enum State {
private RequestThrottler requestThrottler;
public static final String SNAP_COUNT = "zookeeper.snapCount";

/**
* This setting sets a limit on the total number of large requests that
* can be inflight and is designed to prevent ZooKeeper from accepting
* too many large requests such that the JVM runs out of usable heap and
* ultimately crashes.
*
* The limit is enforced by the {@link checkRequestSize(int, boolean)}
* method which is called by the connection layer ({@link NIOServerCnxn},
* {@link NettyServerCnxn}) before allocating a byte buffer and pulling
* data off the TCP socket. The limit is then checked again by the
* ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
* also atomically updates {@link currentLargeRequestBytes}. The request is
* then marked as a large request, with the request size stored in the Request
* object so that it can later be decremented from {@link currentLargeRequestsBytes}.
*
* When a request is completed or dropped, the relevant code path calls the
* {@link requestFinished(Request)} method which performs the decrement if
* needed.
*/
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;

/**
* The size threshold after which a request is considered a large request
* and is checked against the large request byte limit.
*/
private volatile int largeRequestThreshold = -1;

private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);

void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
Expand Down Expand Up @@ -285,6 +314,8 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio

this.requestPathMetricsCollector = new RequestPathMetricsCollector();

this.initLargeRequestThrottlingSettings();

LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
Expand Down Expand Up @@ -1047,14 +1078,20 @@ public void submitRequestNow(Request si) {
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
// Update request accounting/throttling limits
requestFinished(si);
}
}

Expand Down Expand Up @@ -1380,6 +1417,85 @@ static void setMaxBatchSize(int size) {
maxBatchSize = size;
}

private void initLargeRequestThrottlingSettings() {
setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
}

public int getLargeRequestMaxBytes() {
return largeRequestMaxBytes;
}

public void setLargeRequestMaxBytes(int bytes) {
if (bytes <= 0) {
LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
} else {
largeRequestMaxBytes = bytes;
LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
}
}

public int getLargeRequestThreshold() {
return largeRequestThreshold;
}

public void setLargeRequestThreshold(int threshold) {
if (threshold == 0 || threshold < -1) {
LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
largeRequestThreshold = -1;
} else {
largeRequestThreshold = threshold;
LOG.info("The large request threshold is set to {}", largeRequestThreshold);
}
}

public int getLargeRequestBytes() {
return currentLargeRequestBytes.get();
}

private boolean isLargeRequest(int length) {
// The large request limit is disabled when threshold is -1
if (largeRequestThreshold == -1) {
return false;
}
return length > largeRequestThreshold;
}

public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
if (!isLargeRequest(length)) {
return true;
}
if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
return true;
} else {
ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
throw new IOException("Rejecting large request");
}

}

private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
if (!isLargeRequest(length)) {
return true;
}

int bytes = currentLargeRequestBytes.addAndGet(length);
if (bytes > largeRequestMaxBytes) {
currentLargeRequestBytes.addAndGet(-length);
ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
throw new IOException("Rejecting large request");
}
return true;
}

public void requestFinished(Request request) {
int largeRequestLength = request.getLargeRequestSize();
if (largeRequestLength != -1) {
currentLargeRequestBytes.addAndGet(-largeRequestLength);
}
}

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
Expand Down Expand Up @@ -1451,6 +1567,12 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE
cnxn.disableRecv();
} else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
int length = incomingBuffer.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,24 @@ public void setRequestStaleConnectionCheck(boolean check) {
Request.setStaleConnectionCheck(check);
}


///////////////////////////////////////////////////////////////////////////

public int getLargeRequestMaxBytes() {
return zks.getLargeRequestMaxBytes();
}

public void setLargeRequestMaxBytes(int bytes) {
zks.setLargeRequestMaxBytes(bytes);
}

///////////////////////////////////////////////////////////////////////////

public int getLargeRequestThreshold() {
return zks.getLargeRequestThreshold();
}

public void setLargeRequestThreshold(int threshold) {
zks.setLargeRequestThreshold(threshold);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ public interface ZooKeeperServerMXBean {
boolean getRequestStaleConnectionCheck();
void setRequestStaleConnectionCheck(boolean check);

int getLargeRequestMaxBytes();
void setLargeRequestMaxBytes(int bytes);

int getLargeRequestThreshold();
void setLargeRequestThreshold(int threshold);

/**
* Reset packet and latency statistics
*/
Expand Down
Loading