Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAVA-2326: Reduce memory allocations during queries execution #1293

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Changelog

### 3.7.3 (In progress)

- [improvement] JAVA-2326: Reduce memory allocations in Flusher.run, RequestHandler and flags decoding logic


### 3.7.2

- [bug] JAVA-2249: Stop stripping trailing zeros in ByteOrderedTokens.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,8 @@ ResponseHandler write(
throw new ConnectionException(address, "Connection has been closed");
}

logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request);
if (logger.isTraceEnabled())
logger.trace("{}, stream {}, writing request {}", this, request.getStreamId(), request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure I understand, you are trying to avoid the array allocation for the varags arguments?

Copy link
Author

@netudima netudima Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

array allocation in such places is usually optimized by escape analysis, the main issue which I see for such trace logging - hidden auto-boxing of int streamId.

The main goal of all the changes is to reduce an overall memory allocation on an application side (and to reduce GC pauses for the application correspondently). I am not expecting a visible change in request processing avg latency/throughput. Maybe some small CPU reduction will be as a bonus due to reducing some small amount of work.
Added a comment to https://datastax-oss.atlassian.net/browse/JAVA-2326

writer.incrementAndGet();

if (DISABLE_COALESCING) {
Expand Down Expand Up @@ -787,8 +788,9 @@ public void run() {
}
});
} else {
logger.trace(
"{}, stream {}, request sent successfully", Connection.this, request.getStreamId());
if (logger.isTraceEnabled())
logger.trace(
"{}, stream {}, request sent successfully", Connection.this, request.getStreamId());
}
}
};
Expand Down Expand Up @@ -1103,11 +1105,11 @@ public void run() {
}
}

// Always flush what we have (don't artificially delay to try to coalesce more messages)
for (Channel channel : channels) channel.flush();
channels.clear();

