Permalink
Browse files

More replay preparation.

 * Implemented closedListeners in Archiver.
 * Added some descriptions to timber.proto.
 * Logserver will not attempt to send "sync" messages to it's handlers. These should be ignored.
 * MetadataHandler no longer writes a new file on startup, but will according to design only append if the file exists.
 * TimberClientPipelineFactory now holds only one instance of TimberClientHandler, so shutdown is called on the same instance which is started.
  • Loading branch information...
1 parent 311714d commit aec3432b72f84a920f94897d7b9e0a14e0938bc2 @acidmoose acidmoose committed Apr 22, 2013
@@ -51,6 +51,9 @@ message LogEvent {
required string source = 6;
// The type of log event.
+ // "T" == Text message (regular java log messages).
+ // "E" == Events.
+ // "C" == Control messages.
required string type = 7;
// Zero or more payloads associated with this log event.
@@ -19,6 +19,7 @@
public class TimberClientPipelineFactory implements ChannelPipelineFactory {
private TimberClient client;
private ClientBootstrap bootstrap;
+ private final TimberClientHandler handler;
// Use common ReconnectDelayManager across all TimberClientHandler instances.
private final ReconnectDelayManager reconnectDelayManager = new ReconnectDelayManager();
@@ -31,6 +32,7 @@
public TimberClientPipelineFactory(TimberClient client, ClientBootstrap bootstrap) {
this.client = client;
this.bootstrap = bootstrap;
+ handler = new TimberClientHandler(client, bootstrap, reconnectDelayManager);
}
@Override
@@ -42,7 +44,7 @@ public ChannelPipeline getPipeline() throws Exception {
p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
p.addLast("protobufEncoder", new ProtobufEncoder());
- p.addLast("handler", new TimberClientHandler(client, bootstrap, reconnectDelayManager));
+ p.addLast("handler", handler);
return p;
}
}
@@ -178,19 +178,31 @@ private void consumerLoop()
private void processEvent(LogEventQueueEntry entry) {
// Loop through the registered handlers and offer the event to them.
Timber.LogEvent event = entry.getLogEvent();
- for (LogEventHandler handler : handlers) {
- try {
- //TODO(acidmoose): If (sync message from log shipment) return;
- handler.handle(event);
-
- // Anything other than consistency level BESTEFFORT
- // means we are at a higher consistency level so we
- // have to flush.
- if (event.getConsistencyLevel() != Timber.ConsistencyLevel.BESTEFFORT) {
- handler.flush();
+
+ // If the message is a sync message from log shipment we do not
+ // need to handle it, but we still need to ACK it. See TimberClientHandler
+ // in Base for more information.
+ boolean shouldHandle = true;
+ if (event.hasId() &&
+ event.getType().equals("C") &&
+ event.getId().equals("sync")) {
+ shouldHandle = false;
+ }
+
+ if (shouldHandle) {
+ for (LogEventHandler handler : handlers) {
+ try {
+ handler.handle(event);
+
+ // Anything other than consistency level BESTEFFORT
+ // means we are at a higher consistency level so we
+ // have to flush.
+ if (event.getConsistencyLevel() != Timber.ConsistencyLevel.BESTEFFORT) {
+ handler.flush();
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Got exception while dispatching to " + handler.getName(), e);
}
- } catch (Exception e) {
- log.log(Level.WARNING, "Got exception while dispatching to " + handler.getName(), e);
}
}
@@ -113,7 +113,7 @@ private BufferedWriter getWriter(final File slotFile) throws IOException {
mdFile.createNewFile();
}
currentSlotFile = slotFile;
- currentWriter = new BufferedWriter(new FileWriter(mdFile));
+ currentWriter = new BufferedWriter(new FileWriter(mdFile, true));
writerLruCache.put(mdFile.getAbsolutePath(), currentWriter);
}
return currentWriter;
@@ -54,7 +54,6 @@ public void testSimple() throws Exception {
client.addAckEventListener(new TimberClient.AckEventListener() {
@Override
public void ackEventReceived(Timber.AckEvent ackEvent) {
- System.out.println(ackEvent.toString());
ackLatch.countDown();
assertEquals("baluba", ackEvent.getId(0));
}

0 comments on commit aec3432

Please sign in to comment.