-
Notifications
You must be signed in to change notification settings - Fork 875
/
ArrayBackedResultSet.java
515 lines (433 loc) · 22.7 KB
/
ArrayBackedResultSet.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
/*
* Copyright (C) 2012-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.driver.core;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.DriverInternalError;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
/**
* Default implementation of a result set, backed by an ArrayDeque of ArrayList.
*/
abstract class ArrayBackedResultSet implements ResultSet {
private static final Logger logger = LoggerFactory.getLogger(ResultSet.class);
private static final Queue<List<ByteBuffer>> EMPTY_QUEUE = new ArrayDeque<List<ByteBuffer>>(0);
protected volatile ColumnDefinitions metadata;
protected final Token.Factory tokenFactory;
private final boolean wasApplied;
protected final ProtocolVersion protocolVersion;
protected final CodecRegistry codecRegistry;
private ArrayBackedResultSet(ColumnDefinitions metadata, Token.Factory tokenFactory, List<ByteBuffer> firstRow, ProtocolVersion protocolVersion, CodecRegistry codecRegistry) {
this.metadata = metadata;
this.protocolVersion = protocolVersion;
this.codecRegistry = codecRegistry;
this.tokenFactory = tokenFactory;
this.wasApplied = checkWasApplied(firstRow, metadata, protocolVersion);
}
static ArrayBackedResultSet fromMessage(Responses.Result msg, SessionManager session, ProtocolVersion protocolVersion, ExecutionInfo info, Statement statement) {
switch (msg.kind) {
case ROWS:
Responses.Result.Rows r = (Responses.Result.Rows) msg;
Statement actualStatement = statement;
if (statement instanceof StatementWrapper) {
actualStatement = ((StatementWrapper) statement).getWrappedStatement();
}
ColumnDefinitions columnDefs = r.metadata.columns;
if (columnDefs == null) {
// If result set metadata is not present, it means the request had SKIP_METADATA set, the driver
// only ever does that for bound statements.
BoundStatement bs = (BoundStatement) actualStatement;
columnDefs = bs.preparedStatement().getPreparedId().resultSetMetadata.variables;
} else {
// Otherwise, always use the response's metadata.
// In addition, if a new id is present it means we're executing a bound statement with protocol v5,
// the schema changed server-side, and we need to update the prepared statement (see
// CASSANDRA-10786).
MD5Digest newMetadataId = r.metadata.metadataId;
assert !(actualStatement instanceof BoundStatement) ||
ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(protocolVersion) ||
newMetadataId == null;
if (newMetadataId != null) {
BoundStatement bs = ((BoundStatement) actualStatement);
PreparedId preparedId = bs.preparedStatement().getPreparedId();
// Extra test for CASSANDRA-13992: conditional updates yield a different result set depending on
// whether the update was applied or not, so the prepared statement must never have result
// metadata, and we should always execute with skip_metadata = false.
// However the server sends a new_metadata_id in the response, so make sure we ignore it if the
// prepared statement did not have metadata in the first place.
// TODO remove the "if" (i.e. always assign resultSetMetadata) if CASSANDRA-13992 gets fixed before 4.0.0 GA
if (preparedId.resultSetMetadata.variables != null) {
preparedId.resultSetMetadata =
new PreparedId.PreparedMetadata(newMetadataId, columnDefs);
}
}
}
assert columnDefs != null;
Token.Factory tokenFactory = (session == null) ? null
: session.getCluster().manager.metadata.tokenFactory();
info = update(info, r, session, r.metadata.pagingState, protocolVersion, columnDefs.codecRegistry, statement);
// info can be null only for internal calls, but we don't page those. We assert
// this explicitly because MultiPage implementation doesn't support info == null.
assert r.metadata.pagingState == null || info != null;
return r.metadata.pagingState == null
? new SinglePage(columnDefs, tokenFactory, protocolVersion, columnDefs.codecRegistry, r.data, info)
: new MultiPage(columnDefs, tokenFactory, protocolVersion, columnDefs.codecRegistry, r.data, info, r.metadata.pagingState, session);
case VOID:
case SET_KEYSPACE:
case SCHEMA_CHANGE:
info = update(info, msg, session, null, protocolVersion, null, statement);
return empty(info);
case PREPARED:
throw new RuntimeException("Prepared statement received when a ResultSet was expected");
default:
logger.error("Received unknown result type '{}'; returning empty result set", msg.kind);
info = update(info, msg, session, null, protocolVersion, null, statement);
return empty(info);
}
}
private static ExecutionInfo update(ExecutionInfo info, Responses.Result msg, SessionManager session,
ByteBuffer pagingState, ProtocolVersion protocolVersion, CodecRegistry codecRegistry,
Statement statement) {
if (info == null)
return null;
UUID tracingId = msg.getTracingId();
QueryTrace trace = (tracingId == null) ? null : new QueryTrace(tracingId, session);
return info.with(trace, msg.warnings, pagingState, statement, protocolVersion, codecRegistry);
}
private static ArrayBackedResultSet empty(ExecutionInfo info) {
// We could pass the protocol version but we know we won't need it so passing a bogus value (null)
return new SinglePage(ColumnDefinitions.EMPTY, null, null, null, EMPTY_QUEUE, info);
}
@Override
public ColumnDefinitions getColumnDefinitions() {
return metadata;
}
@Override
public List<Row> all() {
if (isExhausted())
return Collections.emptyList();
// We may have more than 'getAvailableWithoutFetching' results but we won't have less, and
// at least in the single page case this will be exactly the size we want so ...
List<Row> result = new ArrayList<Row>(getAvailableWithoutFetching());
for (Row row : this)
result.add(row);
return result;
}
@Override
public Iterator<Row> iterator() {
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return !isExhausted();
}
@Override
public Row next() {
return ArrayBackedResultSet.this.one();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public boolean wasApplied() {
return wasApplied;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ResultSet[ exhausted: ").append(isExhausted());
sb.append(", ").append(metadata).append(']');
return sb.toString();
}
private static class SinglePage extends ArrayBackedResultSet {
private final Queue<List<ByteBuffer>> rows;
private final ExecutionInfo info;
private SinglePage(ColumnDefinitions metadata,
Token.Factory tokenFactory,
ProtocolVersion protocolVersion,
CodecRegistry codecRegistry,
Queue<List<ByteBuffer>> rows,
ExecutionInfo info) {
super(metadata, tokenFactory, rows.peek(), protocolVersion, codecRegistry);
this.info = info;
this.rows = rows;
}
@Override
public boolean isExhausted() {
return rows.isEmpty();
}
@Override
public Row one() {
return ArrayBackedRow.fromData(metadata, tokenFactory, protocolVersion, rows.poll());
}
@Override
public int getAvailableWithoutFetching() {
return rows.size();
}
@Override
public boolean isFullyFetched() {
return true;
}
@Override
public ListenableFuture<ResultSet> fetchMoreResults() {
return Futures.<ResultSet>immediateFuture(this);
}
@Override
public ExecutionInfo getExecutionInfo() {
return info;
}
@Override
public List<ExecutionInfo> getAllExecutionInfo() {
return Collections.singletonList(info);
}
}
private static class MultiPage extends ArrayBackedResultSet {
private Queue<List<ByteBuffer>> currentPage;
private final Queue<NextPage> nextPages = new ConcurrentLinkedQueue<NextPage>();
private final Deque<ExecutionInfo> infos = new LinkedBlockingDeque<ExecutionInfo>();
/*
* The fetching state of this result set. The fetchState will always be in one of
* the 3 following state:
* 1) fetchState is null or reference a null: fetching is done, there
* is nothing more to fetch and no query in progress.
* 2) fetchState.get().nextStart is not null: there is more pages to fetch. In
* that case, inProgress is *guaranteed* to be null.
* 3) fetchState.get().inProgress is not null: a page is being fetched.
* In that case, nextStart is *guaranteed* to be null.
*
* Also note that while ResultSet doesn't pretend to be thread-safe, the actual
* fetch is done asynchronously and so we do need to be volatile below.
*/
private volatile FetchingState fetchState;
private final SessionManager session;
private MultiPage(ColumnDefinitions metadata,
Token.Factory tokenFactory,
ProtocolVersion protocolVersion,
CodecRegistry codecRegistry,
Queue<List<ByteBuffer>> rows,
ExecutionInfo info,
ByteBuffer pagingState,
SessionManager session) {
// Note: as of Cassandra 2.1.0, it turns out that the result of a CAS update is never paged, so
// we could hard-code the result of wasApplied in this class to "true". However, we can not be sure
// that this will never change, so apply the generic check by peeking at the first row.
super(metadata, tokenFactory, rows.peek(), protocolVersion, codecRegistry);
this.currentPage = rows;
this.infos.offer(info);
this.fetchState = new FetchingState(pagingState, null);
this.session = session;
}
@Override
public boolean isExhausted() {
prepareNextRow();
return currentPage.isEmpty();
}
@Override
public Row one() {
prepareNextRow();
return ArrayBackedRow.fromData(metadata, tokenFactory, protocolVersion, currentPage.poll());
}
@Override
public int getAvailableWithoutFetching() {
int available = currentPage.size();
for (NextPage page : nextPages)
available += page.data.size();
return available;
}
@Override
public boolean isFullyFetched() {
return fetchState == null;
}
// Ensure that after the call the next row to consume is in 'currentPage', i.e. that
// 'currentPage' is empty IFF the ResultSet if fully exhausted.
private void prepareNextRow() {
while (currentPage.isEmpty()) {
// Grab the current state now to get a consistent view in this iteration.
FetchingState fetchingState = this.fetchState;
NextPage nextPage = nextPages.poll();
if (nextPage != null) {
if (nextPage.metadata != null) {
this.metadata = nextPage.metadata;
}
currentPage = nextPage.data;
continue;
}
if (fetchingState == null)
return;
// We need to know if there is more result, so fetch the next page and
// wait on it.
try {
session.checkNotInEventLoop();
Uninterruptibles.getUninterruptibly(fetchMoreResults());
} catch (ExecutionException e) {
throw DriverThrowables.propagateCause(e);
}
}
}
@Override
public ListenableFuture<ResultSet> fetchMoreResults() {
return fetchMoreResults(this.fetchState);
}
private ListenableFuture<ResultSet> fetchMoreResults(FetchingState fetchState) {
if (fetchState == null)
return Futures.<ResultSet>immediateFuture(this);
if (fetchState.inProgress != null)
return fetchState.inProgress;
assert fetchState.nextStart != null;
ByteBuffer state = fetchState.nextStart;
SettableFuture<ResultSet> future = SettableFuture.create();
this.fetchState = new FetchingState(null, future);
return queryNextPage(state, future);
}
private ListenableFuture<ResultSet> queryNextPage(ByteBuffer nextStart, final SettableFuture<ResultSet> future) {
Statement statement = this.infos.peek().getStatement();
assert !(statement instanceof BatchStatement);
final Message.Request request = session.makeRequestMessage(statement, nextStart);
session.execute(new RequestHandler.Callback() {
@Override
public Message.Request request() {
return request;
}
@Override
public void register(RequestHandler handler) {
}
@Override
public void onSet(Connection connection, Message.Response response, ExecutionInfo info, Statement statement, long latency) {
try {
switch (response.type) {
case RESULT:
Responses.Result rm = (Responses.Result) response;
if (rm.kind == Responses.Result.Kind.ROWS) {
Responses.Result.Rows rows = (Responses.Result.Rows) rm;
info = update(info, rm, MultiPage.this.session, rows.metadata.pagingState, protocolVersion, codecRegistry, statement);
// If the query is a prepared 'SELECT *', the metadata can change between pages
ColumnDefinitions newMetadata = null;
if (rows.metadata.metadataId != null) {
newMetadata = rows.metadata.columns;
assert statement instanceof BoundStatement;
BoundStatement bs = (BoundStatement) statement;
bs.preparedStatement().getPreparedId().resultSetMetadata =
new PreparedId.PreparedMetadata(rows.metadata.metadataId, rows.metadata.columns);
}
MultiPage.this.nextPages.offer(new NextPage(newMetadata, rows.data));
MultiPage.this.fetchState = rows.metadata.pagingState == null ? null : new FetchingState(rows.metadata.pagingState, null);
} else if (rm.kind == Responses.Result.Kind.VOID) {
// We shouldn't really get a VOID message here but well, no harm in handling it I suppose
info = update(info, rm, MultiPage.this.session, null, protocolVersion, codecRegistry, statement);
MultiPage.this.fetchState = null;
} else {
logger.error("Received unknown result type '{}' during paging: ignoring message", rm.kind);
// This mean we have probably have a bad node, so defunct the connection
connection.defunct(new ConnectionException(connection.address, String.format("Got unexpected %s result response", rm.kind)));
future.setException(new DriverInternalError(String.format("Got unexpected %s result response from %s", rm.kind, connection.address)));
return;
}
MultiPage.this.infos.offer(info);
future.set(MultiPage.this);
break;
case ERROR:
future.setException(((Responses.Error) response).asException(connection.address));
break;
default:
// This mean we have probably have a bad node, so defunct the connection
connection.defunct(new ConnectionException(connection.address, String.format("Got unexpected %s response", response.type)));
future.setException(new DriverInternalError(String.format("Got unexpected %s response from %s", response.type, connection.address)));
break;
}
} catch (RuntimeException e) {
// If we get a bug here, the client will not get it, so better forwarding the error
future.setException(new DriverInternalError("Unexpected error while processing response from " + connection.address, e));
}
}
// This is only called for internal calls, so don't bother with ExecutionInfo
@Override
public void onSet(Connection connection, Message.Response response, long latency, int retryCount) {
onSet(connection, response, null, null, latency);
}
@Override
public void onException(Connection connection, Exception exception, long latency, int retryCount) {
future.setException(exception);
}
@Override
public boolean onTimeout(Connection connection, long latency, int retryCount) {
// This won't be called directly since this will be wrapped by RequestHandler.
throw new UnsupportedOperationException();
}
@Override
public int retryCount() {
// This is only called for internal calls (i.e, when the callback is not wrapped in RequestHandler).
// There is no retry logic in that case, so the value does not really matter.
return 0;
}
}, statement);
return future;
}
@Override
public ExecutionInfo getExecutionInfo() {
return infos.getLast();
}
@Override
public List<ExecutionInfo> getAllExecutionInfo() {
return new ArrayList<ExecutionInfo>(infos);
}
private static class FetchingState {
public final ByteBuffer nextStart;
public final ListenableFuture<ResultSet> inProgress;
FetchingState(ByteBuffer nextStart, ListenableFuture<ResultSet> inProgress) {
assert (nextStart == null) != (inProgress == null);
this.nextStart = nextStart;
this.inProgress = inProgress;
}
}
private static class NextPage {
final ColumnDefinitions metadata;
final Queue<List<ByteBuffer>> data;
NextPage(ColumnDefinitions metadata, Queue<List<ByteBuffer>> data) {
this.metadata = metadata;
this.data = data;
}
}
}
// This method checks the value of the "[applied]" column manually, to avoid instantiating an ArrayBackedRow
// object that we would throw away immediately.
private static boolean checkWasApplied(List<ByteBuffer> firstRow, ColumnDefinitions metadata, ProtocolVersion protocolVersion) {
// If the column is not present or not a boolean, we assume the query
// was not a conditional statement, and therefore return true.
if (firstRow == null)
return true;
int[] is = metadata.findAllIdx("[applied]");
if (is == null)
return true;
int i = is[0];
if (!DataType.cboolean().equals(metadata.getType(i)))
return true;
// Otherwise return the value of the column
ByteBuffer value = firstRow.get(i);
if (value == null || value.remaining() == 0)
return false;
return TypeCodec.cboolean().deserializeNoBoxing(value, protocolVersion);
}
}