-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
TimeWindowCompactionStrategy.java
457 lines (395 loc) · 19.2 KB
/
TimeWindowCompactionStrategy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.Pair;
import static com.google.common.collect.Iterables.filter;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);
private final TimeWindowCompactionStrategyOptions options;
protected volatile int estimatedRemainingTasks;
private final Set<SSTableReader> sstables = new HashSet<>();
private long lastExpiredCheck;
private long highestWindowSeen;
// This is accessed in both the threading context of compaction / repair and also JMX
private volatile Map<Long, Integer> sstableCountByBuckets = Collections.emptyMap();
public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
super(cfs, options);
this.estimatedRemainingTasks = 0;
this.options = new TimeWindowCompactionStrategyOptions(options);
String[] tsOpts = { UNCHECKED_TOMBSTONE_COMPACTION_OPTION, TOMBSTONE_COMPACTION_INTERVAL_OPTION, TOMBSTONE_THRESHOLD_OPTION };
if (Arrays.stream(tsOpts).map(o -> options.get(o)).filter(Objects::nonNull).anyMatch(v -> !v.equals("false")))
{
logger.debug("Enabling tombstone compactions for TWCS");
}
else
{
logger.debug("Disabling tombstone compactions for TWCS");
disableTombstoneCompactions = true;
}
}
@Override
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
if (latestBucket.isEmpty())
return null;
// Already tried acquiring references without success. It means there is a race with
// the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
if (latestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
latestBucket);
return null;
}
LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
if (modifier != null)
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
previousCandidate = latestBucket;
}
}
/**
*
* @param gcBefore
* @return
*/
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
return Collections.emptyList();
Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
// Find fully expired SSTables. Those will be included no matter what.
Set<SSTableReader> expired = Collections.emptySet();
if (currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
{
logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, options.ignoreOverlaps ? Collections.emptySet() : cfs.getOverlappingLiveSSTables(uncompacting),
gcBefore, options.ignoreOverlaps);
lastExpiredCheck = currentTimeMillis();
}
else
{
logger.debug("TWCS skipping check for fully expired SSTables");
}
Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
if (!expired.isEmpty())
{
logger.debug("Including expired sstables: {}", expired);
compactionCandidates.addAll(expired);
}
return compactionCandidates;
}
private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore)
{
List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables);
if (mostInteresting != null)
{
return mostInteresting;
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
for (SSTableReader sstable : nonExpiringSSTables)
{
if (worthDroppingTombstones(sstable, gcBefore))
sstablesWithTombstones.add(sstable);
}
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();
return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator));
}
private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables)
{
Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution);
// Update the highest window seen, if necessary
if(buckets.right > this.highestWindowSeen)
this.highestWindowSeen = buckets.right;
NewestBucket mostInteresting = newestBucket(buckets.left,
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold(),
options.stcsOptions,
this.highestWindowSeen);
this.estimatedRemainingTasks = mostInteresting.estimatedRemainingTasks;
this.sstableCountByBuckets = buckets.left.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> buckets.left.get(k).size()));
if (!mostInteresting.sstables.isEmpty())
return mostInteresting.sstables;
return null;
}
@Override
public synchronized void addSSTable(SSTableReader sstable)
{
sstables.add(sstable);
}
@Override
public synchronized void removeSSTable(SSTableReader sstable)
{
sstables.remove(sstable);
}
@Override
protected synchronized Set<SSTableReader> getSSTables()
{
return ImmutableSet.copyOf(sstables);
}
/**
* Find the lowest and highest timestamps in a given timestamp/unit pair
* Returns milliseconds, caller should adjust accordingly
*/
public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis)
{
long lowerTimestamp;
long upperTimestamp;
long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS);
switch(windowTimeUnit)
{
case MINUTES:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L;
break;
case HOURS:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L;
break;
case DAYS:
default:
lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400L * windowTimeSize));
upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L;
break;
}
return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS),
TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS));
}
/**
* Group files with similar max timestamp into buckets.
*
* @param files pairs consisting of a file and its min timestamp
* @param sstableWindowUnit
* @param sstableWindowSize
* @param timestampResolution
* @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
*/
@VisibleForTesting
static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution)
{
HashMultimap<Long, SSTableReader> buckets = HashMultimap.create();
long maxTimestamp = 0;
// Create hash map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (SSTableReader f : files)
{
assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution);
long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution);
Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp);
buckets.put(bounds.left, f);
if (bounds.left > maxTimestamp)
maxTimestamp = bounds.left;
}
logger.trace("buckets {}, max timestamp {}", buckets, maxTimestamp);
return Pair.create(buckets, maxTimestamp);
}
static final class NewestBucket
{
/** The sstables that should be compacted next */
final List<SSTableReader> sstables;
/** The number of tasks estimated */
final int estimatedRemainingTasks;
NewestBucket(List<SSTableReader> sstables, int estimatedRemainingTasks)
{
this.sstables = sstables;
this.estimatedRemainingTasks = estimatedRemainingTasks;
}
@Override
public String toString()
{
return String.format("sstables: %s, estimated remaining tasks: %d", sstables, estimatedRemainingTasks);
}
}
/**
* @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
* @param minThreshold minimum number of sstables in a bucket to qualify.
* @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
* @return a bucket (list) of sstables to compact.
*/
@VisibleForTesting
static NewestBucket newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now)
{
// If the current bucket has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
List<SSTableReader> sstables = Collections.emptyList();
int estimatedRemainingTasks = 0;
TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet());
Iterator<Long> it = allKeys.descendingIterator();
while(it.hasNext())
{
Long key = it.next();
Set<SSTableReader> bucket = buckets.get(key);
logger.trace("Key {}, now {}", key, now);
if (bucket.size() >= minThreshold && key >= now)
{
// If we're in the newest bucket, we'll use STCS to prioritize sstables
List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket);
List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize);
List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcsInterestingBucket.isEmpty())
{
double remaining = bucket.size() - maxThreshold;
estimatedRemainingTasks += 1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
if (sstables.isEmpty())
{
logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions);
sstables = stcsInterestingBucket;
}
else
{
logger.trace("First window of bucket is eligible but not selected: data files {} , options {}", pairs, stcsOptions);
}
}
}
else if (bucket.size() >= 2 && key < now)
{
double remaining = bucket.size() - maxThreshold;
estimatedRemainingTasks += 1 + (remaining > minThreshold ? Math.ceil(remaining / maxThreshold) : 0);
if (sstables.isEmpty())
{
logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket);
sstables = trimToThreshold(bucket, maxThreshold);
}
else
{
logger.trace("bucket size {} >= 2 and not in current bucket, eligible but not selected: {}", bucket.size(), bucket);
}
}
else
{
logger.trace("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
}
}
return new NewestBucket(sstables, estimatedRemainingTasks);
}
/**
* @param bucket set of sstables
* @param maxThreshold maximum number of sstables in a single compaction task.
* @return A bucket trimmed to the maxThreshold newest sstables.
*/
@VisibleForTesting
static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold)
{
List<SSTableReader> ssTableReaders = new ArrayList<>(bucket);
// Trim the largest sstables off the end to meet the maxThreshold
Collections.sort(ssTableReaders, SSTableReader.sizeComparator);
return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold));
}
@Override
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
if (Iterables.isEmpty(filteredSSTables))
return null;
LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
if (txn == null)
return null;
return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps));
}
/**
* TWCS should not group sstables for anticompaction - this can mix new and old data
*/
@Override
public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
{
Collection<Collection<SSTableReader>> groups = new ArrayList<>(sstablesToGroup.size());
for (SSTableReader sstable : sstablesToGroup)
{
groups.add(Collections.singleton(sstable));
}
return groups;
}
@Override
@SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
{
assert !sstables.isEmpty(); // checked for by CM.submitUserDefined
LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
if (modifier == null)
{
logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
return null;
}
return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
{
return this.estimatedRemainingTasks;
}
public long getMaxSSTableBytes()
{
return Long.MAX_VALUE;
}
public Map<Long, Integer> getSSTableCountByBuckets()
{
return sstableCountByBuckets;
}
public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
{
Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
return uncheckedOptions;
}
public String toString()
{
return String.format("TimeWindowCompactionStrategy[%s/%s]",
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold());
}
}