Skip to content

Commit

Permalink
WebSocket proxy should not make a consumer/producer when authorizatio…
Browse files Browse the repository at this point in the history
…n is failed (#448)
  • Loading branch information
Yuki Shiga authored and merlimat committed Jun 12, 2017
1 parent 019e5f3 commit 10ac6c1
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Expand Up @@ -63,28 +63,33 @@ public void onWebSocketConnect(Session session) {
if (service.isAuthenticationEnabled()) { if (service.isAuthenticationEnabled()) {
try { try {
authRole = service.getAuthenticationService().authenticateHttpRequest(request); authRole = service.getAuthenticationService().authenticateHttpRequest(request);
log.info("[{}] Authenticated WebSocket producer {} on topic {}", session.getRemoteAddress(), authRole, log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole,
topic); topic);


} catch (AuthenticationException e) { } catch (AuthenticationException e) {
log.warn("[{}] Failed to authenticated WebSocket producer {} on topic {}: {}", log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage()); session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.AuthenticationError); close(WebSocketError.AuthenticationError);
return; return;
} }
} }


if (service.isAuthorizationEnabled()) { if (service.isAuthorizationEnabled()) {
final String role = authRole; try {
isAuthorized(authRole).thenApply(isAuthorized -> { if (!isAuthorized(authRole)) {
if(!isAuthorized) { log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole,
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), role,
topic); topic);
close(WebSocketError.NotAuthorizedError); close(WebSocketError.NotAuthorizedError);
return;
} }
return null; } catch (Exception e) {
}); log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.UnknownError);
return;
}
} }
createClient(session);
} }


@Override @Override
Expand Down Expand Up @@ -125,8 +130,6 @@ protected String checkAuthentication() {
return null; return null;
} }


protected abstract CompletableFuture<Boolean> isAuthorized(String authRole);

private String extractTopicName(HttpServletRequest request) { private String extractTopicName(HttpServletRequest request) {
String uri = request.getRequestURI(); String uri = request.getRequestURI();
List<String> parts = Splitter.on("/").splitToList(uri); List<String> parts = Splitter.on("/").splitToList(uri);
Expand All @@ -143,5 +146,9 @@ private String extractTopicName(HttpServletRequest request) {
return dn.toString(); return dn.toString();
} }


protected abstract Boolean isAuthorized(String authRole) throws Exception;

protected abstract void createClient(Session session);

private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class); private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
} }
Expand Up @@ -87,9 +87,7 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request) {
} }


@Override @Override
public void onWebSocketConnect(Session session) { protected void createClient(Session session) {
super.onWebSocketConnect(session);

try { try {
this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
this.service.addConsumer(this); this.service.addConsumer(this);
Expand Down Expand Up @@ -247,8 +245,8 @@ private ConsumerConfiguration getConsumerConfiguration() {
} }


@Override @Override
protected CompletableFuture<Boolean> isAuthorized(String authRole) { protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topic), authRole); return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole);
} }


private static String extractSubscription(HttpServletRequest request) { private static String extractSubscription(HttpServletRequest request) {
Expand Down
Expand Up @@ -88,14 +88,14 @@ public void close() throws IOException {
} }


@Override @Override
public void onWebSocketConnect(Session session) { protected void createClient(Session session) {
super.onWebSocketConnect(session);

try { try {
ProducerConfiguration conf = getProducerConfiguration(); ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf); this.producer = service.getPulsarClient().createProducer(topic, conf);
this.service.addProducer(this); this.service.addProducer(this);
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(),
topic, e);
close(FailedToCreateProducer, e.getMessage()); close(FailedToCreateProducer, e.getMessage());
} }
} }
Expand Down Expand Up @@ -176,8 +176,9 @@ public long getMsgPublishedCounter() {
return MSG_PUBLISHED_COUNTER_UPDATER.get(this); return MSG_PUBLISHED_COUNTER_UPDATER.get(this);
} }


protected CompletableFuture<Boolean> isAuthorized(String authRole) { @Override
return service.getAuthorizationManager().canProduceAsync(DestinationName.get(topic), authRole); protected Boolean isAuthorized(String authRole) throws Exception {
return service.getAuthorizationManager().canProduce(DestinationName.get(topic), authRole);
} }


private void sendAckResponse(ProducerAck response) { private void sendAckResponse(ProducerAck response) {
Expand Down

0 comments on commit 10ac6c1

Please sign in to comment.