/
PureeBufferedOutput.java
112 lines (90 loc) · 2.95 KB
/
PureeBufferedOutput.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.cookpad.puree.outputs;
import com.cookpad.puree.PureeLogger;
import com.cookpad.puree.async.AsyncResult;
import com.cookpad.puree.internal.PureeVerboseRunnable;
import com.cookpad.puree.internal.RetryableTaskRunner;
import com.cookpad.puree.storage.EnhancedPureeStorage;
import com.cookpad.puree.storage.Records;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.ParametersAreNonnullByDefault;
@ParametersAreNonnullByDefault
public abstract class PureeBufferedOutput extends PureeOutput {
RetryableTaskRunner flushTask;
ScheduledExecutorService executor;
public PureeBufferedOutput() {
}
@Override
public void initialize(PureeLogger logger) {
super.initialize(logger);
executor = logger.getExecutor();
flushTask = new RetryableTaskRunner(new Runnable() {
@Override
public void run() {
flush();
}
}, conf.getFlushIntervalMillis(), conf.getMaxRetryCount(), executor);
}
@Override
public void receive(final String jsonLog) {
executor.execute(new PureeVerboseRunnable(new Runnable() {
@Override
public void run() {
String filteredLog = applyFilters(jsonLog);
if (filteredLog != null) {
storage.insert(type(), filteredLog);
}
}
}));
flushTask.tryToStart();
}
@Override
public void flush() {
executor.execute(new PureeVerboseRunnable(new Runnable() {
@Override
public void run() {
flushSync();
}
}));
}
public void flushSync() {
if (!storage.lock()) {
flushTask.retryLater();
return;
}
purgeRecordsFromStorage();
final Records records = getRecordsFromStorage();
if (records.isEmpty()) {
storage.unlock();
flushTask.reset();
return;
}
final List<String> jsonLogs = records.getJsonLogs();
emit(jsonLogs, new AsyncResult() {
@Override
public void success() {
flushTask.reset();
storage.delete(records);
storage.unlock();
}
@Override
public void fail() {
flushTask.retryLater();
storage.unlock();
}
});
}
private Records getRecordsFromStorage() {
return storage.select(type(), conf.getLogsPerRequest());
}
public abstract void emit(List<String> jsonLogs, final AsyncResult result);
private void purgeRecordsFromStorage() {
if (!(storage instanceof EnhancedPureeStorage) || conf.getPurgeAgeMillis() < 0) {
return;
}
((EnhancedPureeStorage) storage).delete(type(), conf.getPurgeAgeMillis());
}
public void emit(String jsonLog) {
// do nothing
}
}