Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/main/java/org/fluentd/logger/sender/RawSocketSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,24 @@ protected boolean emit(Event event) {
return send(bytes);
}

private boolean flushBuffer() {
if (reconnector.enableReconnection(System.currentTimeMillis())) {
flush();
if (pendings.position() == 0) {
return true;
}
}

return false;
}

private synchronized boolean send(byte[] bytes) {
// buffering
if (pendings.position() + bytes.length > pendings.capacity()) {
LOG.error("Cannot send logs to " + server.toString());
return false;
if (!flushBuffer()) {
LOG.error("Cannot send logs to " + server.toString());
return false;
}
}
pendings.put(bytes);

Expand Down
75 changes: 75 additions & 0 deletions src/test/java/org/fluentd/logger/sender/TestRawSocketSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,79 @@ public void process(MessagePack msgpack, Socket socket) throws IOException {
assertTrue(event.data.keySet().contains("key0"));
assertTrue(event.data.values().contains("v3"));
}

@Test
public void testReconnectAfterBufferFull() throws Exception {
final CountDownLatch bufferFull = new CountDownLatch(1);

// start mock fluentd
int port = MockFluentd.randomPort(); // Use a random port available
final List<Event> elist = new ArrayList<Event>();
final MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() {
public void process(MessagePack msgpack, Socket socket) throws IOException {
try {
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
Unpacker unpacker = msgpack.createUnpacker(in);
while (true) {
Event e = unpacker.read(Event.class);
elist.add(e);
}
} catch (EOFException e) {
// ignore
} finally {
socket.close();
}
}
});

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
try {
bufferFull.await(20, TimeUnit.SECONDS);
fluentd.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

// start senders
Sender sender = new RawSocketSender("localhost", port);
String tag = "tag";
int i;
for (i = 0; i < 1000000; i++) { // Enough to fill the sender's buffer
Map<String, Object> record = new HashMap<String, Object>();
record.put("num", i);
record.put("str", "name" + i);

if (bufferFull.getCount() > 0) {
// Fill the sender's buffer
if (!sender.emit(tag, record)) {
// Buffer full. Need to recover the fluentd
bufferFull.countDown();
Thread.sleep(2000);
}
}
else {
// Flush the sender's buffer after the fluentd starts
sender.emit(tag, record);
break;
}
}

// close sender sockets
sender.close();

// wait for unpacking event data on fluentd
Thread.sleep(2000);

// close mock server sockets
fluentd.close();

// check data
assertEquals(0, bufferFull.getCount());
assertEquals(i, elist.size());
}
}
12 changes: 10 additions & 2 deletions src/test/java/org/fluentd/logger/util/MockFluentd.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ private static Object toObject(Unpacker u, Value v) {
}
}

private final int port;
private ServerSocket serverSocket;

private MockProcess process;

private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean finished = new AtomicBoolean(false);

public MockFluentd(int port, MockProcess mockProcess) throws IOException {
serverSocket = new ServerSocket(port);
public MockFluentd(int port, MockProcess mockProcess) {
this.port = port;
process = mockProcess;
}

Expand All @@ -112,6 +113,13 @@ public static int randomPort() throws IOException {


public void run() {
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Failed to start MockFluentd process", e);
}

_logger.debug("Started MockFluentd port:" + serverSocket.getLocalPort());

while (!finished.get()) {
Expand Down