Skip to content

Commit

Permalink
[Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler (a…
Browse files Browse the repository at this point in the history
…pache#15415)

* [Proxy] Remove unnecessary blocking DNS lookup in LookupProxyHandler

* Use existing code pattern for creating address

(cherry picked from commit 7373a51)
  • Loading branch information
lhotari committed May 4, 2022
1 parent fe78908 commit 5980cdc
Showing 1 changed file with 40 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,65 +217,52 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
}
}

/**
* Always get partition metadata from broker service.
*
*
**/
private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata,
long clientRequestId) {
TopicName topicName = TopicName.get(partitionMetadata.getTopic());
if (isBlank(brokerServiceURL)) {
service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topicName,
proxyConnection.clientAuthRole, proxyConnection.authenticationData).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}",
proxyConnection.clientAuthRole, topicName, metadata.partitions);
}
proxyConnection.ctx().writeAndFlush(
Commands.newPartitionMetadataResponse(metadata.partitions, clientRequestId));
}).exceptionally(ex -> {
log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, topicName,
ex.getMessage(), ex);
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
getServerError(ex), ex.getMessage(), clientRequestId));
return null;
});
} else {
URI brokerURI;
try {
brokerURI = new URI(brokerServiceURL);
} catch (URISyntaxException e) {
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
e.getMessage(), clientRequestId));
return;
}
InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());

if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
topicName.getPartitionedTopicName(), clientRequestId);
}
String serviceUrl = getServiceUrl(clientRequestId);
if (serviceUrl == null) {
log.warn("No available broker for {} to lookup partition metadata", topicName);
return;
}
InetSocketAddress addr = getAddr(serviceUrl, clientRequestId);
if (addr == null) {
return;
}

proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
t.getMessage(), t);
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));
} else {
proxyConnection.ctx().writeAndFlush(
Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
}
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), clientRequestId));
return null;
});
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", addr,
topicName.getPartitionedTopicName(), clientRequestId);
}
proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
// Connected to backend broker
long requestId = proxyConnection.newRequestId();
ByteBuf command;
command = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
t.getMessage(), t);
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));
} else {
proxyConnection.ctx().writeAndFlush(
Commands.newPartitionMetadataResponse(r.partitions, clientRequestId));
}
proxyConnection.getConnectionPool().releaseConnection(clientCnx);
});
}).exceptionally(ex -> {
// Failed to connect to backend broker
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), clientRequestId));
return null;
});
}

public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
Expand Down

0 comments on commit 5980cdc

Please sign in to comment.