Skip to content

Commit

Permalink
fixing and refactoring function status (#3102)
Browse files Browse the repository at this point in the history
* fixing and refactoring function status

* further refactoring

* cleaning up
  • Loading branch information
jerrypeng committed Dec 3, 2018
1 parent c7440f9 commit d4794bd
Show file tree
Hide file tree
Showing 10 changed files with 796 additions and 431 deletions.
Expand Up @@ -669,9 +669,8 @@ public void testPulsarFunctionStatus() throws Exception {
}
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStatus functionStatus = functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
functionName, null);
FunctionStatus functionStatus = admin.functions().getFunctionStatus(tenant, namespacePortion,
functionName);

int numInstances = functionStatus.getNumInstances();
assertEquals(numInstances, 1);
Expand Down
Expand Up @@ -18,26 +18,10 @@
*/
package org.apache.pulsar.functions.worker;

import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -46,24 +30,35 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.*;

import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Reflections;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
* This class managers all aspects of functions assignments and running of function assignments for this worker
*/
Expand Down Expand Up @@ -590,207 +585,6 @@ public FunctionStats getFunctionStats(String tenant, String namespace, String fu
return functionStats.calculateOverall();
}

/**
* Get status of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return the function status
*/
public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}

if (assignment == null) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
functionInstanceStatusData.setRunning(false);
functionInstanceStatusData.setError("Function has not been scheduled");
return functionInstanceStatusData;
}

final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();

// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
if (runtimeSpawner != null) {
try {
InstanceCommunication.FunctionStatus status = functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
functionInstanceStatusData.setRunning(status.getRunning());
functionInstanceStatusData.setError(status.getFailureException());
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
functionInstanceStatusData.setNumReceived(status.getNumReceived());
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());

List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
userExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);

functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
systemExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);

functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
functionInstanceStatusData.setWorkerId(assignedWorkerId);


} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
functionInstanceStatusData.setRunning(false);
if (functionRuntimeInfo.getStartupException() != null) {
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
}
functionInstanceStatusData.setWorkerId(assignedWorkerId);
}
return functionInstanceStatusData;
} else {
// query other worker

List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
functionInstanceStatusData.setRunning(false);
functionInstanceStatusData.setError("Function has not been scheduled");
return functionInstanceStatusData;
}

if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}

/**
* Get statuses of all function instances.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return a list of function statuses
* @throws PulsarAdminException
*/
public FunctionStatus getFunctionStatus(String tenant, String namespace,
String functionName, URI uri)
throws PulsarAdminException {

Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);

FunctionStatus functionStatus = new FunctionStatus();
if (assignments.isEmpty()) {
Function.FunctionMetaData functionMetaData = workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, functionName);
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
functionStatus.setNumRunning(0);

return functionStatus;
}

// TODO refactor the code for externally managed.
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
if (isOwner) {
int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
for (int i = 0; i < parallelism; ++i) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= getFunctionInstanceStatus(tenant, namespace, functionName, i, null);
FunctionStatus.FunctionInstanceStatus functionInstanceStatus
= new FunctionStatus.FunctionInstanceStatus();
functionInstanceStatus.setInstanceId(i);
functionInstanceStatus.setStatus(functionInstanceStatusData);
functionStatus.addInstance(functionInstanceStatus);
}
} else {
// find the hostname/port of the worker who is the owner

List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
Function.FunctionMetaData functionMetaData = workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, functionName);
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
functionStatus.setNumRunning(0);
return functionStatus;
}

if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} else {
for (Assignment assignment : assignments) {
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData;
if (isOwner) {
functionInstanceStatusData = getFunctionInstanceStatus(tenant, namespace, functionName, assignment.getInstance().getInstanceId(), null);
} else {
functionInstanceStatusData = this.functionAdmin.functions().getFunctionStatus(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
assignment.getInstance().getInstanceId());
}

FunctionStatus.FunctionInstanceStatus instanceStatus = new FunctionStatus.FunctionInstanceStatus();
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
instanceStatus.setStatus(functionInstanceStatusData);
functionStatus.addInstance(instanceStatus);
}
}
functionStatus.setNumInstances(functionStatus.instances.size());
functionStatus.getInstances().forEach(functionInstanceStatus -> {
if (functionInstanceStatus.getStatus().isRunning()) {
functionStatus.numRunning++;
}
});
return functionStatus;
}

/**
* Process an assignment update from the assignment topic
* @param newAssignment the assignment
Expand Down Expand Up @@ -990,8 +784,11 @@ public void close() throws Exception {
}
}

private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
}

private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}
}

0 comments on commit d4794bd

Please sign in to comment.