/
CommitLogSegment.java
620 lines (545 loc) · 20.6 KB
/
CommitLogSegment.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.commitlog;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Timer;
import com.github.tjake.ICRC32;
import org.apache.cassandra.utils.CRC32Factory;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
/*
* A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*/
public abstract class CommitLogSegment
{
private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
private final static long idBase;
private final static AtomicInteger nextId = new AtomicInteger(1);
private static long replayLimitId;
static
{
long maxId = Long.MIN_VALUE;
for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
{
if (CommitLogDescriptor.isValid(file.getName()))
maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId);
}
replayLimitId = idBase = Math.max(System.currentTimeMillis(), maxId + 1);
}
// The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum)
public static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4;
// The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position])
static final int SYNC_MARKER_SIZE = 4 + 4;
// The OpOrder used to order appends wrt sync
private final OpOrder appendOrder = new OpOrder();
private final AtomicInteger allocatePosition = new AtomicInteger();
// Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after
// each sync are reserved, and point forwards to the next such offset. The final
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
private volatile int lastSyncedOffset;
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
private int endOfBuffer;
// a signal for writers to wait on to confirm the log message they provided has been written to disk
private final WaitQueue syncComplete = new WaitQueue();
// a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use
private final NonBlockingHashMap<UUID, AtomicInteger> cfDirty = new NonBlockingHashMap<>(1024);
// a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use
private final ConcurrentHashMap<UUID, AtomicInteger> cfClean = new ConcurrentHashMap<>();
public final long id;
final File logFile;
final FileChannel channel;
final int fd;
ByteBuffer buffer;
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
static CommitLogSegment createSegment(CommitLog commitLog)
{
return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
: new MemoryMappedSegment(commitLog);
}
static long getNextId()
{
return idBase + nextId.getAndIncrement();
}
/**
* Constructs a new segment file.
*
* @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE.
*/
CommitLogSegment(CommitLog commitLog)
{
this.commitLog = commitLog;
id = getNextId();
descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
logFile = new File(commitLog.location, descriptor.fileName());
try
{
channel = FileChannel.open(logFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
fd = CLibrary.getfd(channel);
}
catch (IOException e)
{
throw new FSWriteError(e, logFile);
}
buffer = createBuffer(commitLog);
// write the header
CommitLogDescriptor.writeHeader(buffer, descriptor);
endOfBuffer = buffer.capacity();
lastSyncedOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
}
abstract ByteBuffer createBuffer(CommitLog commitLog);
/**
* Allocate space in this buffer for the provided mutation, and return the allocated Allocation object.
* Returns null if there is not enough space in this segment, and a new segment is needed.
*/
@SuppressWarnings("resource") //we pass the op order around
Allocation allocate(Mutation mutation, int size)
{
final OpOrder.Group opGroup = appendOrder.start();
try
{
int position = allocate(size);
if (position < 0)
{
opGroup.close();
return null;
}
markDirty(mutation, position);
return new Allocation(this, opGroup, position, (ByteBuffer) buffer.duplicate().position(position).limit(position + size));
}
catch (Throwable t)
{
opGroup.close();
throw t;
}
}
static boolean shouldReplay(String name)
{
return CommitLogDescriptor.fromFileName(name).id < replayLimitId;
}
/**
* FOR TESTING PURPOSES.
*/
static void resetReplayLimit()
{
replayLimitId = getNextId();
}
// allocate bytes in the segment, or return -1 if not enough space
private int allocate(int size)
{
while (true)
{
int prev = allocatePosition.get();
int next = prev + size;
if (next >= endOfBuffer)
return -1;
if (allocatePosition.compareAndSet(prev, next))
{
assert buffer != null;
return prev;
}
}
}
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
void discardUnusedTail()
{
// We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
// Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
// This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
// running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
// we duplicate the protection here so that the contract between discardUnusedTail() and sync() is more explicit.
try (OpOrder.Group group = appendOrder.start())
{
while (true)
{
int prev = allocatePosition.get();
int next = endOfBuffer + 1;
if (prev >= next)
{
// Already stopped allocating, might also be closed.
assert buffer == null || prev == buffer.capacity() + 1;
return;
}
if (allocatePosition.compareAndSet(prev, next))
{
// Stopped allocating now. Can only succeed once, no further allocation or discardUnusedTail can succeed.
endOfBuffer = prev;
assert buffer != null && next == buffer.capacity() + 1;
return;
}
}
}
}
/**
* Wait for any appends or discardUnusedTail() operations started before this method was called
*/
void waitForModifications()
{
// issue a barrier and wait for it
appendOrder.awaitNewBarrier();
}
/**
* Forces a disk flush for this segment file.
*/
synchronized void sync()
{
boolean close = false;
// check we have more work to do
if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
// this will always be entered when a mutation allocation has been attempted after the marker allocation
// succeeded in the previous sync.
assert buffer != null; // Only close once.
int startMarker = lastSyncedOffset;
// Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
// the point at which we can safely consider records to have been completely written to.
int nextMarker = allocate(SYNC_MARKER_SIZE);
if (nextMarker < 0)
{
// Ensure no more of this CLS is writeable, and mark ourselves for closing.
discardUnusedTail();
close = true;
// We use the buffer size as the synced position after a close instead of the end of the actual data
// to make sure we only close the buffer once.
// The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
nextMarker = buffer.capacity();
}
// Wait for mutations to complete as well as endOfBuffer to have been written.
waitForModifications();
int sectionEnd = close ? endOfBuffer : nextMarker;
// Perform compression, writing to file and flush.
write(startMarker, sectionEnd);
// Signal the sync as complete.
lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
}
protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker)
{
ICRC32 crc = CRC32Factory.instance.create();
crc.updateInt((int) (id & 0xFFFFFFFFL));
crc.updateInt((int) (id >>> 32));
crc.updateInt(filePos);
buffer.putInt(offset, nextMarker);
buffer.putInt(offset + 4, crc.getCrc());
}
abstract void write(int lastSyncedOffset, int nextMarker);
public boolean isStillAllocating()
{
return allocatePosition.get() < endOfBuffer;
}
/**
* Completely discards a segment file by deleting it. (Potentially blocking operation)
*/
void discard(boolean deleteFile)
{
close();
if (deleteFile)
FileUtils.deleteWithConfirm(logFile);
commitLog.allocator.addSize(-onDiskSize());
}
/**
* @return the current ReplayPosition for this log segment
*/
public ReplayPosition getContext()
{
return new ReplayPosition(id, allocatePosition.get());
}
/**
* @return the file path to this segment
*/
public String getPath()
{
return logFile.getPath();
}
/**
* @return the file name of this segment
*/
public String getName()
{
return logFile.getName();
}
void waitForFinalSync()
{
while (true)
{
WaitQueue.Signal signal = syncComplete.register();
if (lastSyncedOffset < endOfBuffer)
{
signal.awaitUninterruptibly();
}
else
{
signal.cancel();
break;
}
}
}
void waitForSync(int position, Timer waitingOnCommit)
{
while (lastSyncedOffset < position)
{
WaitQueue.Signal signal = waitingOnCommit != null ?
syncComplete.register(waitingOnCommit.time()) :
syncComplete.register();
if (lastSyncedOffset < position)
signal.awaitUninterruptibly();
else
signal.cancel();
}
}
/**
* Stop writing to this file, sync and close it. Does nothing if the file is already closed.
*/
synchronized void close()
{
discardUnusedTail();
sync();
assert buffer == null;
}
/**
* Close the segment file. Do not call from outside this class, use syncAndClose() instead.
*/
protected void internalClose()
{
try
{
channel.close();
buffer = null;
}
catch (IOException e)
{
throw new FSWriteError(e, getPath());
}
}
void markDirty(Mutation mutation, int allocatedPosition)
{
for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
// check for deleted CFS
CFMetaData cfm = columnFamily.metadata();
if (cfm.isPurged())
logger.error("Attempted to write commit log entry for unrecognized table: {}", columnFamily.id());
else
ensureAtleast(cfDirty, cfm.cfId, allocatedPosition);
}
}
/**
* Marks the ColumnFamily specified by cfId as clean for this log segment. If the
* given context argument is contained in this file, it will only mark the CF as
* clean if no newer writes have taken place.
*
* @param cfId the column family ID that is now clean
* @param context the optional clean offset
*/
public synchronized void markClean(UUID cfId, ReplayPosition context)
{
if (!cfDirty.containsKey(cfId))
return;
if (context.segment == id)
markClean(cfId, context.position);
else if (context.segment > id)
markClean(cfId, Integer.MAX_VALUE);
}
private void markClean(UUID cfId, int position)
{
ensureAtleast(cfClean, cfId, position);
removeCleanFromDirty();
}
private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value)
{
AtomicInteger i = map.get(cfId);
if (i == null)
{
AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
if (i2 != null)
i = i2;
}
while (true)
{
int cur = i.get();
if (cur > value)
break;
if (i.compareAndSet(cur, value))
break;
}
}
private void removeCleanFromDirty()
{
// if we're still allocating from this segment, don't touch anything since it can't be done thread-safely
if (isStillAllocating())
return;
Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator();
while (iter.hasNext())
{
Map.Entry<UUID, AtomicInteger> clean = iter.next();
UUID cfId = clean.getKey();
AtomicInteger cleanPos = clean.getValue();
AtomicInteger dirtyPos = cfDirty.get(cfId);
if (dirtyPos != null && dirtyPos.intValue() <= cleanPos.intValue())
{
cfDirty.remove(cfId);
iter.remove();
}
}
}
/**
* @return a collection of dirty CFIDs for this segment file.
*/
public synchronized Collection<UUID> getDirtyCFIDs()
{
if (cfClean.isEmpty() || cfDirty.isEmpty())
return cfDirty.keySet();
List<UUID> r = new ArrayList<>(cfDirty.size());
for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
{
UUID cfId = dirty.getKey();
AtomicInteger dirtyPos = dirty.getValue();
AtomicInteger cleanPos = cfClean.get(cfId);
if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
r.add(dirty.getKey());
}
return r;
}
/**
* @return true if this segment is unused and safe to recycle or delete
*/
public synchronized boolean isUnused()
{
// if room to allocate, we're still in use as the active allocatingFrom,
// so we don't want to race with updates to cfClean with removeCleanFromDirty
if (isStillAllocating())
return false;
removeCleanFromDirty();
return cfDirty.isEmpty();
}
/**
* Check to see if a certain ReplayPosition is contained by this segment file.
*
* @param context the replay position to be checked
* @return true if the replay position is contained by this segment file.
*/
public boolean contains(ReplayPosition context)
{
return context.segment == id;
}
// For debugging, not fast
public String dirtyString()
{
StringBuilder sb = new StringBuilder();
for (UUID cfId : getDirtyCFIDs())
{
CFMetaData m = Schema.instance.getCFMetaData(cfId);
sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("), ");
}
return sb.toString();
}
abstract public long onDiskSize();
public long contentSize()
{
return lastSyncedOffset;
}
@Override
public String toString()
{
return "CommitLogSegment(" + getPath() + ')';
}
public static class CommitLogSegmentFileComparator implements Comparator<File>
{
public int compare(File f, File f2)
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(f.getName());
CommitLogDescriptor desc2 = CommitLogDescriptor.fromFileName(f2.getName());
return Long.compare(desc.id, desc2.id);
}
}
/**
* A simple class for tracking information about the portion of a segment that has been allocated to a log write.
* The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be
* stack-allocated by escape analysis in CommitLog.add.
*/
static class Allocation
{
private final CommitLogSegment segment;
private final OpOrder.Group appendOp;
private final int position;
private final ByteBuffer buffer;
Allocation(CommitLogSegment segment, OpOrder.Group appendOp, int position, ByteBuffer buffer)
{
this.segment = segment;
this.appendOp = appendOp;
this.position = position;
this.buffer = buffer;
}
CommitLogSegment getSegment()
{
return segment;
}
ByteBuffer getBuffer()
{
return buffer;
}
// markWritten() MUST be called once we are done with the segment or the CL will never flush
// but must not be called more than once
void markWritten()
{
appendOp.close();
}
void awaitDiskSync(Timer waitingOnCommit)
{
segment.waitForSync(position, waitingOnCommit);
}
public ReplayPosition getReplayPosition()
{
return new ReplayPosition(segment.id, buffer.limit());
}
}
}