From dd58995354ebe51d7dafc4d6aece1305591294fc Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Tue, 6 Jun 2023 20:40:05 +0800 Subject: [PATCH] [#768] feat(cli): Cli method for blacklist update --- cli/pom.xml | 8 ++ .../org/apache/uniffle/api/AdminRestApi.java | 30 +++++ .../org/apache/uniffle/cli/UniffleCLI.java | 111 ++++++++++++----- .../uniffle/client/HttpClientFactory.java | 41 +++++++ .../org/apache/uniffle/client/RestClient.java | 27 +++++ .../apache/uniffle/client/RestClientConf.java | 57 +++++++++ .../apache/uniffle/client/RestClientImpl.java | 114 ++++++++++++++++++ .../uniffle/client/UniffleRestClient.java | 87 +++++++++++++ .../exception/UniffleRestException.java | 25 ++++ .../apache/uniffle/cli/AdminRestApiTest.java | 40 ++++++ .../uniffle/cli/UniffleTestAdminCLI.java | 57 +++++++++ .../apache/uniffle/cli/UniffleTestCLI.java | 11 +- .../coordinator/CoordinatorServer.java | 4 + .../access/checker/AbstractAccessChecker.java | 5 + .../checker/AccessCandidatesChecker.java | 5 + .../access/checker/AccessChecker.java | 2 + .../servlet/admin/RefreshCheckerServlet.java | 44 +++++++ integration-test/common/pom.xml | 5 + .../uniffle/test/AccessClusterTest.java | 5 + .../test/CoordinatorAdminServiceTest.java | 54 +++++++++ pom.xml | 12 ++ 21 files changed, 708 insertions(+), 36 deletions(-) create mode 100644 cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/RestClient.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/RestClientConf.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java create mode 100644 cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java create mode 100644 cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java create mode 100644 cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java create mode 100644 integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java diff --git a/cli/pom.xml b/cli/pom.xml index a8eaf2d9f4..8914df73c7 100644 --- a/cli/pom.xml +++ b/cli/pom.xml @@ -41,5 +41,13 @@ commons-cli commons-cli + + org.apache.uniffle + rss-internal-client + + + org.apache.httpcomponents + httpclient + diff --git a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java new file mode 100644 index 0000000000..cc91435a90 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java @@ -0,0 +1,30 @@ +package org.apache.uniffle.api; + +import org.apache.uniffle.client.RestClient; +import org.apache.uniffle.client.UniffleRestClient; + +import java.util.HashMap; +import java.util.Map; + +public class AdminRestApi { + private UniffleRestClient client; + + private static final String API_BASE_PATH = "admin"; + + private AdminRestApi() {} + + public AdminRestApi(UniffleRestClient client) { + this.client = client; + } + + public String refreshAccessChecker(String checker) { + String path = String.format("/api/server/%s/%s", API_BASE_PATH, "refreshChecker"); + Map params = new HashMap<>(); + params.put("class", checker); + return this.getClient().get(path, params, null); + } + + private RestClient getClient() { + return this.client.getHttpClient(); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java index 6e7631acf2..cf16156c26 100644 --- a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java +++ b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java @@ -20,6 +20,8 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.uniffle.api.AdminRestApi; +import org.apache.uniffle.client.UniffleRestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,27 +30,48 @@ public class UniffleCLI extends AbstractCustomCommandLine { - private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); - private final Options allOptions; - private final Option uniffleClientCli; - private final Option uniffleAdminCli; - private final Option help; - - public UniffleCLI(String shortPrefix, String longPrefix) { - allOptions = new Options(); - uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", - true, "This is an client cli command that will print args."); - uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", - true, "This is an admin command that will print args."); - help = new Option(shortPrefix + "h", longPrefix + "help", - false, "Help for the Uniffle CLI."); - allOptions.addOption(uniffleClientCli); - allOptions.addOption(uniffleAdminCli); - allOptions.addOption(help); - } + private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); + private final Options allOptions; + private final Option uniffleClientCli; + private final Option uniffleAdminCli; + private final Option checkerClass; + private final Option help; + private final Option coordServer; + private final Option coordPort; + private final Option refreshAccessCli; + protected UniffleRestClient client; + + public UniffleCLI(String shortPrefix, String longPrefix) { + allOptions = new Options(); + uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", + true, "This is an client cli command that will print args."); + uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", + true, "This is an admin command that will print args."); + refreshAccessCli = new Option(shortPrefix + "rc", longPrefix + "refreshChecker", + false, "This is an admin command that will refresh access checker."); + checkerClass = new Option(shortPrefix + "cc", longPrefix + "checkerClass", + true, "This is an admin command that will refresh access checker."); + help = new Option(shortPrefix + "h", longPrefix + "help", + false, "Help for the Uniffle CLI."); + coordServer = new Option(shortPrefix + "s", longPrefix + "host", + true, "This is coordinator server host."); + coordPort = new Option(shortPrefix + "p", longPrefix + "port", + true, "This is coordinator server port."); + allOptions.addOption(uniffleClientCli); + allOptions.addOption(uniffleAdminCli); + allOptions.addOption(coordServer); + allOptions.addOption(coordPort); + allOptions.addOption(refreshAccessCli); + allOptions.addOption(help); + } - public int run(String[] args) throws UniffleCliArgsException { - final CommandLine cmd = parseCommandLineOptions(args, true); + public UniffleCLI(String shortPrefix, String longPrefix, UniffleRestClient client) { + this(shortPrefix, longPrefix); + this.client = client; + } + + public int run(String[] args) throws UniffleCliArgsException { + final CommandLine cmd = parseCommandLineOptions(args, true); if (cmd.hasOption(help.getOpt())) { printUsage(); @@ -61,20 +84,46 @@ public int run(String[] args) throws UniffleCliArgsException { return 0; } - if (cmd.hasOption(uniffleAdminCli.getOpt())) { - String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); - System.out.println("uniffle-admin-cli : " + cliArgs); - return 0; + if (cmd.hasOption(uniffleAdminCli.getOpt())) { + String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); + System.out.println("uniffle-admin-cli : " + cliArgs); + return 0; + } + if (cmd.hasOption(coordServer.getOpt()) && cmd.hasOption(coordPort.getOpt())) { + String host = cmd.getOptionValue(coordServer.getOpt()).trim(); + int port = Integer.parseInt(cmd.getOptionValue(coordPort.getOpt()).trim()); + String hostUrl = String.format("http://%s:%d", host, port); + LOG.info("uniffle-admin-cli : coordinator server host {}, port {}.", host, port); + client = UniffleRestClient.builder(hostUrl).build(); + } + + if (cmd.hasOption(refreshAccessCli.getOpt())) { + String checker = cmd.getOptionValue(checkerClass.getOpt()); + LOG.info("uniffle-admin-cli : refresh access checker {}!", checker); + refreshAccessChecker(checker); + return 0; + } + + return 1; } - return 1; - } + private String refreshAccessChecker(String checker) throws UniffleCliArgsException { + if (client == null) { + throw new UniffleCliArgsException("Missing Coordinator host address and grpc port parameters."); + } + AdminRestApi adminRestApi = new AdminRestApi(client); + return adminRestApi.refreshAccessChecker(checker); + } - @Override - public void addRunOptions(Options baseOptions) { - baseOptions.addOption(uniffleClientCli); - baseOptions.addOption(uniffleAdminCli); - } + @Override + public void addRunOptions(Options baseOptions) { + baseOptions.addOption(uniffleClientCli); + baseOptions.addOption(uniffleAdminCli); + baseOptions.addOption(refreshAccessCli); + baseOptions.addOption(checkerClass); + baseOptions.addOption(coordServer); + baseOptions.addOption(coordPort); + } @Override public void addGeneralOptions(Options baseOptions) { diff --git a/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java new file mode 100644 index 0000000000..533986a697 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java @@ -0,0 +1,41 @@ +package org.apache.uniffle.client; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContexts; +import org.apache.uniffle.client.exception.UniffleRestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; + +public class HttpClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(HttpClientFactory.class); + + public static CloseableHttpClient createHttpClient(RestClientConf conf) { + RequestConfig requestConfig = + RequestConfig.custom() + .setSocketTimeout(conf.getSocketTimeout()) + .setConnectTimeout(conf.getConnectTimeout()) + .build(); + SSLConnectionSocketFactory sslSocketFactory; + try { + TrustStrategy acceptingTrustStrategy = (cert, authType) -> true; + SSLContext sslContext = + SSLContexts.custom().loadTrustMaterial(null, acceptingTrustStrategy).build(); + sslSocketFactory = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); + } catch (Exception e) { + LOG.error("Error: ", e); + throw new UniffleRestException("Failed to create HttpClient", e); + } + + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .setSSLSocketFactory(sslSocketFactory) + .build(); + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClient.java b/cli/src/main/java/org/apache/uniffle/client/RestClient.java new file mode 100644 index 0000000000..0ae2b977b3 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +import java.util.Map; + +/** A underlying http client interface for common rest request. */ +public interface RestClient extends AutoCloseable, Cloneable { + + String get(String path, Map params, String authHeader); + +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java b/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java new file mode 100644 index 0000000000..2224c28a4b --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClientConf.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client; + +public class RestClientConf { + private int maxAttempts; + private int attemptWaitTime; + private int socketTimeout; + private int connectTimeout; + + public int getMaxAttempts() { + return maxAttempts; + } + + public void setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public int getAttemptWaitTime() { + return attemptWaitTime; + } + + public void setAttemptWaitTime(int attemptWaitTime) { + this.attemptWaitTime = attemptWaitTime; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java new file mode 100644 index 0000000000..81645f0c34 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java @@ -0,0 +1,114 @@ +package org.apache.uniffle.client; + +import org.apache.commons.lang3.StringUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.NoHttpResponseException; +import org.apache.http.client.HttpResponseException; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.apache.uniffle.client.exception.UniffleRestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class RestClientImpl implements RestClient { + + private static final Logger LOG = LoggerFactory.getLogger(RestClientImpl.class); + + private CloseableHttpClient httpclient; + + private String baseUrl; + + public RestClientImpl(String baseUrl, CloseableHttpClient httpclient) { + this.httpclient = httpclient; + this.baseUrl = baseUrl; + } + + @Override + public void close() throws Exception { + if (httpclient != null) { + httpclient.close(); + } + } + + @Override + public String get(String path, Map params, String authHeader) { + return doRequest(buildURI(path, params), authHeader, RequestBuilder.get()); + } + + private String doRequest(URI uri, String authHeader, RequestBuilder requestBuilder) { + String response; + try { + if (requestBuilder.getFirstHeader(HttpHeaders.CONTENT_TYPE) == null) { + requestBuilder.setHeader( + HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + } + if (StringUtils.isNotBlank(authHeader)) { + requestBuilder.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + } + HttpUriRequest httpRequest = requestBuilder.setUri(uri).build(); + + LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri); + + ResponseHandler responseHandler = + resp -> { + int status = resp.getStatusLine().getStatusCode(); + HttpEntity entity = resp.getEntity(); + String entityStr = entity != null ? EntityUtils.toString(entity) : null; + if (status >= 200 && status < 300) { + return entityStr; + } else { + throw new HttpResponseException(status, entityStr); + } + }; + + response = httpclient.execute(httpRequest, responseHandler); + LOG.debug("Response: {}", response); + } catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) { + // net exception can be retried by connecting to other Kyuubi server + throw new UniffleRestException("Api request failed for " + uri.toString(), e); + } catch (UniffleRestException rethrow) { + throw rethrow; + } catch (Exception e) { + LOG.error("Error: ", e); + throw new UniffleRestException("Api request failed for " + uri.toString(), e); + } + + return response; + } + + private URI buildURI(String path) { + return buildURI(path, null); + } + + private URI buildURI(String path, Map params) { + URI uri; + try { + String url = StringUtils.isNotBlank(path) ? this.baseUrl + "/" + path : this.baseUrl; + URIBuilder builder = new URIBuilder(url); + + if (!params.isEmpty()) { + for (Map.Entry entry : params.entrySet()) { + if (entry.getValue() != null) { + builder.addParameter(entry.getKey(), entry.getValue().toString()); + } + } + } + uri = builder.build(); + } catch (URISyntaxException e) { + throw new UniffleRestException("invalid URI.", e); + } + return uri; + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java new file mode 100644 index 0000000000..0e577e1873 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java @@ -0,0 +1,87 @@ +package org.apache.uniffle.client; + +import org.apache.http.impl.client.CloseableHttpClient; + +public class UniffleRestClient implements AutoCloseable, Cloneable { + + private RestClient restClient; + + private RestClientConf conf; + + private String hostUrl; + @Override + public void close() throws Exception { + if (restClient != null) { + restClient.close(); + } + } + + private UniffleRestClient(Builder builder) { + this.hostUrl = builder.hostUrl; + + RestClientConf conf = new RestClientConf(); + conf.setConnectTimeout(builder.connectTimeout); + conf.setSocketTimeout(builder.socketTimeout); + conf.setMaxAttempts(builder.maxAttempts); + conf.setAttemptWaitTime(builder.attemptWaitTime); + this.conf = conf; + CloseableHttpClient httpclient = HttpClientFactory.createHttpClient(conf); + this.restClient = new RestClientImpl(hostUrl, httpclient); + } + + public RestClient getHttpClient() { + return restClient; + } + + public RestClientConf getConf() { + return conf; + } + + public static Builder builder(String hostUrl) { + return new Builder(hostUrl); + } + + public static class Builder { + + private String hostUrl; + + // 2 minutes + private int socketTimeout = 2 * 60 * 1000; + + // 30s + private int connectTimeout = 30 * 1000; + + private int maxAttempts = 3; + + // 3s + private int attemptWaitTime = 3 * 1000; + + public Builder(String hostUrl) { + this.hostUrl = hostUrl; + } + + public Builder socketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + return this; + } + + public Builder connectionTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } + + public Builder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public Builder attemptWaitTime(int attemptWaitTime) { + this.attemptWaitTime = attemptWaitTime; + return this; + } + + public UniffleRestClient build() { + return new UniffleRestClient(this); + } + } +} diff --git a/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java b/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java new file mode 100644 index 0000000000..10b3cec268 --- /dev/null +++ b/cli/src/main/java/org/apache/uniffle/client/exception/UniffleRestException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.client.exception; + +public class UniffleRestException extends RuntimeException { + + public UniffleRestException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java b/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java new file mode 100644 index 0000000000..41f85298d3 --- /dev/null +++ b/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java @@ -0,0 +1,40 @@ +package org.apache.uniffle.cli; + +import org.apache.uniffle.UniffleCliArgsException; +import org.apache.uniffle.api.AdminRestApi; +import org.apache.uniffle.client.UniffleRestClient; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; + +public class AdminRestApiTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private UniffleRestClient uniffleRestClient; + + @InjectMocks + private AdminRestApi adminRestApi; + + @BeforeEach + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + @Test + public void testRunRefreshAccessChecker() throws UniffleCliArgsException { + + Mockito.when(uniffleRestClient.getHttpClient() + .get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) + .thenReturn("OK"); + String result = adminRestApi.refreshAccessChecker("Foo"); + Mockito.verify(uniffleRestClient.getHttpClient(), + Mockito.times(1)).get("/api/server/admin/refreshChecker", + Collections.singletonMap("class", "Foo"), null); + } + +} diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java new file mode 100644 index 0000000000..46bdfbda91 --- /dev/null +++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.cli; + +import org.apache.uniffle.UniffleCliArgsException; +import org.apache.uniffle.client.UniffleRestClient; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +public class UniffleTestAdminCLI { + + @Test + public void testAdminRefreshCLI() throws UniffleCliArgsException, IOException { + + UniffleRestClient client = Mockito.mock(UniffleRestClient.class, Answers.RETURNS_DEEP_STUBS); + Mockito.when(client.getHttpClient().get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) + .thenReturn("OK"); + UniffleCLI uniffleCLI = new UniffleCLI("", "", client); + String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; + assertEquals(0, uniffleCLI.run(args)); + Mockito.verify(client.getHttpClient(), + Mockito.times(1)).get("/api/server/admin/refreshChecker", + Collections.singletonMap("class", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"), null); + } + + + @Test + public void testMissingClientCLI() throws UniffleCliArgsException, IOException { + UniffleCLI uniffleCLI = new UniffleCLI("", ""); + String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; + assertThrows(UniffleCliArgsException.class, () -> uniffleCLI.run(args)); + } + +} diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java index 207538ab53..c81d424311 100644 --- a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java +++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java @@ -17,18 +17,18 @@ package org.apache.uniffle.cli; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; - +import org.apache.uniffle.UniffleCliArgsException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.apache.uniffle.UniffleCliArgsException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; + public class UniffleTestCLI { private UniffleCLI uniffleCLI; @@ -83,4 +83,5 @@ public void testExampleCLI() throws UniffleCliArgsException, IOException { dataOut.close(); dataErr.close(); } + } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 3fb185f489..3239da0e71 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -45,6 +45,7 @@ import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet; import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet; import org.apache.uniffle.coordinator.web.servlet.NodesServlet; +import org.apache.uniffle.coordinator.web.servlet.admin.RefreshCheckerServlet; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; @@ -201,6 +202,9 @@ private void registerRESTAPI() throws Exception { jettyServer.addServlet( new CancelDecommissionServlet(this), "/api/server/cancelDecommission"); + jettyServer.addServlet( + new RefreshCheckerServlet(this), + "/api/server/admin/refreshChecker"); } private void registerMetrics() throws Exception { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java index 677890c5ec..43790bf581 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java @@ -27,4 +27,9 @@ public abstract class AbstractAccessChecker implements AccessChecker { protected AbstractAccessChecker(AccessManager accessManager) throws Exception { } + + @Override + public void refreshAccessChecker() { + + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java index d8b827a9e2..fe869393e2 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java @@ -101,6 +101,11 @@ public AccessCheckResult check(AccessInfo accessInfo) { return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); } + @Override + public void refreshAccessChecker() { + updateAccessCandidates(); + } + @Override public void close() { if (updateAccessCandidatesSES != null) { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java index 71a380a213..53235383a0 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java @@ -34,4 +34,6 @@ public interface AccessChecker extends Closeable { * @return access check result */ AccessCheckResult check(AccessInfo accessInfo); + + void refreshAccessChecker(); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java new file mode 100644 index 0000000000..50b1784588 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java @@ -0,0 +1,44 @@ +package org.apache.uniffle.coordinator.web.servlet.admin; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.access.checker.AccessChecker; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.servlet.BaseServlet; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.util.List; +import java.util.Optional; + + +public class RefreshCheckerServlet extends BaseServlet { + private final CoordinatorServer coordinator; + + public RefreshCheckerServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } + + @Override + protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) { + List checkers = coordinator.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS); + String accessClass = req.getParameter("class"); + if (CollectionUtils.isEmpty(checkers)) { + return Response.fail("Access checkers is empty, will not update any checkers."); + } + try { + Class aClass = Class.forName(accessClass); + Optional checker = coordinator.getAccessManager().getAccessCheckers().stream() + .filter(aClass::isInstance) + .findAny(); + if (!checker.isPresent()) { + return Response.fail(String.format("Access checkers %s is none exist, will not update any checkers.", accessClass)); + } + checker.get().refreshAccessChecker(); + } catch (ClassNotFoundException e) { + return Response.fail(String.format("Access checker class %s is not found.", accessClass)); + } + return Response.success(null); + } +} diff --git a/integration-test/common/pom.xml b/integration-test/common/pom.xml index 8fb44c0fa7..952774a842 100644 --- a/integration-test/common/pom.xml +++ b/integration-test/common/pom.xml @@ -50,6 +50,11 @@ shuffle-server test + + org.apache.uniffle + cli + test + org.apache.uniffle shuffle-server diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java index 4e167e04b4..1f29d33bcf 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java @@ -69,6 +69,11 @@ public AccessCheckResult check(AccessInfo accessInfo) { return new AccessCheckResult(false, ""); } + @Override + public void refreshAccessChecker() { + + } + @Override public void close() throws IOException { // ignore. diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java new file mode 100644 index 0000000000..081d595595 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java @@ -0,0 +1,54 @@ +package org.apache.uniffle.test; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.uniffle.api.AdminRestApi; +import org.apache.uniffle.client.UniffleRestClient; +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.web.Response; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class CoordinatorAdminServiceTest extends IntegrationTestBase { + + private static final Integer JETTY_HTTP_PORT = 12345; + private static final String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"; + + private ObjectMapper objectMapper = new ObjectMapper(); + + protected AdminRestApi adminRestApi; + + + @BeforeAll + public static void setUp() throws Exception { + CoordinatorConf coordinatorConf = new CoordinatorConf(); + coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, JETTY_HTTP_PORT); + coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128); + coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346); + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker); + createCoordinatorServer(coordinatorConf); + startServers(); + } + + + @BeforeEach + public void createClient() { + String hostUrl = String.format("http://%s:%d", LOCALHOST, JETTY_HTTP_PORT); + adminRestApi = new AdminRestApi(UniffleRestClient.builder(hostUrl).build()); + } + + @Test + public void test() throws Exception { + String content = adminRestApi.refreshAccessChecker(accessChecker); + Response response = + objectMapper.readValue(content, new TypeReference>() {}); + HashMap serverList = response.getData(); + assertEquals(0, response.getCode()); + } +} diff --git a/pom.xml b/pom.xml index 670e686238..ba85a4c562 100644 --- a/pom.xml +++ b/pom.xml @@ -269,6 +269,12 @@ shuffle-server ${project.version} + + + org.apache.uniffle + cli + ${project.version} + org.apache.uniffle coordinator @@ -471,6 +477,12 @@ + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + com.fasterxml.jackson.core jackson-core