Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions metadata compaction #7377

Merged
merged 39 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
bf11f0c
Function workers re-direct call update requests to the leader
Jun 11, 2020
691d054
Fixed test
Jun 11, 2020
cfb203d
tests pass
Jun 11, 2020
f814544
Working version
Jun 11, 2020
b0a7f28
Fix test
Jun 11, 2020
e612318
Merge remote-tracking branch 'apache/master' into functions_leader_ex…
Jun 11, 2020
ea53753
Merge branch 'master' into functions_leader_executor
Jun 12, 2020
c02274a
Short circuit update
Jun 12, 2020
93c54e1
Fix test
Jun 12, 2020
fd8766e
Fix test
Jun 12, 2020
b5b1752
Fix tests
Jun 12, 2020
d3403c7
Added one more catch
Jun 12, 2020
cc3646e
Added one more catch
Jun 12, 2020
6b8d6d2
Seperated internal and external errors
Jun 12, 2020
aeb51ca
Fix test
Jun 13, 2020
527aea2
Address feedback
Jun 18, 2020
dcc4cd2
Do not expose updateOnLeader to functions
Jun 18, 2020
f8f2d34
hide api
Jun 18, 2020
476b692
hide api
Jun 18, 2020
ad73099
Merge branch 'master' into functions_leader_executor
Jun 20, 2020
acd999f
removed duplicate comments
Jun 20, 2020
a256bc9
Do leadership changes in function metadata manager
Jun 20, 2020
8fc89c3
make the function sync
Jun 21, 2020
6f21176
Added more comments
Jun 23, 2020
f381eee
Merge branch 'master' into functions_leader_executor
Jun 23, 2020
43eaf86
Throw error
Jun 23, 2020
c28e122
Changed name
Jun 23, 2020
fb86d0a
address comments
Jun 23, 2020
a61e674
Deleted unused classes
Jun 24, 2020
d719626
Rework metadata manager
Jun 24, 2020
3b824e9
Working
Jun 24, 2020
c5328d1
Fix test
Jun 24, 2020
abed1f7
A better way for test
Jun 24, 2020
7bb5bc0
Address feedback
Jun 25, 2020
25f0c67
Merge branch 'master' into functions_metadata_compaction
Jun 25, 2020
62ea750
Merge branch 'master' into functions_metadata_compaction
Jun 26, 2020
198aca2
Added an option to compact function metadata topic
Jun 27, 2020
a84866d
Address feedback
Jun 30, 2020
be0936f
Incorporate feedback
Jun 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The pulsar topic used for storing function metadata"
)
private String functionMetadataTopicName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "Should the metadata topic be compacted?"
)
private Boolean useCompactedMetadataTopic = false;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The web service url for function workers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,26 @@ public static String getFullyQualifiedName(String tenant, String namespace, Stri
return String.format("%s/%s/%s", tenant, namespace, functionName);
}

public static String extractTenantFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 0);
}

public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 1);
}

public static String extractNameFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 2);
}

private static String extractFromFullyQualifiedName(String fqfn, int index) {
String[] parts = fqfn.split("/");
if (parts.length >= 3) {
return parts[index];
}
throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
}

public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader)
throws ClassNotFoundException {
Class<?> loadedClass = classLoader.loadClass(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionCommon;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -68,7 +69,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
// Note that this variable serves a double duty. A non-null value
// implies we are the leader, while a null value means we are not the leader
private Producer exclusiveLeaderProducer;
private MessageId lastMessageSeen = MessageId.earliest;
@Getter
private volatile MessageId lastMessageSeen = MessageId.earliest;

private static final String versionTag = "version";

@Getter
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
Expand Down Expand Up @@ -206,14 +210,30 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
} else {
needsScheduling = processUpdate(functionMetaData);
}
Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
.setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
.setFunctionMetaData(functionMetaData)
.setWorkerId(workerConfig.getWorkerId())
.setRequestId(UUID.randomUUID().toString())
.build();
byte[] toWrite;
if (workerConfig.getUseCompactedMetadataTopic()) {
if (delete) {
toWrite = "".getBytes();
} else {
toWrite = functionMetaData.toByteArray();
}
} else {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
.setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
.setFunctionMetaData(functionMetaData)
.setWorkerId(workerConfig.getWorkerId())
.setRequestId(UUID.randomUUID().toString())
.build();
toWrite = serviceRequest.toByteArray();
}
try {
lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
.value(toWrite)
.property(versionTag, Long.toString(functionMetaData.getVersion()));
if (workerConfig.getUseCompactedMetadataTopic()) {
builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
}
lastMessageSeen = builder.send();
} catch (Exception e) {
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
Expand Down Expand Up @@ -290,26 +310,48 @@ public synchronized void giveupLeadership() {
*/
public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
try {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
this.proccessDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
if (workerConfig.getUseCompactedMetadataTopic()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change this check to if a key exists or not? This creates an avenue in which a existing cluster can transition to use a compacted metadata topic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think thats a good idea. I would rather have worker fail here unless specifically configured to have compaction enabled or disabled.

processCompactedMetaDataTopicMessage(message);
} else {
processUncompactedMetaDataTopicMessage(message);
}
} catch (IllegalArgumentException e) {
// Its ok. Nothing much we can do about it
}
lastMessageSeen = message.getMessageId();
}

private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
this.proccessDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}
}

private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
long version = Long.valueOf(message.getProperty(versionTag));
String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
if (message.getData() == null || message.getData().length == 0) {
// this is a delete message
this.proccessDeregister(tenant, namespace, functionName, version);
} else {
FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
this.processUpdate(functionMetaData);
}
}

