Skip to content

Commit

Permalink
Race condition function-runtime-manager read old assignments (#2437)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Aug 24, 2018
1 parent 225eeb7 commit 672a167
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
Expand Up @@ -335,6 +335,10 @@ public String getSubscription() {
return subscription; return subscription;
} }


public String getConsumerName() {
return this.consumerName;
}

/** /**
* Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
* active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function;
Expand All @@ -53,7 +54,7 @@
public class MembershipManager implements AutoCloseable, ConsumerEventListener { public class MembershipManager implements AutoCloseable, ConsumerEventListener {


private final String consumerName; private final String consumerName;
private final Consumer<byte[]> consumer; private final ConsumerImpl<byte[]> consumer;
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private PulsarAdmin pulsarAdminClient; private PulsarAdmin pulsarAdminClient;
private final CompletableFuture<Void> firstConsumerEventFuture; private final CompletableFuture<Void> firstConsumerEventFuture;
Expand All @@ -68,9 +69,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
@VisibleForTesting @VisibleForTesting
Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>(); Map<Function.Instance, Long> unsignedFunctionDurations = new HashMap<>();


MembershipManager(WorkerConfig workerConfig, PulsarClient client) MembershipManager(WorkerService service, PulsarClient client)
throws PulsarClientException { throws PulsarClientException {
this.workerConfig = workerConfig; this.workerConfig = service.getWorkerConfig();
consumerName = String.format( consumerName = String.format(
"%s:%s:%d", "%s:%s:%d",
workerConfig.getWorkerId(), workerConfig.getWorkerId(),
Expand All @@ -82,13 +83,15 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
// we don't produce any messages into this topic, we only use the `failover` subscription // we don't produce any messages into this topic, we only use the `failover` subscription
// to elect an active consumer as the leader worker. The leader worker will be responsible // to elect an active consumer as the leader worker. The leader worker will be responsible
// for scheduling snapshots for FMT and doing task assignment. // for scheduling snapshots for FMT and doing task assignment.
consumer = client.newConsumer() consumer = (ConsumerImpl<byte[]>) client.newConsumer()
.topic(workerConfig.getClusterCoordinationTopic()) .topic(workerConfig.getClusterCoordinationTopic())
.subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION) .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
.subscriptionType(SubscriptionType.Failover) .subscriptionType(SubscriptionType.Failover)
.consumerEventListener(this) .consumerEventListener(this)
.property(WORKER_IDENTIFIER, consumerName) .property(WORKER_IDENTIFIER, consumerName)
.subscribe(); .subscribe();

isLeader.set(checkLeader(service, consumer.getConsumerName()));
} }


@Override @Override
Expand Down Expand Up @@ -282,4 +285,19 @@ private PulsarAdmin getPulsarAdminClient() {
return this.pulsarAdminClient; return this.pulsarAdminClient;
} }


private boolean checkLeader(WorkerService service, String consumerName) {
try {
TopicStats stats = service.getBrokerAdmin().topics()
.getStats(service.getWorkerConfig().getClusterCoordinationTopic());
String activeConsumerName = stats != null
&& stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION) != null
? stats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName
: null;
return consumerName != null && consumerName.equalsIgnoreCase(activeConsumerName);
} catch (Exception e) {
log.warn("Failed to check leader {}", e.getMessage());
}
return false;
}

} }
Expand Up @@ -137,7 +137,7 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
this.connectorsManager = new ConnectorsManager(workerConfig); this.connectorsManager = new ConnectorsManager(workerConfig);


//create membership manager //create membership manager
this.membershipManager = new MembershipManager(this.workerConfig, this.client); this.membershipManager = new MembershipManager(this, this.client);


// create function runtime manager // create function runtime manager
this.functionRuntimeManager = new FunctionRuntimeManager( this.functionRuntimeManager = new FunctionRuntimeManager(
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
Expand All @@ -68,7 +69,7 @@ public MembershipManagerTest() {
public void testConsumerEventListener() throws Exception { public void testConsumerEventListener() throws Exception {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class); PulsarClientImpl mockClient = mock(PulsarClientImpl.class);


Consumer<byte[]> mockConsumer = mock(Consumer.class); ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);


when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
Expand All @@ -77,6 +78,8 @@ public void testConsumerEventListener() throws Exception {
when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder);


when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer);
WorkerService workerService = mock(WorkerService.class);
doReturn(workerConfig).when(workerService).getWorkerConfig();


AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>(); AtomicReference<ConsumerEventListener> listenerHolder = new AtomicReference<>();
when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> { when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenAnswer(invocationOnMock -> {
Expand All @@ -89,7 +92,7 @@ public void testConsumerEventListener() throws Exception {


when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder);


MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockClient)); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockClient));
assertFalse(membershipManager.isLeader()); assertFalse(membershipManager.isLeader());
verify(mockClient, times(1)) verify(mockClient, times(1))
.newConsumer(); .newConsumer();
Expand All @@ -104,7 +107,7 @@ public void testConsumerEventListener() throws Exception {
private static PulsarClient mockPulsarClient() throws PulsarClientException { private static PulsarClient mockPulsarClient() throws PulsarClientException {
PulsarClientImpl mockClient = mock(PulsarClientImpl.class); PulsarClientImpl mockClient = mock(PulsarClientImpl.class);


Consumer<byte[]> mockConsumer = mock(Consumer.class); ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class);
ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class);


when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder);
Expand All @@ -124,14 +127,15 @@ private static PulsarClient mockPulsarClient() throws PulsarClientException {
@Test @Test
public void testCheckFailuresNoFailures() throws Exception { public void testCheckFailuresNoFailures() throws Exception {
SchedulerManager schedulerManager = mock(SchedulerManager.class); SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class); PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create(); doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class); WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient(); doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();


FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
Expand All @@ -142,7 +146,7 @@ public void testCheckFailuresNoFailures() throws Exception {
mock(ConnectorsManager.class) mock(ConnectorsManager.class)
)); ));
FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); MembershipManager membershipManager = spy(new MembershipManager(workerService, pulsarClient));


List<WorkerInfo> workerInfoList = new LinkedList<>(); List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
Expand Down Expand Up @@ -193,14 +197,15 @@ public void testCheckFailuresNoFailures() throws Exception {
public void testCheckFailuresSomeFailures() throws Exception { public void testCheckFailuresSomeFailures() throws Exception {
workerConfig.setRescheduleTimeoutMs(30000); workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class); SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class); PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create(); doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class); WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient(); doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();


FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
Expand All @@ -212,7 +217,7 @@ public void testCheckFailuresSomeFailures() throws Exception {
)); ));


FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));


List<WorkerInfo> workerInfoList = new LinkedList<>(); List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
Expand Down Expand Up @@ -287,14 +292,15 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000); workerConfig.setRescheduleTimeoutMs(30000);
SchedulerManager schedulerManager = mock(SchedulerManager.class); SchedulerManager schedulerManager = mock(SchedulerManager.class);
PulsarClient pulsarClient = mock(PulsarClient.class); PulsarClient pulsarClient = mockPulsarClient();
ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class); ReaderBuilder<byte[]> readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString()); doReturn(readerBuilder).when(readerBuilder).topic(anyString());
doReturn(readerBuilder).when(readerBuilder).startMessageId(any()); doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
doReturn(mock(Reader.class)).when(readerBuilder).create(); doReturn(mock(Reader.class)).when(readerBuilder).create();
WorkerService workerService = mock(WorkerService.class); WorkerService workerService = mock(WorkerService.class);
doReturn(pulsarClient).when(workerService).getClient(); doReturn(pulsarClient).when(workerService).getClient();
doReturn(workerConfig).when(workerService).getWorkerConfig();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin(); doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();


FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager( FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
Expand All @@ -305,7 +311,7 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
mock(ConnectorsManager.class) mock(ConnectorsManager.class)
)); ));
FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class); FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient())); MembershipManager membershipManager = spy(new MembershipManager(workerService, mockPulsarClient()));


List<WorkerInfo> workerInfoList = new LinkedList<>(); List<WorkerInfo> workerInfoList = new LinkedList<>();
workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000)); workerInfoList.add(WorkerInfo.of("worker-1", "host-1", 8000));
Expand Down

0 comments on commit 672a167

Please sign in to comment.