Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing general records to Pulsar sink #9590

Merged
merged 7 commits into from
Mar 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

name: CI - Integration - Function State
name: CI - Integration - Function & IO
on:
pull_request:
branches:
Expand Down Expand Up @@ -78,4 +78,4 @@ jobs:
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests

- name: run integration tests
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-function-state.xml -DintegrationTests -DredirectTestOutputToFile=false
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-function.xml -DintegrationTests -DredirectTestOutputToFile=false
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -39,12 +38,15 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
Expand Down Expand Up @@ -93,10 +95,10 @@ private interface PulsarSinkProcessor<T> {
void close() throws Exception;
}

private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap<>();
protected Schema schema;
protected Crypto crypto;
protected Crypto crypto;

protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
this.schema = schema;
Expand Down Expand Up @@ -153,11 +155,16 @@ protected Producer<T> getProducer(String destinationTopic, Schema schema) {
protected Producer<T> getProducer(String producerId, String producerName, String topicName, Schema schema) {
return publishProducers.computeIfAbsent(producerId, s -> {
try {
return createProducer(
log.info("Initializing producer {} on topic {} with schema {}",
producerName, topicName, schema);
Producer<T> producer = createProducer(
client,
topicName,
producerName,
schema != null ? schema : this.schema);
log.info("Initialized producer {} on topic {} with schema {}: {} -> {}",
producerName, topicName, schema, producerId, producer);
return producer;
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -209,13 +216,21 @@ public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> record, bo
class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
if (!(schema instanceof AutoConsumeSchema)) {
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e); }
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug("The Pulsar producer is not initialized until the first record is"
+ " published for `AUTO_CONSUME` schema.");
}
}
}

@Override
Expand Down Expand Up @@ -400,7 +415,16 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
if (GenericRecord.class.isAssignableFrom(typeArg)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie initially you pointed out that working here in PulsarSink is not the right way, but we should only work on TopicSchema
#9481 (comment)

In fact I believe that in my PR #9481 I took the right way, driven by your suggestions.

I believe that this change is not enough in order to support by needs.

BTW if the integration test I added to #9481 works with this patch then we can converge to a good solution.
My goal is to get that usecase work, in the best way for the project for the mid/long term

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Yes and no on my original comment.

My original comment is to make sure we returned the write schema information via TopicSchema. Because we are using AUTO_CONSUME in the PulsarSink to indicate GenericRecord are published to the Pulsar topic. AUTO_CONSUME can be used by both source and sink. In order to not impact sources, I didn't add the logic in TopicSchema. Instead, I add it in PulsarSink to make it more explicit, which results in one line of similar change as your initial change. But it doesn't your original and current implementation is in the right direction.

The main problem of your previous and current implementation on #9481 is you are trying to hijack the existing AVRO implementation to introduce the support of lazy schema initialization. The lazy schema initialization is already implemented as part of multi-schema write support. So you don't need to add such a hack.

consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType()); in this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The schema type is already overwritten in line 419. This is just to log an info message to indicate that the schema type has been overwritten to AUTO_CONSUME.

log.info("The configured schema type {} is not able to write GenericRecords."
+ " So overwrite the schema type to be {}", configuredSchemaType, SchemaType.AUTO_CONSUME);
}
} else {
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
}
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
consumerConfig, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -51,12 +53,21 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
import org.testng.Assert;
Expand Down Expand Up @@ -252,6 +263,62 @@ public void testComplexOuputType() throws PulsarClientException {
}
}

@Test
public void testInitializeSchema() throws Exception {
PulsarClient pulsarClient = getPulsarClient();

// generic record type (no serde and no schema type)
PulsarSinkConfig pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
PulsarSink sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
Schema<?> schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (default serde and no schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (no serde and wrong schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setSchemaType(SchemaType.AVRO.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (no serde and AUTO_CONSUME schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (default serde and AUTO_CONSUME schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);
}

@Test
public void testSinkAndMessageRouting() throws Exception {

Expand Down Expand Up @@ -415,6 +482,105 @@ public Optional<Long> getRecordSequence() {
}
}

@Test
public void testWriteGenericRecordsAtMostOnce() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.ATMOST_ONCE);
}

@Test
public void testWriteGenericRecordsAtLeastOnce() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.ATLEAST_ONCE);
}

@Test
public void testWriteGenericRecordsEOS() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.EFFECTIVELY_ONCE);
}

private void testWriteGenericRecords(ProcessingGuarantees guarantees) throws Exception {
String defaultTopic = "default";

PulsarSinkConfig sinkConfig = getPulsarConfigs();
sinkConfig.setTopic(defaultTopic);
sinkConfig.setTypeClassName(GenericRecord.class.getName());
sinkConfig.setProcessingGuarantees(guarantees);

PulsarClient client = getPulsarClient();
PulsarSink pulsarSink = new PulsarSink(
client, sinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());

pulsarSink.open(new HashMap<>(), mock(SinkContext.class));

if (ProcessingGuarantees.ATMOST_ONCE == guarantees) {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtMostOnceProcessor);
} else if (ProcessingGuarantees.ATLEAST_ONCE == guarantees) {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtLeastOnceProcessor);
} else {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkEffectivelyOnceProcessor);
}
PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) pulsarSink.pulsarSinkProcessor;
assertFalse(processor.publishProducers.containsKey(defaultTopic));

String[] topics = { "topic-1", "topic-2", "topic-3" };
for (String topic : topics) {

RecordSchemaBuilder builder = SchemaBuilder.record("MyRecord");
builder.field("number").type(SchemaType.INT32);
builder.field("text").type(SchemaType.STRING);
GenericSchema<GenericRecord> schema = Schema.generic(builder.build(SchemaType.AVRO));

GenericRecordBuilder recordBuilder = schema.newRecordBuilder();
recordBuilder.set("number", 1);
recordBuilder.set("text", topic);

GenericRecord genericRecord = recordBuilder.build();

SinkRecord<GenericRecord> record = new SinkRecord<>(new Record<GenericRecord>() {

@Override
public Optional<String> getDestinationTopic() {
return Optional.of(topic);
}

@Override
public Schema<GenericRecord> getSchema() {
return schema;
}

@Override
public GenericRecord getValue() {
return genericRecord;
}

@Override
public Optional<String> getPartitionId() {
return Optional.of(topic + "-id-1");
}

@Override
public Optional<Long> getRecordSequence() {
return Optional.of(1L);
}
}, genericRecord);

pulsarSink.write(record);

if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) {
assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", topic, topic)));
} else {
assertTrue(processor.publishProducers.containsKey(topic));
}
verify(client.newProducer(), times(1))
.topic(argThat(
otherTopic -> topic != null ? topic.equals(otherTopic) : defaultTopic.equals(otherTopic)));

verify(client, times(1))
.newProducer(argThat(
otherSchema -> Objects.equals(otherSchema, schema)));
}
}

private Optional<String> getTopicOptional(String topic) {
if (topic != null) {
return Optional.of(topic);
Expand Down