Skip to content

Commit

Permalink
Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded t…
Browse files Browse the repository at this point in the history
…he schema before decoding the payload (apache#10248)

(cherry picked from commit 54523bb)
  • Loading branch information
eolivelli committed May 12, 2021
1 parent ec3ad85 commit ff59244
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

public class MessageImpl<T> implements Message<T> {
Expand Down Expand Up @@ -279,10 +280,18 @@ public byte[] getSchemaVersion() {
}
}

private SchemaInfo getSchemaInfo() {
if (schema instanceof AutoConsumeSchema) {
((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
}
return schema.getSchemaInfo();
}

@Override
public T getValue() {
checkNotNull(msgMetadataBuilder);
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
SchemaInfo schemaInfo = getSchemaInfo();
if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
Expand All @@ -30,7 +29,11 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -77,27 +80,7 @@ public boolean supportSchemaVersioning() {

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
if (schema == null) {
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoProvider.getLatestSchema().get();
if (schemaInfo == null) {
// schemaless topic
schemaInfo = BytesSchema.of().getSchemaInfo();
}
} catch (InterruptedException | ExecutionException e ) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("Can't get last schema for topic {} use AutoConsumeSchema", topicName);
throw new SchemaSerializationException(e.getCause());
}
// schemaInfo null means that there is no schema attached to the topic.
schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
fetchSchemaIfNeeded();
ensureSchemaInitialized();
return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
}
Expand Down Expand Up @@ -157,7 +140,7 @@ public Optional<Object> getNativeSchema() {
}
}

private Schema<?> generateSchema(SchemaInfo schemaInfo) {
private static Schema<?> generateSchema(SchemaInfo schemaInfo) {
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
final boolean useProvidedSchemaAsReaderSchema = false;
Expand Down Expand Up @@ -260,4 +243,47 @@ public static GenericRecord wrapPrimitiveObject(Object value, SchemaType type, b
public Schema<?> getInternalSchema() {
return schema;
}

/**
* It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
* We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
* places and we will introduce lots of deadlocks.
*/
public void fetchSchemaIfNeeded() throws SchemaSerializationException {
if (schema == null) {
if (schemaInfoProvider == null) {
throw new SchemaSerializationException("Can't get accurate schema information for topic " + topicName +
"using AutoConsumeSchema because SchemaInfoProvider is not set yet");
} else {
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoProvider.getLatestSchema().get();
if (schemaInfo == null) {
// schemaless topic
schemaInfo = BytesSchema.of().getSchemaInfo();
}
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("Can't get last schema for topic {} using AutoConsumeSchema", topicName);
throw new SchemaSerializationException(e.getCause());
}
// schemaInfo null means that there is no schema attached to the topic.
schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
}
}

@Override
public String toString() {
if (schema != null && schema.getSchemaInfo() != null) {
return "AUTO_CONSUME(schematype=" + schema.getSchemaInfo().getType() + ")";
} else {
return "AUTO_CONSUME(uninitialized)";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void open(Map<String, Object> config, SinkContext sourceContext) throws E
}

public void write(Record<GenericObject> record) {

log.info("topic {}", record.getTopicName().orElse(null));
log.info("properties {}", record.getProperties());
log.info("received record {} {}", record, record.getClass());
log.info("schema {}", record.getSchema());
Expand Down Expand Up @@ -65,6 +65,8 @@ public void write(Record<GenericObject> record) {
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());

record.ack();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Builder;
import lombok.Cleanup;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
Expand All @@ -43,6 +45,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.testng.Assert.assertEquals;
Expand All @@ -55,15 +59,14 @@
@Slf4j
public class PulsarGenericObjectSinkTest extends PulsarStandaloneTestSuite {

@Getter
private static final class SinkSpec<T> {
final String outputTopicName;
final String sinkName;
final Schema<T> schema;
final T testValue;

public SinkSpec(String outputTopicName, String sinkName, Schema<T> schema, T testValue) {
public SinkSpec(String outputTopicName, Schema<T> schema, T testValue) {
this.outputTopicName = outputTopicName;
this.sinkName = sinkName;
this.schema = schema;
this.testValue = testValue;
}
Expand All @@ -89,90 +92,82 @@ public void testGenericObjectSink() throws Exception {
.serviceUrl(container.getPlainTextServiceUrl())
.build();

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();

// we are not using a parametrized test in order to save resources
// we create N sinks, send the records and verify each sink
// sinks execution happens in parallel
// we create one sink that listens on multiple topics, send the records and verify the sink
List<SinkSpec> specs = Arrays.asList(
new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
new SinkSpec("test-kv-sink-input-string-" + randomName(8), Schema.STRING, "foo"),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
);
// submit all sinks
for (SinkSpec spec : specs) {
submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
}
// check all sinks
for (SinkSpec spec : specs) {
// get sink info
getSinkInfoSuccess(spec.sinkName);
getSinkStatus(spec.sinkName);
}

final int numRecords = 10;
final int numRecordsPerTopic = 2;

String sinkName = "genericobject-sink";
String topicNames = specs
.stream()
.map(SinkSpec::getOutputTopicName)
.collect(Collectors.joining(","));
submitSinkConnector(sinkName, topicNames, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
// get sink info
getSinkInfoSuccess(sinkName);
getSinkStatus(sinkName);


for (SinkSpec spec : specs) {

@Cleanup Producer<Object> producer = client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
for (int i = 0; i < numRecordsPerTopic; i++) {
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType", spec.schema.getSchemaInfo().getType().toString())
.property("recordNumber", i + "")
.send();
log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
}
}

// wait that all sinks processed all records without errors
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
for (SinkSpec spec : specs) {
try {
log.info("waiting for sink {}", spec.sinkName);
for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
if (instance.getStatus().numWrittenToSink >= numRecords) {
break;
}
assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
Thread.sleep(1000);
}

SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
log.info("sink {} is okay", spec.sinkName);
} finally {
dumpSinkLogs(spec);
// wait that sink processed all records without errors

try {
log.info("waiting for sink {}", sinkName);

for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
log.info("sink {} status {}", sinkName, status);
assertEquals(status.getInstances().size(), 1);
SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
if (instance.getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size()
|| instance.getStatus().numSinkExceptions > 0
|| instance.getStatus().numSystemExceptions > 0
|| instance.getStatus().numRestarts > 0) {
break;
}
Thread.sleep(1000);
}
}


for (SinkSpec spec : specs) {
deleteSink(spec.sinkName);
getSinkInfoNotFound(spec.sinkName);
SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
log.info("sink {} status {}", sinkName, status);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecordsPerTopic * specs.size());
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
log.info("sink {} is okay", sinkName);
} finally {
dumpFunctionLogs(sinkName);
}
}

private void dumpSinkLogs(SinkSpec spec) {
try {
String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
return IOUtils.toString(inputStream, "utf-8");
});
log.info("Sink {} logs {}", spec.sinkName, logs);
} catch (Throwable err) {
log.info("Cannot download sink {} logs", spec.sinkName, err);
}
deleteSink(sinkName);
getSinkInfoNotFound(sinkName);
}

private void submitSinkConnector(String sinkName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public void tearDownCluster() throws Exception {
public String getTestName() {
return "pulsar-standalone-suite";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testcontainers.containers.Network;
Expand Down Expand Up @@ -90,4 +91,18 @@ protected void stopCluster() throws Exception {
network.close();
}



protected void dumpFunctionLogs(String name) {
try {
String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
return IOUtils.toString(inputStream, "utf-8");
});
log.info("Function {} logs {}", name, logs);
} catch (Throwable err) {
log.info("Cannot download {} logs", name, err);
}
}

}

0 comments on commit ff59244

Please sign in to comment.