Skip to content

Commit

Permalink
Fixing Function get status (#188)
Browse files Browse the repository at this point in the history
* Fixing Function get status

* cleaning up

* adding back comment

* removing unneccessary dep
  • Loading branch information
jerrypeng authored and sijie committed Mar 4, 2018
1 parent dfb333f commit 79a3b78
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 78 deletions.
1 change: 0 additions & 1 deletion pulsar-functions/conf/example.yml
Expand Up @@ -26,5 +26,4 @@ userConfig:
"PublishTopic" : "persistent://sample/standalone/ns1/test_result"

sinkTopic: "persistent://sample/standalone/ns1/test_result"
outputSerdeClassName: "org.apache.pulsar.functions.api.utils.Utf8StringSerDe"
autoAck: true
1 change: 1 addition & 0 deletions pulsar-functions/conf/function_worker.yml
Expand Up @@ -18,6 +18,7 @@
#

workerId: standalone
workerHostname: localhost
workerPort: 6750
zookeeperServers: localhost:2181
functionMetadataTopicName: metadata
Expand Down
Expand Up @@ -19,23 +19,33 @@
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Request.AssignmentsUpdate;
import org.apache.pulsar.functions.runtime.container.FunctionContainerFactory;
import org.apache.pulsar.functions.runtime.container.ProcessFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.container.ThreadFunctionContainerFactory;
import org.apache.pulsar.functions.runtime.metrics.MetricsSink;
import org.apache.pulsar.functions.runtime.spawner.Spawner;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand Down Expand Up @@ -70,10 +80,13 @@ public class FunctionRuntimeManager implements AutoCloseable{

private FunctionContainerFactory functionContainerFactory;

private MembershipManager membershipManager;


public FunctionRuntimeManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
Namespace dlogNamespace) throws Exception {
Namespace dlogNamespace,
MembershipManager membershipManager) throws Exception {
this.workerConfig = workerConfig;

Reader reader = pulsarClient.createReader(
Expand Down Expand Up @@ -107,6 +120,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
this.functionActioner = new FunctionActioner(this.workerConfig, functionContainerFactory,
this.metricsSink, this.workerConfig.getMetricsConfig().getMetricsCollectionInterval(),
dlogNamespace, actionQueue);

this.membershipManager = membershipManager;
}

public synchronized Map<String, Map<String, Assignment>> getCurrentAssignments() {
Expand All @@ -131,6 +146,81 @@ public void start() {
this.functionAssignmentTailer.start();
}

public InstanceCommunication.FunctionStatus getFunctionStatus(String tenant, String namespace, String functionName) {
Map<String, Map<String, Function.Assignment>> currentAssignments = this.getCurrentAssignments();

log.info("this.getCurrentAssignments(): {}", this.getCurrentAssignments());
String workerId = this.workerConfig.getWorkerId();

Function.Assignment assignment = this.findAssignment(tenant, namespace, functionName);
if (assignment == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}

InstanceCommunication.FunctionStatus functionStatus = null;
// If I am running worker
if (assignment.getWorkerId().equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
FunctionConfigUtils.getFullyQualifiedName(tenant, namespace, functionName));
Spawner spawner = functionRuntimeInfo.getSpawner();
if (spawner != null) {
try {
functionStatus = functionRuntimeInfo.getSpawner().getFunctionStatus().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
if (functionRuntimeInfo.getStartupException() != null) {
functionStatusBuilder.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
}
functionStatus = functionStatusBuilder.build();
}
} else {
// query other worker

List<MembershipManager.WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
MembershipManager.WorkerInfo workerInfo = null;
for (MembershipManager.WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder
= InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
functionStatusBuilder.setFailureException("Function has not been scheduled");
return functionStatusBuilder.build();
}

Client client = ClientBuilder.newClient();

// TODO: implement authentication/authorization
String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/status",
workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName))
.request(MediaType.TEXT_PLAIN)
.get(String.class);

InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
try {
JsonFormat.parser().merge(jsonResponse, functionStatusBuilder);
} catch (InvalidProtocolBufferException e) {
log.warn("Got invalid function status response from {}", workerInfo, e);
throw new RuntimeException(e);
}
functionStatus = functionStatusBuilder.build();
}

return functionStatus;
}

public synchronized void processAssignmentUpdate(MessageId messageId, AssignmentsUpdate assignmentsUpdate) {
if (assignmentsUpdate.getVersion() > this.currentAssignmentVersion) {

Expand Down Expand Up @@ -244,12 +334,11 @@ void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
}
}

private Assignment findAssignment(Assignment assignment) {
private Assignment findAssignment(String tenant, String namespace, String functionName) {
String fullyQualifiedName
= FunctionConfigUtils.getFullyQualifiedName(tenant, namespace, functionName);
for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) {
String workerId = entry.getKey();
Map<String, Assignment> assignmentMap = entry.getValue();
String fullyQualifiedName
= FunctionConfigUtils.getFullyQualifiedName(assignment.getFunctionMetaData().getFunctionConfig());
Assignment existingAssignment = assignmentMap.get(fullyQualifiedName);
if (existingAssignment != null) {
return existingAssignment;
Expand All @@ -258,6 +347,14 @@ private Assignment findAssignment(Assignment assignment) {
return null;
}

private Assignment findAssignment(Assignment assignment) {
return findAssignment(
assignment.getFunctionMetaData().getFunctionConfig().getTenant(),
assignment.getFunctionMetaData().getFunctionConfig().getNamespace(),
assignment.getFunctionMetaData().getFunctionConfig().getName()
);
}

@VisibleForTesting
void setAssignment(Assignment assignment) {
if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
Expand Down
Expand Up @@ -22,6 +22,10 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -57,7 +61,8 @@ public class MembershipManager implements AutoCloseable {
consumer = client.subscribe(workerConfig.getClusterCoordinationTopic(), COORDINATION_TOPIC_SUBSCRIPTION,
new ConsumerConfiguration()
.setSubscriptionType(SubscriptionType.Failover)
.setConsumerName(workerConfig.getWorkerId()));
.setConsumerName(String.format("%s:%s:%d", workerConfig.getWorkerId(),
workerConfig.getWorkerHostname(), workerConfig.getWorkerPort())));
this.workerConfig = workerConfig;
this.schedulerManager = schedulerManager;
}
Expand Down Expand Up @@ -106,21 +111,24 @@ private void receiveOne(MessageIdImpl endMsgId, CompletableFuture<Boolean> final
});
}

public List<String> getCurrentMembership() {
public List<WorkerInfo> getCurrentMembership() {

List<String> workerIds = new LinkedList<>();
List<WorkerInfo> workerIds = new LinkedList<>();
PersistentTopicStats persistentTopicStats = null;
PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
try {
persistentTopicStats = pulsarAdmin.persistentTopics().getStats(
this.workerConfig.getClusterCoordinationTopic());
} catch (PulsarAdminException e) {
e.printStackTrace();
log.error("Failled to get status of coordinate topic {}",
this.workerConfig.getClusterCoordinationTopic(), e);
throw new RuntimeException(e);
}

for (ConsumerStats consumerStats : persistentTopicStats.subscriptions
.get(COORDINATION_TOPIC_SUBSCRIPTION).consumers) {
workerIds.add(consumerStats.consumerName);
WorkerInfo workerInfo = WorkerInfo.parseFrom(consumerStats.consumerName);
workerIds.add(workerInfo);
}
return workerIds;
}
Expand All @@ -141,4 +149,26 @@ public void close() throws PulsarClientException {
}
}

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@ToString
public static class WorkerInfo {
private String workerId;
private String workerHostname;
private int port;

public static WorkerInfo parseFrom(String str) {
String[] tokens = str.split(":");
if (tokens.length != 3) {
throw new IllegalArgumentException("Invalid string to parse WorkerInfo");
}

String workerId = tokens[0];
String workerHostname = tokens[1];
int port = Integer.parseInt(tokens[2]);

return new WorkerInfo(workerId, workerHostname, port);
}
}

}
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collector;
import java.util.stream.Collectors;

@Slf4j
Expand Down Expand Up @@ -101,8 +100,8 @@ public void schedule() {
}

public void invokeScheduler() {
List<String> currentMembership = this.membershipManager.getCurrentMembership();

List<String> currentMembership = this.membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());

List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();

Expand All @@ -118,6 +117,7 @@ public void invokeScheduler() {
needsAssignment, currentAssignments, currentMembership);

log.info("New Assignment computed: {}", assignments);

long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder()
.setVersion(assignmentVersion)
Expand Down
Expand Up @@ -133,13 +133,13 @@ protected void doStartImpl() {
this.functionMetaDataManager = new FunctionMetaDataManager(
this.workerConfig, this.schedulerManager, this.client);

// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
this.workerConfig, this.client, this.dlogNamespace);

//create membership manager
this.membershipManager = new MembershipManager(this.workerConfig, this.schedulerManager, this.client);

// create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager(
this.workerConfig, this.client, this.dlogNamespace, this.membershipManager);

// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class WorkerConfig implements Serializable {
private static final long serialVersionUID = 1L;

private String workerId;
private String workerHostname;
private int workerPort;
private String zookeeperServers;
private String functionMetadataTopicName;
Expand Down

0 comments on commit 79a3b78

Please sign in to comment.