Skip to content

Commit

Permalink
Import of commit 339913c by Ry Biesemeyer <yaauie@users.noreply.githu…
Browse files Browse the repository at this point in the history
…b.com>

acks: send sequence-0 ack for 0-window batches (logstash-plugins#479)

* acks: send sequence-0 ack for 0-window batches

Ensures that at least one ACK is sent in reply to each window, including
those with 0-length, by emitting an empty batch when we receive a 0-window
frame and sending a sequence-0 ACK in reply when we process an empty batch.

This enables upstream users of the protocol to (ab)use a 0-length window
frame and an _ensured_ in-protocol response to validate that we are alive.

Note: 0-sequence ACKs are already used as application-level keep-alives
between when the `ConnectionHandler` receives bytes and when `BeatsHandler`
finishes processing a batch, and therefore should already be handled
gracefully by any existing clients.
  • Loading branch information
fbacchella committed Sep 27, 2023
1 parent daf456a commit 76bb18a
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, Batch batch) {
logger.debug("{}", () -> format("Received a new payload"));
try {
if (batch.isEmpty()) {
logger.debug("Sending 0-seq ACK for empty batch");
writeAck(ctx, batch.getProtocol(), 0);
}
for (Message message : batch) {
logger.debug("{}", () -> format("Sending a new message for the listener, sequence: " + message.getSequence()));
messageListener.onNewMessage(ctx, message);
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
if (!batch.isEmpty()) {
logger.warn("New window size received but the current batch was not complete, sending the current batch");
batchComplete(out);
} else if (batch.getBatchSize() == 0) {
logger.debug("New window size 0 received, sending an empty batch");
out.add(batch);
batchComplete(out);
}

transition(States.READ_HEADER);
Expand Down
25 changes: 19 additions & 6 deletions src/test/java/org/logstash/beats/BeatsHandlerTest.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package org.logstash.beats;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Before;
import org.junit.Test;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Before;
import org.junit.Test;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -112,7 +113,19 @@ public void testAcksLastMessageInBatch() {
assertEquals(messageCount, spyListener.getLastMessages().size());
Ack ack = embeddedChannel.readOutbound();
assertEquals(Protocol.VERSION_1, ack.getProtocol());
assertEquals(ack.getSequence(), startSequenceNumber + messageCount - 1);
assertEquals(startSequenceNumber + messageCount - 1, ack.getSequence());
embeddedChannel.close();
}

@Test
public void testAcksZeroSequenceForEmptyBatch() {
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new BeatsHandler(spyListener));
embeddedChannel.writeInbound(new V2Batch());
assertEquals(0, spyListener.getLastMessages().size());
Ack ack = embeddedChannel.readOutbound();
assertEquals(Protocol.VERSION_2, ack.getProtocol());
assertEquals(0, ack.getSequence());
embeddedChannel.close();
}

}
41 changes: 41 additions & 0 deletions src/test/java/org/logstash/beats/BeatsParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class BeatsParserTest {

Expand Down Expand Up @@ -138,6 +139,46 @@ public void testCompressedEncodingDecodingFields() {
decodedBatch.release();
}

@Test
public void testV1EmptyWindowEmitsEmptyBatch() {
Batch decodedBatch = decodeBatch(new V1Batch());

assertNotNull(decodedBatch);
assertTrue(decodedBatch.isEmpty());
assertEquals(0, decodedBatch.getBatchSize());
assertEquals(0, decodedBatch.size());
}

@Test
public void testV2EmptyWindowEmitsEmptyBatch() {
Batch decodedBatch = decodeBatch(new V2Batch());

assertNotNull(decodedBatch);
assertTrue(decodedBatch.isEmpty());
assertEquals(0, decodedBatch.getBatchSize());
assertEquals(0, decodedBatch.size());
}

@Test
public void testV1CompressedFrameEmptyWindowEmitsEmptyBatch() {
Batch decodedBatch = decodeCompressedBatch(new V1Batch());

assertNotNull(decodedBatch);
assertTrue(decodedBatch.isEmpty());
assertEquals(0, decodedBatch.getBatchSize());
assertEquals(0, decodedBatch.size());
}

@Test
public void testV2CompressedFrameEmptyWindowEmitsEmptyBatch() {
Batch decodedBatch = decodeCompressedBatch(new V2Batch());

assertNotNull(decodedBatch);
assertTrue(decodedBatch.isEmpty());
assertEquals(0, decodedBatch.getBatchSize());
assertEquals(0, decodedBatch.size());
}

@Test
public void testOversizedFields() {
thrown.expectCause(isA(BeatsParser.InvalidFrameProtocolException.class));
Expand Down

0 comments on commit 76bb18a

Please sign in to comment.