Skip to content

Commit

Permalink
[admin-client] 'rebalance' command for functions worker apache#13169
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi committed Dec 7, 2021
1 parent f8d21d8 commit ccdf476
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,16 @@ public interface Worker {
* @return
*/
CompletableFuture<Map<String, Collection<String>>> getAssignmentsAsync();

/**
* Triggers a rebalance of functions to workers.
* @throws PulsarAdminException
*/
void rebalance() throws PulsarAdminException;

/**
* Triggers a rebalance of functions to workersasynchronously..
* @throws PulsarAdminException
*/
CompletableFuture<Void> rebalanceAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.concurrent.TimeoutException;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -242,4 +244,24 @@ public void failed(Throwable throwable) {
});
return future;
}

@Override
public void rebalance() throws PulsarAdminException {
try {
rebalanceAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> rebalanceAsync() {
final WebTarget path = worker.path("rebalance");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,25 @@ void runCmd() throws Exception {
}
}


@Parameters(commandDescription = "Triggers a rebalance of functions to workers")
class Rebalance extends BaseCommand {

@Override
void runCmd() throws Exception {
admin.worker().rebalance();
print("Rebalance command sent successfully");
}
}

public CmdFunctionWorker(PulsarAdmin admin) throws PulsarClientException {
super("functions-worker", admin);
jcommander.addCommand("function-stats", new FunctionsStats());
jcommander.addCommand("monitoring-metrics", new CmdMonitoringMetrics());
jcommander.addCommand("get-cluster", new GetCluster());
jcommander.addCommand("get-cluster-leader", new GetClusterLeader());
jcommander.addCommand("get-function-assignments", new GetFunctionAssignments());
jcommander.addCommand("rebalance", new Rebalance());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Worker;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class TestCmdFunctionWorker {

private PulsarAdmin pulsarAdmin;

private CmdFunctionWorker cmdFunctionWorker;

private Worker worker;

@BeforeMethod
public void setup() throws Exception {
pulsarAdmin = mock(PulsarAdmin.class);
worker = mock(Worker.class);
when(pulsarAdmin.worker()).thenReturn(worker);

cmdFunctionWorker = spy(new CmdFunctionWorker(pulsarAdmin));
}

@Test
public void testCmdRebalance() throws Exception {
cmdFunctionWorker.run(new String[]{"rebalance"});
verify(pulsarAdmin.worker(), times(1)).rebalance();
}
}

0 comments on commit ccdf476

Please sign in to comment.