Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose ReachedEndOfTopic in Reader/Consumer API #9381

Merged
merged 7 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;

import io.netty.util.concurrent.CompleteFuture;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -50,6 +53,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
Expand Down Expand Up @@ -84,6 +88,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
private volatile long msgDeliveredCounter = 0;
private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");
//default interval for checking if end of topic has been reached is 10 min
private static long END_OF_TOPIC_CHECK_INTERVAL = 10 * 60;
MarvinCai marked this conversation as resolved.
Show resolved Hide resolved

public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
Expand Down Expand Up @@ -215,6 +221,8 @@ public void onWebSocketText(String message) {
handlePermit(command);
} else if ("unsubscribe".equals(command.type)) {
handleUnsubscribe(command);
} else if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
} else {
handleAck(command);
}
Expand All @@ -224,6 +232,34 @@ public void onWebSocketText(String message) {
}
}

// Check and notify consumer if reached end of topic.
private void handleEndOfTopic() {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(
new EndOfTopicResponse(consumer.hasReachedEndOfTopic()));
getSession().getRemote()
.sendString(msg, new WriteCallback() {
@Override
public void writeFailed(Throwable th) {
log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), th.getMessage());
}

@Override
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] End of topic message is delivered successfully to {} ",
consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString());
}
}
});
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate end of topic response: {}", consumer.getTopic(), e.getMessage());
} catch (Exception e) {
log.warn("[{}] Failed to send end of topic response: {}", consumer.getTopic(), e.getMessage());
}
}

private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
consumer.unsubscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
Expand Down Expand Up @@ -187,6 +189,17 @@ public void onWebSocketConnect(Session session) {
public void onWebSocketText(String message) {
super.onWebSocketText(message);

try {
ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
return;
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}

// We should have received an ack
// but reader doesn't send an ack to broker here because already reader did

Expand All @@ -197,6 +210,34 @@ public void onWebSocketText(String message) {
}
}

// Check and notify reader if reached end of topic.
private void handleEndOfTopic() {
try {
String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(
new EndOfTopicResponse(reader.hasReachedEndOfTopic()));
getSession().getRemote()
.sendString(msg, new WriteCallback() {
@Override
public void writeFailed(Throwable th) {
log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", reader.getTopic(),
subscription, getRemote().getInetSocketAddress().toString(), th.getMessage());
}

@Override
public void writeSuccess() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] End of topic message is delivered successfully to {} ",
reader.getTopic(), subscription, getRemote().getInetSocketAddress().toString());
}
}
});
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to generate end of topic response: {}", reader.getTopic(), e.getMessage());
} catch (Exception e) {
log.warn("[{}] Failed to send end of topic response: {}", reader.getTopic(), e.getMessage());
}
}

@Override
public void close() throws IOException {
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.pulsar.websocket.data;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Represent result of request to check if we've reached end of topic.
*/
@Data
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class EndOfTopicResponse {
// If reach end of topic.
public boolean endOfTopic;
}
45 changes: 45 additions & 0 deletions site2/docs/client-libraries-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,28 @@ Key | Type | Required? | Explanation

NB: in this mode it's possible to acknowledge messages in a different connection.

#### Check if reach end of topic

Consumer can check if it has reached end of topic by sending `isEndOfTopic` request.

**Request**
```json
{
"type": "isEndOfTopic"
}
```

Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`type`| string | yes | Type of command. Must be `isEndOfTopic`

**Response**
```json
{
"endOfTopic": "true/false"
}
```

### Reader endpoint

The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:
Expand Down Expand Up @@ -282,6 +304,29 @@ Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId`| string | yes | Message ID of the processed message

#### Check if reach end of topic

Consumer can check if it has reached end of topic by sending `isEndOfTopic` request.

**Request**
```json
{
"type": "isEndOfTopic"
}
```

Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`type`| string | yes | Type of command. Must be `isEndOfTopic`

**Response**
```json
{
"endOfTopic": "true/false"
}
```



### Error codes

Expand Down