Skip to content

Commit

Permalink
Need to reinitialize certain components for externally managed runtim…
Browse files Browse the repository at this point in the history
…es when moving functions (#5007)

(cherry picked from commit 27a9f62)
  • Loading branch information
jerrypeng authored and jiazhai committed Aug 28, 2019
1 parent 057e7d2 commit 166cf44
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
Expand Up @@ -247,9 +247,19 @@ public void start() throws Exception {
throw e;
}

if (channel == null && stub == null) {
setupGrpcChannelIfNeeded();
}

@Override
public void reinitialize() {
setupGrpcChannelIfNeeded();
}

private synchronized void setupGrpcChannelIfNeeded() {
if (channel == null || stub == null) {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];

String jobName = createJobName(instanceConfig.getFunctionDetails());
for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
String address = getServiceUrl(jobName, jobNamespace, i);
Expand Down Expand Up @@ -332,16 +342,16 @@ public CompletableFuture<Void> resetMetrics() {
@Override
public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
if (instanceId < 0 || instanceId >= stub.length) {
if (stub == null) {
retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
return retval;
}
}
if (stub == null) {
retval.completeExceptionally(new RuntimeException("Not alive"));
return retval;
}

if (instanceId < 0 || instanceId >= stub.length) {
retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
return retval;
}

ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
@Override
Expand Down
Expand Up @@ -31,6 +31,10 @@ public interface Runtime {

void start() throws Exception;

default void reinitialize() {

}

void join() throws Exception;

void stop() throws Exception;
Expand Down
Expand Up @@ -215,9 +215,6 @@ public void initialize() {
}
}
}
// start assignment tailer
this.functionAssignmentTailer.start();

} catch (Exception e) {
log.error("Failed to initialize function runtime manager: ", e.getMessage(), e);
throw new RuntimeException(e);
Expand All @@ -229,7 +226,6 @@ public void initialize() {
*/
public void start() {
log.info("/** Starting Function Runtime Manager **/");
log.info("Initialize metrics sink...");
log.info("Starting function assignment tailer...");
this.functionAssignmentTailer.start();
}
Expand Down Expand Up @@ -629,7 +625,6 @@ private void updateAssignment(Assignment assignment) {
// changes to the function meta data of the instance

if (runtimeFactory.externallyManaged()) {

// change in metadata thus need to potentially restart
if (!assignment.getInstance().equals(existingAssignment.getInstance())) {
//stop function
Expand Down Expand Up @@ -657,6 +652,8 @@ private void updateAssignment(Assignment assignment) {
RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner(
assignment.getInstance(),
assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
// re-initialize if necessary
runtimeSpawner.getRuntime().reinitialize();
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);

this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
Expand Down
Expand Up @@ -648,7 +648,8 @@ public void testExternallyManagedRuntimeUpdate() throws Exception {
doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();

doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());

FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig,
Expand Down Expand Up @@ -743,7 +744,10 @@ public void testExternallyManagedRuntimeUpdate() throws Exception {
instance.getInstanceId());
Assert.assertTrue(
functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory);

Assert.assertTrue(
functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime() != null);

verify(kubernetesRuntime, times(1)).reinitialize();
}
}

0 comments on commit 166cf44

Please sign in to comment.