diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java index 92b6829ed7313..a8525e4da9843 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java @@ -217,65 +217,52 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part } } + /** + * Always get partition metadata from broker service. + * + * + **/ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) { TopicName topicName = TopicName.get(partitionMetadata.getTopic()); - if (isBlank(brokerServiceURL)) { - service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName, - proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Total number of partitions for topic {} is {}", - proxyConnection.clientAuthRole, topicName, metadata.partitions); - } - proxyConnection.ctx().writeAndFlush( - Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId)); - }).exceptionally(ex -> { - log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, topicName, - ex.getMessage(), ex); - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse( - getServerError(ex), ex.getMessage(), clientRequestId)); - return null; - }); - } else { - URI brokerURI; - try { - brokerURI = new URI(brokerServiceURL); - } catch (URISyntaxException e) { - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError, - e.getMessage(), clientRequestId)); - return; - } - InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort()); - if (log.isDebugEnabled()) { - log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, - topicName.getPartitionedTopicName(), clientRequestId); - } + String serviceUrl = getServiceUrl(clientRequestId); + if (serviceUrl == null) { + log.warn("No available broker for {} to lookup partition metadata", topicName); + return; + } + InetSocketAddress addr = getAddr(serviceUrl, clientRequestId); + if (addr == null) { + return; + } - proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { - // Connected to backend broker - long requestId = proxyConnection.newRequestId(); - ByteBuf command; - command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); - clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { - if (t != null) { - log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), - t.getMessage(), t); - proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t), - t.getMessage(), clientRequestId)); - } else { - proxyConnection.ctx().writeAndFlush( - Commands.newPartitionMetadataResponse(r.partitions, clientRequestId)); - } - proxyConnection.getConnectionPool().releaseConnection(clientCnx); - }); - }).exceptionally(ex -> { - // Failed to connect to backend broker - proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - ex.getMessage(), clientRequestId)); - return null; - }); + if (log.isDebugEnabled()) { + log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr, + topicName.getPartitionedTopicName(), clientRequestId); } + proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> { + // Connected to backend broker + long requestId = proxyConnection.newRequestId(); + ByteBuf command; + command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); + clientCnx.newLookup(command, requestId).whenComplete((r, t) -> { + if (t != null) { + log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(), + t.getMessage(), t); + proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t), + t.getMessage(), clientRequestId)); + } else { + proxyConnection.ctx().writeAndFlush( + Commands.newPartitionMetadataResponse(r.partitions, clientRequestId)); + } + proxyConnection.getConnectionPool().releaseConnection(clientCnx); + }); + }).exceptionally(ex -> { + // Failed to connect to backend broker + proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, + ex.getMessage(), clientRequestId)); + return null; + }); } public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {