Skip to content

Commit

Permalink
- 注释FileChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
gogodjzhu committed Aug 23, 2019
1 parent b1d6732 commit 66001f6
Show file tree
Hide file tree
Showing 18 changed files with 437 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,32 @@ public CheckpointRebuilder(List<File> logFiles,
this.queue = queue;
}

/**
* fastReplay实际执行, 将log文件还原为queue队列
* @return
* @throws IOException
* @throws Exception
*/
public boolean rebuild() throws IOException, Exception {
LOG.info("Attempting to fast replay the log files.");
List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
for (File logFile : logFiles) {
try {
// 依赖文件实例化顺序Reader
logReaders.add(LogFileFactory.getSequentialReader(logFile, null));
} catch(EOFException e) {
LOG.warn("Ignoring " + logFile + " due to EOF", e);
}
}
// 全局的最新事务号和写入序号, 全局递增
long transactionIDSeed = 0;
long writeOrderIDSeed = 0;
try {
for (LogFile.SequentialReader log : logReaders) {
LogRecord entry;
int fileID = log.getLogFileID();
// 顺序解析文件, 得到LogRecord, 再根据不同的Event类型在内存中重做整个过程. 最
// 终得到已commit, 未take保留在channel中的put Event 的指针放回queue内存队列
while ((entry = log.next()) != null) {
int offset = entry.getOffset();
TransactionEventRecord record = entry.getEvent();
Expand Down Expand Up @@ -104,6 +114,8 @@ public boolean rebuild() throws IOException, Exception {
uncommittedPuts.get(record.getTransactionID());
if (puts != null) {
for (ComparableFlumeEventPointer put : puts) {
// pending队列是已commit take的Event, 即还未落地commit文件就已经
// 被commit take消费
if (!pendingTakes.remove(put)) {
committedPuts.add(put);
}
Expand All @@ -114,6 +126,9 @@ public boolean rebuild() throws IOException, Exception {
uncommittedTakes.get(record.getTransactionID());
if (takes != null) {
for (ComparableFlumeEventPointer take : takes) {
// commit Take Event, 将对应的Event从commitPuts中移除(此Event即被完整消费)
// 如果在commitPuts中没找到对应的event, 将其放入pendingTakes.
// TODO 可以take未commit的event? 难道put/take Event写入文件不一定严格有序
if (!committedPuts.remove(take)) {
pendingTakes.add(take);
}
Expand Down Expand Up @@ -141,10 +156,14 @@ public boolean rebuild() throws IOException, Exception {
reader.close();
}
}
// 最后根据日志序号对全部剩余的commitPut做排序, 放回到queue, replay结束
Set<ComparableFlumeEventPointer> sortedPuts =
Sets.newTreeSet(committedPuts);
int count = 0;
for (ComparableFlumeEventPointer put : sortedPuts) {
/** 注意queue保存的是 {@link FlumeEventPointer}, 它的属性只包含日志文件和event
* 对应的offset 而不是整个event实体
* */
queue.addTail(put.pointer);
count++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void readFields(DataInput in) throws IOException {
super.readFields(in);
type = in.readShort();
}

/**
* 将本实例编译为protobuf字节流写入到输出流, 报文格式见:
* {@linkplain Log#commit(long, short)}
*/
@Override
void writeProtos(OutputStream out) throws IOException {
ProtosFactory.Commit.Builder commitBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@
import com.google.common.collect.SetMultimap;


/**
* event-checkpoint解析工具, checkpoint文件映射成内存中的LongBuffer
* {@link this#elementsBuffer}来操作读写
*
* 目前有两个不同版本的实现, V3比V2不同在于将元数据从checkpoint文件独立出来保存成一个meta
* 文件, 这有利于修改channel的格式. 具体可看 https://reviews.apache.org/r/6680/
*/
abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
private static final Logger LOG = LoggerFactory
.getLogger(EventQueueBackingStoreFile.class);
Expand All @@ -57,6 +64,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
protected static final int CHECKPOINT_INCOMPLETE = 1;

protected LongBuffer elementsBuffer;
// overwriteMap保存当前已经commit的Event, eventId->EventPtr
protected final Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap();
protected final MappedByteBuffer mappedBuffer;
Expand All @@ -73,6 +81,15 @@ protected EventQueueBackingStoreFile(int capacity, String name,
this(capacity, name, checkpointFile, null, false);
}

/**
* 从checkpoint文件恢复状态, 可能因为多种情况导致checkpoint恢复失败:
* 1. capacity修改后, checkpoint还保持原来capacity计算的大小, 抛出异常
* 2. checkpoint文件标记的版本跟当前版本不一致, 抛出异常
* 3. checkpoint的状态标记为incomplete, 抛出异常
* 以上三种情况抛出的异常均为BadCheckpointException, 这会在系统重启的时候被
* {@link Log#replay()}捕捉, 进一步选择替代checkpoint的方法启动系统
* @throws BadCheckpointException
*/
protected EventQueueBackingStoreFile(int capacity, String name,
File checkpointFile, File checkpointBackupDir,
boolean backupCheckpoint) throws IOException,
Expand All @@ -83,6 +100,9 @@ protected EventQueueBackingStoreFile(int capacity, String name,
this.backupDir = checkpointBackupDir;
checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
// 当checkpointFile的长度为0, 即此尚未使用时, 根据capacity(channel.capacity)计
// 算文件所需空间并初始化, 往后就不再修改. 所以修改capacity时, 可能导致下一句if判断
// 抛出异常
if(checkpointFileHandle.length() == 0) {
allocate(checkpointFile, totalBytes);
checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG);
Expand All @@ -108,6 +128,13 @@ protected EventQueueBackingStoreFile(int capacity, String name,
throw new BadCheckpointException("Invalid version: " + version + " " +
name + ", expected " + getVersion());
}
/**
* 执行checkpoint的时候会将未commit的(inflight)Event序列化保存到checkpoint文件中,
* 开始执行checkpoint动作时会在marker位置写入{@link CHECKPOINT_INCOMPLETE}, 执行结束后会写入{@link CHECKPOINT_COMPLETE}
* 见{@link FlumeEventQueue#checkpoint(boolean)}
*
* 当发现marker标记不为complete时抛出BadCheckpointException异常
*/
long checkpointComplete = elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
if(checkpointComplete != (long) CHECKPOINT_COMPLETE) {
throw new BadCheckpointException("Checkpoint was not completed correctly,"
Expand Down Expand Up @@ -228,6 +255,7 @@ void beginCheckpoint() throws IOException {
}
}
// Start checkpoint
// 往checkpoint文件写入开始checkpoint的标记, 结束的时候还会写相应的结束标记
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
mappedBuffer.force();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@

import com.google.common.base.Preconditions;

/**
* V2版本的event-checkpoint解析工具, 此版本在checkpoint文件的头部保存整个checkpoint的
* 元数据, 格式为:
* | 属性 | index |
* | writeOrder | 0 | 占2个字节, long型, 保存event序号
* | indexSize | 2 | 占1个字节, int型, 保存当前checkpoint queue中的event数量
* | indexHead | 3 | 占1个字节, int型, 保存当前checkpoint queue中的head
* | logFileIDReferenceCounts | (INDEX_ACTIVE_LOG ... INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS) | 每个logFile占2字节, 高位字节int保存fileID,
* 低位字节int保存当前checkpoint文件共有的Event数量
*/
final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;

/**
* V3版本的event-checkpoint解析工具, 使用单独的meta文件保存checkpoint的元数据
* 元数据文件以protobuf格式保存, 对应的定义在filechannel.proto文件中
*/
final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
private static final Logger LOG = LoggerFactory
.getLogger(EventQueueBackingStoreFileV3.class);
Expand All @@ -49,8 +53,10 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
// checkpoint文件名 + .meta = 元数据文件名
metaDataFile = Serialization.getMetaDataFile(checkpointFile);
LOG.info("Starting up with " + checkpointFile + " and " + metaDataFile);
//
if(metaDataFile.exists()) {
FileInputStream inputStream = new FileInputStream(metaDataFile);
try {
Expand All @@ -74,6 +80,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
LOG.warn(msg);
throw new BadCheckpointException(msg);
}
// 将 checkpoint元数据恢复
WriteOrderOracle.setSeed(logWriteOrderID);
setLogWriteOrderID(logWriteOrderID);
setSize(checkpoint.getQueueSize());
Expand Down Expand Up @@ -103,11 +110,13 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
ProtosFactory.Checkpoint.Builder checkpointBuilder =
ProtosFactory.Checkpoint.newBuilder();
checkpointBuilder.setVersion(getVersion());
// 如果没有checkpoint文件, 一下几个值均为初始0
checkpointBuilder.setQueueHead(getHead());
checkpointBuilder.setQueueSize(getSize());
checkpointBuilder.setWriteOrderID(getLogWriteOrderID());
FileOutputStream outputStream = new FileOutputStream(metaDataFile);
try {
// protobuf字节流写入新文件
checkpointBuilder.build().writeDelimitedTo(outputStream);
outputStream.getChannel().force(true);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,24 @@ Log getLog() {
/**
* Transaction backed by a file. This transaction supports either puts
* or takes but not both.
* 基于文件实现的事务, 同一个事务内只能做读或者写当中的一个
*/
static class FileBackedTransaction extends BasicTransactionSemantics {
// 事务take/put队列, 保存一个事务内未提交的临时event
private final LinkedBlockingDeque<FlumeEventPointer> takeList;
private final LinkedBlockingDeque<FlumeEventPointer> putList;
private final long transactionID;
// channel空间不足时, 等待的时间
private final int keepAlive;
/**
* log 和 queue的关系
*
*/
// 事务持久化工具对象
private final Log log;
// 持久化的event队列(包括已commit和未commit的)
private final FlumeEventQueue queue;
// 内存中(未commit)的event队列信号量, 初始值配置为capacity
private final Semaphore queueRemaining;
private final String channelNameDescriptor;
private final ChannelCounter channelCounter;
Expand All @@ -453,8 +463,14 @@ private String getStateAsString() {
return String.valueOf(getState());
}

/**
* 事务内put, 使用protobuf将event编码后落地到文件
* @param event
* @throws InterruptedException
*/
@Override
protected void doPut(Event event) throws InterruptedException {
// put操作递增
channelCounter.incrementEventPutAttemptCount();
if(putList.remainingCapacity() == 0) {
throw new ChannelException("Put queue for FileBackedTransaction " +
Expand All @@ -464,6 +480,7 @@ protected void doPut(Event event) throws InterruptedException {
}
// this does not need to be in the critical section as it does not
// modify the structure of the log or queue.
// 等待channel腾出空间容纳新的event
if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
* header and circular queue semantics. The header of the queue
* contains the timestamp of last sync, the queue size and
* the head position.
*
* Channel内event缓存队列, 包括已commit的的Event保存在{@link this#backingStore},
* 未commit的Event分别保存在{@link this#inflightTakes} 和 {@link this#inflightPuts}
*/
final class FlumeEventQueue {
private static final Logger LOG = LoggerFactory
Expand Down Expand Up @@ -233,6 +236,12 @@ private void set(int index, long value) {
backingStore.put(index, value);
}

/**
* 在指定index位置添加value, 会导致整体位移
* @param index
* @param value
* @return
*/
protected boolean add(int index, long value) {
if (index < 0 || index > backingStore.getSize()) {
throw new IndexOutOfBoundsException(String.valueOf(index)
Expand Down Expand Up @@ -332,8 +341,16 @@ synchronized void close() throws IOException {
* A representation of in flight events which have not yet been committed.
* None of the methods are thread safe, and should be called from thread
* safe methods only.
*
* 封装了所有尚未commit的event, 主要属性包括:
* - inflightEvents 保存Event的指针, 以事务id为key
* - inflightFileIDs 保存Event持久化所在的文件id, 以事务id为key
* - file, fileChannel 当前对象持久化的文件
*
* 存储event的格式可以参考 {@link InflightEventWrapper#serializeAndWrite()}
*/
class InflightEventWrapper {
// SetMultimap 类似于一个 Map<Key, Set<Value>>
private SetMultimap<Long, Long> inflightEvents = HashMultimap.create();
// Both these are volatile for safe publication, they are never accessed by
// more than 1 thread at a time.
Expand Down Expand Up @@ -373,6 +390,7 @@ public boolean completeTransaction(Long transactionID) {

/**
* Add an event pointer to the inflights list.
* 添加某事务未commitEvent到inflight map
* @param transactionID
* @param pointer
*/
Expand All @@ -387,6 +405,9 @@ public void addEvent(Long transactionID, Long pointer){
* Serialize the set of in flights into a byte longBuffer.
* @return Returns the checksum of the buffer that is being
* asynchronously written to disk.
* 将event刷入文件, 格式为:
* | checkSum | 事务号txnID | event数量 | event1, event2 ... event n |
* 其中checkSum是对整个事务计算的
*/
public void serializeAndWrite() throws Exception {
Collection<Long> values = inflightEvents.values();
Expand All @@ -407,12 +428,14 @@ public void serializeAndWrite() throws Exception {
+ values.size()) * 8) //Event pointers
+ 16; //Checksum
//There is no real need of filling the channel with 0s, since we
//will write the exact nummber of bytes as expected file size.
//will write the exact number of bytes as expected file size.
file.setLength(expectedFileSize);
Preconditions.checkState(file.length() == expectedFileSize,
"Expected File size of inflight events file does not match the "
+ "current file size. Checkpoint is incomplete.");
file.seek(0);
// buffer 开辟一段内存空间, 存储所有inflight的event, 格式为:
// | 事务号txnID | event数量 | event1, event2 ... event n |
final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize);
LongBuffer longBuffer = buffer.asLongBuffer();
for (Long txnID : inflightEvents.keySet()) {
Expand All @@ -427,9 +450,12 @@ public void serializeAndWrite() throws Exception {
longBuffer.put(written);
}
byte[] checksum = digest.digest(buffer.array());
// checksum写入 文件
file.write(checksum);
buffer.position(0);
// 将buffer结果写入 文件
fileChannel.write(buffer);
// 强制刷入硬盘
fileChannel.force(true);
syncRequired = false;
} catch (IOException ex) {
Expand Down
Loading

0 comments on commit 66001f6

Please sign in to comment.