/**
* Private methods for internal use. Should not be used outside of this class
*/
Expand All @@ -336,25 +378,29 @@ private boolean containsFunctionMetaData(String tenant, String namespace, String

@VisibleForTesting
synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {

String functionName = deregisterRequestFs.getFunctionDetails().getName();
String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
return proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
}

synchronized boolean proccessDeregister(String tenant, String namespace,
String functionName, long version) throws IllegalArgumentException {

boolean needsScheduling = false;

log.debug("Process deregister request: {}", deregisterRequestFs);
log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version);

// Check if we still have this function. Maybe already deleted by someone else
if (this.containsFunctionMetaData(deregisterRequestFs)) {
if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
// check if request is outdated
if (!isRequestOutdated(deregisterRequestFs)) {
if (!isRequestOutdated(tenant, namespace, functionName, version)) {
this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
needsScheduling = true;
} else {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
deregisterRequestFs.getVersion());
version);
}
throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
}
Expand Down Expand Up @@ -393,9 +439,14 @@ synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws Ille

private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
.get(functionDetails.getNamespace()).get(functionDetails.getName());
return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), requestFunctionMetaData.getVersion());
}

private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
.get(namespace).get(functionName);
return currentFunctionMetaData.getVersion() >= version;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;

@Slf4j
public class FunctionMetaDataTopicTailer
Expand Down Expand Up @@ -131,11 +130,14 @@ public void close() {

public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
MessageId startMessageId) throws PulsarClientException {
return readerBuilder
ReaderBuilder builder = readerBuilder
.topic(workerConfig.getFunctionMetadataTopic())
.startMessageId(startMessageId)
.readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
.subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
.create();
.subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer");
if (workerConfig.getUseCompactedMetadataTopic()) {
builder = builder.readCompacted(true);
}
return builder.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class SchedulerManager implements AutoCloseable {
@Getter
private MessageId lastMessageProduced = null;

private MessageId metadataTopicLastMessage = MessageId.earliest;

public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
PulsarAdmin admin,
Expand Down Expand Up @@ -224,6 +226,13 @@ private void scheduleCompaction(ScheduledExecutorService executor, long schedule
isCompactionNeeded.set(false);
}
}, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> {
if (leaderService.isLeader() && metadataTopicLastMessage.compareTo(functionMetaDataManager.getLastMessageSeen()) != 0) {
metadataTopicLastMessage = functionMetaDataManager.getLastMessageSeen();
compactFunctionMetadataTopic();
}
}, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -337,6 +346,18 @@ private void compactAssignmentTopic() {
}
}

private void compactFunctionMetadataTopic() {
if (this.admin != null) {
try {
this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
}

private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
try {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -34,12 +35,25 @@

public class FunctionMetaDataManagerTest {

static byte[] producerByteArray;

private static PulsarClient mockPulsarClient() throws PulsarClientException {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
when(builder.producerName(anyString())).thenReturn(builder);

when(builder.create()).thenReturn(mock(Producer.class));
Producer producer = mock(Producer.class);
TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
when(messageBuilder.key(anyString())).thenReturn(messageBuilder);
doAnswer(invocation -> {
Object arg0 = invocation.getArgument(0);
FunctionMetaDataManagerTest.producerByteArray = (byte[])arg0;
return messageBuilder;
}).when(messageBuilder).value(any());
when(messageBuilder.property(anyString(), anyString())).thenReturn(messageBuilder);
when(producer.newMessage()).thenReturn(messageBuilder);

when(builder.create()).thenReturn(producer);

PulsarClient client = mock(PulsarClient.class);
when(client.newProducer()).thenReturn(builder);
Expand Down Expand Up @@ -86,10 +100,20 @@ public void testListFunctions() throws PulsarClientException {
}

@Test
public void testUpdateIfLeaderFunction() throws PulsarClientException {
public void testUpdateIfLeaderFunctionWithoutCompaction() throws PulsarClientException {
testUpdateIfLeaderFunction(false);
}

@Test
public void testUpdateIfLeaderFunctionWithCompaction() throws PulsarClientException {
testUpdateIfLeaderFunction(true);
}

private void testUpdateIfLeaderFunction(boolean compact) throws PulsarClientException {

WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
Expand All @@ -110,6 +134,11 @@ public void testUpdateIfLeaderFunction() throws PulsarClientException {
functionMetaDataManager.acquireLeadership();
// Now w should be able to really update
functionMetaDataManager.updateFunctionOnLeader(m1, false);
if (compact) {
Assert.assertTrue(Arrays.equals(m1.toByteArray(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
}

// outdated request
try {
Expand All @@ -119,15 +148,30 @@ public void testUpdateIfLeaderFunction() throws PulsarClientException {
Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
}
// udpate with new version
m1 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m1, false);
Function.FunctionMetaData m2 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m2, false);
if (compact) {
Assert.assertTrue(Arrays.equals(m2.toByteArray(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m2.toByteArray(), producerByteArray));
}
}

@Test
public void deregisterFunctionWithoutCompaction() throws PulsarClientException {
deregisterFunction(false);
}

@Test
public void deregisterFunction() throws PulsarClientException {
public void deregisterFunctionWithCompaction() throws PulsarClientException {
deregisterFunction(true);
}

private void deregisterFunction(boolean compact) throws PulsarClientException {
SchedulerManager mockedScheduler = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mockedScheduler,
Expand Down Expand Up @@ -170,6 +214,11 @@ public void deregisterFunction() throws PulsarClientException {
m1 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m1, true);
verify(mockedScheduler, times(2)).schedule();
if (compact) {
Assert.assertTrue(Arrays.equals("".getBytes(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
}
}

@Test
Expand Down