/
CommitLog.java
542 lines (474 loc) · 17.6 KB
/
CommitLog.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import com.github.tjake.ICRC32;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CRC32Factory;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
/*
* Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
* successfully recover data that was not stored to disk via the Memtable.
*/
public class CommitLog implements CommitLogMBean
{
private static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
public static final CommitLog instance = CommitLog.construct();
// we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
// empty segments when writing large records
private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
public final CommitLogSegmentManager allocator;
public final CommitLogArchiver archiver;
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
volatile Configuration configuration;
final public String location;
static private CommitLog construct()
{
CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log.start();
}
@VisibleForTesting
CommitLog(String location, CommitLogArchiver archiver)
{
this.location = location;
this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
DatabaseDescriptor.createAllDirectories();
this.archiver = archiver;
metrics = new CommitLogMetrics();
executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch
? new BatchCommitLogService(this)
: new PeriodicCommitLogService(this);
allocator = new CommitLogSegmentManager(this);
// register metrics
metrics.attach(executor, allocator);
}
CommitLog start()
{
executor.start();
allocator.start();
return this;
}
/**
* Perform recovery on commit logs located in the directory specified by the config file.
*
* @return the number of mutations replayed
*/
public int recover() throws IOException
{
// If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place.
if (allocator.createReserveSegments)
return 0;
FilenameFilter unmanagedFilesFilter = new FilenameFilter()
{
public boolean accept(File dir, String name)
{
// we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
// until after recover was finished. this turns out to be fragile; it is less error-prone to go
// ahead and allow writes before recover(), and just skip active segments when we do.
return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
}
};
// submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter))
{
archiver.maybeArchive(file.getPath(), file.getName());
archiver.maybeWaitForArchiving(file.getName());
}
assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
archiver.maybeRestoreArchive();
File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
int replayed = 0;
if (files.length == 0)
{
logger.info("No commitlog files found; skipping replay");
}
else
{
Arrays.sort(files, new CommitLogSegmentFileComparator());
logger.info("Replaying {}", StringUtils.join(files, ", "));
replayed = recover(files);
logger.info("Log replay complete, {} replayed mutations", replayed);
for (File f : files)
allocator.recycleSegment(f);
}
allocator.enableReserveSegmentCreation();
return replayed;
}
/**
* Perform recovery on a list of commit log files.
*
* @param clogs the list of commit log files to replay
* @return the number of mutations replayed
*/
public int recover(File... clogs) throws IOException
{
CommitLogReplayer recovery = CommitLogReplayer.construct(this);
recovery.recover(clogs);
return recovery.blockForWrites();
}
/**
* Perform recovery on a single commit log.
*/
public void recover(String path) throws IOException
{
CommitLogReplayer recovery = CommitLogReplayer.construct(this);
recovery.recover(new File(path), false);
recovery.blockForWrites();
}
/**
* @return a ReplayPosition which, if >= one returned from add(), implies add() was started
* (but not necessarily finished) prior to this call
*/
public ReplayPosition getContext()
{
return allocator.allocatingFrom().getContext();
}
/**
* Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
*/
public void forceRecycleAllSegments(Iterable<UUID> droppedCfs)
{
allocator.forceRecycleAll(droppedCfs);
}
/**
* Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining
*/
public void forceRecycleAllSegments()
{
allocator.forceRecycleAll(Collections.<UUID>emptyList());
}
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
public void sync(boolean syncAllSegments)
{
CommitLogSegment current = allocator.allocatingFrom();
for (CommitLogSegment segment : allocator.getActiveSegments())
{
if (!syncAllSegments && segment.id > current.id)
return;
segment.sync();
}
}
/**
* Preempts the CLExecutor, telling to to sync immediately
*/
public void requestExtraSync()
{
executor.requestExtraSync();
}
/**
* Add a Mutation to the commit log.
*
* @param mutation the Mutation to add to the log
*/
public ReplayPosition add(Mutation mutation)
{
assert mutation != null;
long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);
long totalSize = size + ENTRY_OVERHEAD_SIZE;
if (totalSize > MAX_MUTATION_SIZE)
{
throw new IllegalArgumentException(String.format("Mutation of %s bytes is too large for the maximum size of %s",
totalSize, MAX_MUTATION_SIZE));
}
Allocation alloc = allocator.allocate(mutation, (int) totalSize);
ICRC32 checksum = CRC32Factory.instance.create();
final ByteBuffer buffer = alloc.getBuffer();
try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
{
// checksummed length
dos.writeInt((int) size);
checksum.update(buffer, buffer.position() - 4, 4);
buffer.putInt(checksum.getCrc());
int start = buffer.position();
// checksummed mutation
Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
checksum.update(buffer, start, (int) size);
buffer.putInt(checksum.getCrc());
}
catch (IOException e)
{
throw new FSWriteError(e, alloc.getSegment().getPath());
}
finally
{
alloc.markWritten();
}
executor.finishWriteFor(alloc);
return alloc.getReplayPosition();
}
/**
* Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position
* given. Discards any commit log segments that are no longer used.
*
* @param cfId the column family ID that was flushed
* @param context the replay position of the flush
*/
public void discardCompletedSegments(final UUID cfId, final ReplayPosition context)
{
logger.trace("discard completed log segments for {}, table {}", context, cfId);
// Go thru the active segment files, which are ordered oldest to newest, marking the
// flushed CF as clean, until we reach the segment file containing the ReplayPosition passed
// in the arguments. Any segments that become unused after they are marked clean will be
// recycled or discarded.
for (Iterator<CommitLogSegment> iter = allocator.getActiveSegments().iterator(); iter.hasNext();)
{
CommitLogSegment segment = iter.next();
segment.markClean(cfId, context);
if (segment.isUnused())
{
logger.trace("Commit log segment {} is unused", segment);
allocator.recycleSegment(segment);
}
else
{
logger.trace("Not safe to delete{} commit log segment {}; dirty is {}",
(iter.hasNext() ? "" : " active"), segment, segment.dirtyString());
}
// Don't mark or try to delete any newer segments once we've reached the one containing the
// position of the flush.
if (segment.contains(context))
break;
}
}
@Override
public String getArchiveCommand()
{
return archiver.archiveCommand;
}
@Override
public String getRestoreCommand()
{
return archiver.restoreCommand;
}
@Override
public String getRestoreDirectories()
{
return archiver.restoreDirectories;
}
@Override
public long getRestorePointInTime()
{
return archiver.restorePointInTime;
}
@Override
public String getRestorePrecision()
{
return archiver.precision.toString();
}
public List<String> getActiveSegmentNames()
{
List<String> segmentNames = new ArrayList<>();
for (CommitLogSegment segment : allocator.getActiveSegments())
segmentNames.add(segment.getName());
return segmentNames;
}
public List<String> getArchivingSegmentNames()
{
return new ArrayList<>(archiver.archivePending.keySet());
}
@Override
public long getActiveContentSize()
{
long size = 0;
for (CommitLogSegment segment : allocator.getActiveSegments())
size += segment.contentSize();
return size;
}
@Override
public long getActiveOnDiskSize()
{
return allocator.onDiskSize();
}
@Override
public Map<String, Double> getActiveSegmentCompressionRatios()
{
Map<String, Double> segmentRatios = new TreeMap<>();
for (CommitLogSegment segment : allocator.getActiveSegments())
segmentRatios.put(segment.getName(), 1.0 * segment.onDiskSize() / segment.contentSize());
return segmentRatios;
}
/**
* Shuts down the threads used by the commit log, blocking until completion.
* TODO this should accept a timeout, and throw TimeoutException
*/
public void shutdownBlocking() throws InterruptedException
{
executor.shutdown();
executor.awaitTermination();
allocator.shutdown();
allocator.awaitTermination();
}
/**
* FOR TESTING PURPOSES. See CommitLogAllocator.
* @return the number of files recovered
*/
public int resetUnsafe(boolean deleteSegments) throws IOException
{
stopUnsafe(deleteSegments);
resetConfiguration();
return restartUnsafe();
}
/**
* FOR TESTING PURPOSES. See CommitLogAllocator.
*/
public void stopUnsafe(boolean deleteSegments)
{
executor.shutdown();
try
{
executor.awaitTermination();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
allocator.stopUnsafe(deleteSegments);
CommitLogSegment.resetReplayLimit();
}
/**
* FOR TESTING PURPOSES.
*/
public void resetConfiguration()
{
this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
}
/**
* FOR TESTING PURPOSES. See CommitLogAllocator
*/
public int restartUnsafe() throws IOException
{
allocator.start();
executor.restartUnsafe();
try
{
return recover();
}
catch (FSWriteError e)
{
// Workaround for a class of races that keeps showing up on Windows tests.
// stop/start/reset path on Windows with segment deletion is very touchy/brittle
// and the timing keeps getting screwed up. Rather than chasing our tail further
// or rewriting the CLSM, just report that we didn't recover anything back up
// the chain. This will silence most intermittent test failures on Windows
// and appropriately fail tests that expected segments to be recovered that
// were not.
return 0;
}
}
/**
* Used by tests.
*
* @return the number of active segments (segments with unflushed data in them)
*/
public int activeSegments()
{
return allocator.getActiveSegments().size();
}
@VisibleForTesting
public static boolean handleCommitError(String message, Throwable t)
{
JVMStabilityInspector.inspectCommitLogThrowable(t);
switch (DatabaseDescriptor.getCommitFailurePolicy())
{
// Needed here for unit tests to not fail on default assertion
case die:
case stop:
StorageService.instance.stopTransports();
//$FALL-THROUGH$
case stop_commit:
logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
return false;
case ignore:
logger.error(message, t);
return true;
default:
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
public static final class Configuration
{
/**
* The compressor class.
*/
private final ParameterizedClass compressorClass;
/**
* The compressor used to compress the segments.
*/
private final ICompressor compressor;
public Configuration(ParameterizedClass compressorClass)
{
this.compressorClass = compressorClass;
this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
}
/**
* Checks if the segments must be compressed.
* @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
*/
public boolean useCompression()
{
return compressor != null;
}
/**
* Returns the compressor used to compress the segments.
* @return the compressor used to compress the segments
*/
public ICompressor getCompressor()
{
return compressor;
}
/**
* Returns the compressor class.
* @return the compressor class
*/
public ParameterizedClass getCompressorClass()
{
return compressorClass;
}
/**
* Returns the compressor name.
* @return the compressor name.
*/
public String getCompressorName()
{
return useCompression() ? compressor.getClass().getSimpleName() : "none";
}
}
}