Skip to content

Commit

Permalink
[apache#768] feat(cli): Cli method for blacklist update
Browse files Browse the repository at this point in the history
  • Loading branch information
beryllw committed May 26, 2023
1 parent 3e58805 commit 3c6fc01
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 6 deletions.
4 changes: 4 additions & 0 deletions cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-internal-client</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.uniffle.api;

import org.apache.uniffle.proto.RssProtos;

public interface CoordinatorAdminClient {

RssProtos.RefreshAccessCheckerResponse refreshAccessChecker(RssProtos.RefreshAccessCheckerRequest request);
}
33 changes: 33 additions & 0 deletions cli/src/main/java/org/apache/uniffle/cli/UniffleCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.uniffle.AbstractCustomCommandLine;
import org.apache.uniffle.UniffleCliArgsException;
import org.apache.uniffle.impl.grpc.CoordinatorAdminGrpcClient;

public class UniffleCLI extends AbstractCustomCommandLine {

Expand All @@ -33,17 +34,30 @@ public class UniffleCLI extends AbstractCustomCommandLine {
private final Option uniffleClientCli;
private final Option uniffleAdminCli;
private final Option help;
private final Option coordServer;
private final Option grpcPort;
private final Option refreshAccessCli;
protected CoordinatorAdminGrpcClient adminGrpcClient;

public UniffleCLI(String shortPrefix, String longPrefix) {
allOptions = new Options();
uniffleClientCli = new Option(shortPrefix + "c", longPrefix + "cli",
true, "This is an client cli command that will print args.");
uniffleAdminCli = new Option(shortPrefix + "a", longPrefix + "admin",
true, "This is an admin command that will print args.");
refreshAccessCli = new Option(shortPrefix + "rac", longPrefix + "refreshChecker",
true, "This is an admin command that will refresh access checker.");
help = new Option(shortPrefix + "h", longPrefix + "help",
false, "Help for the Uniffle CLI.");
coordServer = new Option(shortPrefix + "s", longPrefix + "server",
true, "This is coordinator server host.");
grpcPort = new Option(shortPrefix + "p", longPrefix + "port",
true, "This is coordinator server port.");
allOptions.addOption(uniffleClientCli);
allOptions.addOption(uniffleAdminCli);
allOptions.addOption(coordServer);
allOptions.addOption(grpcPort);
allOptions.addOption(refreshAccessCli);
allOptions.addOption(help);
}

Expand All @@ -66,14 +80,33 @@ public int run(String[] args) throws UniffleCliArgsException {
System.out.println("uniffle-admin-cli : " + cliArgs);
return 0;
}
if (cmd.hasOption(coordServer.getOpt()) && cmd.hasOption(grpcPort.getOpt())) {
String server = cmd.getOptionValue(coordServer.getOpt()).trim();
int port = Integer.parseInt(cmd.getOptionValue(grpcPort.getOpt()).trim());
adminGrpcClient = new CoordinatorAdminGrpcClient(server, port);
}

if (cmd.hasOption(refreshAccessCli.getOpt())) {
String checker = cmd.getOptionValue(refreshAccessCli.getOpt()).trim();
return refreshAccessChecker(checker);
}
return 1;
}

private int refreshAccessChecker(String checker) throws UniffleCliArgsException {
if (adminGrpcClient == null){
throw new UniffleCliArgsException("Missing Coordinator host address and grpc port parameters.");
}
return adminGrpcClient.refreshAccessChecker(checker);
}

@Override
public void addRunOptions(Options baseOptions) {
baseOptions.addOption(uniffleClientCli);
baseOptions.addOption(uniffleAdminCli);
baseOptions.addOption(refreshAccessCli);
baseOptions.addOption(coordServer);
baseOptions.addOption(grpcPort);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.apache.uniffle.impl.grpc;

import io.grpc.ManagedChannel;
import org.apache.uniffle.api.CoordinatorAdminClient;
import org.apache.uniffle.client.impl.grpc.GrpcClient;
import org.apache.uniffle.proto.CoordinatorAdminServerGrpc;
import org.apache.uniffle.proto.RssProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatorAdminGrpcClient extends GrpcClient implements CoordinatorAdminClient {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorAdminGrpcClient.class);
private CoordinatorAdminServerGrpc.CoordinatorAdminServerBlockingStub blockingStub;

public CoordinatorAdminGrpcClient(String host, int port) {
this(host, port, 3);
}

public CoordinatorAdminGrpcClient(String host, int port, int maxRetryAttempts) {
this(host, port, maxRetryAttempts, true);
}

public CoordinatorAdminGrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
super(host, port, maxRetryAttempts, usePlaintext);
blockingStub = CoordinatorAdminServerGrpc.newBlockingStub(channel);
}

@Override
public RssProtos.RefreshAccessCheckerResponse refreshAccessChecker(RssProtos.RefreshAccessCheckerRequest request) {
return blockingStub.refreshAccessChecker(request);
}

public int refreshAccessChecker(String checker) {
System.out.println("refresh " + checker);
RssProtos.RefreshAccessCheckerRequest rpcRequest = RssProtos.RefreshAccessCheckerRequest
.newBuilder()
.setAccessCheckerClass(checker)
.build();
RssProtos.RefreshAccessCheckerResponse response = refreshAccessChecker(rpcRequest);
RssProtos.RefreshAccessCheckerResponse.AdminCliStatus statusCode = response.getStatus();
switch (statusCode) {
case SUCCESS:
LOG.info("Refresh access checker {} succed.", checker);
break;
case FAILED:
LOG.error("Refresh access checker {} failed. Exception:{}", checker, response.getMsg());
break;
}
return statusCode.getNumber();
}
}
33 changes: 28 additions & 5 deletions cli/src/test/java/org/apache/uniffle/cli/UniffleTestCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@

package org.apache.uniffle.cli;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import org.apache.uniffle.UniffleCliArgsException;
import org.apache.uniffle.impl.grpc.CoordinatorAdminGrpcClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.uniffle.UniffleCliArgsException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class UniffleTestCLI {

Expand Down Expand Up @@ -83,4 +90,20 @@ public void testExampleCLI() throws UniffleCliArgsException, IOException {
dataOut.close();
dataErr.close();
}

@Test
public void testRefreshChecker() throws UniffleCliArgsException{
String[] args = {"-rac", "TestChecker"};
CoordinatorAdminGrpcClient grpcClient = mock(CoordinatorAdminGrpcClient.class);
when(grpcClient.refreshAccessChecker(any(String.class))).thenAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocationOnMock){
return 0;
}
});
uniffleCLI.adminGrpcClient = grpcClient;
assertEquals(0, uniffleCLI.run(args));
verify(grpcClient).refreshAccessChecker(any(String.class));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.uniffle.coordinator;

