/
JournalTailer.java
133 lines (119 loc) · 4.43 KB
/
JournalTailer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package alluxio.master.journal;
import alluxio.master.Master;
import alluxio.master.journal.ufs.ReadOnlyUfsJournal;
import alluxio.proto.journal.Journal.JournalEntry;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
/**
* This class tails the journal for a master. It will process the journal checkpoint, and then
* process all existing completed logs.
*/
@NotThreadSafe
public final class JournalTailer {
private static final Logger LOG = LoggerFactory.getLogger(JournalTailer.class);
/** The master to apply all the journal entries to. */
private final Master mMaster;
/** The journal reader to read journal entries. */
private final JournalReader mReader;
/** This keeps track of the latest sequence number seen in the journal entries. */
private long mLatestSequenceNumber = 0;
/**
* Creates a new instance of {@link JournalTailer}.
*
* @param master the master to apply the journal entries to
* @param journal the journal to tail
*/
public JournalTailer(Master master, Journal journal) {
mMaster = Preconditions.checkNotNull(master);
mReader = ((ReadOnlyUfsJournal) journal).getNewReader();
}
/**
* @return true if this tailer is valid, false otherwise
*/
public boolean isValid() {
return mReader.isValid();
}
/**
* @return true if the checkpoint exists
*/
public boolean checkpointExists() {
try {
mReader.getCheckpointLastModifiedTimeMs();
return true;
} catch (IOException e) {
return false;
}
}
/**
* @return the sequence number of the latest entry in the journal read so far
*/
public long getLatestSequenceNumber() {
return mLatestSequenceNumber;
}
/**
* Loads and (optionally) processes the journal checkpoint.
*
* @param applyToMaster if true, apply all the checkpoint events to the master. Otherwise, simply
* open the checkpoint.
* @throws IOException if an I/O error occurs
*/
public void processJournalCheckpoint(boolean applyToMaster) throws IOException {
// Load the checkpoint.
LOG.info("{}: Loading checkpoint.", mMaster.getName());
// The checkpoint stream must be retrieved before retrieving any log streams, because the
// journal reader verifies that the checkpoint was read before the log streams.
JournalInputStream is = mReader.getCheckpointInputStream();
if (applyToMaster) {
// Only apply the checkpoint to the master, if specified.
mMaster.processJournalCheckpoint(is);
}
// update the latest sequence number seen.
mLatestSequenceNumber = is.getLatestSequenceNumber();
is.close();
}
/**
* Processes all the completed journal logs. This method will return when it processes the last
* completed log.
*
* {@link #processJournalCheckpoint(boolean)} must have been called previously.
*
* @return the number of logs processed
* @throws IOException if an I/O error occurs
*/
public int processNextJournalLogs() throws IOException {
int numFilesProcessed = 0;
while (mReader.isValid()) {
// Process the new completed log, if it exists.
JournalInputStream inputStream = mReader.getNextInputStream();
if (inputStream != null) {
LOG.info("{}: Processing a completed log.", mMaster.getName());
JournalEntry entry;
while ((entry = inputStream.getNextEntry()) != null) {
mMaster.processJournalEntry(entry);
// update the latest sequence number seen.
mLatestSequenceNumber = inputStream.getLatestSequenceNumber();
}
inputStream.close();
numFilesProcessed++;
LOG.info("{}: Finished processing the log.", mMaster.getName());
} else {
return numFilesProcessed;
}
}
LOG.info("{}: The checkpoint is out of date and must be reloaded.", mMaster.getName());
return numFilesProcessed;
}
}