From 3655ec3f3e37bc230ca535f07781cbb694cf3e47 Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Wed, 7 Jun 2023 17:54:23 +0800 Subject: [PATCH] [#768] feat(cli): Cli method for blacklist update --- .../org/apache/uniffle/api/AdminRestApi.java | 8 +- .../org/apache/uniffle/cli/UniffleCLI.java | 155 +++++++++-------- .../uniffle/client/HttpClientFactory.java | 3 +- .../org/apache/uniffle/client/RestClient.java | 4 +- .../apache/uniffle/client/RestClientImpl.java | 159 +++++++++--------- .../uniffle/client/UniffleRestClient.java | 121 ++++++------- .../apache/uniffle/cli/AdminRestApiTest.java | 46 ++--- .../uniffle/cli/UniffleTestAdminCLI.java | 53 +++--- .../apache/uniffle/cli/UniffleTestCLI.java | 18 +- .../coordinator/CoordinatorServer.java | 25 ++- .../access/checker/AbstractAccessChecker.java | 4 +- .../checker/AccessCandidatesChecker.java | 8 +- .../access/checker/AccessChecker.java | 3 +- .../servlet/admin/RefreshCheckerServlet.java | 58 +++---- .../uniffle/test/AccessClusterTest.java | 26 ++- .../test/CoordinatorAdminServiceTest.java | 62 +++---- 16 files changed, 372 insertions(+), 381 deletions(-) diff --git a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java index 073ed61197..1bf58b49a9 100644 --- a/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java +++ b/cli/src/main/java/org/apache/uniffle/api/AdminRestApi.java @@ -17,18 +17,18 @@ package org.apache.uniffle.api; -import org.apache.uniffle.client.RestClient; -import org.apache.uniffle.client.UniffleRestClient; - import java.util.HashMap; import java.util.Map; +import org.apache.uniffle.client.RestClient; +import org.apache.uniffle.client.UniffleRestClient; public class AdminRestApi { private UniffleRestClient client; private static final String API_BASE_PATH = "admin"; - private AdminRestApi() {} + private AdminRestApi() { + } public AdminRestApi(UniffleRestClient client) { this.client = client; diff --git a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java index 7d3d379df6..c15100a6b1 100644 --- a/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java +++ b/cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java @@ -20,58 +20,57 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.uniffle.AbstractCustomCommandLine; +import org.apache.uniffle.UniffleCliArgsException; import org.apache.uniffle.api.AdminRestApi; import org.apache.uniffle.client.UniffleRestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.uniffle.AbstractCustomCommandLine; -import org.apache.uniffle.UniffleCliArgsException; - public class UniffleCLI extends AbstractCustomCommandLine { - private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); - private final Options allOptions; - private final Option uniffleClientCli; - private final Option uniffleAdminCli; - private final Option checkerClass; - private final Option help; - private final Option coordServer; - private final Option coordPort; - private final Option refreshAccessCli; - protected UniffleRestClient client; - - public UniffleCLI(String shortPrefix, String longPrefix) { - allOptions = new Options(); - uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", - true, "This is an client cli command that will print args."); - uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", - true, "This is an admin command that will print args."); - refreshAccessCli = new Option(shortPrefix + "rc", longPrefix + "refreshChecker", - false, "This is an admin command that will refresh access checker."); - checkerClass = new Option(shortPrefix + "cc", longPrefix + "checkerClass", - true, "This is the checker class that will be refreshed."); - help = new Option(shortPrefix + "h", longPrefix + "help", - false, "Help for the Uniffle CLI."); - coordServer = new Option(shortPrefix + "s", longPrefix + "host", - true, "This is coordinator server host."); - coordPort = new Option(shortPrefix + "p", longPrefix + "port", - true, "This is coordinator server port."); - allOptions.addOption(uniffleClientCli); - allOptions.addOption(uniffleAdminCli); - allOptions.addOption(coordServer); - allOptions.addOption(coordPort); - allOptions.addOption(refreshAccessCli); - allOptions.addOption(help); - } + private static final Logger LOG = LoggerFactory.getLogger(UniffleCLI.class); + private final Options allOptions; + private final Option uniffleClientCli; + private final Option uniffleAdminCli; + private final Option checkerClass; + private final Option help; + private final Option coordServer; + private final Option coordPort; + private final Option refreshAccessCli; + protected UniffleRestClient client; + + public UniffleCLI(String shortPrefix, String longPrefix) { + allOptions = new Options(); + uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli", + true, "This is an client cli command that will print args."); + uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin", + true, "This is an admin command that will print args."); + refreshAccessCli = new Option(shortPrefix + "rc", longPrefix + "refreshChecker", + false, "This is an admin command that will refresh access checker."); + checkerClass = new Option(shortPrefix + "cc", longPrefix + "checkerClass", + true, "This is the checker class that will be refreshed."); + help = new Option(shortPrefix + "h", longPrefix + "help", + false, "Help for the Uniffle CLI."); + coordServer = new Option(shortPrefix + "s", longPrefix + "host", + true, "This is coordinator server host."); + coordPort = new Option(shortPrefix + "p", longPrefix + "port", + true, "This is coordinator server port."); + allOptions.addOption(uniffleClientCli); + allOptions.addOption(uniffleAdminCli); + allOptions.addOption(coordServer); + allOptions.addOption(coordPort); + allOptions.addOption(refreshAccessCli); + allOptions.addOption(help); + } - public UniffleCLI(String shortPrefix, String longPrefix, UniffleRestClient client) { - this(shortPrefix, longPrefix); - this.client = client; - } + public UniffleCLI(String shortPrefix, String longPrefix, UniffleRestClient client) { + this(shortPrefix, longPrefix); + this.client = client; + } - public int run(String[] args) throws UniffleCliArgsException { - final CommandLine cmd = parseCommandLineOptions(args, true); + public int run(String[] args) throws UniffleCliArgsException { + final CommandLine cmd = parseCommandLineOptions(args, true); if (cmd.hasOption(help.getOpt())) { printUsage(); @@ -84,46 +83,46 @@ public int run(String[] args) throws UniffleCliArgsException { return 0; } - if (cmd.hasOption(uniffleAdminCli.getOpt())) { - String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); - System.out.println("uniffle-admin-cli : " + cliArgs); - return 0; - } - if (cmd.hasOption(coordServer.getOpt()) && cmd.hasOption(coordPort.getOpt())) { - String host = cmd.getOptionValue(coordServer.getOpt()).trim(); - int port = Integer.parseInt(cmd.getOptionValue(coordPort.getOpt()).trim()); - String hostUrl = String.format("http://%s:%d", host, port); - LOG.info("uniffle-admin-cli : coordinator server host {}, port {}.", host, port); - client = UniffleRestClient.builder(hostUrl).build(); - } - - if (cmd.hasOption(refreshAccessCli.getOpt())) { - String checker = cmd.getOptionValue(checkerClass.getOpt()); - LOG.info("uniffle-admin-cli : refresh access checker {}!", checker); - refreshAccessChecker(checker); - return 0; - } - - return 1; + if (cmd.hasOption(uniffleAdminCli.getOpt())) { + String cliArgs = cmd.getOptionValue(uniffleAdminCli.getOpt()); + System.out.println("uniffle-admin-cli : " + cliArgs); + return 0; + } + if (cmd.hasOption(coordServer.getOpt()) && cmd.hasOption(coordPort.getOpt())) { + String host = cmd.getOptionValue(coordServer.getOpt()).trim(); + int port = Integer.parseInt(cmd.getOptionValue(coordPort.getOpt()).trim()); + String hostUrl = String.format("http://%s:%d", host, port); + LOG.info("uniffle-admin-cli : coordinator server host {}, port {}.", host, port); + client = UniffleRestClient.builder(hostUrl).build(); } - private String refreshAccessChecker(String checker) throws UniffleCliArgsException { - if (client == null) { - throw new UniffleCliArgsException("Missing Coordinator host address and grpc port parameters."); - } - AdminRestApi adminRestApi = new AdminRestApi(client); - return adminRestApi.refreshAccessChecker(checker); + if (cmd.hasOption(refreshAccessCli.getOpt())) { + String checker = cmd.getOptionValue(checkerClass.getOpt()); + LOG.info("uniffle-admin-cli : refresh access checker {}!", checker); + refreshAccessChecker(checker); + return 0; } - @Override - public void addRunOptions(Options baseOptions) { - baseOptions.addOption(uniffleClientCli); - baseOptions.addOption(uniffleAdminCli); - baseOptions.addOption(refreshAccessCli); - baseOptions.addOption(checkerClass); - baseOptions.addOption(coordServer); - baseOptions.addOption(coordPort); + return 1; + } + + private String refreshAccessChecker(String checker) throws UniffleCliArgsException { + if (client == null) { + throw new UniffleCliArgsException("Missing Coordinator host address and grpc port parameters."); } + AdminRestApi adminRestApi = new AdminRestApi(client); + return adminRestApi.refreshAccessChecker(checker); + } + + @Override + public void addRunOptions(Options baseOptions) { + baseOptions.addOption(uniffleClientCli); + baseOptions.addOption(uniffleAdminCli); + baseOptions.addOption(refreshAccessCli); + baseOptions.addOption(checkerClass); + baseOptions.addOption(coordServer); + baseOptions.addOption(coordPort); + } @Override public void addGeneralOptions(Options baseOptions) { diff --git a/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java index 85036527bb..a3f41cb1ba 100644 --- a/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java +++ b/cli/src/main/java/org/apache/uniffle/client/HttpClientFactory.java @@ -17,6 +17,7 @@ package org.apache.uniffle.client; +import javax.net.ssl.SSLContext; import org.apache.http.client.config.RequestConfig; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; @@ -28,8 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - public class HttpClientFactory { private static final Logger LOG = LoggerFactory.getLogger(HttpClientFactory.class); diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClient.java b/cli/src/main/java/org/apache/uniffle/client/RestClient.java index 0ae2b977b3..b867121dab 100644 --- a/cli/src/main/java/org/apache/uniffle/client/RestClient.java +++ b/cli/src/main/java/org/apache/uniffle/client/RestClient.java @@ -19,7 +19,9 @@ import java.util.Map; -/** A underlying http client interface for common rest request. */ +/** + * A underlying http client interface for common rest request. + */ public interface RestClient extends AutoCloseable, Cloneable { String get(String path, Map params, String authHeader); diff --git a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java index db3b5fba37..c75d780096 100644 --- a/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java +++ b/cli/src/main/java/org/apache/uniffle/client/RestClientImpl.java @@ -17,6 +17,10 @@ package org.apache.uniffle.client; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -34,98 +38,93 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Map; - public class RestClientImpl implements RestClient { - private static final Logger LOG = LoggerFactory.getLogger(RestClientImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(RestClientImpl.class); - private CloseableHttpClient httpclient; + private CloseableHttpClient httpclient; - private String baseUrl; + private String baseUrl; - public RestClientImpl(String baseUrl, CloseableHttpClient httpclient) { - this.httpclient = httpclient; - this.baseUrl = baseUrl; - } + public RestClientImpl(String baseUrl, CloseableHttpClient httpclient) { + this.httpclient = httpclient; + this.baseUrl = baseUrl; + } - @Override - public void close() throws Exception { - if (httpclient != null) { - httpclient.close(); - } + @Override + public void close() throws Exception { + if (httpclient != null) { + httpclient.close(); } - - @Override - public String get(String path, Map params, String authHeader) { - return doRequest(buildURI(path, params), authHeader, RequestBuilder.get()); + } + + @Override + public String get(String path, Map params, String authHeader) { + return doRequest(buildURI(path, params), authHeader, RequestBuilder.get()); + } + + private String doRequest(URI uri, String authHeader, RequestBuilder requestBuilder) { + String response; + try { + if (requestBuilder.getFirstHeader(HttpHeaders.CONTENT_TYPE) == null) { + requestBuilder.setHeader( + HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); + } + if (StringUtils.isNotBlank(authHeader)) { + requestBuilder.setHeader(HttpHeaders.AUTHORIZATION, authHeader); + } + HttpUriRequest httpRequest = requestBuilder.setUri(uri).build(); + + LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri); + + ResponseHandler responseHandler = + resp -> { + int status = resp.getStatusLine().getStatusCode(); + HttpEntity entity = resp.getEntity(); + String entityStr = entity != null ? EntityUtils.toString(entity) : null; + if (status >= 200 && status < 300) { + return entityStr; + } else { + throw new HttpResponseException(status, entityStr); + } + }; + + response = httpclient.execute(httpRequest, responseHandler); + LOG.debug("Response: {}", response); + } catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) { + // net exception can be retried by connecting to other Kyuubi server + throw new UniffleRestException("Api request failed for " + uri.toString(), e); + } catch (UniffleRestException rethrow) { + throw rethrow; + } catch (Exception e) { + LOG.error("Error: ", e); + throw new UniffleRestException("Api request failed for " + uri.toString(), e); } - private String doRequest(URI uri, String authHeader, RequestBuilder requestBuilder) { - String response; - try { - if (requestBuilder.getFirstHeader(HttpHeaders.CONTENT_TYPE) == null) { - requestBuilder.setHeader( - HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType()); - } - if (StringUtils.isNotBlank(authHeader)) { - requestBuilder.setHeader(HttpHeaders.AUTHORIZATION, authHeader); - } - HttpUriRequest httpRequest = requestBuilder.setUri(uri).build(); - - LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri); - - ResponseHandler responseHandler = - resp -> { - int status = resp.getStatusLine().getStatusCode(); - HttpEntity entity = resp.getEntity(); - String entityStr = entity != null ? EntityUtils.toString(entity) : null; - if (status >= 200 && status < 300) { - return entityStr; - } else { - throw new HttpResponseException(status, entityStr); - } - }; - - response = httpclient.execute(httpRequest, responseHandler); - LOG.debug("Response: {}", response); - } catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) { - // net exception can be retried by connecting to other Kyuubi server - throw new UniffleRestException("Api request failed for " + uri.toString(), e); - } catch (UniffleRestException rethrow) { - throw rethrow; - } catch (Exception e) { - LOG.error("Error: ", e); - throw new UniffleRestException("Api request failed for " + uri.toString(), e); - } + return response; + } - return response; - } + private URI buildURI(String path) { + return buildURI(path, null); + } - private URI buildURI(String path) { - return buildURI(path, null); - } + private URI buildURI(String path, Map params) { + URI uri; + try { + String url = StringUtils.isNotBlank(path) ? this.baseUrl + "/" + path : this.baseUrl; + URIBuilder builder = new URIBuilder(url); - private URI buildURI(String path, Map params) { - URI uri; - try { - String url = StringUtils.isNotBlank(path) ? this.baseUrl + "/" + path : this.baseUrl; - URIBuilder builder = new URIBuilder(url); - - if (!params.isEmpty()) { - for (Map.Entry entry : params.entrySet()) { - if (entry.getValue() != null) { - builder.addParameter(entry.getKey(), entry.getValue().toString()); - } - } - } - uri = builder.build(); - } catch (URISyntaxException e) { - throw new UniffleRestException("invalid URI.", e); + if (!params.isEmpty()) { + for (Map.Entry entry : params.entrySet()) { + if (entry.getValue() != null) { + builder.addParameter(entry.getKey(), entry.getValue().toString()); + } } - return uri; + } + uri = builder.build(); + } catch (URISyntaxException e) { + throw new UniffleRestException("invalid URI.", e); } + return uri; + } } diff --git a/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java index 7321c8205f..3a6a104128 100644 --- a/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java +++ b/cli/src/main/java/org/apache/uniffle/client/UniffleRestClient.java @@ -21,84 +21,85 @@ public class UniffleRestClient implements AutoCloseable, Cloneable { - private RestClient restClient; + private RestClient restClient; - private RestClientConf conf; + private RestClientConf conf; - private String hostUrl; - @Override - public void close() throws Exception { - if (restClient != null) { - restClient.close(); - } - } + private String hostUrl; - private UniffleRestClient(Builder builder) { - this.hostUrl = builder.hostUrl; - - RestClientConf conf = new RestClientConf(); - conf.setConnectTimeout(builder.connectTimeout); - conf.setSocketTimeout(builder.socketTimeout); - conf.setMaxAttempts(builder.maxAttempts); - conf.setAttemptWaitTime(builder.attemptWaitTime); - this.conf = conf; - CloseableHttpClient httpclient = HttpClientFactory.createHttpClient(conf); - this.restClient = new RestClientImpl(hostUrl, httpclient); + @Override + public void close() throws Exception { + if (restClient != null) { + restClient.close(); } + } - public RestClient getHttpClient() { - return restClient; - } + private UniffleRestClient(Builder builder) { + this.hostUrl = builder.hostUrl; - public RestClientConf getConf() { - return conf; - } + RestClientConf conf = new RestClientConf(); + conf.setConnectTimeout(builder.connectTimeout); + conf.setSocketTimeout(builder.socketTimeout); + conf.setMaxAttempts(builder.maxAttempts); + conf.setAttemptWaitTime(builder.attemptWaitTime); + this.conf = conf; + CloseableHttpClient httpclient = HttpClientFactory.createHttpClient(conf); + this.restClient = new RestClientImpl(hostUrl, httpclient); + } - public static Builder builder(String hostUrl) { - return new Builder(hostUrl); - } + public RestClient getHttpClient() { + return restClient; + } - public static class Builder { + public RestClientConf getConf() { + return conf; + } - private String hostUrl; + public static Builder builder(String hostUrl) { + return new Builder(hostUrl); + } - // 2 minutes - private int socketTimeout = 2 * 60 * 1000; + public static class Builder { - // 30s - private int connectTimeout = 30 * 1000; + private String hostUrl; - private int maxAttempts = 3; + // 2 minutes + private int socketTimeout = 2 * 60 * 1000; - // 3s - private int attemptWaitTime = 3 * 1000; + // 30s + private int connectTimeout = 30 * 1000; - public Builder(String hostUrl) { - this.hostUrl = hostUrl; - } + private int maxAttempts = 3; - public Builder socketTimeout(int socketTimeout) { - this.socketTimeout = socketTimeout; - return this; - } + // 3s + private int attemptWaitTime = 3 * 1000; - public Builder connectionTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - return this; - } + public Builder(String hostUrl) { + this.hostUrl = hostUrl; + } - public Builder maxAttempts(int maxAttempts) { - this.maxAttempts = maxAttempts; - return this; - } + public Builder socketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + return this; + } + + public Builder connectionTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + return this; + } - public Builder attemptWaitTime(int attemptWaitTime) { - this.attemptWaitTime = attemptWaitTime; - return this; - } + public Builder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public Builder attemptWaitTime(int attemptWaitTime) { + this.attemptWaitTime = attemptWaitTime; + return this; + } - public UniffleRestClient build() { - return new UniffleRestClient(this); - } + public UniffleRestClient build() { + return new UniffleRestClient(this); } + } } diff --git a/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java b/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java index ebd9404375..013b6890fe 100644 --- a/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java +++ b/cli/src/test/java/org/apache/uniffle/cli/AdminRestApiTest.java @@ -17,6 +17,7 @@ package org.apache.uniffle.cli; +import java.util.Collections; import org.apache.uniffle.UniffleCliArgsException; import org.apache.uniffle.api.AdminRestApi; import org.apache.uniffle.client.UniffleRestClient; @@ -28,30 +29,29 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import java.util.Collections; - public class AdminRestApiTest { - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private UniffleRestClient uniffleRestClient; - - @InjectMocks - private AdminRestApi adminRestApi; - - @BeforeEach - public void setup() throws Exception { - MockitoAnnotations.initMocks(this); - } - @Test - public void testRunRefreshAccessChecker() throws UniffleCliArgsException { - - Mockito.when(uniffleRestClient.getHttpClient() - .get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) - .thenReturn("OK"); - String result = adminRestApi.refreshAccessChecker("Foo"); - Mockito.verify(uniffleRestClient.getHttpClient(), - Mockito.times(1)).get("/api/server/admin/refreshChecker", - Collections.singletonMap("class", "Foo"), null); - } + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private UniffleRestClient uniffleRestClient; + + @InjectMocks + private AdminRestApi adminRestApi; + + @BeforeEach + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testRunRefreshAccessChecker() throws UniffleCliArgsException { + + Mockito.when(uniffleRestClient.getHttpClient() + .get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) + .thenReturn("OK"); + String result = adminRestApi.refreshAccessChecker("Foo"); + Mockito.verify(uniffleRestClient.getHttpClient(), + Mockito.times(1)).get("/api/server/admin/refreshChecker", + Collections.singletonMap("class", "Foo"), null); + } } diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java index 46bdfbda91..79191a7e8d 100644 --- a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java +++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestAdminCLI.java @@ -17,41 +17,40 @@ package org.apache.uniffle.cli; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.Collections; import org.apache.uniffle.UniffleCliArgsException; import org.apache.uniffle.client.UniffleRestClient; import org.junit.jupiter.api.Test; import org.mockito.Answers; import org.mockito.Mockito; -import java.io.IOException; -import java.util.Collections; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - public class UniffleTestAdminCLI { - @Test - public void testAdminRefreshCLI() throws UniffleCliArgsException, IOException { - - UniffleRestClient client = Mockito.mock(UniffleRestClient.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(client.getHttpClient().get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) - .thenReturn("OK"); - UniffleCLI uniffleCLI = new UniffleCLI("", "", client); - String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; - assertEquals(0, uniffleCLI.run(args)); - Mockito.verify(client.getHttpClient(), - Mockito.times(1)).get("/api/server/admin/refreshChecker", - Collections.singletonMap("class", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"), null); - } - - - @Test - public void testMissingClientCLI() throws UniffleCliArgsException, IOException { - UniffleCLI uniffleCLI = new UniffleCLI("", ""); - String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; - assertThrows(UniffleCliArgsException.class, () -> uniffleCLI.run(args)); - } + @Test + public void testAdminRefreshCLI() throws UniffleCliArgsException, IOException { + + UniffleRestClient client = Mockito.mock(UniffleRestClient.class, Answers.RETURNS_DEEP_STUBS); + Mockito.when(client.getHttpClient().get(Mockito.anyString(), Mockito.anyMap(), Mockito.anyString())) + .thenReturn("OK"); + UniffleCLI uniffleCLI = new UniffleCLI("", "", client); + String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; + assertEquals(0, uniffleCLI.run(args)); + Mockito.verify(client.getHttpClient(), + Mockito.times(1)).get("/api/server/admin/refreshChecker", + Collections.singletonMap("class", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"), null); + } + + + @Test + public void testMissingClientCLI() throws UniffleCliArgsException, IOException { + UniffleCLI uniffleCLI = new UniffleCLI("", ""); + String[] args = {"-rc", "-cc", "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"}; + assertThrows(UniffleCliArgsException.class, () -> uniffleCLI.run(args)); + } } diff --git a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java index 86f6a81ba6..82c6256429 100644 --- a/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java +++ b/cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java @@ -17,18 +17,16 @@ package org.apache.uniffle.cli; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; - +import org.apache.uniffle.UniffleCliArgsException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.apache.uniffle.UniffleCliArgsException; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - public class UniffleTestCLI { private UniffleCLI uniffleCLI; @@ -51,11 +49,11 @@ public void testHelp() throws UniffleCliArgsException, IOException { assertEquals(0, uniffleCLI.run(args1)); oldOutPrintStream.println(dataOut); assertTrue(dataOut.toString().contains( - "-a,--admin This is an admin command that will print args.")); + "-a,--admin This is an admin command that will print args.")); assertTrue(dataOut.toString().contains( - "-c,--cli This is an client cli command that will print args.")); + "-c,--cli This is an client cli command that will print args.")); assertTrue(dataOut.toString().contains( - "-h,--help Help for the Uniffle CLI.")); + "-h,--help Help for the Uniffle CLI.")); System.setOut(oldOutPrintStream); System.setErr(oldErrPrintStream); @@ -73,7 +71,7 @@ public void testExampleCLI() throws UniffleCliArgsException, IOException { System.setOut(new PrintStream(dataOut)); System.setErr(new PrintStream(dataErr)); - String[] args = {"-c","hello world"}; + String[] args = {"-c", "hello world"}; assertEquals(0, uniffleCLI.run(args)); oldOutPrintStream.println(dataOut); assertTrue(dataOut.toString().contains("uniffle-client-cli : hello world")); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 3239da0e71..31f482e21a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -17,12 +17,14 @@ package org.apache.uniffle.coordinator; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC; +import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE; + import io.prometheus.client.CollectorRegistry; import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import picocli.CommandLine; - import org.apache.uniffle.common.Arguments; import org.apache.uniffle.common.config.ReconfigurableBase; import org.apache.uniffle.common.config.RssConf; @@ -46,12 +48,9 @@ import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet; import org.apache.uniffle.coordinator.web.servlet.NodesServlet; import org.apache.uniffle.coordinator.web.servlet.admin.RefreshCheckerServlet; - -import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; -import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; -import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL; -import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC; -import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; /** * The main entrance of coordinator service @@ -203,8 +202,8 @@ private void registerRESTAPI() throws Exception { new CancelDecommissionServlet(this), "/api/server/cancelDecommission"); jettyServer.addServlet( - new RefreshCheckerServlet(this), - "/api/server/admin/refreshChecker"); + new RefreshCheckerServlet(this), + "/api/server/admin/refreshChecker"); } private void registerMetrics() throws Exception { @@ -237,7 +236,7 @@ private void registerMetrics() throws Exception { new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true), "/prometheus/metrics/jvm"); - metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf, id); + metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf, id); if (metricReporter != null) { metricReporter.addCollectorRegistry(CoordinatorMetrics.getCollectorRegistry()); metricReporter.addCollectorRegistry(grpcMetrics.getCollectorRegistry()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java index 43790bf581..09b2ab2f4b 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java @@ -20,12 +20,12 @@ import org.apache.uniffle.coordinator.AccessManager; /** - * Abstract class for checking the access info from the client-side. + * Abstract class for checking the access info from the client-side. */ public abstract class AbstractAccessChecker implements AccessChecker { protected AbstractAccessChecker(AccessManager accessManager) throws Exception { - + } @Override diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java index fe869393e2..0f43a27741 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessCandidatesChecker.java @@ -17,6 +17,7 @@ package org.apache.uniffle.coordinator.access.checker; +import com.google.common.collect.Sets; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Set; @@ -24,8 +25,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -34,9 +33,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; import org.apache.uniffle.common.util.Constants; @@ -46,6 +42,8 @@ import org.apache.uniffle.coordinator.access.AccessCheckResult; import org.apache.uniffle.coordinator.access.AccessInfo; import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * AccessCandidatesChecker maintain a list of candidate access id and update it periodically, diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java index 53235383a0..f633d467f0 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessChecker.java @@ -18,7 +18,6 @@ package org.apache.uniffle.coordinator.access.checker; import java.io.Closeable; - import org.apache.uniffle.coordinator.access.AccessCheckResult; import org.apache.uniffle.coordinator.access.AccessInfo; @@ -31,7 +30,7 @@ public interface AccessChecker extends Closeable { * Called when the AccessManager handle the access request. * * @param accessInfo access info of the client - * @return access check result + * @return access check result */ AccessCheckResult check(AccessInfo accessInfo); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java index aa16d24329..a5d12887e8 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/admin/RefreshCheckerServlet.java @@ -17,6 +17,10 @@ package org.apache.uniffle.coordinator.web.servlet.admin; +import java.util.List; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.commons.collections.CollectionUtils; import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.coordinator.CoordinatorServer; @@ -24,38 +28,34 @@ import org.apache.uniffle.coordinator.web.Response; import org.apache.uniffle.coordinator.web.servlet.BaseServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.util.List; -import java.util.Optional; - public class RefreshCheckerServlet extends BaseServlet { - private final CoordinatorServer coordinator; + private final CoordinatorServer coordinator; - public RefreshCheckerServlet(CoordinatorServer coordinator) { - this.coordinator = coordinator; - } + public RefreshCheckerServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } - @Override - protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) { - List checkers = coordinator.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS); - String accessClass = req.getParameter("class"); - if (CollectionUtils.isEmpty(checkers)) { - return Response.fail("Access checkers is empty, will not update any checkers."); - } - try { - Class aClass = Class.forName(accessClass); - Optional checker = coordinator.getAccessManager().getAccessCheckers().stream() - .filter(aClass::isInstance) - .findAny(); - if (!checker.isPresent()) { - return Response.fail(String.format("Access checkers %s is none exist, will not update any checkers.", accessClass)); - } - checker.get().refreshAccessChecker(); - } catch (ClassNotFoundException e) { - return Response.fail(String.format("Access checker class %s is not found.", accessClass)); - } - return Response.success(null); + @Override + protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) { + List checkers = coordinator.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS); + String accessClass = req.getParameter("class"); + if (CollectionUtils.isEmpty(checkers)) { + return Response.fail("Access checkers is empty, will not update any checkers."); + } + try { + Class aClass = Class.forName(accessClass); + Optional checker = coordinator.getAccessManager().getAccessCheckers().stream() + .filter(aClass::isInstance) + .findAny(); + if (!checker.isPresent()) { + return Response.fail( + String.format("Access checkers %s is none exist, will not update any checkers.", accessClass)); + } + checker.get().refreshAccessChecker(); + } catch (ClassNotFoundException e) { + return Response.fail(String.format("Access checker class %s is not found.", accessClass)); } + return Response.success(null); + } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java index 1f29d33bcf..84764e07b7 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java @@ -17,6 +17,11 @@ package org.apache.uniffle.test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -26,12 +31,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - import org.apache.uniffle.client.api.CoordinatorClient; import org.apache.uniffle.client.factory.CoordinatorClientFactory; import org.apache.uniffle.client.request.RssAccessClusterRequest; @@ -46,9 +45,8 @@ import org.apache.uniffle.coordinator.access.checker.AccessChecker; import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; public class AccessClusterTest extends CoordinatorTestBase { @@ -84,8 +82,8 @@ public void close() throws IOException { public void testUsingCustomExtraProperties() throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); coordinatorConf.setString( - "rss.coordinator.access.checkers", - "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"); + "rss.coordinator.access.checkers", + "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"); createCoordinatorServer(coordinatorConf); startServers(); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); @@ -130,9 +128,9 @@ public void test(@TempDir File tempDir) throws Exception { coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold", 2); coordinatorConf.setString("rss.coordinator.access.candidates.path", cfgFile.getAbsolutePath()); coordinatorConf.setString( - "rss.coordinator.access.checkers", - "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," - + "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"); + "rss.coordinator.access.checkers", + "org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker," + + "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"); createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java index 7abd88e6b0..47fa8e84e7 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java @@ -17,8 +17,11 @@ package org.apache.uniffle.test; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; import org.apache.uniffle.api.AdminRestApi; import org.apache.uniffle.client.UniffleRestClient; import org.apache.uniffle.common.config.RssBaseConf; @@ -28,44 +31,41 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.HashMap; - -import static org.junit.jupiter.api.Assertions.assertEquals; - public class CoordinatorAdminServiceTest extends IntegrationTestBase { - private static final Integer JETTY_HTTP_PORT = 12345; - private static final String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"; + private static final Integer JETTY_HTTP_PORT = 12345; + private static final String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker"; - private ObjectMapper objectMapper = new ObjectMapper(); + private ObjectMapper objectMapper = new ObjectMapper(); - protected AdminRestApi adminRestApi; + protected AdminRestApi adminRestApi; - @BeforeAll - public static void setUp() throws Exception { - CoordinatorConf coordinatorConf = new CoordinatorConf(); - coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, JETTY_HTTP_PORT); - coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128); - coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346); - coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker); - createCoordinatorServer(coordinatorConf); - startServers(); - } + @BeforeAll + public static void setUp() throws Exception { + CoordinatorConf coordinatorConf = new CoordinatorConf(); + coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, JETTY_HTTP_PORT); + coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128); + coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346); + coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker); + createCoordinatorServer(coordinatorConf); + startServers(); + } - @BeforeEach - public void createClient() { - String hostUrl = String.format("http://%s:%d", LOCALHOST, JETTY_HTTP_PORT); - adminRestApi = new AdminRestApi(UniffleRestClient.builder(hostUrl).build()); - } + @BeforeEach + public void createClient() { + String hostUrl = String.format("http://%s:%d", LOCALHOST, JETTY_HTTP_PORT); + adminRestApi = new AdminRestApi(UniffleRestClient.builder(hostUrl).build()); + } - @Test - public void test() throws Exception { - String content = adminRestApi.refreshAccessChecker(accessChecker); - Response response = - objectMapper.readValue(content, new TypeReference>() {}); - HashMap serverList = response.getData(); - assertEquals(0, response.getCode()); - } + @Test + public void test() throws Exception { + String content = adminRestApi.refreshAccessChecker(accessChecker); + Response response = + objectMapper.readValue(content, new TypeReference>() { + }); + HashMap serverList = response.getData(); + assertEquals(0, response.getCode()); + } }