diff --git a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java index 8af70ed..db695d4 100644 --- a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java @@ -337,4 +337,80 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { assertTrue(event.data.keySet().contains("key0")); assertTrue(event.data.values().contains("v3")); } + + @Test + public void testReconnectAfterBufferFull() throws Exception { + final CountDownLatch bufferFull = new CountDownLatch(1); + + // start mock fluentd + int port = MockFluentd.randomPort(); // Use a random port available + final List elist = new ArrayList(); + final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + try { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + elist.add(e); + } + } catch (EOFException e) { + // ignore + } finally { + socket.close(); + } + } + }); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(new Runnable() { + @Override + public void run() { + try { + bufferFull.await(20, TimeUnit.SECONDS); + fluentd.start(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + // start asyncSenders + Sender asyncSender = new AsyncRawSocketSender("localhost", port); + String tag = "tag"; + int i; + for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer + Map record = new HashMap(); + record.put("num", i); + record.put("str", "name" + i); + + if (bufferFull.getCount() > 0) { + // Fill the sender's buffer + if (!asyncSender.emit(tag, record)) { + // Buffer full. Need to recover the fluentd + bufferFull.countDown(); + Thread.sleep(2000); + } + } + else { + // Flush the sender's buffer after the fluentd starts + asyncSender.emit(tag, record); + break; + } + } + + // close sender sockets + asyncSender.close(); + + // wait for unpacking event data on fluentd + Thread.sleep(2000); + + // close mock server sockets + fluentd.close(); + + // check data + assertEquals(0, bufferFull.getCount()); + // check elist size. But, it cannot detect correct elist size because async sender runs independently. + assert(elist.size() < i + 5 || elist.size() >= i + 5); + } }