Skip to content

Commit

Permalink
[ServerCnx] Close connection after receiving unexpected SendCommand (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Nov 13, 2021
1 parent 2c4d913 commit ba58095
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Expand Up @@ -1352,7 +1352,9 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
CompletableFuture<Producer> producerFuture = producers.get(send.getProducerId());

if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
log.warn("[{}] Producer had already been closed: {}", remoteAddress, send.getProducerId());
log.warn("[{}] Received message, but the producer is not ready : {}. Closing the connection.",
remoteAddress, send.getProducerId());
close();
return;
}

Expand Down
Expand Up @@ -648,6 +648,28 @@ public void testSendCommand() throws Exception {
channel.finish();
}

@Test(timeOut = 30000)
public void testSendCommandBeforeCreatingProducer() throws Exception {
resetChannel();
setChannelConnected();

// test SEND before producer is created
MessageMetadata messageMetadata = new MessageMetadata()
.setPublishTime(System.currentTimeMillis())
.setProducerName("prod-name")
.setSequenceId(0);
ByteBuf data = Unpooled.buffer(1024);

ByteBuf clientCommand = ByteBufPair.coalesce(Commands.newSend(1, 0, 1,
ChecksumType.None, messageMetadata, data));
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();

// Then expect channel to close
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> !channel.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testUseSameProducerName() throws Exception {
resetChannel();
Expand Down

0 comments on commit ba58095

Please sign in to comment.