Skip to content

Commit

Permalink
Avoid scheduling heartbeat function if owner-worker not available (#2613
Browse files Browse the repository at this point in the history
)

* Avoid scheduling heartbeat function if owner-worker not available

* fix: heartbeat function place

* add heartbeat check at scheduler-manager

* create seaparate list for heatbeat functions
  • Loading branch information
rdhabalia committed Oct 2, 2018
1 parent 8191fcf commit 044ba40
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 34 deletions.
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;

/**
* A simple implementation of leader election using a pulsar topic.
Expand Down Expand Up @@ -239,6 +240,10 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
if (!currentMembership.contains(workerId)) {
for (Function.Assignment assignmentEntry : assignmentEntries.values()) {
Function.Instance instance = assignmentEntry.getInstance();
// avoid scheduling-trigger for heartbeat-function if owner-worker is not up
if (checkHeartBeatFunction(instance) != null) {
continue;
}
if (!this.unsignedFunctionDurations.containsKey(instance)) {
this.unsignedFunctionDurations.put(instance, currentTimeMs);
}
Expand Down
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.pulsar.functions.worker;

import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -37,11 +43,14 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -70,6 +79,8 @@ public class SchedulerManager implements AutoCloseable {

AtomicBoolean isCompactionNeeded = new AtomicBoolean(false);
private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60;
public static final String HEARTBEAT_TENANT = "pulsar-function";
public static final String HEARTBEAT_NAMESPACE = "heartbeat";

public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) {
this.workerConfig = workerConfig;
Expand Down Expand Up @@ -122,8 +133,8 @@ private void scheduleCompaction(ScheduledExecutorService executor, long schedule
@VisibleForTesting
public void invokeScheduler() {

List<String> currentMembership = this.membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
Set<String> currentMembership = this.membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());

List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
Expand Down Expand Up @@ -164,12 +175,12 @@ public void invokeScheduler() {
.entrySet().stream()
.flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()).collect(Collectors.toList());

List<Function.Instance> needsAssignment = this.getUnassignedFunctionInstances(workerIdToAssignments,
Pair<List<Function.Instance>, List<Assignment>> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments,
allInstances);

List<Assignment> assignments = this.scheduler.schedule(
needsAssignment, currentAssignments, currentMembership);

List<Assignment> assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership);
assignments.addAll(unassignedInstances.getRight());
if (log.isDebugEnabled()) {
log.debug("New assignments computed: {}", assignments);
}
Expand Down Expand Up @@ -238,10 +249,11 @@ public static List<Function.Instance> computeInstances(FunctionMetaData function
return functionInstances;
}

private List<Function.Instance> getUnassignedFunctionInstances(
private Pair<List<Function.Instance>, List<Assignment>> getUnassignedFunctionInstances(
Map<String, Map<String, Assignment>> currentAssignments, Map<String, Function.Instance> functionInstances) {

List<Function.Instance> unassignedFunctionInstances = new LinkedList<>();
List<Assignment> heartBeatAssignments = Lists.newArrayList();
Map<String, Assignment> assignmentMap = new HashMap<>();
for (Map<String, Assignment> entry : currentAssignments.values()) {
assignmentMap.putAll(entry);
Expand All @@ -250,11 +262,17 @@ private List<Function.Instance> getUnassignedFunctionInstances(
for (Map.Entry<String, Function.Instance> instanceEntry : functionInstances.entrySet()) {
String fullyQualifiedInstanceId = instanceEntry.getKey();
Function.Instance instance = instanceEntry.getValue();
String heartBeatWorkerId = checkHeartBeatFunction(instance);
if (heartBeatWorkerId != null) {
heartBeatAssignments
.add(Assignment.newBuilder().setInstance(instance).setWorkerId(heartBeatWorkerId).build());
continue;
}
if (!assignmentMap.containsKey(fullyQualifiedInstanceId)) {
unassignedFunctionInstances.add(instance);
}
}
return unassignedFunctionInstances;
return ImmutablePair.of(unassignedFunctionInstances, heartBeatAssignments);
}

@Override
Expand All @@ -265,4 +283,14 @@ public void close() {
log.warn("Failed to shutdown scheduler manager assignment producer", e);
}
}

public static String checkHeartBeatFunction(Instance funInstance) {
if (funInstance.getFunctionMetaData() != null
&& funInstance.getFunctionMetaData().getFunctionDetails() != null) {
FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
return HEARTBEAT_TENANT.equals(funDetails.getTenant())
&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null;
}
return null;
}
}
Expand Up @@ -22,9 +22,20 @@
import org.apache.pulsar.functions.proto.Function.Instance;

import java.util.List;
import java.util.Set;

public interface IScheduler {

List<Assignment> schedule(List<Instance> unassignedFunctionInstances,
List<Assignment> currentAssignments, List<String> workers);
/**
* Scheduler schedules assignments to appropriate workers and adds into #resultAssignments
*
* @param unassignedFunctionInstances
* all unassigned instances
* @param currentAssignments
* current assignments
* @param workers
* @return
*/
List<Assignment> schedule(List<Instance> unassignedFunctionInstances, List<Assignment> currentAssignments,
Set<String> workers);
}
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.functions.worker.scheduler;

