Skip to content

Commit

Permalink
Support writing general records to Pulsar sink (apache#9590)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored and Miguelez committed Mar 16, 2021
1 parent 59357b1 commit 03b3a26
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

name: CI - Integration - Function State
name: CI - Integration - Function & IO
on:
pull_request:
branches:
Expand Down Expand Up @@ -93,4 +93,4 @@ jobs:

- name: run integration tests
if: steps.docs.outputs.changed_only == 'no'
run: ./build/run_integration_group.sh FUNCTION_STATE
run: ./build/run_integration_group.sh FUNCTION
4 changes: 2 additions & 2 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ test_group_cli() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-auth.xml -DintegrationTests
}

test_group_function_state() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function-state.xml -DintegrationTests
test_group_function() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-function.xml -DintegrationTests
}

test_group_messaging() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
Expand All @@ -39,12 +38,15 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
Expand Down Expand Up @@ -93,10 +95,10 @@ private interface PulsarSinkProcessor<T> {
void close() throws Exception;
}

private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap<>();
protected Schema schema;
protected Crypto crypto;
protected Crypto crypto;

protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
this.schema = schema;
Expand Down Expand Up @@ -153,11 +155,16 @@ protected Producer<T> getProducer(String destinationTopic, Schema schema) {
protected Producer<T> getProducer(String producerId, String producerName, String topicName, Schema schema) {
return publishProducers.computeIfAbsent(producerId, s -> {
try {
return createProducer(
log.info("Initializing producer {} on topic {} with schema {}",
producerName, topicName, schema);
Producer<T> producer = createProducer(
client,
topicName,
producerName,
schema != null ? schema : this.schema);
log.info("Initialized producer {} on topic {} with schema {}: {} -> {}",
producerName, topicName, schema, producerId, producer);
return producer;
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
Expand Down Expand Up @@ -209,13 +216,21 @@ public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> record, bo
class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
super(schema, crypto);
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
if (!(schema instanceof AutoConsumeSchema)) {
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e); }
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
}
} else {
if (log.isDebugEnabled()) {
log.debug("The Pulsar producer is not initialized until the first record is"
+ " published for `AUTO_CONSUME` schema.");
}
}
}

@Override
Expand Down Expand Up @@ -400,7 +415,16 @@ Schema<T> initializeSchema() throws ClassNotFoundException {
ConsumerConfig consumerConfig = new ConsumerConfig();
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties());
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
if (GenericRecord.class.isAssignableFrom(typeArg)) {
consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType());
if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
log.info("The configured schema type {} is not able to write GenericRecords."
+ " So overwrite the schema type to be {}", configuredSchemaType, SchemaType.AUTO_CONSUME);
}
} else {
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
}
return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
consumerConfig, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -51,12 +53,21 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionConfig.ProcessingGuarantees;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessorBase;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
import org.testng.Assert;
Expand Down Expand Up @@ -252,6 +263,62 @@ public void testComplexOuputType() throws PulsarClientException {
}
}

@Test
public void testInitializeSchema() throws Exception {
PulsarClient pulsarClient = getPulsarClient();

// generic record type (no serde and no schema type)
PulsarSinkConfig pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
PulsarSink sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
Schema<?> schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (default serde and no schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (no serde and wrong schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setSchemaType(SchemaType.AVRO.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (no serde and AUTO_CONSUME schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSerdeClassName(null);
pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);

// generic record type (default serde and AUTO_CONSUME schema type)
pulsarSinkConfig = getPulsarConfigs();
pulsarSinkConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
pulsarSinkConfig.setTypeClassName(GenericRecord.class.getName());
sink = new PulsarSink(
pulsarClient, pulsarSinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());
schema = sink.initializeSchema();
assertTrue(schema instanceof AutoConsumeSchema);
}

@Test
public void testSinkAndMessageRouting() throws Exception {

Expand Down Expand Up @@ -415,6 +482,105 @@ public Optional<Long> getRecordSequence() {
}
}

@Test
public void testWriteGenericRecordsAtMostOnce() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.ATMOST_ONCE);
}

@Test
public void testWriteGenericRecordsAtLeastOnce() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.ATLEAST_ONCE);
}

@Test
public void testWriteGenericRecordsEOS() throws Exception {
testWriteGenericRecords(ProcessingGuarantees.EFFECTIVELY_ONCE);
}

private void testWriteGenericRecords(ProcessingGuarantees guarantees) throws Exception {
String defaultTopic = "default";

PulsarSinkConfig sinkConfig = getPulsarConfigs();
sinkConfig.setTopic(defaultTopic);
sinkConfig.setTypeClassName(GenericRecord.class.getName());
sinkConfig.setProcessingGuarantees(guarantees);

PulsarClient client = getPulsarClient();
PulsarSink pulsarSink = new PulsarSink(
client, sinkConfig, new HashMap<>(), mock(ComponentStatsManager.class),
Thread.currentThread().getContextClassLoader());

pulsarSink.open(new HashMap<>(), mock(SinkContext.class));

if (ProcessingGuarantees.ATMOST_ONCE == guarantees) {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtMostOnceProcessor);
} else if (ProcessingGuarantees.ATLEAST_ONCE == guarantees) {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtLeastOnceProcessor);
} else {
assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkEffectivelyOnceProcessor);
}
PulsarSinkProcessorBase processor = (PulsarSinkProcessorBase) pulsarSink.pulsarSinkProcessor;
assertFalse(processor.publishProducers.containsKey(defaultTopic));

String[] topics = { "topic-1", "topic-2", "topic-3" };
for (String topic : topics) {

RecordSchemaBuilder builder = SchemaBuilder.record("MyRecord");
builder.field("number").type(SchemaType.INT32);
builder.field("text").type(SchemaType.STRING);
GenericSchema<GenericRecord> schema = Schema.generic(builder.build(SchemaType.AVRO));

GenericRecordBuilder recordBuilder = schema.newRecordBuilder();
recordBuilder.set("number", 1);
recordBuilder.set("text", topic);

GenericRecord genericRecord = recordBuilder.build();

SinkRecord<GenericRecord> record = new SinkRecord<>(new Record<GenericRecord>() {

@Override
public Optional<String> getDestinationTopic() {
return Optional.of(topic);
}

@Override
public Schema<GenericRecord> getSchema() {
return schema;
}

@Override
public GenericRecord getValue() {
return genericRecord;
}

@Override
public Optional<String> getPartitionId() {
return Optional.of(topic + "-id-1");
}

@Override
public Optional<Long> getRecordSequence() {
return Optional.of(1L);
}
}, genericRecord);

pulsarSink.write(record);

if (ProcessingGuarantees.EFFECTIVELY_ONCE == guarantees) {
assertTrue(processor.publishProducers.containsKey(String.format("%s-%s-id-1", topic, topic)));
} else {
assertTrue(processor.publishProducers.containsKey(topic));
}
verify(client.newProducer(), times(1))
.topic(argThat(
otherTopic -> topic != null ? topic.equals(otherTopic) : defaultTopic.equals(otherTopic)));

verify(client, times(1))
.newProducer(argThat(
otherSchema -> Objects.equals(otherSchema, schema)));
}
}

private Optional<String> getTopicOptional(String topic) {
if (topic != null) {
return Optional.of(topic);
Expand Down

0 comments on commit 03b3a26

Please sign in to comment.