if (doneWork) {
// Always flush what we have (don't artificially delay to try to coalesce more messages)
for (Channel channel : channels) channel.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, although I'm not very comfortable changing this logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main trigger for the change is to avoid HashSet iterator objects creation for empty spin loops of flushing

channels.clear();

runsWithNoWork = 0;
} else {
// either reschedule or cancel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ enum Flag {
WARNING,
USE_BETA;

private static final Flag[] values = Flag.values();

static EnumSet<Flag> deserialize(int flags) {
EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
Flag[] values = Flag.values();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this gives us any perf improvement? I would expect the implementation of Enum.values() to be highly optimized.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum.values() method creates a new copy of array with values each time (due to inability to create immutable arrays in Java - https://dzone.com/articles/memory-hogging-enumvalues-method)

for (int n = 0; n < 8; n++) {
if ((flags & (1 << n)) != 0) set.add(values[n]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.datastax.driver.core;

import com.codahale.metrics.Timer;
import com.datastax.driver.core.exceptions.BootstrappingException;
import com.datastax.driver.core.exceptions.BusyConnectionException;
import com.datastax.driver.core.exceptions.BusyPoolException;
Expand Down Expand Up @@ -67,7 +66,7 @@ class RequestHandler {
private static final QueryLogger QUERY_LOGGER = QueryLogger.builder().build();
static final String DISABLE_QUERY_WARNING_LOGS = "com.datastax.driver.DISABLE_QUERY_WARNING_LOGS";

final String id;
private volatile String id;

private final SessionManager manager;
private final Callback callback;
Expand All @@ -83,15 +82,13 @@ class RequestHandler {
private volatile List<Host> triedHosts;
private volatile ConcurrentMap<InetSocketAddress, Throwable> errors;

private final Timer.Context timerContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are just trying to avoid the allocation of a Timer.Context object, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, trying to avoid the extra object allocation

private final long startTime;

private final AtomicBoolean isDone = new AtomicBoolean();
private final AtomicInteger executionIndex = new AtomicInteger();

public RequestHandler(SessionManager manager, Callback callback, Statement statement) {
this.id = Long.toString(System.identityHashCode(this));
if (logger.isTraceEnabled()) logger.trace("[{}] {}", id, statement);
if (logger.isTraceEnabled()) logger.trace("[{}] {}", getId(), statement);
this.manager = manager;
this.callback = callback;
this.scheduler = manager.cluster.manager.connectionFactory.timer;
Expand All @@ -114,7 +111,6 @@ public RequestHandler(SessionManager manager, Callback callback, Statement state
&& statement.isIdempotentWithDefault(manager.configuration().getQueryOptions());
this.statement = statement;

this.timerContext = metricsEnabled() ? metrics().getRequestsTimer().time() : null;
this.startTime = System.nanoTime();
}

Expand All @@ -129,6 +125,14 @@ void cancel() {
cancelPendingExecutions(null);
}

String getId() {
// atomicity is not required to set the value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method could be called concurrently from different speculative executions. Are you saying atomicity is not required because even if 2 threads enter the method at the same time, the effect is going to be deterministic and idempotent, i.e. field id will be eventually set to Long.toString(System.identityHashCode(this))?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right, the result is deterministic and double evaluation is not a problem if it will happen (an atomic construction, like AtomicLong, would be more expensive here)

if (id == null) {
id = Long.toString(System.identityHashCode(this));
}
return id;
}

private void startNewExecution() {
if (isDone.get()) return;

Expand All @@ -143,7 +147,7 @@ private void startNewExecution() {
private void scheduleExecution(long delayMillis) {
if (isDone.get() || delayMillis < 0) return;
if (logger.isTraceEnabled())
logger.trace("[{}] Schedule next speculative execution in {} ms", id, delayMillis);
logger.trace("[{}] Schedule next speculative execution in {} ms", getId(), delayMillis);
if (delayMillis == 0) {
// kick off request immediately
scheduleExecutionImmediately();
Expand Down Expand Up @@ -189,16 +193,17 @@ private void setFinalResult(
SpeculativeExecution execution, Connection connection, Message.Response response) {
if (!isDone.compareAndSet(false, true)) {
if (logger.isTraceEnabled())
logger.trace("[{}] Got beaten to setting the result", execution.id);
logger.trace("[{}] Got beaten to setting the result", execution.getId());
return;
}

if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.id);
if (logger.isTraceEnabled()) logger.trace("[{}] Setting final result", execution.getId());

cancelPendingExecutions(execution);

try {
if (timerContext != null) timerContext.stop();
if (metricsEnabled())
metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

ExecutionInfo info;
int speculativeExecutions = executionIndex.get() - 1;
Expand Down Expand Up @@ -257,16 +262,17 @@ private void setFinalException(
SpeculativeExecution execution, Connection connection, Exception exception) {
if (!isDone.compareAndSet(false, true)) {
if (logger.isTraceEnabled())
logger.trace("[{}] Got beaten to setting final exception", execution.id);
logger.trace("[{}] Got beaten to setting final exception", execution.getId());
return;
}

if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.id);
if (logger.isTraceEnabled()) logger.trace("[{}] Setting final exception", execution.getId());

cancelPendingExecutions(execution);

try {
if (timerContext != null) timerContext.stop();
if (metricsEnabled())
metrics().getRequestsTimer().update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
} finally {
callback.onException(connection, exception, System.nanoTime() - startTime, /*unused*/ 0);
}
Expand Down Expand Up @@ -324,7 +330,7 @@ void onSet(
* informs the RequestHandler, which will decide what to do
*/
class SpeculativeExecution implements Connection.ResponseCallback {
final String id;
private volatile String id;
private final Message.Request request;
private final int position;
private volatile Host current;
Expand All @@ -344,11 +350,18 @@ class SpeculativeExecution implements Connection.ResponseCallback {
private volatile Connection.ResponseHandler connectionHandler;

SpeculativeExecution(Message.Request request, int position) {
this.id = RequestHandler.this.id + "-" + position;
this.request = request;
this.position = position;
this.queryStateRef = new AtomicReference<QueryState>(QueryState.INITIAL);
if (logger.isTraceEnabled()) logger.trace("[{}] Starting", id);
if (logger.isTraceEnabled()) logger.trace("[{}] Starting", getId());
}

String getId() {
// atomicity is not required to set the value
if (id == null) {
id = RequestHandler.this.getId() + "-" + position;
}
return id;
}

void findNextHostAndQuery() {
Expand Down Expand Up @@ -387,7 +400,7 @@ private boolean query(final Host host) {
HostConnectionPool pool = manager.pools.get(host);
if (pool == null || pool.isClosed()) return false;

if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", id, host);
if (logger.isTraceEnabled()) logger.trace("[{}] Querying node {}", getId(), host);

if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))
scheduleExecution(speculativeExecutionPlan.nextExecution(host));
Expand Down Expand Up @@ -530,7 +543,7 @@ private void processRetryDecision(
if (logger.isDebugEnabled())
logger.debug(
"[{}] Doing retry {} for query {} at consistency {}",
id,
getId(),
retriesByPolicy,
statement,
retryDecision.getRetryConsistencyLevel());
Expand Down Expand Up @@ -559,7 +572,7 @@ private void retry(final boolean retryCurrent, ConsistencyLevel newConsistencyLe
}

private void logError(InetSocketAddress address, Throwable exception) {
logger.debug("[{}] Error querying {} : {}", id, address, exception.toString());
logger.debug("[{}] Error querying {} : {}", getId(), address, exception.toString());
if (errors == null) {
synchronized (RequestHandler.this) {
if (errors == null) {
Expand All @@ -580,7 +593,7 @@ void cancel() {
return;
} else if (previous.inProgress
&& queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_IN_PROGRESS)) {
if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", id);
if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while in progress", getId());
// The connectionHandler should be non-null, but we might miss the update if we're racing
// with write().
// If it's still null, this will be handled by re-checking queryStateRef at the end of
Expand All @@ -598,7 +611,7 @@ void cancel() {
return;
} else if (!previous.inProgress
&& queryStateRef.compareAndSet(previous, QueryState.CANCELLED_WHILE_COMPLETE)) {
if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", id);
if (logger.isTraceEnabled()) logger.trace("[{}] Cancelled while complete", getId());
Host queriedHost = current;
if (queriedHost != null && statement != Statement.DEFAULT) {
manager.cluster.manager.reportQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,10 @@ private enum Flag {
NO_METADATA,
METADATA_CHANGED;

private static final Flag[] values = Flag.values();

static EnumSet<Flag> deserialize(int flags) {
EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
Flag[] values = Flag.values();
for (int n = 0; n < values.length; n++) {
if ((flags & (1 << n)) != 0) set.add(values[n]);
}
Expand Down