import io.grpc.stub.StreamObserver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.proto.CoordinatorAdminServerGrpc;
import org.apache.uniffle.proto.RssProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;

public class CoordinatorAdminGrpcService extends CoordinatorAdminServerGrpc.CoordinatorAdminServerImplBase {

private static final Logger LOG = LoggerFactory.getLogger(CoordinatorAdminGrpcService.class);

private final CoordinatorServer coordinatorServer;

public CoordinatorAdminGrpcService(CoordinatorServer coordinatorServer) {
this.coordinatorServer = coordinatorServer;
}

@Override
public void refreshAccessChecker(RssProtos.RefreshAccessCheckerRequest request, StreamObserver<RssProtos.RefreshAccessCheckerResponse> responseObserver) {
List<String> checkers = coordinatorServer.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS);
String accessClass = request.getAccessCheckerClass();
if (CollectionUtils.isEmpty(checkers)) {
LOG.warn("Access checkers is empty, will not update any checkers.");
return;
}
try {
Class<?> aClass = Class.forName(accessClass);
Optional<AccessChecker> checker = coordinatorServer.getAccessManager().getAccessCheckers().stream()
.filter(aClass::isInstance)
.findAny();
if (!checker.isPresent()) {
LOG.warn("Access checkers {} is none exist, will not update any checkers.", accessClass);
return;
}
checker.get().refreshAccessChecker();
} catch (ClassNotFoundException e) {
LOG.warn("Access checker class {} is not found.", accessClass, e);
}
final RssProtos.RefreshAccessCheckerResponse response = RssProtos.RefreshAccessCheckerResponse
.newBuilder()
.setStatus(RssProtos.RefreshAccessCheckerResponse.AdminCliStatus.SUCCESS)
.setMsg("")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public ServerInterface getServer() {
return GrpcServer.Builder.newBuilder()
.conf(conf)
.grpcMetrics(coordinatorServer.getGrpcMetrics())
.addService(new CoordinatorGrpcService(coordinatorServer)).build();
.addService(new CoordinatorGrpcService(coordinatorServer))
.addService(new CoordinatorAdminGrpcService(coordinatorServer))
.build();
} else {
throw new UnsupportedOperationException("Unsupported server type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ public abstract class AbstractAccessChecker implements AccessChecker {
protected AbstractAccessChecker(AccessManager accessManager) throws Exception {

}


@Override
public void refreshAccessChecker() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public AccessCheckResult check(AccessInfo accessInfo) {
return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}

@Override
public void refreshAccessChecker() {
updateAccessCandidates();
}

@Override
public void close() {
if (updateAccessCandidatesSES != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface AccessChecker extends Closeable {
* @return access check result
*/
AccessCheckResult check(AccessInfo accessInfo);

void refreshAccessChecker();
}
5 changes: 5 additions & 0 deletions integration-test/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<artifactId>shuffle-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>cli</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>shuffle-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public AccessCheckResult check(AccessInfo accessInfo) {
return new AccessCheckResult(false, "");
}

@Override
public void refreshAccessChecker() {

}

@Override
public void close() throws IOException {
// ignore.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.uniffle.test;

import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class CoordinatorAdminServiceTest extends CoordinatorAdminTestBase {

@Test
public void test() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setString(CoordinatorConf.COORDINATOR_ASSIGNMENT_STRATEGY.key(), "BASIC");
String accessChecker = "org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker";
coordinatorConf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), accessChecker);
createCoordinatorServer(coordinatorConf);
startServers();
assertEquals(coordinatorAdminClient.refreshAccessChecker(accessChecker),0);
shutdownServers();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import org.apache.uniffle.impl.grpc.CoordinatorAdminGrpcClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public class CoordinatorAdminTestBase extends IntegrationTestBase {

protected CoordinatorAdminGrpcClient coordinatorAdminClient;

@BeforeEach
public void createClient() {
coordinatorAdminClient = new CoordinatorAdminGrpcClient(LOCALHOST, COORDINATOR_PORT_1);
}

@AfterEach
public void closeClient() {
if (coordinatorAdminClient != null) {
coordinatorAdminClient.close();
}
}
}
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,12 @@
<artifactId>shuffle-server</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.uniffle</groupId>
<artifactId>coordinator</artifactId>
Expand Down
Loading

0 comments on commit 3c6fc01

Please sign in to comment.