Skip to content

Commit

Permalink
Expose ReachedEndOfTopic in Reader/Consumer API (#9381)
Browse files Browse the repository at this point in the history
Fixes #2687

### Motivation
Expose ReachedEndOfTopic in Reader/Consumer API

### Modifications

Add a request of type `isEndOfTopic` to check if consumer/reader has reached end of topic and send response based on result.
  • Loading branch information
MarvinCai committed Feb 22, 2021
1 parent 18eb2e5 commit 65fed8b
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.websocket.stats.ProxyTopicStat;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ConsumerStats;
import org.apache.pulsar.websocket.stats.ProxyTopicStat.ProducerStats;
import org.awaitility.Awaitility;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
Expand Down Expand Up @@ -193,6 +194,74 @@ public void socketTest() throws Exception {
}
}

@Test(timeOut = 10000)
public void socketTestEndOfTopic() throws Exception {
final String topic = "my-property/my-ns/my-topic8";
final String subscription = "my-sub";
final String consumerUri = String.format(
"ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
proxyServer.getListenPortHTTP().get(), topic, subscription
);
final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", proxyServer.getListenPortHTTP().get(), topic);

URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);

WebSocketClient consumeClient = new WebSocketClient();
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
consumeClient.start();
ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
log.info("Connecting to : {}", consumeUri);

// let it connect
assertTrue(consumerFuture.get().isOpen());

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
assertTrue(producerFuture.get().isOpen());
// Send 30 message in total.
produceSocket.sendMessage(20);
// Send 10 permits, should receive 10 message
consumeSocket.sendPermits(10);
Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 10));
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
assertEquals(consumeSocket.getBuffer().size(), 11));
// Assert not reach end of topic yet
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");

// Send 20 more permits, should receive all message
consumeSocket.sendPermits(20);
// 31 includes previous of end of topic request.
Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 31));
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 32));
// Assert not reached end of topic.
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":false}");

admin.topics().terminateTopicAsync(topic).get();
consumeSocket.isEndOfTopic();
// Wait till get response
Awaitility.await().atMost(500, TimeUnit.MILLISECONDS).untilAsserted(() ->
assertEquals(consumeSocket.getReceivedMessagesCount(), 33));
// Assert reached end of topic.
assertEquals(consumeSocket.getBuffer().get(consumeSocket.getBuffer().size() - 1), "{\"endOfTopic\":true}");
} finally {
stopWebSocketClient(consumeClient, produceClient);
}
}

@Test
public void unsubscribeTest() throws Exception {
final String namespace = "my-property/my-ns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
Expand Down Expand Up @@ -73,12 +74,16 @@ public void onConnect(Session session) throws InterruptedException {
public synchronized void onMessage(String msg) throws JsonParseException, IOException {
receivedMessages.incrementAndGet();
JsonObject message = new Gson().fromJson(msg, JsonObject.class);
JsonObject ack = new JsonObject();
String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
consumerBuffer.add(messageId);
ack.add("messageId", new JsonPrimitive(messageId));
// Acking the proxy
this.getRemote().sendString(ack.toString());
if (message.get(X_PULSAR_MESSAGE_ID) != null) {
JsonObject ack = new JsonObject();
String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
consumerBuffer.add(messageId);
ack.add("messageId", new JsonPrimitive(messageId));
// Acking the proxy
this.getRemote().sendString(ack.toString());
} else {
consumerBuffer.add(message.toString());
}
}

public void sendPermits(int nbPermits) throws IOException {
Expand All @@ -94,6 +99,12 @@ public void unsubscribe() throws IOException {
this.getRemote().sendString(message.toString());
}

public void isEndOfTopic() throws IOException {
JsonObject message = new JsonObject();
message.add("type", new JsonPrimitive("isEndOfTopic"));
this.getRemote().sendString(message.toString());
}

public RemoteEndpoint getRemote() {
return this.session.getRemote();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;

import io.netty.util.concurrent.CompleteFuture;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -50,6 +53,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
Expand Down Expand Up @@ -215,6 +219,8 @@ public void onWebSocketText(String message) {
handlePermit(command);
} else if ("unsubscribe".equals(command.type)) {
handleUnsubscribe(command);
} else if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
} else {
handleAck(command);
}
Expand All @@ -224,6 +230,34 @@ public void onWebSocketText(String message) {
}
}

// Check and notify consumer if reached end of topic.
private void handleEndOfTopic() {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(
new EndOfTopicResponse(consumer.hasReachedEndOfTopic()));
getSession().getRemote()
.sendString(msg, new WriteCallback() {
@Override
public void writeFailed(Throwable th) {
log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), th.getMessage());
}

@Override
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] End of topic message is delivered successfully to {} ",
consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString());
}
}
});
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate end of topic response: {}", consumer.getTopic(), e.getMessage());
} catch (Exception e) {
log.warn("[{}] Failed to send end of topic response: {}", consumer.getTopic(), e.getMessage());
}
}

private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
consumer.unsubscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
Expand Down Expand Up @@ -187,6 +189,17 @@ public void onWebSocketConnect(Session session) {
public void onWebSocketText(String message) {
super.onWebSocketText(message);

try {
ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
return;
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}

// We should have received an ack
// but reader doesn't send an ack to broker here because already reader did

Expand All @@ -197,6 +210,34 @@ public void onWebSocketText(String message) {
}
}

// Check and notify reader if reached end of topic.
private void handleEndOfTopic() {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(
new EndOfTopicResponse(reader.hasReachedEndOfTopic()));
getSession().getRemote()
.sendString(msg, new WriteCallback() {
@Override
public void writeFailed(Throwable th) {
log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), th.getMessage());
}

@Override
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] End of topic message is delivered successfully to {} ",
reader.getTopic(), subscription, getRemote().getInetSocketAddress().toString());
}
}
});
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate end of topic response: {}", reader.getTopic(), e.getMessage());
} catch (Exception e) {
log.warn("[{}] Failed to send end of topic response: {}", reader.getTopic(), e.getMessage());
}
}

@Override
public void close() throws IOException {
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.data;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Represent result of request to check if we've reached end of topic.
*/
@Data
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EndOfTopicResponse {
// If reach end of topic.
public boolean endOfTopic;
}
45 changes: 45 additions & 0 deletions site2/docs/client-libraries-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,28 @@ Key | Type | Required? | Explanation

NB: in this mode it's possible to acknowledge messages in a different connection.

#### Check if reach end of topic

Consumer can check if it has reached end of topic by sending `isEndOfTopic` request.

**Request**
```json
{
"type": "isEndOfTopic"
}
```

Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`type`| string | yes | Type of command. Must be `isEndOfTopic`

**Response**
```json
{
"endOfTopic": "true/false"
}
```

### Reader endpoint

The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:
Expand Down Expand Up @@ -282,6 +304,29 @@ Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId`| string | yes | Message ID of the processed message

#### Check if reach end of topic

Consumer can check if it has reached end of topic by sending `isEndOfTopic` request.

**Request**
```json
{
"type": "isEndOfTopic"
}
```

Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`type`| string | yes | Type of command. Must be `isEndOfTopic`

**Response**
```json
{
"endOfTopic": "true/false"
}
```



### Error codes

Expand Down

0 comments on commit 65fed8b

Please sign in to comment.