55
import io .netty .util .Timer ;
55
import io .netty .util .Timer ;
56
import io .netty .util .concurrent .Future ;
56
import io .netty .util .concurrent .Future ;
57
import io .netty .util .concurrent .GenericFutureListener ;
57
import io .netty .util .concurrent .GenericFutureListener ;
58
- import java .nio .ByteBuffer ;
59
import java .time .Duration ;
58
import java .time .Duration ;
60
import java .util .AbstractMap ;
59
import java .util .AbstractMap ;
61
import java .util .ArrayList ;
60
import java .util .ArrayList ;
65
import java .util .concurrent .CancellationException ;
64
import java .util .concurrent .CancellationException ;
66
import java .util .concurrent .CompletableFuture ;
65
import java .util .concurrent .CompletableFuture ;
67
import java .util .concurrent .CompletionStage ;
66
import java .util .concurrent .CompletionStage ;
68
- import java .util .concurrent .ConcurrentMap ;
69
import java .util .concurrent .CopyOnWriteArrayList ;
67
import java .util .concurrent .CopyOnWriteArrayList ;
70
import java .util .concurrent .TimeUnit ;
68
import java .util .concurrent .TimeUnit ;
71
import net .jcip .annotations .ThreadSafe ;
69
import net .jcip .annotations .ThreadSafe ;
74
72
75
/** Handles the lifecycle of the preparation of a CQL statement. */
73
/** Handles the lifecycle of the preparation of a CQL statement. */
76
@ ThreadSafe
74
@ ThreadSafe
77
- public abstract class CqlPrepareHandlerBase implements Throttled {
75
+ public class CqlPrepareHandler implements Throttled {
78
76
79
- private static final Logger LOG = LoggerFactory .getLogger (CqlPrepareHandlerBase .class );
77
+ private static final Logger LOG = LoggerFactory .getLogger (CqlPrepareHandler .class );
80
78
81
private final long startTimeNanos ;
79
private final long startTimeNanos ;
82
private final String logPrefix ;
80
private final String logPrefix ;
83
private final PrepareRequest request ;
81
private final PrepareRequest request ;
84
- private final ConcurrentMap <ByteBuffer , DefaultPreparedStatement > preparedStatementsCache ;
85
private final DefaultSession session ;
82
private final DefaultSession session ;
86
private final InternalDriverContext context ;
83
private final InternalDriverContext context ;
87
private final DriverExecutionProfile executionProfile ;
84
private final DriverExecutionProfile executionProfile ;
@@ -100,9 +97,8 @@ public abstract class CqlPrepareHandlerBase implements Throttled {
100
// We don't use a map because nodes can appear multiple times.
97
// We don't use a map because nodes can appear multiple times.
101
private volatile List <Map .Entry <Node , Throwable >> errors ;
98
private volatile List <Map .Entry <Node , Throwable >> errors ;
102
99
103
- protected CqlPrepareHandlerBase (
100
+ protected CqlPrepareHandler (
104
PrepareRequest request ,
101
PrepareRequest request ,
105
- ConcurrentMap <ByteBuffer , DefaultPreparedStatement > preparedStatementsCache ,
106
DefaultSession session ,
102
DefaultSession session ,
107
InternalDriverContext context ,
103
InternalDriverContext context ,
108
String sessionLogPrefix ) {
104
String sessionLogPrefix ) {
@@ -112,7 +108,6 @@ protected CqlPrepareHandlerBase(
112
LOG .trace ("[{}] Creating new handler for prepare request {}" , logPrefix , request );
108
LOG .trace ("[{}] Creating new handler for prepare request {}" , logPrefix , request );
113
109
114
this .request = request ;
110
this .request = request ;
115
- this .preparedStatementsCache = preparedStatementsCache ;
116
this .session = session ;
111
this .session = session ;
117
this .context = context ;
112
this .context = context ;
118
this .executionProfile = Conversions .resolveExecutionProfile (request , context );
113
this .executionProfile = Conversions .resolveExecutionProfile (request , context );
@@ -171,6 +166,10 @@ public void onThrottleReady(boolean wasDelayed) {
171
sendRequest (null , 0 );
166
sendRequest (null , 0 );
172
}
167
}
173
168
169
+ public CompletableFuture <PreparedStatement > handle () {
170
+ return result ;
171
+ }
172
+
174
private Timeout scheduleTimeout (Duration timeoutDuration ) {
173
private Timeout scheduleTimeout (Duration timeoutDuration ) {
175
if (timeoutDuration .toNanos () > 0 ) {
174
if (timeoutDuration .toNanos () > 0 ) {
176
return this .timer .newTimeout (
175
return this .timer .newTimeout (
@@ -221,7 +220,7 @@ private void recordError(Node node, Throwable error) {
221
// Use a local variable to do only a single single volatile read in the nominal case
220
// Use a local variable to do only a single single volatile read in the nominal case
222
List <Map .Entry <Node , Throwable >> errorsSnapshot = this .errors ;
221
List <Map .Entry <Node , Throwable >> errorsSnapshot = this .errors ;
223
if (errorsSnapshot == null ) {
222
if (errorsSnapshot == null ) {
224
- synchronized (CqlPrepareHandlerBase .this ) {
223
+ synchronized (CqlPrepareHandler .this ) {
225
errorsSnapshot = this .errors ;
224
errorsSnapshot = this .errors ;
226
if (errorsSnapshot == null ) {
225
if (errorsSnapshot == null ) {
227
this .errors = errorsSnapshot = new CopyOnWriteArrayList <>();
226
this .errors = errorsSnapshot = new CopyOnWriteArrayList <>();
@@ -236,58 +235,28 @@ private void setFinalResult(Prepared prepared) {
236
// Whatever happens below, we're done with this stream id
235
// Whatever happens below, we're done with this stream id
237
throttler .signalSuccess (this );
236
throttler .signalSuccess (this );
238
237
239
- DefaultPreparedStatement newStatement =
238
+ DefaultPreparedStatement preparedStatement =
240
Conversions .toPreparedStatement (prepared , request , context );
239
Conversions .toPreparedStatement (prepared , request , context );
241
240
242
- DefaultPreparedStatement cachedStatement = cache (newStatement );
241
+ session
243
-
242
+ .getRepreparePayloads ()
244
- if (cachedStatement != newStatement ) {
243
+ .put (preparedStatement .getId (), preparedStatement .getRepreparePayload ());
245
- // The statement already existed in the cache, assume it's because the client called
244
+ if (prepareOnAllNodes ) {
246
- // prepare() twice, and therefore it's already been prepared on other nodes.
245
+ prepareOnOtherNodes ()
247
- result .complete (cachedStatement );
246
+ .thenRun (
248
- } else {
247
+ () -> {
249
- session
248
+ LOG .trace (
250
- .getRepreparePayloads ()
249
+ "[{}] Done repreparing on other nodes, completing the request" , logPrefix );
251
- .put (cachedStatement .getId (), cachedStatement .getRepreparePayload ());
250
+ result .complete (preparedStatement );
252
- if (prepareOnAllNodes ) {
251
+ })
253
- prepareOnOtherNodes ()
252
+ .exceptionally (
254
- .thenRun (
253
+ error -> {
255
- () -> {
254
+ result .completeExceptionally (error );
256
- LOG .trace (
255
+ return null ;
257
- "[{}] Done repreparing on other nodes, completing the request" , logPrefix );
256
+ });
258
- result .complete (cachedStatement );
259
- })
260
- .exceptionally (
261
- error -> {
262
- result .completeExceptionally (error );
263
- return null ;
264
- });
265
- } else {
266
- LOG .trace ("[{}] Prepare on all nodes is disabled, completing the request" , logPrefix );
267
- result .complete (cachedStatement );
268
- }
269
- }
270
- }
271
-
272
- private DefaultPreparedStatement cache (DefaultPreparedStatement preparedStatement ) {
273
- DefaultPreparedStatement previous =
274
- preparedStatementsCache .putIfAbsent (preparedStatement .getId (), preparedStatement );
275
- if (previous != null ) {
276
- LOG .warn (
277
- "Re-preparing already prepared query. "
278
- + "This is generally an anti-pattern and will likely affect performance. "
279
- + "The cached version of the PreparedStatement will be returned, which may use "
280
- + "different bound statement execution parameters (CL, timeout, etc.) from the "
281
- + "current session.prepare call. Consider preparing the statement only once. "
282
- + "Query='{}'" ,
283
- preparedStatement .getQuery ());
284
-
285
- // The one object in the cache will get GCed once it's not referenced by the client anymore
286
- // since we use a weak reference. So we need to make sure that the instance we do return to
287
- // the user is the one that is in the cache.
288
- return previous ;
289
} else {
257
} else {
290
- return preparedStatement ;
258
+ LOG .trace ("[{}] Prepare on all nodes is disabled, completing the request" , logPrefix );
259
+ result .complete (preparedStatement );
291
}
260
}
292
}
261
}
293
262
0 commit comments