diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 6140220fee6af..4845c2d8e1f2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -65,6 +65,7 @@ import org.apache.pulsar.websocket.stats.ProxyTopicStat; import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats; import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats; +import org.awaitility.Awaitility; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeException; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; @@ -193,6 +194,74 @@ public void socketTest() throws Exception { } } + @Test(timeOut = 10000) + public void socketTestEndOfTopic() throws Exception { + final String topic = "my-property/my-ns/my-topic8"; + final String subscription = "my-sub"; + final String consumerUri = String.format( + "ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", + proxyServer.getListenPortHTTP().get(), topic, subscription + ); + final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", proxyServer.getListenPortHTTP().get(), topic); + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + WebSocketClient produceClient = new WebSocketClient(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + log.info("Connecting to : {}", consumeUri); + + // let it connect + assertTrue(consumerFuture.get().isOpen()); + + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + produceClient.start(); + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + assertTrue(producerFuture.get().isOpen()); + // Send 30 message in total. + produceSocket.sendMessage(20); + // Send 10 permits, should receive 10 message + consumeSocket.sendPermits(10); + Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> + assertEquals(consumeSocket.getReceivedMessagesCount(), 10)); + consumeSocket.isEndOfTopic(); + // Wait till get response + Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> + assertEquals(consumeSocket.getBuffer().size(), 11)); + // Assert not reach end of topic yet + assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}"); + + // Send 20 more permits, should receive all message + consumeSocket.sendPermits(20); + // 31 includes previous of end of topic request. + Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> + assertEquals(consumeSocket.getReceivedMessagesCount(), 31)); + consumeSocket.isEndOfTopic(); + // Wait till get response + Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> + assertEquals(consumeSocket.getReceivedMessagesCount(), 32)); + // Assert not reached end of topic. + assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}"); + + admin.topics().terminateTopicAsync(topic).get(); + consumeSocket.isEndOfTopic(); + // Wait till get response + Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() -> + assertEquals(consumeSocket.getReceivedMessagesCount(), 33)); + // Assert reached end of topic. + assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":true}"); + } finally { + stopWebSocketClient(consumeClient, produceClient); + } + } + @Test public void unsubscribeTest() throws Exception { final String namespace = "my-property/my-ns"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java index a7fd1682371d8..e1ddaaeacb9da 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.StringUtils; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; @@ -73,12 +74,16 @@ public void onConnect(Session session) throws InterruptedException { public synchronized void onMessage(String msg) throws JsonParseException, IOException { receivedMessages.incrementAndGet(); JsonObject message = new Gson().fromJson(msg, JsonObject.class); - JsonObject ack = new JsonObject(); - String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString(); - consumerBuffer.add(messageId); - ack.add("messageId", new JsonPrimitive(messageId)); - // Acking the proxy - this.getRemote().sendString(ack.toString()); + if (message.get(X_PULSAR_MESSAGE_ID) != null) { + JsonObject ack = new JsonObject(); + String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString(); + consumerBuffer.add(messageId); + ack.add("messageId", new JsonPrimitive(messageId)); + // Acking the proxy + this.getRemote().sendString(ack.toString()); + } else { + consumerBuffer.add(message.toString()); + } } public void sendPermits(int nbPermits) throws IOException { @@ -94,6 +99,12 @@ public void unsubscribe() throws IOException { this.getRemote().sendString(message.toString()); } + public void isEndOfTopic() throws IOException { + JsonObject message = new JsonObject(); + message.add("type", new JsonPrimitive("isEndOfTopic")); + this.getRemote().sendString(message.toString()); + } + public RemoteEndpoint getRemote() { return this.session.getRemote(); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 0f60bc36b81c4..8d60b557cee2f 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.util.Base64; import java.util.List; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -34,6 +35,8 @@ import javax.servlet.http.HttpServletRequest; +import io.netty.util.concurrent.CompleteFuture; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -50,6 +53,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ConsumerCommand; import org.apache.pulsar.websocket.data.ConsumerMessage; +import org.apache.pulsar.websocket.data.EndOfTopicResponse; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -215,6 +219,8 @@ public void onWebSocketText(String message) { handlePermit(command); } else if ("unsubscribe".equals(command.type)) { handleUnsubscribe(command); + } else if ("isEndOfTopic".equals(command.type)) { + handleEndOfTopic(); } else { handleAck(command); } @@ -224,6 +230,34 @@ public void onWebSocketText(String message) { } } + // Check and notify consumer if reached end of topic. + private void handleEndOfTopic() { + try { + String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString( + new EndOfTopicResponse(consumer.hasReachedEndOfTopic())); + getSession().getRemote() + .sendString(msg, new WriteCallback() { + @Override + public void writeFailed(Throwable th) { + log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", consumer.getTopic(), + subscription, getRemote().getInetSocketAddress().toString(), th.getMessage()); + } + + @Override + public void writeSuccess() { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] End of topic message is delivered successfully to {} ", + consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString()); + } + } + }); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to generate end of topic response: {}", consumer.getTopic(), e.getMessage()); + } catch (Exception e) { + log.warn("[{}] Failed to send end of topic response: {}", consumer.getTopic(), e.getMessage()); + } + } + private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException { consumer.unsubscribe(); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index c34c0c0c8e616..bcd5239d63f3e 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -38,7 +38,9 @@ import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.ConsumerCommand; import org.apache.pulsar.websocket.data.ConsumerMessage; +import org.apache.pulsar.websocket.data.EndOfTopicResponse; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -187,6 +189,17 @@ public void onWebSocketConnect(Session session) { public void onWebSocketText(String message) { super.onWebSocketText(message); + try { + ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class); + if ("isEndOfTopic".equals(command.type)) { + handleEndOfTopic(); + return; + } + } catch (IOException e) { + log.warn("Failed to deserialize message id: {}", message, e); + close(WebSocketError.FailedToDeserializeFromJSON); + } + // We should have received an ack // but reader doesn't send an ack to broker here because already reader did @@ -197,6 +210,34 @@ public void onWebSocketText(String message) { } } + // Check and notify reader if reached end of topic. + private void handleEndOfTopic() { + try { + String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString( + new EndOfTopicResponse(reader.hasReachedEndOfTopic())); + getSession().getRemote() + .sendString(msg, new WriteCallback() { + @Override + public void writeFailed(Throwable th) { + log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", reader.getTopic(), + subscription, getRemote().getInetSocketAddress().toString(), th.getMessage()); + } + + @Override + public void writeSuccess() { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] End of topic message is delivered successfully to {} ", + reader.getTopic(), subscription, getRemote().getInetSocketAddress().toString()); + } + } + }); + } catch (JsonProcessingException e) { + log.warn("[{}] Failed to generate end of topic response: {}", reader.getTopic(), e.getMessage()); + } catch (Exception e) { + log.warn("[{}] Failed to send end of topic response: {}", reader.getTopic(), e.getMessage()); + } + } + @Override public void close() throws IOException { if (reader != null) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/EndOfTopicResponse.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/EndOfTopicResponse.java new file mode 100644 index 0000000000000..dbb85f8430489 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/EndOfTopicResponse.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Represent result of request to check if we've reached end of topic. + */ +@Data +@AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) +public class EndOfTopicResponse { + // If reach end of topic. + public boolean endOfTopic; +} diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md index b1790025659d9..fac8c8dacbd74 100644 --- a/site2/docs/client-libraries-websocket.md +++ b/site2/docs/client-libraries-websocket.md @@ -227,6 +227,28 @@ Key | Type | Required? | Explanation NB: in this mode it's possible to acknowledge messages in a different connection. +#### Check if reach end of topic + +Consumer can check if it has reached end of topic by sending `isEndOfTopic` request. + +**Request** +```json +{ + "type": "isEndOfTopic" +} +``` + +Key | Type | Required? | Explanation +:---|:-----|:----------|:----------- +`type`| string | yes | Type of command. Must be `isEndOfTopic` + +**Response** +```json +{ + "endOfTopic": "true/false" + } +``` + ### Reader endpoint The reader endpoint requires you to specify a tenant, namespace, and topic in the URL: @@ -282,6 +304,29 @@ Key | Type | Required? | Explanation :---|:-----|:----------|:----------- `messageId`| string | yes | Message ID of the processed message +#### Check if reach end of topic + +Consumer can check if it has reached end of topic by sending `isEndOfTopic` request. + +**Request** +```json +{ + "type": "isEndOfTopic" +} +``` + +Key | Type | Required? | Explanation +:---|:-----|:----------|:----------- +`type`| string | yes | Type of command. Must be `isEndOfTopic` + +**Response** +```json +{ + "endOfTopic": "true/false" + } +``` + + ### Error codes