forked from apache/cassandra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ReadCommand.java
1815 lines (1541 loc) · 82 KB
/
ReadCommand.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.db.monitoring.MonitorableImpl;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.RTBoundCloser;
import org.apache.cassandra.db.transform.RTBoundValidator;
import org.apache.cassandra.db.transform.RTBoundValidator.Stage;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.io.ForwardingVersionedSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.UnknownIndexException;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
* General interface for storage-engine read commands (common to both range and
* single partition commands).
* <p>
* This contains all the informations needed to do a local read.
*/
public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
{
private static final int TEST_ITERATION_DELAY_MILLIS = Integer.parseInt(System.getProperty("cassandra.test.read_iteration_delay_ms", "0"));
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
// For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
// Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>()
{
protected IVersionedSerializer<ReadCommand> delegate(int version)
{
return version < MessagingService.VERSION_30
? legacyReadCommandSerializer : serializer;
}
};
// For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version.
// Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>()
{
protected IVersionedSerializer<ReadCommand> delegate(int version)
{
return version < MessagingService.VERSION_30
? legacyRangeSliceCommandSerializer : serializer;
}
};
// For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version.
// Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility.
public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>()
{
protected IVersionedSerializer<ReadCommand> delegate(int version)
{
return version < MessagingService.VERSION_30
? legacyPagedRangeCommandSerializer : serializer;
}
};
public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
private final Kind kind;
private final CFMetaData metadata;
private final int nowInSec;
private final ColumnFilter columnFilter;
private final RowFilter rowFilter;
private final DataLimits limits;
private final boolean isDigestQuery;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
private final boolean isForThrift;
@Nullable
private final IndexMetadata index;
protected static abstract class SelectionDeserializer
{
public abstract ReadCommand deserialize(DataInputPlus in,
int version,
boolean isDigest,
int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
IndexMetadata index) throws IOException;
}
protected enum Kind
{
SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer);
private final SelectionDeserializer selectionDeserializer;
Kind(SelectionDeserializer selectionDeserializer)
{
this.selectionDeserializer = selectionDeserializer;
}
}
protected ReadCommand(Kind kind,
boolean isDigestQuery,
int digestVersion,
boolean isForThrift,
CFMetaData metadata,
int nowInSec,
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
IndexMetadata index)
{
this.kind = kind;
this.isDigestQuery = isDigestQuery;
this.digestVersion = digestVersion;
this.isForThrift = isForThrift;
this.metadata = metadata;
this.nowInSec = nowInSec;
this.columnFilter = columnFilter;
this.rowFilter = rowFilter;
this.limits = limits;
this.index = index;
}
protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
protected abstract long selectionSerializedSize(int version);
public abstract boolean isLimitedToOnePartition();
/**
* Creates a new <code>ReadCommand</code> instance with new limits.
*
* @param newLimits the new limits
* @return a new <code>ReadCommand</code> with the updated limits
*/
public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
/**
* The metadata for the table queried.
*
* @return the metadata for the table queried.
*/
public CFMetaData metadata()
{
return metadata;
}
/**
* The time in seconds to use as "now" for this query.
* <p>
* We use the same time as "now" for the whole query to avoid considering different
* values as expired during the query, which would be buggy (would throw of counting amongst other
* things).
*
* @return the time (in seconds) to use as "now".
*/
public int nowInSec()
{
return nowInSec;
}
/**
* The configured timeout for this command.
*
* @return the configured timeout for this command.
*/
public abstract long getTimeout();
/**
* A filter on which (non-PK) columns must be returned by the query.
*
* @return which columns must be fetched by this query.
*/
public ColumnFilter columnFilter()
{
return columnFilter;
}
/**
* Filters/Resrictions on CQL rows.
* <p>
* This contains the restrictions that are not directly handled by the
* {@code ClusteringIndexFilter}. More specifically, this includes any non-PK column
* restrictions and can include some PK columns restrictions when those can't be
* satisfied entirely by the clustering index filter (because not all clustering columns
* have been restricted for instance). If there is 2ndary indexes on the table,
* one of this restriction might be handled by a 2ndary index.
*
* @return the filter holding the expression that rows must satisfy.
*/
public RowFilter rowFilter()
{
return rowFilter;
}
/**
* The limits set on this query.
*
* @return the limits set on this query.
*/
public DataLimits limits()
{
return limits;
}
/**
* Whether this query is a digest one or not.
*
* @return Whether this query is a digest query.
*/
public boolean isDigestQuery()
{
return isDigestQuery;
}
/**
* If the query is a digest one, the requested digest version.
*
* @return the requested digest version if the query is a digest. Otherwise, this can return
* anything.
*/
public int digestVersion()
{
return digestVersion;
}
/**
* Sets the digest version, for when digest for that command is requested.
* <p>
* Note that we allow setting this independently of setting the command as a digest query as
* this allows us to use the command as a carrier of the digest version even if we only call
* setIsDigestQuery on some copy of it.
*
* @param digestVersion the version for the digest is this command is used for digest query..
* @return this read command.
*/
public ReadCommand setDigestVersion(int digestVersion)
{
this.digestVersion = digestVersion;
return this;
}
/**
* Whether this query is for thrift or not.
*
* @return whether this query is for thrift.
*/
public boolean isForThrift()
{
return isForThrift;
}
/**
* Index (metadata) chosen for this query. Can be null.
*
* @return index (metadata) chosen for this query
*/
@Nullable
public IndexMetadata indexMetadata()
{
return index;
}
/**
* The clustering index filter this command to use for the provided key.
* <p>
* Note that that method should only be called on a key actually queried by this command
* and in practice, this will almost always return the same filter, but for the sake of
* paging, the filter on the first key of a range command might be slightly different.
*
* @param key a partition key queried by this command.
*
* @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
*/
public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
/**
* Returns a copy of this command.
*
* @return a copy of this command.
*/
public abstract ReadCommand copy();
/**
* Returns a copy of this command with isDigestQuery set to true.
*/
public abstract ReadCommand copyAsDigestQuery();
protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
protected abstract int oldestUnrepairedTombstone();
/**
* Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*
* @return whether the underlying {@code ClusteringIndexFilter} is reversed or not.
*/
public abstract boolean isReversed();
public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
{
// validate that the sequence of RT markers is correct: open is followed by close, deletion times for both
// ends equal, and there are no dangling RT bound in any partition.
iterator = RTBoundValidator.validate(iterator, Stage.PROCESSED, true);
return isDigestQuery()
? ReadResponse.createDigestResponse(iterator, this)
: ReadResponse.createDataResponse(iterator, this);
}
long indexSerializedSize(int version)
{
return null != index
? IndexMetadata.serializer.serializedSize(index, version)
: 0;
}
public Index getIndex(ColumnFamilyStore cfs)
{
return null != index
? cfs.indexManager.getIndex(index)
: null;
}
static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter)
{
if (table.getIndexes().isEmpty() || rowFilter.isEmpty())
return null;
ColumnFamilyStore cfs = Keyspace.openAndGetStore(table);
Index index = cfs.indexManager.getBestIndexFor(rowFilter);
return null != index
? index.getIndexMetadata()
: null;
}
/**
* If the index manager for the CFS determines that there's an applicable
* 2i that can be used to execute this command, call its (optional)
* validation method to check that nothing in this command's parameters
* violates the implementation specific validation rules.
*/
public void maybeValidateIndex()
{
Index index = getIndex(Keyspace.openAndGetStore(metadata));
if (null != index)
index.validate(this);
}
/**
* Executes this command on the local host.
*
* @param executionController the execution controller spanning this command
*
* @return an iterator over the result of executing this command locally.
*/
@SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
// iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
{
long startTimeNanos = System.nanoTime();
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
Index index = getIndex(cfs);
Index.Searcher searcher = null;
if (index != null)
{
if (!cfs.indexManager.isIndexQueryable(index))
throw new IndexNotAvailableException(index);
searcher = index.searcherFor(this);
Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
}
UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController);
iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false);
try
{
iterator = withStateTracking(iterator);
iterator = RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric, startTimeNanos);
// If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
// no point in checking it again.
RowFilter filter = (null == searcher) ? rowFilter() : index.getPostIndexQueryFilter(rowFilter());
/*
* TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
* we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
* would be more efficient (the sooner we discard stuff we know we don't care, the less useless
* processing we do on it).
*/
iterator = filter.filter(iterator, nowInSec());
// apply the limits/row counter; this transformation is stopping and would close the iterator as soon
// as the count is observed; if that happens in the middle of an open RT, its end bound will not be included.
iterator = limits().filter(iterator, nowInSec(), selectsFullPartition());
// because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter.
return RTBoundCloser.close(iterator);
}
catch (RuntimeException | Error e)
{
iterator.close();
throw e;
}
}
protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
public PartitionIterator executeInternal(ReadExecutionController controller)
{
return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec());
}
public ReadExecutionController executionController()
{
return ReadExecutionController.forCommand(this);
}
/**
* Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
* This also log warning/trow TombstoneOverwhelmingException if appropriate.
*/
private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
{
class MetricRecording extends Transformation<UnfilteredRowIterator>
{
private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().ksName);
private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness();
private int liveRows = 0;
private int tombstones = 0;
private DecoratedKey currentKey;
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
currentKey = iter.partitionKey();
return Transformation.apply(iter, this);
}
@Override
public Row applyToStatic(Row row)
{
return applyToRow(row);
}
/**
* Count the number of live rows returned by the read command and the number of tombstones.
*
* Tombstones come in two forms on rows :
* - cells that aren't live anymore (either expired through TTL or deleted) : 1 tombstone per cell
* - Rows that aren't live and have no cell (DELETEs performed on the primary key) : 1 tombstone per row
* We avoid counting rows as tombstones if they contain nothing but expired cells.
*/
@Override
public Row applyToRow(Row row)
{
boolean hasTombstones = false;
for (Cell cell : row.cells())
{
if (!cell.isLive(ReadCommand.this.nowInSec()))
{
countTombstone(row.clustering());
hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired
}
}
if (row.hasLiveData(ReadCommand.this.nowInSec(), enforceStrictLiveness))
++liveRows;
else if (!row.primaryKeyLivenessInfo().isLive(ReadCommand.this.nowInSec())
&& row.hasDeletion(ReadCommand.this.nowInSec())
&& !hasTombstones)
{
// We're counting primary key deletions only here.
countTombstone(row.clustering());
}
return row;
}
@Override
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
countTombstone(marker.clustering());
return marker;
}
private void countTombstone(ClusteringPrefix clustering)
{
++tombstones;
if (tombstones > failureThreshold && respectTombstoneThresholds)
{
String query = ReadCommand.this.toCQLString();
Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
}
}
@Override
public void onClose()
{
recordLatency(metric, System.nanoTime() - startTimeNanos);
metric.tombstoneScannedHistogram.update(tombstones);
metric.liveScannedHistogram.update(liveRows);
boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
if (warnTombstones)
{
String msg = String.format(
"Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)",
liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken());
ClientWarn.instance.warn(msg);
logger.warn(msg);
}
Tracing.trace("Read {} live rows and {} tombstone cells{}",
liveRows, tombstones,
(warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
}
return Transformation.apply(iter, new MetricRecording());
}
protected class CheckForAbort extends StoppingTransformation<UnfilteredRowIterator>
{
long lastChecked = 0;
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
if (maybeAbort())
{
partition.close();
return null;
}
return Transformation.apply(partition, this);
}
protected Row applyToRow(Row row)
{
if (TEST_ITERATION_DELAY_MILLIS > 0)
maybeDelayForTesting();
return maybeAbort() ? null : row;
}
private boolean maybeAbort()
{
/**
* The value returned by ApproximateTime.currentTimeMillis() is updated only every
* {@link ApproximateTime.CHECK_INTERVAL_MS}, by default 10 millis. Since MonitorableImpl
* relies on ApproximateTime, we don't need to check unless the approximate time has elapsed.
*/
if (lastChecked == ApproximateTime.currentTimeMillis())
return false;
lastChecked = ApproximateTime.currentTimeMillis();
if (isAborted())
{
stop();
return true;
}
return false;
}
private void maybeDelayForTesting()
{
if (!metadata.ksName.startsWith("system"))
FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
}
}
protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionIterator iter)
{
return Transformation.apply(iter, new CheckForAbort());
}
/**
* Creates a message for this command.
*/
public abstract MessageOut<ReadCommand> createMessage(int version);
protected abstract void appendCQLWhereClause(StringBuilder sb);
// Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
// can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
// are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
{
final boolean isForThrift = iterator.isForThrift();
class WithoutPurgeableTombstones extends PurgeFunction
{
public WithoutPurgeableTombstones()
{
super(isForThrift,
nowInSec(),
cfs.gcBefore(nowInSec()),
oldestUnrepairedTombstone(),
cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
cfs.metadata.enforceStrictLiveness());
}
protected Predicate<Long> getPurgeEvaluator()
{
return time -> true;
}
}
return Transformation.apply(iterator, new WithoutPurgeableTombstones());
}
/**
* Recreate the CQL string corresponding to this query.
* <p>
* Note that in general the returned string will not be exactly the original user string, first
* because there isn't always a single syntax for a given query, but also because we don't have
* all the information needed (we know the non-PK columns queried but not the PK ones as internally
* we query them all). So this shouldn't be relied too strongly, but this should be good enough for
* debugging purpose which is what this is for.
*/
public String toCQLString()
{
StringBuilder sb = new StringBuilder();
sb.append("SELECT ").append(columnFilter().toCQLString());
sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata().cfName);
appendCQLWhereClause(sb);
if (limits() != DataLimits.NONE)
sb.append(' ').append(limits());
return sb.toString();
}
// Monitorable interface
public String name()
{
return toCQLString();
}
private static class Serializer implements IVersionedSerializer<ReadCommand>
{
private static int digestFlag(boolean isDigest)
{
return isDigest ? 0x01 : 0;
}
private static boolean isDigest(int flags)
{
return (flags & 0x01) != 0;
}
private static int thriftFlag(boolean isForThrift)
{
return isForThrift ? 0x02 : 0;
}
private static boolean isForThrift(int flags)
{
return (flags & 0x02) != 0;
}
private static int indexFlag(boolean hasIndex)
{
return hasIndex ? 0x04 : 0;
}
private static boolean hasIndex(int flags)
{
return (flags & 0x04) != 0;
}
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
assert version >= MessagingService.VERSION_30;
out.writeByte(command.kind.ordinal());
out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index));
if (command.isDigestQuery())
out.writeUnsignedVInt(command.digestVersion());
CFMetaData.serializer.serialize(command.metadata(), out, version);
out.writeInt(command.nowInSec());
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator);
if (null != command.index)
IndexMetadata.serializer.serialize(command.index, out, version);
command.serializeSelection(out, version);
}
public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
{
assert version >= MessagingService.VERSION_30;
Kind kind = Kind.values()[in.readByte()];
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean isForThrift = isForThrift(flags);
boolean hasIndex = hasIndex(flags);
int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
int nowInSec = in.readInt();
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator);
IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
}
private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
{
try
{
return IndexMetadata.serializer.deserialize(in, version, cfm);
}
catch (UnknownIndexException e)
{
logger.info("Couldn't find a defined index on {}.{} with the id {}. " +
"If an index was just created, this is likely due to the schema not " +
"being fully propagated. Local read will proceed without using the " +
"index. Please wait for schema agreement after index creation.",
cfm.ksName, cfm.cfName, e.indexId);
return null;
}
}
public long serializedSize(ReadCommand command, int version)
{
assert version >= MessagingService.VERSION_30;
return 2 // kind + flags
+ (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
+ CFMetaData.serializer.serializedSize(command.metadata(), version)
+ TypeSizes.sizeof(command.nowInSec())
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ DataLimits.serializer.serializedSize(command.limits(), version, command.metadata.comparator)
+ command.selectionSerializedSize(version)
+ command.indexSerializedSize(version);
}
}
private enum LegacyType
{
GET_BY_NAMES((byte)1),
GET_SLICES((byte)2);
public final byte serializedValue;
LegacyType(byte b)
{
this.serializedValue = b;
}
public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
{
return kind == ClusteringIndexFilter.Kind.SLICE
? GET_SLICES
: GET_BY_NAMES;
}
public static LegacyType fromSerializedValue(byte b)
{
return b == 1 ? GET_BY_NAMES : GET_SLICES;
}
}
/**
* Serializer for pre-3.0 RangeSliceCommands.
*/
private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
{
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
assert version < MessagingService.VERSION_30;
PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
assert !rangeCommand.dataRange().isPaging();
// convert pre-3.0 incompatible names filters to slice filters
rangeCommand = maybeConvertNamesToSlice(rangeCommand);
CFMetaData metadata = rangeCommand.metadata();
out.writeUTF(metadata.ksName);
out.writeUTF(metadata.cfName);
out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis
// begin DiskAtomFilterSerializer.serialize()
if (rangeCommand.isNamesQuery())
{
out.writeByte(1); // 0 for slices, 1 for names
ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
}
else
{
out.writeByte(0); // 0 for slices, 1 for names
// slice filter serialization
ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
out.writeBoolean(filter.isReversed());
// limit
DataLimits limits = rangeCommand.limits();
if (limits.isDistinct())
out.writeInt(1);
else
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
int compositesToGroup;
boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
compositesToGroup = -1;
else if (limits.isDistinct() && !selectsStatics)
compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490)
else
compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
out.writeInt(compositesToGroup);
}
serializeRowFilter(out, rangeCommand.rowFilter());
AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
// maxResults
out.writeInt(rangeCommand.limits().count());
// countCQL3Rows
if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT
out.writeBoolean(false);
else
out.writeBoolean(true);
// isPaging
out.writeBoolean(false);
}
public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
{
assert version < MessagingService.VERSION_30;
String keyspace = in.readUTF();
String columnFamily = in.readUTF();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
if (metadata == null)
{
String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
throw new UnknownColumnFamilyException(message, null);
}
int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds
ClusteringIndexFilter filter;
ColumnFilter selection;
int compositesToGroup = 0;
int perPartitionLimit = -1;
byte readType = in.readByte(); // 0 for slices, 1 for names
if (readType == 1)
{
Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
selection = selectionAndFilter.left;
filter = selectionAndFilter.right;
}
else
{
Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
filter = p.left;
perPartitionLimit = in.readInt();
compositesToGroup = in.readInt();
selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
}
RowFilter rowFilter = deserializeRowFilter(in, metadata);
AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
int maxResults = in.readInt();
boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed)
in.readBoolean(); // isPaging (not needed)
boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
// We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
// we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
// direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
// that fact.
boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
DataLimits limits;
if (isDistinct)
limits = DataLimits.distinctLimits(maxResults);
else if (compositesToGroup == -1)
limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
else if (metadata.isStaticCompactTable())
limits = DataLimits.legacyCompactStaticCqlLimits(maxResults);
else
limits = DataLimits.cqlLimits(maxResults);
return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
}
static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
{
ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
out.writeInt(indexExpressions.size());
for (RowFilter.Expression expression : indexExpressions)
{
ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
expression.operator().writeTo(out);
ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
}
}
static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
{
int numRowFilters = in.readInt();
if (numRowFilters == 0)
return RowFilter.NONE;
RowFilter rowFilter = RowFilter.create(numRowFilters);
for (int i = 0; i < numRowFilters; i++)
{
ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
ColumnDefinition column = metadata.getColumnDefinition(columnName);
Operator op = Operator.readFrom(in);
ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
rowFilter.add(column, op, indexValue);
}
return rowFilter;
}