diff --git a/cli/pom.xml b/cli/pom.xml index a8eaf2d9f4..8914df73c7 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -41,5 +41,13 @@ commons-cli commons-cli + + org.apache.uniffle + rss-internal-client + + + org.apache.httpcomponents + httpclient + diff --git a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java new file mode 100644 index 0000000000..5f973e100c --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.api; + +import org.apache.uniffle.client.RestClient; +import org.apache.uniffle.client.UniffleRestClient; + +import java.util.HashMap; +import java.util.Map; + +public class AdminRestApi { + private UniffleRestClient client; + + private static final String API_BASE_PATH = "admin"; + + private AdminRestApi() {} + + public AdminRestApi(UniffleRestClient client) { + this.client = client; + } + + public String refreshAccessChecker(String checker) { + String path = String.format("/api/server/%s/%s", API_BASE_PATH, "refresh/accessChecker"); + Map params = new HashMap<>(); + params.put("class", checker); + return this.getClient().get(path, params, null); + } + + private RestClient getClient() { + return this.client.getHttpClient(); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java index 6e7631acf2..d649317e10 100644 --- a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java +++ b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java @@ -20,6 +20,8 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.uniffle.api.AdminRestApi; +import org.apache.uniffle.client.UniffleRestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,69 +30,109 @@ public class UniffleCLI extends AbstractCustomCommandLine { - private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); - private final Options allOptions; - private final Option uniffleClientCli; - private final Option uniffleAdminCli; - private final Option help; - - public UniffleCLI(String shortPrefix, String longPrefix) { - allOptions = new Options(); - uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", - true, "This is an client cli command that will print args."); - uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", - true, "This is an admin command that will print args."); - help = new Option(shortPrefix + "h", longPrefix + "help", - false, "Help for the Uniffle CLI."); - allOptions.addOption(uniffleClientCli); - allOptions.addOption(uniffleAdminCli); - allOptions.addOption(help); - } - - public int run(String[] args) throws UniffleCliArgsException { - final CommandLine cmd = parseCommandLineOptions(args, true); - - if (cmd.hasOption(help.getOpt())) { - printUsage(); - return 0; + private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); + private final Options allOptions; + private final Option uniffleClientCli; + private final Option uniffleAdminCli; + private final Option checkerClass; + private final Option help; + private final Option coordServer; + private final Option coordPort; + private final Option refreshAccessCli; + protected UniffleRestClient client; + + public UniffleCLI(String shortPrefix, String longPrefix) { + allOptions = new Options(); + uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", + true, "This is an client cli command that will print args."); + uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", + true, "This is an admin command that will print args."); + refreshAccessCli = new Option(shortPrefix + "rc", longPrefix + "refreshChecker", + false, "This is an admin command that will refresh access checker."); + checkerClass = new Option(shortPrefix + "ac", longPrefix + "checkerClass", + true, "This is an admin command that will refresh access checker."); + help = new Option(shortPrefix + "h", longPrefix + "help", + false, "Help for the Uniffle CLI."); + coordServer = new Option(shortPrefix + "s", longPrefix + "host", + true, "This is coordinator server host."); + coordPort = new Option(shortPrefix + "p", longPrefix + "port", + true, "This is coordinator server port."); + allOptions.addOption(uniffleClientCli); + allOptions.addOption(uniffleAdminCli); + allOptions.addOption(coordServer); + allOptions.addOption(coordPort); + allOptions.addOption(refreshAccessCli); + allOptions.addOption(help); + } + + public int run(String[] args) throws UniffleCliArgsException { + final CommandLine cmd = parseCommandLineOptions(args, true); + + if (cmd.hasOption(help.getOpt())) { + printUsage(); + return 0; + } + + if (cmd.hasOption(uniffleClientCli.getOpt())) { + String cliArgs = cmd.getOptionValue(uniffleClientCli.getOpt()); + System.out.println("uniffle-client-cli : " + cliArgs); + return 0; + } + + if (cmd.hasOption(uniffleAdminCli.getOpt())) { + String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); + System.out.println("uniffle-admin-cli : " + cliArgs); + return 0; + } + if (cmd.hasOption(coordServer.getOpt()) && cmd.hasOption(coordPort.getOpt())) { + String host = cmd.getOptionValue(coordServer.getOpt()).trim(); + int port = Integer.parseInt(cmd.getOptionValue(coordPort.getOpt()).trim()); + String hostUrl = String.format("http://%s:%d", host, port); + client = UniffleRestClient.builder(hostUrl).build(); + } + + if (cmd.hasOption(refreshAccessCli.getOpt())) { + String checker = cmd.getOptionValue(checkerClass.getOpt()); + LOG.info(refreshAccessChecker(checker)); + return 0; + } + + return 1; + } + + private String refreshAccessChecker(String checker) throws UniffleCliArgsException { + if (client == null) { + throw new UniffleCliArgsException("Missing Coordinator host address and grpc port parameters."); + } + AdminRestApi adminRestApi = new AdminRestApi(client); + return adminRestApi.refreshAccessChecker(checker); } - if (cmd.hasOption(uniffleClientCli.getOpt())) { - String cliArgs = cmd.getOptionValue(uniffleClientCli.getOpt()); - System.out.println("uniffle-client-cli : " + cliArgs); - return 0; + @Override + public void addRunOptions(Options baseOptions) { + baseOptions.addOption(uniffleClientCli); + baseOptions.addOption(uniffleAdminCli); + baseOptions.addOption(refreshAccessCli); + baseOptions.addOption(checkerClass); + baseOptions.addOption(coordServer); + baseOptions.addOption(coordPort); } - if (cmd.hasOption(uniffleAdminCli.getOpt())) { - String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); - System.out.println("uniffle-admin-cli : " + cliArgs); - return 0; + @Override + public void addGeneralOptions(Options baseOptions) { + baseOptions.addOption(help); } - return 1; - } - - @Override - public void addRunOptions(Options baseOptions) { - baseOptions.addOption(uniffleClientCli); - baseOptions.addOption(uniffleAdminCli); - } - - @Override - public void addGeneralOptions(Options baseOptions) { - baseOptions.addOption(help); - } - - public static void main(String[] args) { - int retCode; - try { - final UniffleCLI cli = new UniffleCLI("", ""); - retCode = cli.run(args); - } catch (UniffleCliArgsException e) { - retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG); - } catch (Exception e) { - retCode = AbstractCustomCommandLine.handleError(e, LOG); + public static void main(String[] args) { + int retCode; + try { + final UniffleCLI cli = new UniffleCLI("", ""); + retCode = cli.run(args); + } catch (UniffleCliArgsException e) { + retCode = AbstractCustomCommandLine.handleCliArgsException(e, LOG); + } catch (Exception e) { + retCode = AbstractCustomCommandLine.handleError(e, LOG); + } + System.exit(retCode); } - System.exit(retCode); - } } diff --git a/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java new file mode 100644 index 0000000000..85036527bb --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.uniffle.client.exception.UniffleRestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; + +public class HttpClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(HttpClientFactory.class); + + public static CloseableHttpClient createHttpClient(RestClientConf conf) { + RequestConfig requestConfig = + RequestConfig.custom() + .setSocketTimeout(conf.getSocketTimeout()) + .setConnectTimeout(conf.getConnectTimeout()) + .build(); + SSLConnectionSocketFactory sslSocketFactory; + try { + TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; + SSLContext sslContext = + SSLContexts.custom().loadTrustMaterial(null, acceptingTrustStrategy).build(); + sslSocketFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); + } catch (Exception e) { + LOG.error("Error: ", e); + throw new UniffleRestException("Failed to create HttpClient", e); + } + + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .setSSLSocketFactory(sslSocketFactory) + .build(); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClient.java b/cli/src/main/java/org/apache/uniffle/client/RestClient.java new file mode 100644 index 0000000000..023ca2b32d --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClient.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +import java.util.Map; + +/** A underlying http client interface for common rest request. */ +public interface RestClient extends AutoCloseable { + T get(String path, Map params, Class type, String authHeader); + + String get(String path, Map params, String authHeader); + +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java b/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java new file mode 100644 index 0000000000..2224c28a4b --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +public class RestClientConf { + private int maxAttempts; + private int attemptWaitTime; + private int socketTimeout; + private int connectTimeout; + + public int getMaxAttempts() { + return maxAttempts; + } + + public void setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public int getAttemptWaitTime() { + return attemptWaitTime; + } + + public void setAttemptWaitTime(int attemptWaitTime) { + this.attemptWaitTime = attemptWaitTime; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java new file mode 100644 index 0000000000..12eeac3a48 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpResponseException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.apache.uniffle.client.exception.RetryableUniffleRestException; +import org.apache.uniffle.client.exception.UniffleRestException; +import org.apache.uniffle.util.JsonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class RestClientImpl implements RestClient { + + private static final Logger LOG = LoggerFactory.getLogger(RestClientImpl.class); + + private CloseableHttpClient httpclient; + + private String baseUrl; + + public RestClientImpl(String baseUrl, CloseableHttpClient httpclient) { + this.httpclient = httpclient; + this.baseUrl = baseUrl; + } + + @Override + public void close() throws Exception { + if (httpclient != null) { + httpclient.close(); + } + } + + @Override + public T get(String path, Map params, Class type, String authHeader) { + String responseJson = get(path, params, authHeader); + return JsonUtil.fromJson(responseJson, type); + } + + @Override + public String get(String path, Map params, String authHeader) { + return doRequest(buildURI(path, params), authHeader, RequestBuilder.get()); + } + + private String doRequest(URI uri, String authHeader, RequestBuilder requestBuilder) { + String response; + try { + if (requestBuilder.getFirstHeader(HttpHeaders.CONTENT_TYPE) == null) { + requestBuilder.setHeader( + HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + } + if (StringUtils.isNotBlank(authHeader)) { + requestBuilder.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + } + HttpUriRequest httpRequest = requestBuilder.setUri(uri).build(); + + LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri); + + ResponseHandler responseHandler = + resp -> { + int status = resp.getStatusLine().getStatusCode(); + HttpEntity entity = resp.getEntity(); + String entityStr = entity != null ? EntityUtils.toString(entity) : null; + if (status >= 200 && status < 300) { + return entityStr; + } else { + throw new HttpResponseException(status, entityStr); + } + }; + + response = httpclient.execute(httpRequest, responseHandler); + LOG.debug("Response: {}", response); + } catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) { + // net exception can be retried by connecting to other Kyuubi server + throw new RetryableUniffleRestException("Api request failed for " + uri.toString(), e); + } catch (UniffleRestException rethrow) { + throw rethrow; + } catch (Exception e) { + LOG.error("Error: ", e); + throw new UniffleRestException("Api request failed for " + uri.toString(), e); + } + + return response; + } + + private URI buildURI(String path) { + return buildURI(path, null); + } + + private URI buildURI(String path, Map params) { + URI uri; + try { + String url = StringUtils.isNotBlank(path) ? this.baseUrl + "/" + path : this.baseUrl; + URIBuilder builder = new URIBuilder(url); + + if (!params.isEmpty()) { + for (Map.Entry entry : params.entrySet()) { + if (entry.getValue() != null) { + builder.addParameter(entry.getKey(), entry.getValue().toString()); + } + } + } + uri = builder.build(); + } catch (URISyntaxException e) { + throw new UniffleRestException("invalid URI.", e); + } + return uri; + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java new file mode 100644 index 0000000000..db5b967d45 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +import org.apache.http.impl.client.CloseableHttpClient; + +public class UniffleRestClient implements AutoCloseable, Cloneable { + + private RestClient restClient; + + private RestClientConf conf; + + private String hostUrl; + @Override + public void close() throws Exception { + if (restClient != null) { + restClient.close(); + } + } + + @Override + public UniffleRestClient clone() { + UniffleRestClient uniffleRestClient = new UniffleRestClient(); + uniffleRestClient.conf = this.conf; + return uniffleRestClient; + } + + public String getHostUrl() { + return hostUrl; + } + + public void setHostUrl(String hostUrl) { + this.hostUrl = hostUrl; + } + + private UniffleRestClient() { + } + + private UniffleRestClient(Builder builder) { + this.hostUrl = builder.hostUrl; + + RestClientConf conf = new RestClientConf(); + conf.setConnectTimeout(builder.connectTimeout); + conf.setSocketTimeout(builder.socketTimeout); + conf.setMaxAttempts(builder.maxAttempts); + conf.setAttemptWaitTime(builder.attemptWaitTime); + this.conf = conf; + CloseableHttpClient httpclient = HttpClientFactory.createHttpClient(conf); + this.restClient = new RestClientImpl(hostUrl, httpclient); + } + + public RestClient getHttpClient() { + return restClient; + } + + public RestClientConf getConf() { + return conf; + } + + public static Builder builder(String hostUrl) { + return new Builder(hostUrl); + } + + public static class Builder { + + private String hostUrl; + + // 2 minutes + private int socketTimeout = 2 * 60 * 1000; + + // 30s + private int connectTimeout = 30 * 1000; + + private int maxAttempts = 3; + + // 3s + private int attemptWaitTime = 3 * 1000; + + public Builder(String hostUrl) { + this.hostUrl = hostUrl; + } + + public Builder socketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + return this; + } + + public Builder connectionTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public Builder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public Builder attemptWaitTime(int attemptWaitTime) { + this.attemptWaitTime = attemptWaitTime; + return this; + } + + public UniffleRestClient build() { + return new UniffleRestClient(this); + } + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/exception/RetryableUniffleRestException.java b/cli/src/main/java/org/apache/uniffle/client/exception/RetryableUniffleRestException.java new file mode 100644 index 0000000000..b6cc9700cf --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/exception/RetryableUniffleRestException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.exception; + +/** + * A retryable exception which is thrown by underlying rest client. The call side can do retry by + * catching this exception. + */ +public class RetryableUniffleRestException extends UniffleRestException { + public RetryableUniffleRestException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java b/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java new file mode 100644 index 0000000000..10b3cec268 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.exception; + +public class UniffleRestException extends RuntimeException { + + public UniffleRestException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/opt/CoordinatorServerOpt.java b/cli/src/main/java/org/apache/uniffle/opt/CoordinatorServerOpt.java new file mode 100644 index 0000000000..882da79485 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/opt/CoordinatorServerOpt.java @@ -0,0 +1,27 @@ +package org.apache.uniffle.opt; + +public class CoordinatorServerOpt { + private String host; + private Integer port; + + public CoordinatorServerOpt(String host, Integer port) { + this.host = host; + this.port = port; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } +} diff --git a/cli/src/main/java/org/apache/uniffle/util/JsonUtil.java b/cli/src/main/java/org/apache/uniffle/util/JsonUtil.java new file mode 100644 index 0000000000..d74ea2e315 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/util/JsonUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.util; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.uniffle.client.exception.UniffleRestException; + +public final class JsonUtil { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public static String toJson(Object object) { + try { + return MAPPER.writeValueAsString(object); + } catch (Exception e) { + throw new UniffleRestException( + String.format("Failed to convert object(%s) to json", object), e); + } + } + + public static T fromJson(String json, Class clazz) { + try { + return MAPPER.readValue(json, clazz); + } catch (Exception e) { + throw new UniffleRestException( + String.format("Failed to convert json string(%s) to %s", json, clazz.getName()), e); + } + } +} diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java index 207538ab53..c143e56377 100644 --- a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java +++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java @@ -17,18 +17,18 @@ package org.apache.uniffle.cli; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; - +import org.apache.uniffle.UniffleCliArgsException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.apache.uniffle.UniffleCliArgsException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; + public class UniffleTestCLI { private UniffleCLI uniffleCLI; diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 3fb185f489..28e11a602a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -19,6 +19,7 @@ import io.prometheus.client.CollectorRegistry; import org.apache.hadoop.conf.Configuration; +import org.apache.uniffle.coordinator.web.servlet.admin.RefreshCheckerServlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -201,6 +202,9 @@ private void registerRESTAPI() throws Exception { jettyServer.addServlet( new CancelDecommissionServlet(this), "/api/server/cancelDecommission"); + jettyServer.addServlet( + new RefreshCheckerServlet(this), + "/api/server/admin/refreshChecker"); } private void registerMetrics() throws Exception { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java index 677890c5ec..88586bda8f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java @@ -27,4 +27,10 @@ public abstract class AbstractAccessChecker implements AccessChecker { protected AbstractAccessChecker(AccessManager accessManager) throws Exception { } + + + @Override + public void refreshAccessChecker() { + + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java index d8b827a9e2..fe869393e2 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java @@ -101,6 +101,11 @@ public AccessCheckResult check(AccessInfo accessInfo) { return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); } + @Override + public void refreshAccessChecker() { + updateAccessCandidates(); + } + @Override public void close() { if (updateAccessCandidatesSES != null) { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java index 71a380a213..53235383a0 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java @@ -34,4 +34,6 @@ public interface AccessChecker extends Closeable { * @return access check result */ AccessCheckResult check(AccessInfo accessInfo); + + void refreshAccessChecker(); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java new file mode 100644 index 0000000000..9ab5d6338f --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.servlet.admin; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.ServerNode; +import org.apache.uniffle.coordinator.access.checker.AccessChecker; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.servlet.BaseServlet; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + + +public class RefreshCheckerServlet extends BaseServlet { + private final CoordinatorServer coordinator; + + public RefreshCheckerServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } + + @Override + protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) { + List checkers = coordinator.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS); + String accessClass = req.getParameter("class"); + if (CollectionUtils.isEmpty(checkers)) { + return Response.fail("Access checkers is empty, will not update any checkers."); + } + try { + Class aClass = Class.forName(accessClass); + Optional checker = coordinator.getAccessManager().getAccessCheckers().stream() + .filter(aClass::isInstance) + .findAny(); + if (!checker.isPresent()) { + return Response.fail(String.format("Access checkers %s is none exist, will not update any checkers.", accessClass)); + } + checker.get().refreshAccessChecker(); + } catch (ClassNotFoundException e) { + return Response.fail(String.format("Access checker class %s is not found.", accessClass)); + } + return Response.success(null); + } +} diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml index abdb0de044..b8b705dd9e 100644 --- a/integration-test/common/pom.xml +++ b/integration-test/common/pom.xml @@ -50,6 +50,11 @@ shuffle-server test + + org.apache.uniffle + cli + test + org.apache.uniffle shuffle-server diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java index 4e167e04b4..1f29d33bcf 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java @@ -69,6 +69,11 @@ public AccessCheckResult check(AccessInfo accessInfo) { return new AccessCheckResult(false, ""); } + @Override + public void refreshAccessChecker() { + + } + @Override public void close() throws IOException { // ignore. diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java new file mode 100644 index 0000000000..dfb056d7a8 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java @@ -0,0 +1,30 @@ +package org.apache.uniffle.test; + +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.metrics.TestUtils; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CoordinatorAdminServiceTest extends CoordinatorAdminTestBase { + + private static final String URL_PREFIX = "http://127.0.0.1:12345/api/"; + private static final String ADMIN_URL = URL_PREFIX + "server/admin/refresh/accessChecker"; + + @Test + public void test() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY.key(), "BASIC"); + String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"; + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker); + createCoordinatorServer(coordinatorConf); + startServers(); +//CoordinatorConf.JETTY_HTTP_PORT + while (true){ + Thread.sleep(100000); + } +// assertEquals(adminRestApi.refreshAccessChecker(accessChecker),""); +// shutdownServers(); + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminTestBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminTestBase.java new file mode 100644 index 0000000000..b59fc7e1d3 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminTestBase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import org.apache.uniffle.api.AdminRestApi; +import org.apache.uniffle.client.UniffleRestClient; +import org.junit.jupiter.api.BeforeEach; + +public class CoordinatorAdminTestBase extends IntegrationTestBase { + + protected AdminRestApi adminRestApi; + + @BeforeEach + public void createClient() { + String hostUrl = String.format("http://%s:%d", LOCALHOST, COORDINATOR_PORT_1); + adminRestApi = new AdminRestApi(UniffleRestClient.builder(hostUrl).build()); + } +} diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java index c5a8b5073b..6a5601943a 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java @@ -25,6 +25,7 @@ import java.util.Map; import com.google.common.collect.Lists; +import org.apache.uniffle.common.config.RssBaseConf; import org.junit.jupiter.api.AfterAll; import org.apache.uniffle.common.util.RssUtils; diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java index 943ecd4091..0485a772a9 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java @@ -52,6 +52,9 @@ public class ServletTest extends IntegrationTestBase { private static final String URL_PREFIX = "http://127.0.0.1:12345/api/"; private static final String NODES_URL = URL_PREFIX + "server/nodes"; + + private static final String ADMIN_URL = URL_PREFIX + "server/admin/refresh/accessChecker"; + private static final String DECOMMISSION_URL = URL_PREFIX + "server/decommission"; private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + "server/cancelDecommission"; private static CoordinatorServer coordinatorServer; @@ -154,4 +157,18 @@ public void testDecommissionServlet() throws Exception { assertEquals(0, response.getCode()); assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus()); } + + @Test + public void testAdminServlet() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY.key(), "BASIC"); + String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"; + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker); + createCoordinatorServer(coordinatorConf); + startServers(); + shutdownServers(); + String content = TestUtils.httpGet(ADMIN_URL+"?"); + Response> response = objectMapper.readValue(content, new TypeReference>>() { + }); + } } diff --git a/pom.xml b/pom.xml index 582a44e4e9..d85ed3d435 100644 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,12 @@ shuffle-server ${project.version} + + + org.apache.uniffle + cli + ${project.version} + org.apache.uniffle coordinator @@ -467,6 +473,18 @@ + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + commons-logging + commons-logging + + + + com.fasterxml.jackson.core jackson-core