Skip to content

Commit

Permalink
[proxy][functions] Issue #2154: proxy should be able to forward rest …
Browse files Browse the repository at this point in the history
…requests to function workers cluster (#2560)

*Motivation*

Function workers can be deployed as a separate cluster. If so, proxy is not able to forward the functions
related rest calls to the correct server.

*Changes*

Add two settings in proxy configuration to allow proxy configuring forwarding functions related rest calls
to function worker cluster.

*Tests*

Verified with changes in integration tests (manually). It is hard to add the integration tests based on
current integration tests. will add them in a separate PR.
  • Loading branch information
sijie committed Sep 14, 2018
1 parent 95fe84c commit 07d4226
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
5 changes: 5 additions & 0 deletions conf/proxy.conf
Expand Up @@ -23,6 +23,11 @@ zookeeperServers=
# Configuration store connection string (as a comma-separated list)
configurationStoreServers=

# If function workers are setup in a separate cluster, configure the following 2 settings
# to point to the function workers cluster
functionWorkerWebServiceURL=
functionWorkerWebServiceURLTLS=

# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000

Expand Down
Expand Up @@ -49,12 +49,15 @@ class AdminProxyHandler extends AsyncProxyServlet {
private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
private final String brokerWebServiceUrl;
private final String functionWorkerWebServiceUrl;

AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
this.config = config;
this.discoveryProvider = discoveryProvider;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS()
: config.getFunctionWorkerWebServiceURL();
}

@Override
Expand Down Expand Up @@ -122,7 +125,16 @@ protected HttpClient newHttpClient() {
protected String rewriteTarget(HttpServletRequest request) {
StringBuilder url = new StringBuilder();

if (isBlank(brokerWebServiceUrl)) {
boolean isFunctionsRestRequest = false;
String requestUri = request.getRequestURI();
if (requestUri.startsWith("/admin/v2/functions")
|| requestUri.startsWith("/admin/functions")) {
isFunctionsRestRequest = true;
}

if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
url.append(functionWorkerWebServiceUrl);
} else if (isBlank(brokerWebServiceUrl)) {
try {
ServiceLookupData availableBroker = discoveryProvider.nextBroker();

Expand All @@ -148,7 +160,7 @@ protected String rewriteTarget(HttpServletRequest request) {
if (url.lastIndexOf("/") == url.length() - 1) {
url.deleteCharAt(url.lastIndexOf("/"));
}
url.append(request.getRequestURI());
url.append(requestUri);

String query = request.getQueryString();
if (query != null) {
Expand Down
Expand Up @@ -48,6 +48,10 @@ public class ProxyConfiguration implements PulsarConfiguration {
private String brokerWebServiceURL;
private String brokerWebServiceURLTLS;

// function worker web services
private String functionWorkerWebServiceURL;
private String functionWorkerWebServiceURLTLS;

// Port to use to server binary-proto request
private int servicePort = 6650;
// Port to use to server binary-proto-tls request
Expand Down Expand Up @@ -158,6 +162,14 @@ public void setBrokerWebServiceURLTLS(String brokerWebServiceURLTLS) {
this.brokerWebServiceURLTLS = brokerWebServiceURLTLS;
}

public String getFunctionWorkerWebServiceURL() {
return functionWorkerWebServiceURL;
}

public String getFunctionWorkerWebServiceURLTLS() {
return functionWorkerWebServiceURLTLS;
}

public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down

0 comments on commit 07d4226

Please sign in to comment.