From 044ba40650a67262fad4936914d3c31602dc552d Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Tue, 2 Oct 2018 15:00:02 -0700 Subject: [PATCH] Avoid scheduling heartbeat function if owner-worker not available (#2613) * Avoid scheduling heartbeat function if owner-worker not available * fix: heartbeat function place * add heartbeat check at scheduler-manager * create seaparate list for heatbeat functions --- .../functions/worker/MembershipManager.java | 5 ++ .../functions/worker/SchedulerManager.java | 44 +++++++++-- .../worker/scheduler/IScheduler.java | 15 +++- .../worker/scheduler/RoundRobinScheduler.java | 25 ++----- .../worker/MembershipManagerTest.java | 75 +++++++++++++++++++ .../worker/SchedulerManagerTest.java | 8 +- 6 files changed, 138 insertions(+), 34 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index 03b9aead87e78..e4969cf11495c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -46,6 +46,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; +import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction; /** * A simple implementation of leader election using a pulsar topic. @@ -239,6 +240,10 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager, if (!currentMembership.contains(workerId)) { for (Function.Assignment assignmentEntry : assignmentEntries.values()) { Function.Instance instance = assignmentEntry.getInstance(); + // avoid scheduling-trigger for heartbeat-function if owner-worker is not up + if (checkHeartBeatFunction(instance) != null) { + continue; + } if (!this.unsignedFunctionDurations.containsKey(instance)) { this.unsignedFunctionDurations.put(instance, currentTimeMs); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 608b2fcd4ad28..69725adce4f49 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -18,17 +18,23 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction; + import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.CompressionType; @@ -37,11 +43,14 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.Assignment; +import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.FunctionMetaData; +import org.apache.pulsar.functions.proto.Function.Instance; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.scheduler.IScheduler; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -70,6 +79,8 @@ public class SchedulerManager implements AutoCloseable { AtomicBoolean isCompactionNeeded = new AtomicBoolean(false); private static final long DEFAULT_ADMIN_API_BACKOFF_SEC = 60; + public static final String HEARTBEAT_TENANT = "pulsar-function"; + public static final String HEARTBEAT_NAMESPACE = "heartbeat"; public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient, PulsarAdmin admin, ScheduledExecutorService executor) { this.workerConfig = workerConfig; @@ -122,8 +133,8 @@ private void scheduleCompaction(ScheduledExecutorService executor, long schedule @VisibleForTesting public void invokeScheduler() { - List currentMembership = this.membershipManager.getCurrentMembership() - .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList()); + Set currentMembership = this.membershipManager.getCurrentMembership() + .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet()); List allFunctions = this.functionMetaDataManager.getAllFunctionMetaData(); Map allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged()); @@ -164,12 +175,12 @@ public void invokeScheduler() { .entrySet().stream() .flatMap(stringMapEntry -> stringMapEntry.getValue().values().stream()).collect(Collectors.toList()); - List needsAssignment = this.getUnassignedFunctionInstances(workerIdToAssignments, + Pair, List> unassignedInstances = this.getUnassignedFunctionInstances(workerIdToAssignments, allInstances); - List assignments = this.scheduler.schedule( - needsAssignment, currentAssignments, currentMembership); - + List assignments = this.scheduler.schedule(unassignedInstances.getLeft(), currentAssignments, currentMembership); + assignments.addAll(unassignedInstances.getRight()); + if (log.isDebugEnabled()) { log.debug("New assignments computed: {}", assignments); } @@ -238,10 +249,11 @@ public static List computeInstances(FunctionMetaData function return functionInstances; } - private List getUnassignedFunctionInstances( + private Pair, List> getUnassignedFunctionInstances( Map> currentAssignments, Map functionInstances) { List unassignedFunctionInstances = new LinkedList<>(); + List heartBeatAssignments = Lists.newArrayList(); Map assignmentMap = new HashMap<>(); for (Map entry : currentAssignments.values()) { assignmentMap.putAll(entry); @@ -250,11 +262,17 @@ private List getUnassignedFunctionInstances( for (Map.Entry instanceEntry : functionInstances.entrySet()) { String fullyQualifiedInstanceId = instanceEntry.getKey(); Function.Instance instance = instanceEntry.getValue(); + String heartBeatWorkerId = checkHeartBeatFunction(instance); + if (heartBeatWorkerId != null) { + heartBeatAssignments + .add(Assignment.newBuilder().setInstance(instance).setWorkerId(heartBeatWorkerId).build()); + continue; + } if (!assignmentMap.containsKey(fullyQualifiedInstanceId)) { unassignedFunctionInstances.add(instance); } } - return unassignedFunctionInstances; + return ImmutablePair.of(unassignedFunctionInstances, heartBeatAssignments); } @Override @@ -265,4 +283,14 @@ public void close() { log.warn("Failed to shutdown scheduler manager assignment producer", e); } } + + public static String checkHeartBeatFunction(Instance funInstance) { + if (funInstance.getFunctionMetaData() != null + && funInstance.getFunctionMetaData().getFunctionDetails() != null) { + FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); + return HEARTBEAT_TENANT.equals(funDetails.getTenant()) + && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; + } + return null; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java index 30fd123aa7060..e19ca71a44842 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/IScheduler.java @@ -22,9 +22,20 @@ import org.apache.pulsar.functions.proto.Function.Instance; import java.util.List; +import java.util.Set; public interface IScheduler { - List schedule(List unassignedFunctionInstances, - List currentAssignments, List workers); + /** + * Scheduler schedules assignments to appropriate workers and adds into #resultAssignments + * + * @param unassignedFunctionInstances + * all unassigned instances + * @param currentAssignments + * current assignments + * @param workers + * @return + */ + List schedule(List unassignedFunctionInstances, List currentAssignments, + Set workers); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java index 817962d4f1b2b..9347ed9fda53c 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java @@ -19,7 +19,6 @@ package org.apache.pulsar.functions.worker.scheduler; import org.apache.pulsar.functions.proto.Function.Assignment; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.proto.Function.Instance; import com.google.common.collect.Lists; @@ -28,18 +27,16 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Set; public class RoundRobinScheduler implements IScheduler { - public static final String HEARTBEAT_TENANT = "pulsar-function"; - public static final String HEARTBEAT_NAMESPACE = "heartbeat"; - @Override - public List schedule(List unassignedFunctionInstances, List - currentAssignments, List workers) { + public List schedule(List unassignedFunctionInstances, + List currentAssignments, Set workers) { Map> workerIdToAssignment = new HashMap<>(); + List newAssignments = Lists.newArrayList(); for (String workerId : workers) { workerIdToAssignment.put(workerId, new LinkedList<>()); @@ -49,10 +46,8 @@ public List schedule(List unassignedFunctionInstances, Lis workerIdToAssignment.get(existingAssignment.getWorkerId()).add(existingAssignment); } - List newAssignments = Lists.newArrayList(); for (Instance unassignedFunctionInstance : unassignedFunctionInstances) { - String heartBeatWorkerId = checkHeartBeatFunction(unassignedFunctionInstance); - String workerId = heartBeatWorkerId != null ? heartBeatWorkerId : findNextWorker(workerIdToAssignment); + String workerId = findNextWorker(workerIdToAssignment); Assignment newAssignment = Assignment.newBuilder().setInstance(unassignedFunctionInstance) .setWorkerId(workerId).build(); workerIdToAssignment.get(workerId).add(newAssignment); @@ -62,16 +57,6 @@ public List schedule(List unassignedFunctionInstances, Lis return newAssignments; } - private static String checkHeartBeatFunction(Instance funInstance) { - if (funInstance.getFunctionMetaData() != null - && funInstance.getFunctionMetaData().getFunctionDetails() != null) { - FunctionDetails funDetails = funInstance.getFunctionMetaData().getFunctionDetails(); - return HEARTBEAT_TENANT.equals(funDetails.getTenant()) - && HEARTBEAT_NAMESPACE.equals(funDetails.getNamespace()) ? funDetails.getName() : null; - } - return null; - } - private String findNextWorker(Map> workerIdToAssignment) { String targetWorkerId = null; int least = Integer.MAX_VALUE; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index c849fd36de70a..a2fb6207a36ff 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.testng.Assert; @@ -359,4 +360,78 @@ public void testCheckFailuresSomeUnassigned() throws Exception { verify(schedulerManager, times(1)).schedule(); verify(functionRuntimeManager, times(0)).removeAssignments(any()); } + + @Test + public void testHeartBeatFunctionWorkerDown() throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setRescheduleTimeoutMs(30000); + SchedulerManager schedulerManager = mock(SchedulerManager.class); + PulsarClient pulsarClient = mockPulsarClient(); + ReaderBuilder readerBuilder = mock(ReaderBuilder.class); + doReturn(readerBuilder).when(pulsarClient).newReader(); + doReturn(readerBuilder).when(readerBuilder).topic(anyString()); + doReturn(readerBuilder).when(readerBuilder).readCompacted(true); + doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); + doReturn(mock(Reader.class)).when(readerBuilder).create(); + WorkerService workerService = mock(WorkerService.class); + doReturn(pulsarClient).when(workerService).getClient(); + doReturn(workerConfig).when(workerService).getWorkerConfig(); + doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); + + FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( + workerConfig, + workerService, + mock(Namespace.class), + mock(MembershipManager.class), + mock(ConnectorsManager.class) + )); + FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); + MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient())); + + List workerInfoList = new LinkedList<>(); + workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); + // make worker-2 unavailable + //workerInfoList.add(WorkerInfo.of("worker-2", "host-2", 8001)); + + Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder().setParallelism(1) + .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionDetails(Function.FunctionDetails.newBuilder().setParallelism(1) + .setTenant(SchedulerManager.HEARTBEAT_TENANT) + .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE).setName("worker-2")) + .build(); + + List metaDataList = new LinkedList<>(); + metaDataList.add(function1); + metaDataList.add(function2); + + Mockito.doReturn(metaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1").setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-2").setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + // add existing assignments + functionRuntimeManager.setAssignment(assignment1); + functionRuntimeManager.setAssignment(assignment2); + + membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager); + + verify(schedulerManager, times(0)).schedule(); + verify(functionRuntimeManager, times(0)).removeAssignments(any()); + Assert.assertEquals(membershipManager.unsignedFunctionDurations.size(), 0); + } + } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 649cd3ef7353c..d3e0118bb8b71 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -589,14 +589,14 @@ public void testHeartbeatFunction() throws Exception { final String workerId2 = "host-workerId-2"; Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId1) - .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) - .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE) + .setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1)) .setVersion(version).build(); Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() .setFunctionDetails(Function.FunctionDetails.newBuilder().setName(workerId2) - .setNamespace(RoundRobinScheduler.HEARTBEAT_NAMESPACE) - .setTenant(RoundRobinScheduler.HEARTBEAT_TENANT).setParallelism(1)) + .setNamespace(SchedulerManager.HEARTBEAT_NAMESPACE) + .setTenant(SchedulerManager.HEARTBEAT_TENANT).setParallelism(1)) .setVersion(version).build(); functionMetaDataList.add(function1); functionMetaDataList.add(function2);