Skip to content

Commit

Permalink
Refactor worker to make sure that FunctionActioner is just an actione…
Browse files Browse the repository at this point in the history
…r. Also consolidate all info in worker in FunctionRuntimeInfo (#88)
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent f4130f8 commit 5650574
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 147 deletions.
Expand Up @@ -33,5 +33,5 @@ public enum Action {
}

private Action action;
private FunctionMetaData functionMetaData;
private FunctionRuntimeInfo functionRuntimeInfo;
}
Expand Up @@ -22,15 +22,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

import java.io.File;
import java.io.FileOutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

Expand All @@ -42,25 +39,13 @@
@Slf4j
public class FunctionActioner implements AutoCloseable {

@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
@Slf4j
public static class AssignmentInfo {
private FunctionMetaData functionMetaData;
private Spawner spawner;
}

private final WorkerConfig workerConfig;
private final LimitsConfig limitsConfig;
private final FunctionContainerFactory functionContainerFactory;
private final Namespace dlogNamespace;
private LinkedBlockingQueue<FunctionAction> actionQueue;
private volatile boolean running;
private Thread actioner;
private final Map<String, AssignmentInfo> assignments = new HashMap<>();

public FunctionActioner(WorkerConfig workerConfig, LimitsConfig limitsConfig,
FunctionContainerFactory functionContainerFactory,
Expand All @@ -79,13 +64,13 @@ public FunctionActioner(WorkerConfig workerConfig, LimitsConfig limitsConfig,
if (action == null) continue;
if (action.getAction() == FunctionAction.Action.START) {
try {
startFunction(action.getFunctionMetaData());
startFunction(action.getFunctionRuntimeInfo());
} catch (Exception ex) {
log.info("Error starting function", ex);
action.getFunctionMetaData().setStartupException(ex);
action.getFunctionRuntimeInfo().getFunctionMetaData().setStartupException(ex);
}
} else {
stopFunction(action.getFunctionMetaData());
stopFunction(action.getFunctionRuntimeInfo());
}
} catch (InterruptedException ex) {
}
Expand All @@ -108,7 +93,8 @@ public void join() throws InterruptedException {
actioner.join();
}

private void startFunction(FunctionMetaData functionMetaData) throws Exception {
private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionMetaData();
log.info("Starting function {} ...", functionMetaData.getFunctionConfig().getName());
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
Expand All @@ -133,33 +119,18 @@ private void startFunction(FunctionMetaData functionMetaData) throws Exception {
Spawner spawner = Spawner.createSpawner(functionMetaData.getFunctionConfig(), limitsConfig,
pkgFile.getAbsolutePath(), functionContainerFactory);

AssignmentInfo assignmentInfo = new AssignmentInfo();
assignmentInfo.setFunctionMetaData(functionMetaData);
assignmentInfo.setSpawner(spawner);
assignments.put(functionMetaData.getFunctionConfig().getFullyQualifiedName(), assignmentInfo);
functionRuntimeInfo.setSpawner(spawner);
spawner.start();
}

private boolean stopFunction(FunctionMetaData functionMetaData) {
private boolean stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionMetaData();
log.info("Stopping function {}...", functionMetaData.getFunctionConfig().getName());
AssignmentInfo assignmentInfo = assignments.get(functionMetaData.getFunctionConfig().getFullyQualifiedName());
if (assignmentInfo != null && assignmentInfo.getSpawner() != null) {
assignmentInfo.getSpawner().close();
assignmentInfo.setSpawner(null);
if (functionRuntimeInfo.getSpawner() != null) {
functionRuntimeInfo.getSpawner().close();
functionRuntimeInfo.setSpawner(null);
return true;
}
return false;
}

public boolean containsAssignment(FunctionConfig functionConfig) {
return assignments.containsKey(functionConfig.getFullyQualifiedName());
}

public Spawner getSpawner(FunctionConfig functionConfig) {
if (!containsAssignment(functionConfig)) {
return null;
} else {
return assignments.get(functionConfig.getFullyQualifiedName()).getSpawner();
}
}
}
Expand Up @@ -30,7 +30,6 @@
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

@Data
@Setter
Expand Down
Expand Up @@ -27,20 +27,19 @@
import org.apache.pulsar.functions.runtime.worker.request.DeregisterRequest;
import org.apache.pulsar.functions.runtime.worker.request.MarkerRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequest;
import org.apache.pulsar.functions.runtime.worker.request.ServiceRequestManager;
import org.apache.pulsar.functions.runtime.worker.request.UpdateRequest;

@Slf4j
public class FunctionMetaDataTopicTailer
implements java.util.function.Consumer<Message>, Function<Throwable, Void>, AutoCloseable {

private final FunctionMetaDataManager functionMetaDataManager;
private final FunctionRuntimeManager functionRuntimeManager;
private final Reader reader;

public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
public FunctionMetaDataTopicTailer(FunctionRuntimeManager functionRuntimeManager,
Reader reader)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
this.functionRuntimeManager = functionRuntimeManager;
this.reader = reader;
}

Expand All @@ -51,7 +50,7 @@ public void start() {

public void initialize() {
log.info("Initializing Metadata state...");
this.functionMetaDataManager.sendIntializationMarker();
this.functionRuntimeManager.sendIntializationMarker();
}

private void receiveOne() {
Expand Down Expand Up @@ -87,13 +86,13 @@ public void accept(Message msg) {

switch(serviceRequest.getRequestType()) {
case MARKER:
this.functionMetaDataManager.processInitializeMarker((MarkerRequest) serviceRequest);
this.functionRuntimeManager.processInitializeMarker((MarkerRequest) serviceRequest);
break;
case UPDATE:
this.functionMetaDataManager.processUpdate((UpdateRequest) serviceRequest);
this.functionRuntimeManager.processUpdate((UpdateRequest) serviceRequest);
break;
case DELETE:
this.functionMetaDataManager.proccessDeregister((DeregisterRequest) serviceRequest);
this.functionRuntimeManager.proccessDeregister((DeregisterRequest) serviceRequest);
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
Expand Down
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.runtime.worker;

import lombok.*;
import lombok.experimental.Accessors;
import org.apache.pulsar.functions.runtime.spawner.Spawner;

@Data
@Setter
@Getter
@Accessors(chain = true)
public class FunctionRuntimeInfo {

// function meta data
private FunctionMetaData functionMetaData;
// The associated runtime with it if any
private Spawner spawner;
}

0 comments on commit 5650574

Please sign in to comment.