Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new entrypoint for reader to websocket proxy #620

Merged
merged 5 commits into from
Aug 23, 2017

Conversation

hrsakai
Copy link
Contributor

@hrsakai hrsakai commented Aug 2, 2017

Motivation

no reader entrypoint on websocket proxy .

Modifications

added reader entrypoint to websocket proxy.

Result

we can use reader through websocket proxy

@hrsakai hrsakai added the type/feature The PR added a new feature or issue requested a new feature label Aug 2, 2017
// /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic
checkArgument(parts.size() == 8, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
checkArgument(parts.get(3).equals("persistent"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about non-persistent topics? I think we should support them as well in the WebSocket interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support non-persistent topics and delete ReaderHandler#checkRequestURI() because
AbstractWebSocketHandler class check the same.

@merlimat
Copy link
Contributor

merlimat commented Aug 2, 2017

retest this please

@merlimat merlimat added this to the 1.20.0-incubating milestone Aug 2, 2017
@hrsakai
Copy link
Contributor Author

hrsakai commented Aug 3, 2017

retest this please

1 similar comment
@hrsakai
Copy link
Contributor Author

hrsakai commented Aug 3, 2017

retest this please

service.getExecutor().execute(() -> receiveMessage());
}
}).exceptionally(exception -> {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a debug log here?

}
updateDeliverMsgStat(msgSize);
int pending = pendingMessages.getAndDecrement();
if (pending >= maxPendingMessages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be tricky.. here, in reader we are keep pushing messages to the client which may blow up client-memory. so, should we follow the same semantic as normal consumer where initially proxy will send queue-size messages to client and on every acknowledgement/readNext proxy send one more message.??

@hrsakai
Copy link
Contributor Author

hrsakai commented Aug 11, 2017

@rdhabalia
Sorry for my late reply, I added warn log and call receiveMessage() on every readNext().
Please check them.

service.getExecutor().execute(() -> receiveMessage());
} else {
// Resume delivery
receiveMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should't we stop delivery if pending reached to maxPendingMessages else it will keep sending messages to the client ?
Should we do the similar like ConsumerHandler, after reaching maxPendingMessages, proxy-reader will send more message when it receives a new readNext request from client, same way consumer receives ack-request.? If that seems feasible solution then we can document the semantic into websocket-reader-doc as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia
Please let me confirm.
I will try to change readerHandler like the following codes.
Does this match what you mean ?

                            public void writeSuccess() {
                                 if (log.isDebugEnabled()) {
                                     log.debug("[{}/{}] message is delivered successfully to {} ", reader.getTopic(),
                                             subscription, getRemote().getInetSocketAddress().toString());
                                 }
                                 updateDeliverMsgStat(msgSize);
 -                               pendingMessages.getAndDecrement();
                             }
.
.
.
              int pending = pendingMessages.incrementAndGet();
              if (pending < maxPendingMessages) {
                  // Start next read in a separate thread to avoid recursion
                  service.getExecutor().execute(() -> receiveMessage());
 -            } else {
 -                // Resume delivery
 -                receiveMessage();
              }
.
.
.
   @Override
    public void onWebSocketText(String message) {
        super.onWebSocketText(message);

+        int pending = pendingMessages.getAndDecrement();
+        if (pending >= maxPendingMessages) {
+            // Resume delivery
+            receiveMessage();
+        }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this would control message-delivery to the client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your confirmation.
I will try to implement it.

@hrsakai
Copy link
Contributor Author

hrsakai commented Aug 17, 2017

@rdhabalia
please check again and next pull request, I will update websocket docs.

Copy link
Contributor

@rdhabalia rdhabalia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 LGTM

Copy link
Contributor

@yush1ga yush1ga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@yush1ga yush1ga merged commit ccb4d92 into apache:master Aug 23, 2017
@hrsakai hrsakai deleted the reader branch June 17, 2020 00:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants