Skip to content

Commit

Permalink
keep tailers lazy to avoid grabbing file descriptors where unnecessary
Browse files Browse the repository at this point in the history
  • Loading branch information
krisskross committed Nov 2, 2014
1 parent df8c331 commit 60b15a4
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions core/src/main/java/org/deephacks/logbuffers/LogBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,48 @@ public class LogBuffer {

private final Logger logger;

/**
* system tmp dir
*/
/** system tmp dir */
private static final String TMP_DIR = System.getProperty("java.io.tmpdir");

/**
* default path used by log files if not specified
*/
/** default path used by log files if not specified */
private static final String DEFAULT_BASE_PATH = TMP_DIR + "/logbuffer";

/**
* optional executor used only by scheduled tailing
*/
/** optional executor used only by scheduled tailing */
private ScheduledExecutorService cachedExecutor;

/**
* log writer
*/
/** log writer */
private final AppenderHolder appenderHolder;

/**
* log reader
*/
private final TailerHolder tailerHolder;
/** log reader */
private TailerHolder tailerHolder;

/**
* path where log buffer files are stored
*/
/** path where log buffer files are stored */
private String basePath;

private ConcurrentHashMap<Class<?>, LogBufferTail<?>> tails = new ConcurrentHashMap<>();

private LogSerializers serializers;
private long lastTailerIndex;
private Object firstIndex;

private final DateRanges ranges;

protected LogBuffer(Builder builder) throws IOException {
Preconditions.checkNotNull(builder.ranges, "choose a range");
this.basePath = builder.basePath.or(DEFAULT_BASE_PATH);
this.logger = Logger.getLogger(LogBuffer.class.getName() + "." + checkNotNull(basePath + "/writer"));
this.appenderHolder = new AppenderHolder(basePath + "/data", builder.ranges);
this.tailerHolder = new TailerHolder(basePath + "/data", builder.ranges);
this.serializers = builder.serializers;
this.ranges = builder.ranges;
}

// keep tailers lazy to avoid grabbing file descriptors where unnecessary
private void initalizeTailerHolder() {
if (this.tailerHolder == null) {
synchronized (this) {
if (tailerHolder == null) {
this.tailerHolder = new TailerHolder(basePath + "/data", ranges);
}
}
}
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -149,11 +148,13 @@ private LogRaw internalWrite(byte[] content, long type) throws IOException {
* @throws IOException
*/
public List<LogRaw> select(long fromIndex) throws IOException {
initalizeTailerHolder();
long lastIndex = tailerHolder.getLatestStopIndex();
return select(fromIndex, lastIndex);
}

Optional<LogRaw> get(long index) throws IOException {
initalizeTailerHolder();
synchronized (tailerHolder) {
return LogRaw.read(tailerHolder, index);
}
Expand All @@ -163,6 +164,7 @@ Optional<LogRaw> get(long index) throws IOException {
* Get the next forward index of specified type.
*/
public <T> Optional<LogRaw> getNext(Class<T> cls, long index) throws IOException {
initalizeTailerHolder();
synchronized (tailerHolder) {
Optional<Long> optional = peekType(index);
if (!optional.isPresent()) {
Expand All @@ -187,13 +189,15 @@ public <T> Optional<LogRaw> getNext(Class<T> cls, long index) throws IOException
* Only reads the timestamp in order to avoid serialization overhead.
*/
Optional<Long> peekTimestamp(long index) throws IOException {
initalizeTailerHolder();
synchronized (tailerHolder) {
List<Tailer> tailers = tailerHolder.getTailersBetweenIndex(index, index);
return LogRaw.peekTimestamp(tailers.get(0), index);
}
}

Optional<Long> peekType(long index) throws IOException {
initalizeTailerHolder();
synchronized (tailerHolder) {
return LogRaw.peekType(tailerHolder, index);
}
Expand All @@ -218,6 +222,7 @@ Optional<LogRaw> getLatestWrite() throws IOException {
* @throws IOException
*/
public Long findStartTimeIndex(long startTime) throws IOException {
initalizeTailerHolder();
long writeIndex = getWriteIndex();
synchronized (tailerHolder) {
long index = tailerHolder.binarySearchAfterTime(startTime).get().getIndex();
Expand Down Expand Up @@ -248,6 +253,7 @@ public Long findStartTimeIndex(long startTime) throws IOException {
*/
public List<LogRaw> select(long fromIndex, long toIndex) throws IOException {
Preconditions.checkArgument(fromIndex <= toIndex, "from must be less than to");
initalizeTailerHolder();
synchronized (tailerHolder) {
ListIterator<Tailer> tailers = tailerHolder.getTailersBetweenIndex(fromIndex, toIndex).listIterator();
List<LogRaw> result = new ArrayList<>();
Expand Down Expand Up @@ -303,6 +309,7 @@ public List<LogRaw> selectBackward(long fromTimeMs, long toTimeMs) throws IOExce
*/
public List<LogRaw> selectForward(long fromTimeMs, long toTimeMs) throws IOException {
Preconditions.checkArgument(fromTimeMs <= toTimeMs, "from must be less than to");
initalizeTailerHolder();
LinkedList<LogRaw> messages = new LinkedList<>();
ListIterator<Tailer> tailers = tailerHolder.getTailersBetweenTime(fromTimeMs, toTimeMs).listIterator();
if (!tailers.hasNext()) {
Expand Down Expand Up @@ -350,6 +357,7 @@ public List<LogRaw> selectForward(long fromTimeMs, long toTimeMs) throws IOExcep
* @throws IOException
*/
public <T> Logs<T> select(Class<T> type, long fromIndex) throws IOException {
initalizeTailerHolder();
return select(type, fromIndex, tailerHolder.getLatestStopIndex());
}

Expand Down Expand Up @@ -459,7 +467,9 @@ public synchronized void close() throws IOException {
if (cachedExecutor != null) {
cachedExecutor.shutdown();
}
tailerHolder.close();
if (tailerHolder != null) {
tailerHolder.close();
}
}
}

Expand Down Expand Up @@ -550,6 +560,7 @@ public <T> long getReadIndex(Class<? extends Tail<T>> cls) throws IOException {
}

public long getFirstIndex() {
initalizeTailerHolder();
return tailerHolder.getFirstIndex();
}

Expand Down

0 comments on commit 60b15a4

Please sign in to comment.