import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Instance;

import com.google.common.collect.Lists;
Expand All @@ -28,18 +27,16 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Set;

public class RoundRobinScheduler implements IScheduler {

public static final String HEARTBEAT_TENANT = "pulsar-function";
public static final String HEARTBEAT_NAMESPACE = "heartbeat";

@Override
public List<Assignment> schedule(List<Instance> unassignedFunctionInstances, List<Assignment>
currentAssignments, List<String> workers) {
public List<Assignment> schedule(List<Instance> unassignedFunctionInstances,
List<Assignment> currentAssignments, Set<String> workers) {

Map<String, List<Assignment>> workerIdToAssignment = new HashMap<>();
List<Assignment> newAssignments = Lists.newArrayList();

for (String workerId : workers) {
workerIdToAssignment.put(workerId, new LinkedList<>());
Expand All @@ -49,10 +46,8 @@ public List<Assignment> schedule(List<Instance> unassignedFunctionInstances, Lis
workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment);
}

List<Assignment> newAssignments = Lists.newArrayList();
for (Instance unassignedFunctionInstance : unassignedFunctionInstances) {
String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance);
String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment);
String workerId = findNextWorker(workerIdToAssignment);
Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance)
.setWorkerId(workerId).build();
workerIdToAssignment.get(workerId).add(newAssignment);
Expand All @@ -62,16 +57,6 @@ public List<Assignment> schedule(List<Instance> unassignedFunctionInstances, Lis
return newAssignments;
}

private static String checkHeartBeatFunction(Instance funInstance) {
if (funInstance.getFunctionMetaData() != null
&& funInstance.getFunctionMetaData().getFunctionDetails() != null) {
FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails();
return HEARTBEAT_TENANT.equals(funDetails.getTenant())
&& HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null;
}
return null;
}

private String findNextWorker(Map<String, List<Assignment>> workerIdToAssignment) {
String targetWorkerId = null;
int least = Integer.MAX_VALUE;
Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -359,4 +360,78 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
verify(schedulerManager, times(1)).schedule();
verify(functionRuntimeManager, times(0)).removeAssignments(any());
}

@Test
public void testHeartBeatFunctionWorkerDown() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).readCompacted(true);
doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();

FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
mock(MembershipManager.class),
mock(ConnectorsManager.class)
));
FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));

List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
// make worker-2 unavailable
//workerInfoList.add(WorkerInfo.of("worker-2", "host-2", 8001));

Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership();

Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setParallelism(1)
.setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();

Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setParallelism(1)
.setTenant(SchedulerManager.HEARTBEAT_TENANT)
.setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE).setName("worker-2"))
.build();

List<Function.FunctionMetaData> metaDataList = new LinkedList<>();
metaDataList.add(function1);
metaDataList.add(function2);

Mockito.doReturn(metaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
Function.Assignment assignment1 = Function.Assignment.newBuilder()
.setWorkerId("worker-1").setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function1).setInstanceId(0).build())
.build();
Function.Assignment assignment2 = Function.Assignment.newBuilder()
.setWorkerId("worker-2").setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function2).setInstanceId(0).build())
.build();

// add existing assignments
functionRuntimeManager.setAssignment(assignment1);
functionRuntimeManager.setAssignment(assignment2);

membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager);

verify(schedulerManager, times(0)).schedule();
verify(functionRuntimeManager, times(0)).removeAssignments(any());
Assert.assertEquals(membershipManager.unsignedFunctionDurations.size(), 0);
}

}
Expand Up @@ -589,14 +589,14 @@ public void testHeartbeatFunction() throws Exception {
final String workerId2 = "host-workerId-2";
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1)
.setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
.setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE)
.setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1))
.setVersion(version).build();

Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2)
.setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE)
.setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1))
.setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE)
.setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1))
.setVersion(version).build();
functionMetaDataList.add(function1);
functionMetaDataList.add(function2);
Expand Down

0 comments on commit 044ba40

Please sign in to comment.