From 7cef9792bc7456c4c2d582cda7e7fa816804854f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Fri, 22 Apr 2016 18:12:23 +0900 Subject: [PATCH] Try to add reconnect after buffer full test case for async sender --- .../sender/TestAsyncRawSocketSender.java | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java index 8af70ed..fcec035 100644 --- a/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java +++ b/src/test/java/org/fluentd/logger/sender/TestAsyncRawSocketSender.java @@ -337,4 +337,80 @@ public void process(MessagePack msgpack, Socket socket) throws IOException { assertTrue(event.data.keySet().contains("key0")); assertTrue(event.data.values().contains("v3")); } + + @Test + public void testReconnectAfterBufferFull() throws Exception { + final CountDownLatch bufferFull = new CountDownLatch(1); + + // start mock fluentd + int port = MockFluentd.randomPort(); // Use a random port available + final List elist = new ArrayList(); + final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + try { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + elist.add(e); + } + } catch (EOFException e) { + // ignore + } finally { + socket.close(); + } + } + }); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(new Runnable() { + @Override + public void run() { + try { + bufferFull.await(20, TimeUnit.SECONDS); + fluentd.start(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + // start asyncSenders + Sender asyncSender = new AsyncRawSocketSender("localhost", port); + String tag = "tag"; + int i; + for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer + Map record = new HashMap(); + record.put("num", i); + record.put("str", "name" + i); + + if (bufferFull.getCount() > 0) { + // Fill the sender's buffer + if (!asyncSender.emit(tag, record)) { + // Buffer full. Need to recover the fluentd + bufferFull.countDown(); + Thread.sleep(2000); + } + } + else { + // Flush the sender's buffer after the fluentd starts + asyncSender.emit(tag, record); + break; + } + } + + // close sender sockets + asyncSender.close(); + + // wait for unpacking event data on fluentd + Thread.sleep(2000); + + // close mock server sockets + fluentd.close(); + + // check data + assertEquals(0, bufferFull.getCount()); + // check elist size. But, it cannot detect correct elist size because async sender runs independently. + assert(i - 5 <= elist.size()|| elist.size() < i + 5); + } }